Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: CometScanExec on Spark 3.5.2 #915

Merged
merged 7 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,30 @@ case class CometScanExec(
if (wrapped == null) Map.empty else wrapped.metadata

override def verboseStringWithOperatorId(): String = {
getTagValue(QueryPlan.OP_ID_TAG).foreach(id => wrapped.setTagValue(QueryPlan.OP_ID_TAG, id))
wrapped.verboseStringWithOperatorId()
val metadataStr = metadata.toSeq.sorted
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the original issue was that OP_ID_TAG has been removed, is it not sufficient to just remove the offending line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just removing setting the OP_ID_TAG leads to a formatted explain of:

== Physical Plan ==
* ColumnarToRow (2)
+- CometScan parquet  (1)


(unknown) Scan parquet 
Output [20]: [_1#0, _2#1, _3#2, _4#3, _5#4L, _6#5, _7#6, _8#7, _9#8, _10#9, _11#10L, _12#11, _13#12, _14#13, _15#14, _16#15, _17#16, _18#17, _19#18, _20#19]
Batched: true
Location: InMemoryFileIndex [file:/Users/abinford/projects/arrow-datafusion-comet/spark/target/tmp/spark-c0b82b7c-3de1-431b-96bf-56cb37f3a463/test.parquet]
ReadSchema: struct<_1:boolean,_2:tinyint,_3:smallint,_4:int,_5:bigint,_6:float,_7:double,_8:string,_9:smallint,_10:int,_11:bigint,_12:decimal(20,0),_13:string,_14:binary,_15:decimal(5,2),_16:decimal(18,10),_17:decimal(38,37),_18:timestamp,_19:timestamp,_20:date>

(2) ColumnarToRow [codegen id : 1]
Input [20]: [_1#0, _2#1, _3#2, _4#3, _5#4L, _6#5, _7#6, _8#7, _9#8, _10#9, _11#10L, _12#11, _13#12, _14#13, _15#14, _16#15, _17#16, _18#17, _19#18, _20#19]

which is the whole reason it was added in the first place, to fix the unknown operator ID bc35fa5

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plus by what I could see, every other operator is prefixed with Comet in the formatted explain, so it's weird for Scan to be the one thing that doesn't actually match up to the physical plan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, that makes sense.
On an unrelated note, wonder why CometScanExec extends DataSourceScanExec instead of FileSourceScanExec (if it had we would have got the verboseString for free)

.filterNot {
case (_, value) if (value.isEmpty || value.equals("[]")) => true
case (key, _) if (key.equals("DataFilters") || key.equals("Format")) => true
case (_, _) => false
}
.map {
case (key, _) if (key.equals("Location")) =>
val location = relation.location
val numPaths = location.rootPaths.length
val abbreviatedLocation = if (numPaths <= 1) {
location.rootPaths.mkString("[", ", ", "]")
} else {
"[" + location.rootPaths.head + s", ... ${numPaths - 1} entries]"
}
s"$key: ${location.getClass.getSimpleName} ${redact(abbreviatedLocation)}"
case (key, value) => s"$key: ${redact(value)}"
}

s"""
|$formattedNodeName
|${ExplainUtils.generateFieldString("Output", output)}
|${metadataStr.mkString("\n")}
|""".stripMargin
}

lazy val inputRDD: RDD[InternalRow] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
+- CometScan parquet spark_catalog.default.customer (34)


(1) Scan parquet spark_catalog.default.store_returns
(1) CometScan parquet spark_catalog.default.store_returns
Output [4]: [sr_customer_sk#1, sr_store_sk#2, sr_return_amt#3, sr_returned_date_sk#4]
Batched: true
Location: InMemoryFileIndex []
Expand All @@ -53,7 +53,7 @@ ReadSchema: struct<sr_customer_sk:int,sr_store_sk:int,sr_return_amt:decimal(7,2)
Input [4]: [sr_customer_sk#1, sr_store_sk#2, sr_return_amt#3, sr_returned_date_sk#4]
Condition : (isnotnull(sr_store_sk#2) AND isnotnull(sr_customer_sk#1))

(3) Scan parquet spark_catalog.default.date_dim
(3) CometScan parquet spark_catalog.default.date_dim
Output [2]: [d_date_sk#6, d_year#7]
Batched: true
Location [not included in comparison]/{warehouse_dir}/date_dim]
Expand Down Expand Up @@ -99,7 +99,7 @@ Functions [1]: [sum(UnscaledValue(sr_return_amt#3))]
Input [3]: [ctr_customer_sk#9, ctr_store_sk#10, ctr_total_return#11]
Condition : isnotnull(ctr_total_return#11)

(13) Scan parquet spark_catalog.default.store_returns
(13) CometScan parquet spark_catalog.default.store_returns
Output [4]: [sr_customer_sk#12, sr_store_sk#13, sr_return_amt#14, sr_returned_date_sk#15]
Batched: true
Location: InMemoryFileIndex []
Expand Down Expand Up @@ -168,7 +168,7 @@ Arguments: [ctr_store_sk#10], [ctr_store_sk#19], Inner, (cast(ctr_total_return#1
Input [5]: [ctr_customer_sk#9, ctr_store_sk#10, ctr_total_return#11, (avg(ctr_total_return) * 1.2)#23, ctr_store_sk#19]
Arguments: [ctr_customer_sk#9, ctr_store_sk#10], [ctr_customer_sk#9, ctr_store_sk#10]

(28) Scan parquet spark_catalog.default.store
(28) CometScan parquet spark_catalog.default.store
Output [2]: [s_store_sk#24, s_state#25]
Batched: true
Location [not included in comparison]/{warehouse_dir}/store]
Expand Down Expand Up @@ -196,7 +196,7 @@ Arguments: [ctr_store_sk#10], [s_store_sk#24], Inner, BuildRight
Input [3]: [ctr_customer_sk#9, ctr_store_sk#10, s_store_sk#24]
Arguments: [ctr_customer_sk#9], [ctr_customer_sk#9]

(34) Scan parquet spark_catalog.default.customer
(34) CometScan parquet spark_catalog.default.customer
Output [2]: [c_customer_sk#26, c_customer_id#27]
Batched: true
Location [not included in comparison]/{warehouse_dir}/customer]
Expand Down Expand Up @@ -237,7 +237,7 @@ BroadcastExchange (45)
+- CometScan parquet spark_catalog.default.date_dim (41)


(41) Scan parquet spark_catalog.default.date_dim
(41) CometScan parquet spark_catalog.default.date_dim
Output [2]: [d_date_sk#6, d_year#7]
Batched: true
Location [not included in comparison]/{warehouse_dir}/date_dim]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ TakeOrderedAndProject (45)
+- CometScan parquet spark_catalog.default.customer_demographics (36)


(1) Scan parquet spark_catalog.default.customer
(1) CometScan parquet spark_catalog.default.customer
Output [3]: [c_customer_sk#3, c_current_cdemo_sk#4, c_current_addr_sk#5]
Batched: true
Location [not included in comparison]/{warehouse_dir}/customer]
Expand All @@ -57,14 +57,14 @@ ReadSchema: struct<c_customer_sk:int,c_current_cdemo_sk:int,c_current_addr_sk:in
Input [3]: [c_customer_sk#3, c_current_cdemo_sk#4, c_current_addr_sk#5]
Condition : (isnotnull(c_current_addr_sk#5) AND isnotnull(c_current_cdemo_sk#4))

(3) Scan parquet spark_catalog.default.store_sales
(3) CometScan parquet spark_catalog.default.store_sales
Output [2]: [ss_customer_sk#6, ss_sold_date_sk#7]
Batched: true
Location: InMemoryFileIndex []
PartitionFilters: [isnotnull(ss_sold_date_sk#7), dynamicpruningexpression(ss_sold_date_sk#7 IN dynamicpruning#8)]
ReadSchema: struct<ss_customer_sk:int>

(4) Scan parquet spark_catalog.default.date_dim
(4) CometScan parquet spark_catalog.default.date_dim
Output [3]: [d_date_sk#9, d_year#10, d_moy#11]
Batched: true
Location [not included in comparison]/{warehouse_dir}/date_dim]
Expand Down Expand Up @@ -104,7 +104,7 @@ Arguments: [c_customer_sk#3], [ss_customer_sk#6], LeftSemi, BuildRight
(12) ColumnarToRow [codegen id : 5]
Input [3]: [c_customer_sk#3, c_current_cdemo_sk#4, c_current_addr_sk#5]

(13) Scan parquet spark_catalog.default.web_sales
(13) CometScan parquet spark_catalog.default.web_sales
Output [2]: [ws_bill_customer_sk#12, ws_sold_date_sk#13]
Batched: true
Location: InMemoryFileIndex []
Expand Down Expand Up @@ -136,7 +136,7 @@ Right keys [1]: [ws_bill_customer_sk#12]
Join type: ExistenceJoin(exists#2)
Join condition: None

(20) Scan parquet spark_catalog.default.catalog_sales
(20) CometScan parquet spark_catalog.default.catalog_sales
Output [2]: [cs_ship_customer_sk#16, cs_sold_date_sk#17]
Batched: true
Location: InMemoryFileIndex []
Expand Down Expand Up @@ -176,7 +176,7 @@ Condition : (exists#2 OR exists#1)
Output [2]: [c_current_cdemo_sk#4, c_current_addr_sk#5]
Input [5]: [c_customer_sk#3, c_current_cdemo_sk#4, c_current_addr_sk#5, exists#2, exists#1]

(29) Scan parquet spark_catalog.default.customer_address
(29) CometScan parquet spark_catalog.default.customer_address
Output [2]: [ca_address_sk#20, ca_county#21]
Batched: true
Location [not included in comparison]/{warehouse_dir}/customer_address]
Expand Down Expand Up @@ -208,7 +208,7 @@ Join condition: None
Output [1]: [c_current_cdemo_sk#4]
Input [3]: [c_current_cdemo_sk#4, c_current_addr_sk#5, ca_address_sk#20]

(36) Scan parquet spark_catalog.default.customer_demographics
(36) CometScan parquet spark_catalog.default.customer_demographics
Output [9]: [cd_demo_sk#22, cd_gender#23, cd_marital_status#24, cd_education_status#25, cd_purchase_estimate#26, cd_credit_rating#27, cd_dep_count#28, cd_dep_employed_count#29, cd_dep_college_count#30]
Batched: true
Location [not included in comparison]/{warehouse_dir}/customer_demographics]
Expand Down Expand Up @@ -268,7 +268,7 @@ BroadcastExchange (50)
+- CometScan parquet spark_catalog.default.date_dim (46)


(46) Scan parquet spark_catalog.default.date_dim
(46) CometScan parquet spark_catalog.default.date_dim
Output [3]: [d_date_sk#9, d_year#10, d_moy#11]
Batched: true
Location [not included in comparison]/{warehouse_dir}/date_dim]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
+- ReusedExchange (59)


(1) Scan parquet spark_catalog.default.customer
(1) CometScan parquet spark_catalog.default.customer
Output [8]: [c_customer_sk#1, c_customer_id#2, c_first_name#3, c_last_name#4, c_preferred_cust_flag#5, c_birth_country#6, c_login#7, c_email_address#8]
Batched: true
Location [not included in comparison]/{warehouse_dir}/customer]
Expand All @@ -81,7 +81,7 @@ ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_first_name:string,c_
Input [8]: [c_customer_sk#1, c_customer_id#2, c_first_name#3, c_last_name#4, c_preferred_cust_flag#5, c_birth_country#6, c_login#7, c_email_address#8]
Condition : (isnotnull(c_customer_sk#1) AND isnotnull(c_customer_id#2))

(3) Scan parquet spark_catalog.default.store_sales
(3) CometScan parquet spark_catalog.default.store_sales
Output [4]: [ss_customer_sk#9, ss_ext_discount_amt#10, ss_ext_list_price#11, ss_sold_date_sk#12]
Batched: true
Location: InMemoryFileIndex []
Expand All @@ -106,7 +106,7 @@ Arguments: [c_customer_sk#1], [ss_customer_sk#9], Inner, BuildRight
Input [12]: [c_customer_sk#1, c_customer_id#2, c_first_name#3, c_last_name#4, c_preferred_cust_flag#5, c_birth_country#6, c_login#7, c_email_address#8, ss_customer_sk#9, ss_ext_discount_amt#10, ss_ext_list_price#11, ss_sold_date_sk#12]
Arguments: [c_customer_id#2, c_first_name#3, c_last_name#4, c_preferred_cust_flag#5, c_birth_country#6, c_login#7, c_email_address#8, ss_ext_discount_amt#10, ss_ext_list_price#11, ss_sold_date_sk#12], [c_customer_id#2, c_first_name#3, c_last_name#4, c_preferred_cust_flag#5, c_birth_country#6, c_login#7, c_email_address#8, ss_ext_discount_amt#10, ss_ext_list_price#11, ss_sold_date_sk#12]

(8) Scan parquet spark_catalog.default.date_dim
(8) CometScan parquet spark_catalog.default.date_dim
Output [2]: [d_date_sk#14, d_year#15]
Batched: true
Location [not included in comparison]/{warehouse_dir}/date_dim]
Expand Down Expand Up @@ -148,7 +148,7 @@ Functions [1]: [sum(UnscaledValue((ss_ext_list_price#11 - ss_ext_discount_amt#10
Input [2]: [customer_id#17, year_total#18]
Condition : (isnotnull(year_total#18) AND (year_total#18 > 0.00))

(17) Scan parquet spark_catalog.default.customer
(17) CometScan parquet spark_catalog.default.customer
Output [8]: [c_customer_sk#19, c_customer_id#20, c_first_name#21, c_last_name#22, c_preferred_cust_flag#23, c_birth_country#24, c_login#25, c_email_address#26]
Batched: true
Location [not included in comparison]/{warehouse_dir}/customer]
Expand All @@ -159,7 +159,7 @@ ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_first_name:string,c_
Input [8]: [c_customer_sk#19, c_customer_id#20, c_first_name#21, c_last_name#22, c_preferred_cust_flag#23, c_birth_country#24, c_login#25, c_email_address#26]
Condition : (isnotnull(c_customer_sk#19) AND isnotnull(c_customer_id#20))

(19) Scan parquet spark_catalog.default.store_sales
(19) CometScan parquet spark_catalog.default.store_sales
Output [4]: [ss_customer_sk#27, ss_ext_discount_amt#28, ss_ext_list_price#29, ss_sold_date_sk#30]
Batched: true
Location: InMemoryFileIndex []
Expand All @@ -184,7 +184,7 @@ Arguments: [c_customer_sk#19], [ss_customer_sk#27], Inner, BuildRight
Input [12]: [c_customer_sk#19, c_customer_id#20, c_first_name#21, c_last_name#22, c_preferred_cust_flag#23, c_birth_country#24, c_login#25, c_email_address#26, ss_customer_sk#27, ss_ext_discount_amt#28, ss_ext_list_price#29, ss_sold_date_sk#30]
Arguments: [c_customer_id#20, c_first_name#21, c_last_name#22, c_preferred_cust_flag#23, c_birth_country#24, c_login#25, c_email_address#26, ss_ext_discount_amt#28, ss_ext_list_price#29, ss_sold_date_sk#30], [c_customer_id#20, c_first_name#21, c_last_name#22, c_preferred_cust_flag#23, c_birth_country#24, c_login#25, c_email_address#26, ss_ext_discount_amt#28, ss_ext_list_price#29, ss_sold_date_sk#30]

(24) Scan parquet spark_catalog.default.date_dim
(24) CometScan parquet spark_catalog.default.date_dim
Output [2]: [d_date_sk#32, d_year#33]
Batched: true
Location [not included in comparison]/{warehouse_dir}/date_dim]
Expand Down Expand Up @@ -235,7 +235,7 @@ Arguments: [customer_id#17], [customer_id#35], Inner, BuildRight
Input [5]: [customer_id#17, year_total#18, customer_id#35, customer_preferred_cust_flag#36, year_total#37]
Arguments: [customer_id#17, year_total#18, customer_preferred_cust_flag#36, year_total#37], [customer_id#17, year_total#18, customer_preferred_cust_flag#36, year_total#37]

(35) Scan parquet spark_catalog.default.customer
(35) CometScan parquet spark_catalog.default.customer
Output [8]: [c_customer_sk#38, c_customer_id#39, c_first_name#40, c_last_name#41, c_preferred_cust_flag#42, c_birth_country#43, c_login#44, c_email_address#45]
Batched: true
Location [not included in comparison]/{warehouse_dir}/customer]
Expand All @@ -246,7 +246,7 @@ ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_first_name:string,c_
Input [8]: [c_customer_sk#38, c_customer_id#39, c_first_name#40, c_last_name#41, c_preferred_cust_flag#42, c_birth_country#43, c_login#44, c_email_address#45]
Condition : (isnotnull(c_customer_sk#38) AND isnotnull(c_customer_id#39))

(37) Scan parquet spark_catalog.default.web_sales
(37) CometScan parquet spark_catalog.default.web_sales
Output [4]: [ws_bill_customer_sk#46, ws_ext_discount_amt#47, ws_ext_list_price#48, ws_sold_date_sk#49]
Batched: true
Location: InMemoryFileIndex []
Expand Down Expand Up @@ -314,7 +314,7 @@ Arguments: [customer_id#17], [customer_id#54], Inner, BuildRight
Input [6]: [customer_id#17, year_total#18, customer_preferred_cust_flag#36, year_total#37, customer_id#54, year_total#55]
Arguments: [customer_id#17, year_total#18, customer_preferred_cust_flag#36, year_total#37, year_total#55], [customer_id#17, year_total#18, customer_preferred_cust_flag#36, year_total#37, year_total#55]

(52) Scan parquet spark_catalog.default.customer
(52) CometScan parquet spark_catalog.default.customer
Output [8]: [c_customer_sk#56, c_customer_id#57, c_first_name#58, c_last_name#59, c_preferred_cust_flag#60, c_birth_country#61, c_login#62, c_email_address#63]
Batched: true
Location [not included in comparison]/{warehouse_dir}/customer]
Expand All @@ -325,7 +325,7 @@ ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_first_name:string,c_
Input [8]: [c_customer_sk#56, c_customer_id#57, c_first_name#58, c_last_name#59, c_preferred_cust_flag#60, c_birth_country#61, c_login#62, c_email_address#63]
Condition : (isnotnull(c_customer_sk#56) AND isnotnull(c_customer_id#57))

(54) Scan parquet spark_catalog.default.web_sales
(54) CometScan parquet spark_catalog.default.web_sales
Output [4]: [ws_bill_customer_sk#64, ws_ext_discount_amt#65, ws_ext_list_price#66, ws_sold_date_sk#67]
Batched: true
Location: InMemoryFileIndex []
Expand Down Expand Up @@ -405,7 +405,7 @@ BroadcastExchange (73)
+- CometScan parquet spark_catalog.default.date_dim (70)


(70) Scan parquet spark_catalog.default.date_dim
(70) CometScan parquet spark_catalog.default.date_dim
Output [2]: [d_date_sk#14, d_year#15]
Batched: true
Location [not included in comparison]/{warehouse_dir}/date_dim]
Expand All @@ -430,7 +430,7 @@ BroadcastExchange (77)
+- CometScan parquet spark_catalog.default.date_dim (74)


(74) Scan parquet spark_catalog.default.date_dim
(74) CometScan parquet spark_catalog.default.date_dim
Output [2]: [d_date_sk#32, d_year#33]
Batched: true
Location [not included in comparison]/{warehouse_dir}/date_dim]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ TakeOrderedAndProject (22)
+- CometScan parquet spark_catalog.default.date_dim (8)


(1) Scan parquet spark_catalog.default.web_sales
(1) CometScan parquet spark_catalog.default.web_sales
Output [3]: [ws_item_sk#1, ws_ext_sales_price#2, ws_sold_date_sk#3]
Batched: true
Location: InMemoryFileIndex []
Expand All @@ -35,7 +35,7 @@ ReadSchema: struct<ws_item_sk:int,ws_ext_sales_price:decimal(7,2)>
Input [3]: [ws_item_sk#1, ws_ext_sales_price#2, ws_sold_date_sk#3]
Condition : isnotnull(ws_item_sk#1)

(3) Scan parquet spark_catalog.default.item
(3) CometScan parquet spark_catalog.default.item
Output [6]: [i_item_sk#5, i_item_id#6, i_item_desc#7, i_current_price#8, i_class#9, i_category#10]
Batched: true
Location [not included in comparison]/{warehouse_dir}/item]
Expand All @@ -59,7 +59,7 @@ Arguments: [ws_item_sk#1], [i_item_sk#5], Inner, BuildRight
Input [9]: [ws_item_sk#1, ws_ext_sales_price#2, ws_sold_date_sk#3, i_item_sk#5, i_item_id#6, i_item_desc#7, i_current_price#8, i_class#9, i_category#10]
Arguments: [ws_ext_sales_price#2, ws_sold_date_sk#3, i_item_id#6, i_item_desc#7, i_current_price#8, i_class#9, i_category#10], [ws_ext_sales_price#2, ws_sold_date_sk#3, i_item_id#6, i_item_desc#7, i_current_price#8, i_class#9, i_category#10]

(8) Scan parquet spark_catalog.default.date_dim
(8) CometScan parquet spark_catalog.default.date_dim
Output [2]: [d_date_sk#11, d_date#12]
Batched: true
Location [not included in comparison]/{warehouse_dir}/date_dim]
Expand Down Expand Up @@ -134,7 +134,7 @@ BroadcastExchange (27)
+- CometScan parquet spark_catalog.default.date_dim (23)


(23) Scan parquet spark_catalog.default.date_dim
(23) CometScan parquet spark_catalog.default.date_dim
Output [2]: [d_date_sk#11, d_date#12]
Batched: true
Location [not included in comparison]/{warehouse_dir}/date_dim]
Expand Down
Loading
Loading