Skip to content

Commit

Permalink
[Spark] Apply filters pushed down into DeltaCDFRelation (#3127)
Browse files Browse the repository at this point in the history
#### Which Delta project/connector is this regarding?

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

This PR modifies `DeltaCDFRelation` to apply the filters that are pushed
down into this. This enables both partition pruning and row group
skipping to happen when reading the Change Data Feed.

## How was this patch tested?

Unit tests

## Does this PR introduce _any_ user-facing changes?

No
  • Loading branch information
tomvanbussel authored May 22, 2024
1 parent 0ee9fd0 commit f2d6c8b
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArray, RoaringBi
import org.apache.spark.sql.delta.files.{CdcAddFileIndex, TahoeChangeFileIndex, TahoeFileIndexWithSnapshotDescriptor, TahoeRemoveFileIndex}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSource, DeltaSQLConf}
import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSource, DeltaSourceUtils, DeltaSQLConf}
import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore
import org.apache.spark.sql.util.ScalaExtensions.OptionExt

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession, SQLContext}
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Literal}
import org.apache.spark.sql.catalyst.plans.logical.Statistics
Expand Down Expand Up @@ -147,6 +147,8 @@ object CDCReader extends CDCReaderImpl

override val schema: StructType = cdcReadSchema(snapshotForBatchSchema.metadata.schema)

override def unhandledFilters(filters: Array[Filter]): Array[Filter] = Array.empty

override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
val df = changesToBatchDF(
deltaLog,
Expand All @@ -158,7 +160,9 @@ object CDCReader extends CDCReaderImpl
sqlContext.sparkSession,
readSchemaSnapshot = Some(snapshotForBatchSchema))

df.select(requiredColumns.map(SchemaUtils.fieldNameToColumn): _*).rdd
val filter = new Column(DeltaSourceUtils.translateFilters(filters))
val projections = requiredColumns.map(SchemaUtils.fieldNameToColumn)
df.filter(filter).select(projections: _*).rdd
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,5 @@ object DeltaSourceUtils {
UnresolvedAttribute(attribute), expressions.Literal.create(s"%${value}%"))
case sources.AlwaysTrue() => expressions.Literal.TrueLiteral
case sources.AlwaysFalse() => expressions.Literal.FalseLiteral
}.reduce(expressions.And)
}.reduceOption(expressions.And).getOrElse(expressions.Literal.TrueLiteral)
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class DeltaCDCSQLSuite extends DeltaCDCSuiteBase with DeltaColumnMappingTestUtil
.withColumn("_change_type", lit("insert")))
}
assert(plans.map(_.executedPlan).toString
.contains("PushedFilters: [IsNotNull(id), LessThan(id,5)]"))
.contains("PushedFilters: [*IsNotNull(id), *LessThan(id,5)]"))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,28 @@ class DeltaCDCScalaSuite extends DeltaCDCSuiteBase {
}
}

test("filters should be pushed down") {
val tblName = "tbl"
withTable(tblName) {
createTblWithThreeVersions(tblName = Some(tblName))
val plans = DeltaTestUtils.withAllPlansCaptured(spark) {
val res = spark.read.format("delta")
.option(DeltaOptions.CDC_READ_OPTION, "true")
.option("startingVersion", 0)
.option("endingVersion", 1)
.table(tblName)
.select("id", "_change_type")
.where(col("id") < lit(5))
assert(res.columns === Seq("id", "_change_type"))
checkAnswer(
res,
spark.range(5)
.withColumn("_change_type", lit("insert")))
}
assert(plans.map(_.executedPlan).toString
.contains("PushedFilters: [*IsNotNull(id), *LessThan(id,5)]"))
}
}

test("start version or timestamp is not provided") {
val tblName = "tbl"
Expand Down

0 comments on commit f2d6c8b

Please sign in to comment.