Skip to content

Commit 05b0543

Browse files
[SPARK-54240] Translate get array item catalyst expression to connector expression
### What changes were proposed in this pull request? - Support conversion of catalyst GetArrayItem expression to connector expression to allow data sources to implement pushdown of this expression ### Why are the changes needed? - To allow data sources (built-in and third-party) to implement pushdown of get array item ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? No testing needed, since we did not implement pushdowns yet ### Was this patch authored or co-authored using generative AI tooling? No Closes #52940 from urosstan-db/SPARK-54240-support-get-array-item-pushdown. Lead-authored-by: Uros Stankovic <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Uros Stankovic <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 4eb56bb commit 05b0543

File tree

5 files changed

+84
-1
lines changed

5 files changed

+84
-1
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1693,6 +1693,12 @@
16931693
],
16941694
"sqlState" : "42846"
16951695
},
1696+
"EXPRESSION_TRANSLATION_TO_V2_IS_NOT_SUPPORTED" : {
1697+
"message" : [
1698+
"Expression <expr> cannot be translated to v2 expression."
1699+
],
1700+
"sqlState" : "0A000"
1701+
},
16961702
"EXPRESSION_TYPE_IS_NOT_ORDERABLE" : {
16971703
"message" : [
16981704
"Column expression <expr> cannot be sorted because its type <exprType> is not orderable."
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.expressions;
19+
20+
import org.apache.spark.annotation.Evolving;
21+
import org.apache.spark.sql.internal.connector.ExpressionWithToString;
22+
23+
/**
24+
* Get array item expression.
25+
*
26+
* @since 4.1.0
27+
*/
28+
29+
@Evolving
30+
public class GetArrayItem extends ExpressionWithToString {
31+
32+
private final Expression childArray;
33+
private final Expression ordinal;
34+
private final boolean failOnError;
35+
36+
/**
37+
* Creates GetArrayItem expression.
38+
* @param childArray Array that is source to get element from. Child of this expression.
39+
* @param ordinal Ordinal of element. Zero-based indexing.
40+
* @param failOnError Whether expression should throw exception for index out of bound or to
41+
* return null.
42+
*/
43+
public GetArrayItem(Expression childArray, Expression ordinal, boolean failOnError) {
44+
this.childArray = childArray;
45+
this.ordinal = ordinal;
46+
this.failOnError = failOnError;
47+
}
48+
49+
public Expression childArray() { return this.childArray; }
50+
public Expression ordinal() { return this.ordinal; }
51+
public boolean failOnError() { return this.failOnError; }
52+
53+
@Override
54+
public Expression[] children() { return new Expression[]{ childArray, ordinal }; }
55+
}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.spark.sql.connector.expressions.Extract;
3030
import org.apache.spark.sql.connector.expressions.NamedReference;
3131
import org.apache.spark.sql.connector.expressions.GeneralScalarExpression;
32+
import org.apache.spark.sql.connector.expressions.GetArrayItem;
3233
import org.apache.spark.sql.connector.expressions.Literal;
3334
import org.apache.spark.sql.connector.expressions.NullOrdering;
3435
import org.apache.spark.sql.connector.expressions.SortDirection;
@@ -84,6 +85,8 @@ public String build(Expression expr) {
8485
} else if (expr instanceof SortOrder sortOrder) {
8586
return visitSortOrder(
8687
build(sortOrder.expression()), sortOrder.direction(), sortOrder.nullOrdering());
88+
} else if (expr instanceof GetArrayItem getArrayItem) {
89+
return visitGetArrayItem(getArrayItem);
8790
} else if (expr instanceof GeneralScalarExpression e) {
8891
String name = e.name();
8992
return switch (name) {
@@ -348,6 +351,13 @@ protected String visitTrim(String direction, String[] inputs) {
348351
}
349352
}
350353

354+
protected String visitGetArrayItem(GetArrayItem getArrayItem) {
355+
throw new SparkUnsupportedOperationException(
356+
"EXPRESSION_TRANSLATION_TO_V2_IS_NOT_SUPPORTED",
357+
Map.of("expr", getArrayItem.toString())
358+
);
359+
}
360+
351361
protected String visitExtract(Extract extract) {
352362
return visitExtract(extract.field(), build(extract.source()));
353363
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
2424
import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke}
2525
import org.apache.spark.sql.catalyst.optimizer.ConstantFolding
2626
import org.apache.spark.sql.connector.catalog.functions.ScalarFunction
27-
import org.apache.spark.sql.connector.expressions.{Cast => V2Cast, Expression => V2Expression, Extract => V2Extract, FieldReference, GeneralScalarExpression, LiteralValue, NullOrdering, SortDirection, SortValue, UserDefinedScalarFunc}
27+
import org.apache.spark.sql.connector.expressions.{Cast => V2Cast, Expression => V2Expression, Extract => V2Extract, FieldReference, GeneralScalarExpression, GetArrayItem => V2GetArrayItem, LiteralValue, NullOrdering, SortDirection, SortValue, UserDefinedScalarFunc}
2828
import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, Avg, Count, CountStar, GeneralAggregateFunc, Max, Min, Sum, UserDefinedAggregateFunc}
2929
import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, AlwaysTrue, And => V2And, Not => V2Not, Or => V2Or, Predicate => V2Predicate}
3030
import org.apache.spark.sql.internal.SQLConf
@@ -326,6 +326,13 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) extends L
326326
case _: Sha2 => generateExpressionWithName("SHA2", expr, isPredicate)
327327
case _: StringLPad => generateExpressionWithName("LPAD", expr, isPredicate)
328328
case _: StringRPad => generateExpressionWithName("RPAD", expr, isPredicate)
329+
case GetArrayItem(child, ordinal, failOnError) =>
330+
(generateExpression(child), generateExpression(ordinal)) match {
331+
case (Some(v2ArrayChild), Some(v2Ordinal)) =>
332+
Some(new V2GetArrayItem(v2ArrayChild, v2Ordinal, failOnError))
333+
case _ =>
334+
None
335+
}
329336
// TODO supports other expressions
330337
case ApplyFunctionExpression(function, children) =>
331338
val childrenExpressions = children.flatMap(generateExpression(_))

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ToStringSQLBuilder.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.internal.connector
1919

20+
import org.apache.spark.sql.connector.expressions.GetArrayItem
2021
import org.apache.spark.sql.connector.util.V2ExpressionSQLBuilder
2122

2223
/**
@@ -35,4 +36,8 @@ class ToStringSQLBuilder extends V2ExpressionSQLBuilder with Serializable {
3536
val distinct = if (isDistinct) "DISTINCT " else ""
3637
s"""$funcName($distinct${inputs.mkString(", ")})"""
3738
}
39+
40+
override protected def visitGetArrayItem(getArrayItem: GetArrayItem): String = {
41+
s"${getArrayItem.childArray.toString}[${getArrayItem.ordinal.toString}]"
42+
}
3843
}

0 commit comments

Comments
 (0)