From Spark 1.3, the team introduced a data source API to help quickly integrating various input formats with Spark SQL. But eventually this version of API became insufficient and the team needed to add a lot of internal codes to provide more efficient solutions for Spark SQL data sources. So in Spark 2.3, the second version of data source API is out, which is supposed to overcome the limitations of the previous version. In this article, I will demonstrate how to implement custom data source for Spark SQL in both V1 and V2 API, to help understanding their differences and the new API’s advantages.
DataSource V1 API
V1 API provides a set of abstract classes and traits. They are located in spark/sql/sources/interfaces.scala. Some basic APIs are:
1 | trait RelationProvider { |
A RelationProvider defines a class that can create a relational data source for Spark SQL to manipulate with. It can initialize itself with provided options, such as file path or authentication. BaseRelation is used to define the data schema, which can be loaded from database, Parquet file, or specified by the user. This class also needs to mix-in one of the Scan traits, implements the buildScan method, and returns an RDD.
JdbcSourceV1
Now we use V1 API to implement a JDBC data source. For simplicity, the table schema is hard coded, and it only supports full table scan. Complete example can be found on GitHub (link), while the sample data is in here.
1 | class JdbcSourceV1 extends RelationProvider { |
The actual data reading happens in JdbcRDD#compute. It receives the connection options, possibly with pruned column list and where conditions, executes the query, and returns an iterator of Row objects, correspondent to the defined schema. Now we can create a DataFrame from this custom data source.
1 | val df = spark.read |
The outputs are:
1 | root |
Limitations of V1 API
As we can see, V1 API is quite straightforward and can meet the initial requirements of Spark SQL use cases. But as Spark moves forward, V1 API starts to show its limitations.
Coupled with Higher Level API
createRelation accepts SQLContext as parameter; buildScan returns RDD of Row; and when implementing writable data source, the insert method accepts DataFrame type.
1 | trait InsertableRelation { |
These classes are of higher level of Spark API, and some of them have already upgraded, like SQLContext is superceded by SparkSession, and DataFrame is now an alias of Dataset[Row]. Data sources should not be required to reflect these changes.
Hard to Add New Push Down Operators
Besides TableScan, V1 API provides PrunedScan to eliminate unnecessary columns, and PrunedFilteredScan to push predicates down to data source. In JdbcSourceV1, they are reflected in the SQL statement.
1 | class JdbcRelationV1 extends BaseRelation with PrunedFilteredScan { |
What if we need to push down a new operator like limit? It will introduce a whole new set of Scan traits.
1 | trait LimitedScan { |
Hard to Pass Partition Info
For data sources that support partitioning like HDFS and Kafka, V1 API does not provide native support for partitioning and data locality. We need to achieve this by extending the RDD class. For instance, some Kafka topic contains several partitions, and we want the data reading task to be run on the servers where leader brokers reside.
1 | case class KafkaPartition(partitionId: Int, leaderHost: String) extends Partition { |
Besides, some database like Cassandra distributes data by primary key. If the query pipeline contains grouping on the columns, this information can be used by the optimizer to avoid shuffling. V2 API supports this with a dedicated trait.
Lack of Transactional Writing
Spark tasks may fail, and with V1 API there will be partially written data. For file systems like HDFS, we can put a _SUCCESS file in the output directory to indicate if the job finishes successfully, but this process needs to be implemented by users, while V2 API provides explicit interfaces to support transactional writing.
Lack of Columnar and Streaming Support
Columnar data and stream processing are both added to Spark SQL without using V1 API. Current implementations like ParquetFileFormat and KafkaSource are written in dedicated codes with internal APIs. These features are also addressed by V2 API.
DataSource V2 API
V2 API starts with a marker interface DataSourceV2. The class needs to be mixed-in with either ReadSupport or WriteSupport. ReadSupport interface, for instance, creates a DataSourceReader with initialization options; DataSourceReader reads schema of the data source, and returns a list of DataReaderFactory; the factory will create the actual DataReader, which works like an iterator. Besides, DataSourceReader can mix-in various Support interfaces, to apply query optimizations like operator push-down and columnar scan. For WriteSupport interfaces, the hierarchy is similar. All of them are written in Java for better interoperability.
1 | public interface DataSourceV2 {} |
You may notice that DataSourceReader#createDataReaderFactories still relies on Row class, because currently only Row is supported, and V2 API is still marked as Evolving.
JdbcSourceV2
Let us rewrite the JDBC data source with V2 API. The following is an abridged example of full table scan. Complete code can be found on GitHub (link).
1 | class JdbcDataSourceReader extends DataSourceReader { |
Prune Columns
DataSourceReader can mix-in the SupportsPushDownRequiredColumns trait. Spark will invoke the pruneColumns method with required StructType, and DataSourceReader can pass it to underlying DataReader.
1 | class JdbcDataSourceReader with SupportsPushDownRequiredColumns { |
We can examine the execution plan with df.explain(true). For instance, the optimized logical plan of query SELECT emp_name, age FROM employee shows column pruning is pushed down to the data source.
1 | == Analyzed Logical Plan == |
Push Down Filters
Similarly, with SupportsPushDownFilters, we can add where conditions to the underlying SQL query.
1 | class JdbcDataSourceReader with SupportsPushDownFilters { |
Multiple Partitions
createDataReaderFactories returns a list. Each reader will output data for an RDD partition. Say we want to parallelize the data reading tasks, we can divide the records into two parts, according to primary key ranges.
1 | def createDataReaderFactories() = { |
Transactional Write
V2 API provides two sets of commit / abort methods to implement transactional writes.
1 | public interface DataSourceWriter { |
DataSourceWriter is running on Spark driver, DataWriter on executor. When DataWriter succeeds in writing, it sends commit message to driver, and after DataSourceWriter collects all writers’ commit messages, it will do the final commit. If the writer task fails, abort will be called, and a new task will be retried. When the retries hit the maximum, abort will be called on all tasks.
Columnar and Streaming Support
These features are currently still in experimental status and there is no concrete implementation yet. Briefly, DataSourceReader can mix-in SupportsScanColumnarBatch trait and creates DataReaderFactory that handles ColumnarBatch, an interface that Spark uses to represent columnar data. For streaming support, there are MicroBatchReader and ContinuousReader traits. One can refer to the unit tests for more details.
References
- http://blog.madhukaraphatak.com/spark-datasource-v2-part-1/
- https://databricks.com/session/apache-spark-data-source-v2
- https://databricks.com/blog/2015/01/09/spark-sql-data-sources-api-unified-data-access-for-the-spark-platform.html
- https://developer.ibm.com/code/2018/04/16/introducing-apache-spark-data-sources-api-v2/