Skip to content

Commit

Permalink
[SC-75521][DELTA] Strip the full temp view plan for Delta DML commands
Browse files Browse the repository at this point in the history
Strip the full temp view plan for Delta DML commands. This allows us to reenable the test for merging into SQL temp views for MERGE - previously resolution would fail.

new unit test

Author: Jose Torres <joseph.torres@databricks.com>

GitOrigin-RevId: b418f4bd194d6186390261cd8d32c4f2c9ed1048
  • Loading branch information
jose-torres authored and Yaohua628 committed Jul 6, 2021
1 parent ac5f9e1 commit 83277eb
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 14 deletions.
7 changes: 6 additions & 1 deletion core/src/main/scala/io/delta/tables/DeltaMergeBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import scala.collection.JavaConverters._
import scala.collection.Map

import org.apache.spark.sql.delta.{DeltaErrors, PreprocessTableMerge}
import org.apache.spark.sql.delta.DeltaViewHelper
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.util.AnalysisHelper

Expand All @@ -30,6 +31,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.internal.SQLConf

/**
* Builder to specify how to merge data from source DataFrame into the target Delta table.
Expand Down Expand Up @@ -217,8 +219,11 @@ class DeltaMergeBuilder private(
if (!resolvedMergeInto.resolved) {
throw DeltaErrors.analysisException("Failed to resolve\n", plan = Some(resolvedMergeInto))
}
val strippedMergeInto = resolvedMergeInto.copy(
target = DeltaViewHelper.stripTempViewForMerge(resolvedMergeInto.target, SQLConf.get)
)
// Preprocess the actions and verify
val mergeIntoCommand = PreprocessTableMerge(sparkSession.sessionState.conf)(resolvedMergeInto)
val mergeIntoCommand = PreprocessTableMerge(sparkSession.sessionState.conf)(strippedMergeInto)
sparkSession.sessionState.analyzer.checkAnalysis(mergeIntoCommand)
mergeIntoCommand.run(sparkSession)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ package org.apache.spark.sql.delta

import scala.collection.JavaConverters._


// scalastyle:off import.ordering.noEmptyLine

import org.apache.spark.sql.delta.DeltaErrors.{TemporallyUnstableInputException, TimestampEarlierThanCommitRetentionException}
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.constraints.{AddConstraint, DropConstraint}
import org.apache.spark.sql.delta.files.TahoeLogFileIndex
import org.apache.spark.sql.delta.files.{TahoeFileIndex, TahoeLogFileIndex}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.DeltaDataSource
Expand Down Expand Up @@ -156,7 +154,8 @@ class DeltaAnalysis(session: SparkSession)
s"WHEN NOT MATCHED clause in MERGE INTO.")
}
// rewrites Delta from V2 to V1
val newTarget = stripTempViewWrapper(target).transformUp { case DeltaRelation(lr) => lr }
val newTarget =
stripTempViewForMergeWrapper(target).transformUp { case DeltaRelation(lr) => lr }
// Even if we're merging into a non-Delta target, we will catch it later and throw an
// exception.
val deltaMerge =
Expand All @@ -168,7 +167,7 @@ class DeltaAnalysis(session: SparkSession)
val d = if (deltaMerge.childrenResolved && !deltaMerge.resolved) {
DeltaMergeInto.resolveReferences(deltaMerge, conf)(tryResolveReferences(session))
} else deltaMerge
d.copy(target = stripTempViewWrapper(d.target))
d.copy(target = stripTempViewForMergeWrapper(d.target))

case AlterTableAddConstraintStatement(
original @ SessionCatalogAndIdentifier(catalog, ident), constraintName, expr) =>
Expand Down Expand Up @@ -362,8 +361,11 @@ class DeltaAnalysis(session: SparkSession)
private def stripTempViewWrapper(plan: LogicalPlan): LogicalPlan = {
DeltaViewHelper.stripTempView(plan, conf)
}
}

private def stripTempViewForMergeWrapper(plan: LogicalPlan): LogicalPlan = {
DeltaViewHelper.stripTempViewForMerge(plan, conf)
}
}

/** Matchers for dealing with a Delta table. */
object DeltaRelation {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@
package org.apache.spark.sql.delta

import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, SubqueryAlias, View}
import org.apache.spark.sql.internal.SQLConf

object DeltaViewHelper {
Expand Down Expand Up @@ -86,4 +87,10 @@ object DeltaViewHelper {
case (_, originAttr) => originAttr
}
}

def stripTempViewForMerge(plan: LogicalPlan, conf: SQLConf): LogicalPlan = {
// Check that the two expression lists have the same names and types in the same order, and
// are either attributes or direct casts of attributes.
stripTempView(plan, conf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ class MergeIntoSQLSuite extends MergeIntoSuiteBase with DeltaSQLCommandTest {
}
}

ignore("merge into a SQL temp view") {
test("merge into a SQL temp view") {
withTable("tab") {
withTempView("v") {
withTempView("src") {
Expand All @@ -253,6 +253,40 @@ class MergeIntoSQLSuite extends MergeIntoSuiteBase with DeltaSQLCommandTest {
}
}

protected def testInvalidSqlTempView(name: String)(text: String, expectedError: String): Unit = {
test(s"can't merge into invalid SQL temp view - $name") {
withTable("tab") {
withTempView("v") {
withTempView("src") {
Seq((0, 3), (1, 2)).toDF("key", "value").write.format("delta").saveAsTable("tab")
sql(text)
sql("CREATE TEMP VIEW src AS SELECT * FROM VALUES (1, 2), (3, 4) AS t(a, b)")
val ex = intercept[AnalysisException] {
sql(
s"""
|MERGE INTO v
|USING src
|ON src.a = v.key AND src.b = v.value
|WHEN MATCHED THEN
| UPDATE SET v.value = src.b + 1
|WHEN NOT MATCHED THEN
| INSERT (v.key, v.value) VALUES (src.a, src.b)
|""".stripMargin)
}
assert(ex.getMessage.contains(expectedError))
}
}
}
}
}

testInvalidSqlTempView("subset cols")(
text = "CREATE TEMP VIEW v AS SELECT key FROM tab",
expectedError = "cannot resolve"
)



// This test is to capture the incorrect behavior caused by
// https://github.com/delta-io/delta/issues/618 .
// If this test fails then the issue has been fixed. Replace this test with a correct test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,14 @@ abstract class MergeIntoSuiteBase
}

Seq(true, false).foreach { skippingEnabled =>
Seq(true, false).foreach { isPartitioned =>
// TODO (SC-72770): enable test case when useSQLView = true
Seq(false).foreach { useSQLView =>
Seq(true, false).foreach { partitioned =>
Seq(true, false).foreach { useSQLView =>
test("basic case - merge to view on a Delta table by path, " +
s"isPartitioned: $isPartitioned skippingEnabled: $skippingEnabled") {
s"partitioned: $partitioned skippingEnabled: $skippingEnabled useSqlView: $useSQLView") {
withTable("delta_target", "source") {
withSQLConf(DeltaSQLConf.DELTA_STATS_SKIPPING.key -> skippingEnabled.toString) {
Seq((1, 1), (0, 3), (1, 6)).toDF("key1", "value").createOrReplaceTempView("source")
val partitions = if (isPartitioned) "key2" :: Nil else Nil
val partitions = if (partitioned) "key2" :: Nil else Nil
append(Seq((2, 2), (1, 4)).toDF("key2", "value"), partitions)
if (useSQLView) {
sql(s"CREATE OR REPLACE TEMP VIEW delta_target AS " +
Expand Down

0 comments on commit 83277eb

Please sign in to comment.