Changes

Summary

  1. [SPARK-33086][FOLLOW-UP] Remove unused Optional import from (commit: 83f8e13) (details)
  2. [SPARK-33106][BUILD] Fix resolvers clash in SBT (commit: c78971b) (details)
  3. [SPARK-21708][BUILD][FOLLOWUP] Rename hdpVersion to hadoopVersionValue (commit: 50b2a49) (details)
  4. [SPARK-32047][SQL] Add JDBC connection provider disable possibility (commit: 4af1ac9) (details)
  5. [SPARK-33107][BUILD][FOLLOW-UP] Remove (commit: 543d59d) (details)
  6. [SPARK-33117][BUILD] Update zstd-jni to 1.4.5-6 (commit: 9896288) (details)
  7. [SPARK-33092][SQL] Support subexpression elimination in ProjectExec (commit: 78c0967) (details)
  8. [SPARK-32704][SQL][FOLLOWUP] Corrects version values of plan logging (commit: a0e3244) (details)
  9. [SPARK-33111][ML] aft transform optimization (commit: ed2fe8d) (details)
  10. [SPARK-33016][SQL] Potential SQLMetrics missed which might cause WEB UI (commit: b27a287) (details)
Commit 83f8e13956d5602ff4d37b742da427aa07537c1f by gurwls223
[SPARK-33086][FOLLOW-UP] Remove unused Optional import from
pyspark.resource.profile stub
### What changes were proposed in this pull request?
Remove unused `typing.Optional` import from `pyspark.resource.profile`
stub.
### Why are the changes needed?
Since SPARK-32319 we don't allow unused imports.  However, this one
slipped both local and CI tests for some reason.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests and mypy check.
Closes #30002 from zero323/SPARK-33086-FOLLOWUP.
Authored-by: zero323 <mszymkiewicz@gmail.com> Signed-off-by: HyukjinKwon
<gurwls223@apache.org>
(commit: 83f8e13)
The file was modifiedpython/pyspark/resource/profile.pyi (diff)
Commit c78971b1c7214357a275481a5af468259bcf406f by dhyun
[SPARK-33106][BUILD] Fix resolvers clash in SBT
### What changes were proposed in this pull request?
Rename manually added resolver for local Ivy repo. Create configuration
to publish to local Ivy repo similar to Maven one. Use `publishLocal` to
publish both to local Maven and Ivy repos instead of custom task
`localPublish` (renamed from `publish-local` of sbt 0.13.x).
### Why are the changes needed?
There are two resolvers (bootResolvers's "local" and manually added
"local") that point to the same local Ivy repo, but have different
configurations, which led to excessive warnings in logs and,
potentially, resolution issues. Changeset fixes that case, observable in
sbt output as
```
[warn] Multiple resolvers having different access mechanism configured
with same name 'local'. To avoid conflict, Remove duplicate project
resolvers (`resolvers`) or rename publishing resolve r (`publishTo`).
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Executed `build/sbt`'s `publishLocal` task on individual module and on
root project.
Closes #30006 from gemelen/feature/local-resolvers.
Authored-by: Denis Pyshev <git@gemelen.net> Signed-off-by: Dongjoon Hyun
<dhyun@apple.com>
(commit: c78971b)
The file was modifiedproject/SparkBuild.scala (diff)
Commit 50b2a497f37c7a51b34dee1c0cb80910687ad4a2 by dhyun
[SPARK-21708][BUILD][FOLLOWUP] Rename hdpVersion to hadoopVersionValue
This PR aims to rename hdpVersion to versionValue.
Use the general variable name.
No.
Pass the CI.
Closes #30008 from williamhyun/sbt.
Authored-by: William Hyun <williamhyun3@gmail.com> Signed-off-by:
Dongjoon Hyun <dhyun@apple.com>
(commit: 50b2a49)
The file was modifiedproject/SparkBuild.scala (diff)
Commit 4af1ac93846a0dfdcc57ec7604ed51d7787bd6fd by gurwls223
[SPARK-32047][SQL] Add JDBC connection provider disable possibility
### What changes were proposed in this pull request? At the moment there
is no possibility to turn off JDBC authentication providers which exists
on the classpath. This can be problematic because service providers are
loaded with service loader. In this PR I've added
`spark.sql.sources.disabledJdbcConnProviderList` configuration
possibility (default: empty).
### Why are the changes needed? No possibility to turn off JDBC
authentication providers.
### Does this PR introduce _any_ user-facing change? Yes, it introduces
new configuration option.
### How was this patch tested?
* Existing + newly added unit tests.
* Existing integration tests.
Closes #29964 from gaborgsomogyi/SPARK-32047.
Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com> Signed-off-by:
HyukjinKwon <gurwls223@apache.org>
(commit: 4af1ac9)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProvider.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcConnectionProvider.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/DB2ConnectionProvider.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProvider.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/IntentionallyFaultyConnectionProvider.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuiteBase.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProvider.scala (diff)
Commit 543d59dfbffadeb4e11f06d6bbf857f21ac03f73 by dhyun
[SPARK-33107][BUILD][FOLLOW-UP] Remove
com.twitter:parquet-hadoop-bundle:1.6.0 and orc.classifier
### What changes were proposed in this pull request?
This pr removes `com.twitter:parquet-hadoop-bundle:1.6.0` and
`orc.classifier`.
### Why are the changes needed?
To make code more clear and readable.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing test.
Closes #30005 from wangyum/SPARK-33107.
Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Dongjoon Hyun
<dhyun@apple.com>
(commit: 543d59d)
The file was modifiedpom.xml (diff)
The file was modifiedsql/hive/pom.xml (diff)
The file was modifiedexamples/pom.xml (diff)
The file was modifiedsql/core/pom.xml (diff)
Commit 9896288b881788660cfaa3f45e90496105889bde by dhyun
[SPARK-33117][BUILD] Update zstd-jni to 1.4.5-6
### What changes were proposed in this pull request?
This PR aims to upgrade ZStandard library for Apache Spark 3.1.0.
### Why are the changes needed?
This will bring the latest bug fixes.
-
https://github.com/luben/zstd-jni/commit/2662fbdc320ce482a24c20b8fcac8b1d5b79fe33
-
https://github.com/luben/zstd-jni/commit/bbe140b758be2e0ba64566e16d44cafd6e4ba142
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the CI.
Closes #30010 from dongjoon-hyun/SPARK-33117.
Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: Dongjoon
Hyun <dhyun@apple.com>
(commit: 9896288)
The file was modifiedpom.xml (diff)
The file was modifieddev/deps/spark-deps-hadoop-2.7-hive-2.3 (diff)
The file was modifieddev/deps/spark-deps-hadoop-3.2-hive-2.3 (diff)
Commit 78c0967bbe27d3872aa73ff9e6fafb095fd149c1 by yamamuro
[SPARK-33092][SQL] Support subexpression elimination in ProjectExec
### What changes were proposed in this pull request?
This patch proposes to add subexpression elimination support into
`ProjectExec`. It can be controlled by
`spark.sql.subexpressionElimination.enabled` config.
Before this change:
```scala val df = spark.read.option("header", true).csv("/tmp/test.csv")
df.withColumn("my_map", expr("str_to_map(foo, '&',
'=')")).select(col("my_map")("foo"), col("my_map")("bar"),
col("my_map")("baz")).debugCodegen
```
L27-40: first `str_to_map`. L68:81: second `str_to_map`. L109-122: third
`str_to_map`.
```
/* 024 */   private void project_doConsume_0(InternalRow
inputadapter_row_0, UTF8String project_expr_0_0, boolean
project_exprIsNull_0_0) throws java.io.IOException {
/* 025 */     boolean project_isNull_0 = true;
/* 026 */     UTF8String project_value_0 = null;
/* 027 */     boolean project_isNull_1 = true;
/* 028 */     MapData project_value_1 = null;
/* 029 */
/* 030 */     if (!project_exprIsNull_0_0) {
/* 031 */       project_isNull_1 = false; // resultCode could change
nullability.
/* 032 */
/* 033 */       UTF8String[] project_kvs_0 =
project_expr_0_0.split(((UTF8String) references[1] /* literal */), -1);
/* 034 */       for(UTF8String kvEntry: project_kvs_0) {
/* 035 */         UTF8String[] kv = kvEntry.split(((UTF8String)
references[2] /* literal */), 2);
/* 036 */       
((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[0]
/* mapBuilder */).put(kv[0], kv.length == 2 ? kv[1] : null);
/* 037 */       }
/* 038 */       project_value_1 =
((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[0]
/* mapBuilder */).build();
/* 039 */
/* 040 */     }
/* 041 */     if (!project_isNull_1) {
/* 042 */       project_isNull_0 = false; // resultCode could change
nullability.
/* 043 */
/* 044 */       final int project_length_0 =
project_value_1.numElements();
/* 045 */       final ArrayData project_keys_0 =
project_value_1.keyArray();
/* 046 */       final ArrayData project_values_0 =
project_value_1.valueArray();
/* 047 */
/* 048 */       int project_index_0 = 0;
/* 049 */       boolean project_found_0 = false;
/* 050 */       while (project_index_0 < project_length_0 &&
!project_found_0) {
/* 051 */         final UTF8String project_key_0 =
project_keys_0.getUTF8String(project_index_0);
/* 052 */         if (project_key_0.equals(((UTF8String) references[3]
/* literal */))) {
/* 053 */           project_found_0 = true;
/* 054 */         } else {
/* 055 */           project_index_0++;
/* 056 */         }
/* 057 */       }
/* 058 */
/* 059 */       if (!project_found_0 ||
project_values_0.isNullAt(project_index_0)) {
/* 060 */         project_isNull_0 = true;
/* 061 */       } else {
/* 062 */         project_value_0 =
project_values_0.getUTF8String(project_index_0);
/* 063 */       }
/* 064 */
/* 065 */     }
/* 066 */     boolean project_isNull_6 = true;
/* 067 */     UTF8String project_value_6 = null;
/* 068 */     boolean project_isNull_7 = true;
/* 069 */     MapData project_value_7 = null;
/* 070 */
/* 071 */     if (!project_exprIsNull_0_0) {
/* 072 */       project_isNull_7 = false; // resultCode could change
nullability.
/* 073 */
/* 074 */       UTF8String[] project_kvs_1 =
project_expr_0_0.split(((UTF8String) references[5] /* literal */), -1);
/* 075 */       for(UTF8String kvEntry: project_kvs_1) {
/* 076 */         UTF8String[] kv = kvEntry.split(((UTF8String)
references[6] /* literal */), 2);
/* 077 */       
((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[4]
/* mapBuilder */).put(kv[0], kv.length == 2 ? kv[1] : null);
/* 078 */       }
/* 079 */       project_value_7 =
((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[4]
/* mapBuilder */).build();
/* 080 */
/* 081 */     }
/* 082 */     if (!project_isNull_7) {
/* 083 */       project_isNull_6 = false; // resultCode could change
nullability.
/* 084 */
/* 085 */       final int project_length_1 =
project_value_7.numElements();
/* 086 */       final ArrayData project_keys_1 =
project_value_7.keyArray();
/* 087 */       final ArrayData project_values_1 =
project_value_7.valueArray();
/* 088 */
/* 089 */       int project_index_1 = 0;
/* 090 */       boolean project_found_1 = false;
/* 091 */       while (project_index_1 < project_length_1 &&
!project_found_1) {
/* 092 */         final UTF8String project_key_1 =
project_keys_1.getUTF8String(project_index_1);
/* 093 */         if (project_key_1.equals(((UTF8String) references[7]
/* literal */))) {
/* 094 */           project_found_1 = true;
/* 095 */         } else {
/* 096 */           project_index_1++;
/* 097 */         }
/* 098 */       }
/* 099 */
/* 100 */       if (!project_found_1 ||
project_values_1.isNullAt(project_index_1)) {
/* 101 */         project_isNull_6 = true;
/* 102 */       } else {
/* 103 */         project_value_6 =
project_values_1.getUTF8String(project_index_1);
/* 104 */       }
/* 105 */
/* 106 */     }
/* 107 */     boolean project_isNull_12 = true;
/* 108 */     UTF8String project_value_12 = null;
/* 109 */     boolean project_isNull_13 = true;
/* 110 */     MapData project_value_13 = null;
/* 111 */
/* 112 */     if (!project_exprIsNull_0_0) {
/* 113 */       project_isNull_13 = false; // resultCode could change
nullability.
/* 114 */
/* 115 */       UTF8String[] project_kvs_2 =
project_expr_0_0.split(((UTF8String) references[9] /* literal */), -1);
/* 116 */       for(UTF8String kvEntry: project_kvs_2) {
/* 117 */         UTF8String[] kv = kvEntry.split(((UTF8String)
references[10] /* literal */), 2);
/* 118 */       
((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[8]
/* mapBuilder */).put(kv[0], kv.length == 2 ? kv[1] : null);
/* 119 */       }
/* 120 */       project_value_13 =
((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[8]
/* mapBuilder */).build();
/* 121 */
/* 122 */     }
...
``` After this change:
L27-40 evaluates the common map variable.
```
/* 024 */   private void project_doConsume_0(InternalRow
inputadapter_row_0, UTF8String project_expr_0_0, boolean
project_exprIsNull_0_0) throws java.io.IOException {
/* 025 */     // common sub-expressions
/* 026 */
/* 027 */     boolean project_isNull_0 = true;
/* 028 */     MapData project_value_0 = null;
/* 029 */
/* 030 */     if (!project_exprIsNull_0_0) {
/* 031 */       project_isNull_0 = false; // resultCode could change
nullability.
/* 032 */
/* 033 */       UTF8String[] project_kvs_0 =
project_expr_0_0.split(((UTF8String) references[1] /* literal */), -1);
/* 034 */       for(UTF8String kvEntry: project_kvs_0) {
/* 035 */         UTF8String[] kv = kvEntry.split(((UTF8String)
references[2] /* literal */), 2);
/* 036 */       
((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[0]
/* mapBuilder */).put(kv[0], kv.length == 2 ? kv[1] : null);
/* 037 */       }
/* 038 */       project_value_0 =
((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[0]
/* mapBuilder */).build();
/* 039 */
/* 040 */     }
/* 041 */
/* 042 */     boolean project_isNull_4 = true;
/* 043 */     UTF8String project_value_4 = null;
/* 044 */
/* 045 */     if (!project_isNull_0) {
/* 046 */       project_isNull_4 = false; // resultCode could change
nullability.
/* 047 */
/* 048 */       final int project_length_0 =
project_value_0.numElements();
/* 049 */       final ArrayData project_keys_0 =
project_value_0.keyArray();
/* 050 */       final ArrayData project_values_0 =
project_value_0.valueArray();
/* 051 */
/* 052 */       int project_index_0 = 0;
/* 053 */       boolean project_found_0 = false;
/* 054 */       while (project_index_0 < project_length_0 &&
!project_found_0) {
/* 055 */         final UTF8String project_key_0 =
project_keys_0.getUTF8String(project_index_0);
/* 056 */         if (project_key_0.equals(((UTF8String) references[3]
/* literal */))) {
/* 057 */           project_found_0 = true;
/* 058 */         } else {
/* 059 */           project_index_0++;
/* 060 */         }
/* 061 */       }
/* 062 */
/* 063 */       if (!project_found_0 ||
project_values_0.isNullAt(project_index_0)) {
/* 064 */         project_isNull_4 = true;
/* 065 */       } else {
/* 066 */         project_value_4 =
project_values_0.getUTF8String(project_index_0);
/* 067 */       }
/* 068 */
/* 069 */     }
/* 070 */     boolean project_isNull_6 = true;
/* 071 */     UTF8String project_value_6 = null;
/* 072 */
/* 073 */     if (!project_isNull_0) {
/* 074 */       project_isNull_6 = false; // resultCode could change
nullability.
/* 075 */
/* 076 */       final int project_length_1 =
project_value_0.numElements();
/* 077 */       final ArrayData project_keys_1 =
project_value_0.keyArray();
/* 078 */       final ArrayData project_values_1 =
project_value_0.valueArray();
/* 079 */
/* 080 */       int project_index_1 = 0;
/* 081 */       boolean project_found_1 = false;
/* 082 */       while (project_index_1 < project_length_1 &&
!project_found_1) {
/* 083 */         final UTF8String project_key_1 =
project_keys_1.getUTF8String(project_index_1);
/* 084 */         if (project_key_1.equals(((UTF8String) references[4]
/* literal */))) {
/* 085 */           project_found_1 = true;
/* 086 */         } else {
/* 087 */           project_index_1++;
/* 088 */         }
/* 089 */       }
/* 090 */
/* 091 */       if (!project_found_1 ||
project_values_1.isNullAt(project_index_1)) {
/* 092 */         project_isNull_6 = true;
/* 093 */       } else {
/* 094 */         project_value_6 =
project_values_1.getUTF8String(project_index_1);
/* 095 */       }
/* 096 */
/* 097 */     }
/* 098 */     boolean project_isNull_8 = true;
/* 099 */     UTF8String project_value_8 = null;
/* 100 */
...
```
When the code is split into separated method:
```
/* 026 */   private void project_doConsume_0(InternalRow
inputadapter_row_0, UTF8String project_expr_0_0, boolean
project_exprIsNull_0_0) throws java.io.IOException {
/* 027 */     // common sub-expressions
/* 028 */
/* 029 */     MapData project_subExprValue_0 =
project_subExpr_0(project_exprIsNull_0_0, project_expr_0_0);
/* 030 */
...
/* 140 */   private MapData project_subExpr_0(boolean
project_exprIsNull_0_0, org.apache.spark.unsafe.types.UTF8String
project_expr_0_0) {
/* 141 */     boolean project_isNull_0 = true;
/* 142 */     MapData project_value_0 = null;
/* 143 */
/* 144 */     if (!project_exprIsNull_0_0) {
/* 145 */       project_isNull_0 = false; // resultCode could change
nullability.
/* 146 */
/* 147 */       UTF8String[] project_kvs_0 =
project_expr_0_0.split(((UTF8String) references[1] /* literal */), -1);
/* 148 */       for(UTF8String kvEntry: project_kvs_0) {
/* 149 */         UTF8String[] kv = kvEntry.split(((UTF8String)
references[2] /* literal */), 2);
/* 150 */       
((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[0]
/* mapBuilder */).put(kv[0], kv.length == 2 ? kv[1] : null);
/* 151 */       }
/* 152 */       project_value_0 =
((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[0]
/* mapBuilder */).build();
/* 153 */
/* 154 */     }
/* 155 */     project_subExprIsNull_0 = project_isNull_0;
/* 156 */     return project_value_0;
/* 157 */   }
```
### Why are the changes needed?
Users occasionally write repeated expression in projection. It is also
possibly that query optimizer optimizes a query to evaluate same
expression many times in a Project. Currently in ProjectExec, we don't
support subexpression elimination in Whole-stage codegen. We can support
it to reduce redundant evaluation.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
`spark.sql.subexpressionElimination.enabled` is enabled by default. So
that's said we should pass all tests with this change.
Closes #29975 from viirya/SPARK-33092.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Takeshi
Yamamuro <yamamuro@apache.org>
(commit: 78c0967)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala (diff)
Commit a0e324460e5d05cc8beeba5b1b0d1887b71254ea by yamamuro
[SPARK-32704][SQL][FOLLOWUP] Corrects version values of plan logging
configs in SQLConf
### What changes were proposed in this pull request?
This PR intends to correct version values (`3.0.0` -> `3.1.0`) of three
configs below in `SQLConf`:
- spark.sql.planChangeLog.level
- spark.sql.planChangeLog.rules
- spark.sql.planChangeLog.batches
This PR comes from
https://github.com/apache/spark/pull/29544#discussion_r503049350.
### Why are the changes needed?
Bugfix.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
N/A
Closes #30015 from maropu/pr29544-FOLLOWUP.
Authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by:
Takeshi Yamamuro <yamamuro@apache.org>
(commit: a0e3244)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala (diff)
Commit ed2fe8d80635014681ec18b29e33e6ecfaf883d7 by srowen
[SPARK-33111][ML] aft transform optimization
### What changes were proposed in this pull request? 1, when
`predictionCol` and `quantilesCol` are both set, we only need one
prediction for each row: prediction is just the variable `lambda` in
`predictQuantiles`; 2, in the computation of variable `quantiles` in
`predictQuantiles`, a pre-computed vector `val baseQuantiles =
$(quantileProbabilities).map(q => math.exp(math.log(-math.log1p(-q)) *
scale))` can be reused for each row;
### Why are the changes needed? avoid redundant computation in
transform, like what we did in `ProbabilisticClassificationModel`,
`GaussianMixtureModel`, etc
### Does this PR introduce _any_ user-facing change? No
### How was this patch tested? existing testsuite
Closes #30000 from zhengruifeng/aft_predict_transform_opt.
Authored-by: zhengruifeng <ruifengz@foxmail.com> Signed-off-by: Sean
Owen <srowen@gmail.com>
(commit: ed2fe8d)
The file was modifiedmllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala (diff)
Commit b27a287ff293c02dcad0c45cca71a5244664d7f5 by wenchen
[SPARK-33016][SQL] Potential SQLMetrics missed which might cause WEB UI
display issue while AQE is on
### What changes were proposed in this pull request?
With following scenario when AQE is on, SQLMetrics could be incorrect.
1. Stage A and B are created, and UI updated thru event
onAdaptiveExecutionUpdate. 2. Stage A and B are running. Subquery in
stage A keep updating metrics thru event onAdaptiveSQLMetricUpdate. 3.
Stage B completes, while stage A's subquery is still running, updating
metrics. 4. Completion of stage B triggers new stage creation and UI
update thru event onAdaptiveExecutionUpdate again (just like step 1).
So decided to make a trade off of keeping more duplicate SQLMetrics
without deleting them when AQE with newPlan updated.
### Why are the changes needed?
Make SQLMetrics behavior 100% correct.
### Does this PR introduce any user-facing change? No.
### How was this patch tested? Updated SQLAppStatusListenerSuite.
Closes #29965 from leanken/leanken-SPARK-33016.
Authored-by: xuewei.linxuewei <xuewei.linxuewei@alibaba-inc.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(commit: b27a287)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala (diff)