Changes

Summary

  1. [SPARK-26425][SS] Add more constraint checks to avoid checkpoint (commit: d936cb3) (details)
  2. [SPARK-32903][SQL] GeneratePredicate should be able to eliminate common (commit: bd38e0b) (details)
  3. [SPARK-32508][SQL] Disallow empty part col values in partition spec (commit: 92b75dc) (details)
  4. [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are (commit: e5e54a3) (details)
  5. [SPARK-32287][CORE] Fix flaky o.a.s.ExecutorAllocationManagerSuite on (commit: a54a6a0) (details)
  6. [SPARK-24994][SQL][FOLLOW-UP] Handle foldable, timezone and cleanup (commit: 482a79a) (details)
  7. [SPARK-32887][DOC] Correct the typo for SHOW TABLE (commit: 88e87bc) (details)
  8. [SPARK-32926][TESTS] Add Scala 2.13 build test in GitHub Action (commit: a8442c2) (details)
  9. [SPARK-32909][SQL] Pass all `sql/hive-thriftserver` module UTs in Scala (commit: 5817c58) (details)
  10. [SPARK-32889][SQL] orc table column name supports special characters (commit: ea3b979) (details)
  11. [SPARK-32635][SQL] Fix foldable propagation (commit: 4ced588) (details)
  12. [SPARK-32902][SQL] Logging plan changes for AQE (commit: 68e0d5f) (details)
  13. [SPARK-18409][ML][FOLLOWUP] LSH approxNearestNeighbors optimization 2 (commit: 9d6221b) (details)
  14. [SPARK-32908][SQL] Fix target error calculation in `percentile_approx()` (commit: 75dd864) (details)
  15. [SPARK-32906][SQL] Struct field names should not change after (commit: b49aaa3) (details)
  16. [SPARK-27951][SQL] Support ANSI SQL NTH_VALUE window function (commit: 8b09536) (details)
  17. [SPARK-32905][CORE][YARN] ApplicationMaster fails to receive (commit: 9e9d4b6) (details)
Commit d936cb328d1562d280a2dff29e31fefa1ad8bdd6 by kabhwan.opensource
[SPARK-26425][SS] Add more constraint checks to avoid checkpoint
corruption
### What changes were proposed in this pull request?
Credits to tdas who reported and described the fix to
[SPARK-26425](https://issues.apache.org/jira/browse/SPARK-26425). I just
followed the description of the issue.
This patch adds more checks on commit log as well as file streaming
source so that multiple concurrent runs of streaming query don't mess up
the status of query/checkpoint. This patch addresses two different spots
which are having a bit different issues:
1. FileStreamSource.fetchMaxOffset()
In structured streaming, we don't allow multiple streaming queries to
run with same checkpoint (including concurrent runs of same query), so
query should fail if it fails to write the metadata of specific batch ID
due to same batch ID being written by others.
2. commit log
As described in JIRA issue, assertion is already applied to the
`offsetLog` for the same reason.
https://github.com/apache/spark/blob/8167714cab93a5c06c23f92c9077fe8b9677ab28/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L394-L402
This patch applied the same for commit log.
### Why are the changes needed?
This prevents the inconsistent behavior on streaming query and lets
query fail instead.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
N/A, as the change is simple and obvious, and it's really hard to
artificially reproduce the issue.
Closes #25965 from HeartSaVioR/SPARK-26425.
Lead-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Co-authored-by: Jungtaek Lim (HeartSaVioR)
<kabhwan.opensource@gmail.com> Signed-off-by: Jungtaek Lim (HeartSaVioR)
<kabhwan.opensource@gmail.com>
(commit: d936cb3)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala (diff)
Commit bd38e0be83528ec9ce0e5f533d4b3b25203dc917 by wenchen
[SPARK-32903][SQL] GeneratePredicate should be able to eliminate common
sub-expressions
### What changes were proposed in this pull request?
This patch proposes to make GeneratePredicate eliminate common
sub-expressions.
### Why are the changes needed?
Both GenerateMutableProjection and GenerateUnsafeProjection, such
codegen objects can eliminate common sub-expressions. But
GeneratePredicate currently doesn't do it.
We encounter a customer issue that a Filter pushed down through a
Project causes performance issue, compared with not pushed down case.
The issue is one expression used in Filter predicates are run many
times. Due to the complex schema, the query nodes are not wholestage
codegen, so it runs Filter.doExecute and then call GeneratePredicate.
The common expression was run many time and became performance
bottleneck. GeneratePredicate should be able to eliminate common
sub-expressions for such case.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit tests.
Closes #29776 from viirya/filter-pushdown.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Wenchen
Fan <wenchen@databricks.com>
(commit: bd38e0b)
The file was addedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenSubexpressionEliminationSuite.scala
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala (diff)
Commit 92b75dc260eb43d906a425f9f9d8d63b78c48cee by wenchen
[SPARK-32508][SQL] Disallow empty part col values in partition spec
before static partition writing
### What changes were proposed in this pull request? Write to static
partition, check in advance that the partition field is empty.
### Why are the changes needed? When writing to the current static
partition, the partition field is empty, and an error will be reported
when all tasks are completed.
### Does this PR introduce _any_ user-facing change? No
### How was this patch tested? add ut
Closes #29316 from cxzl25/SPARK-32508.
Authored-by: sychen <sychen@ctrip.com> Signed-off-by: Wenchen Fan
<wenchen@databricks.com>
(commit: 92b75dc)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala (diff)
The file was modifiedsql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala (diff)
Commit e5e54a3614ffd2a9150921e84e5b813d5cbf285a by herman
[SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are
nulls
### What changes were proposed in this pull request?
This PR changes the way `UnsafeExternalSorter.SpillableIterator` checks
whether it has spilled already, by checking whether `inMemSorter` is
null. It also allows it to spill other `UnsafeSorterIterator`s than
`UnsafeInMemorySorter.SortedIterator`.
### Why are the changes needed?
Before this PR `UnsafeExternalSorter.SpillableIterator` could not spill
when there are NULLs in the input and radix sorting is used. Currently,
Spark determines whether UnsafeExternalSorter.SpillableIterator has not
spilled yet by checking whether `upstream` is an instance of
`UnsafeInMemorySorter.SortedIterator`. When radix sorting is used and
there are NULLs in the input however, `upstream` will be an instance of
`UnsafeExternalSorter.ChainedIterator` instead, and Spark will assume
that the `SpillableIterator` iterator has spilled already, and therefore
cannot spill again when it's supposed to spill.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
A test was added to `UnsafeExternalSorterSuite` (and therefore also to
`UnsafeExternalSorterRadixSortSuite`). I manually confirmed that the
test failed in `UnsafeExternalSorterRadixSortSuite` without this patch.
Closes #29772 from tomvanbussel/SPARK-32900.
Authored-by: Tom van Bussel <tom.vanbussel@databricks.com>
Signed-off-by: herman <herman@databricks.com>
(commit: e5e54a3)
The file was modifiedcore/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java (diff)
The file was modifiedcore/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java (diff)
The file was modifiedcore/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java (diff)
The file was modifiedcore/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java (diff)
The file was modifiedcore/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java (diff)
The file was modifiedcore/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java (diff)
Commit a54a6a0113115112f589d09c875f1cba5fd0bbca by wenchen
[SPARK-32287][CORE] Fix flaky o.a.s.ExecutorAllocationManagerSuite on
GithubActions
### What changes were proposed in this pull request?
To fix the flaky `ExecutorAllocationManagerSuite`: Avoid first
`schedule()` invocation after `ExecutorAllocationManager` started.
### Why are the changes needed?
`ExecutorAllocationManagerSuite` is still flaky, see:
https://github.com/apache/spark/pull/29722/checks?check_run_id=1117979237
By checking the below logs, we can see that there's a race condition
between thread `pool-1-thread-1-ScalaTest-running` and thread
`spark-dynamic-executor-allocation`.  The only possibility of thread
`spark-dynamic-executor-allocation` becoming active is the first time
invocation of `schedule()`(since the `TEST_SCHEDULE_INTERVAL`(30s) is
really long, so it's impossible the second invocation would happen).
Thus, I think we shall avoid the first invocation too.
```scala 20/09/15 12:41:20.831
pool-1-thread-1-ScalaTest-running-ExecutorAllocationManagerSuite INFO
ExecutorAllocationManager: Requesting 1 new executor because tasks are
backlogged (new desired total will be 2 for resource profile id: 0)
20/09/15 12:41:20.832 spark-dynamic-executor-allocation INFO
ExecutorAllocationManager: Requesting 2 new executors because tasks are
backlogged (new desired total will be 4 for resource profile id: 0)
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
The flaky can't be reproduced locally so it's hard to say it has been
completely fixed by now. We need time to see the result.
Closes #29773 from Ngone51/fix-SPARK-32287.
Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Wenchen Fan
<wenchen@databricks.com>
(commit: a54a6a0)
The file was modifiedcore/src/main/scala/org/apache/spark/internal/config/Tests.scala (diff)
The file was modifiedcore/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala (diff)
Commit 482a79a5e39d54048533d42e1ca1266fbe95fffb by dhyun
[SPARK-24994][SQL][FOLLOW-UP] Handle foldable, timezone and cleanup
### What changes were proposed in this pull request?
This is a follow-up on #29565, and addresses a few issues in the last
PR:
- style issue pointed by [this
comment](https://github.com/apache/spark/pull/29565#discussion_r487646749)
- skip optimization when `fromExp` is foldable (by [this
comment](https://github.com/apache/spark/pull/29565#discussion_r487646973))
as there could be more efficient rule to apply for this case.
- pass timezone info to the generated cast on the literal value
- a bunch of cleanups and test improvements
Originally I plan to handle this when implementing
[SPARK-32858](https://issues.apache.org/jira/browse/SPARK-32858) but now
think it's better to isolate these changes from that.
### Why are the changes needed?
To fix a few left over issues in the above PR.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added a test for the foldable case. Otherwise relying on existing tests.
Closes #29775 from sunchao/SPARK-24994-followup.
Authored-by: Chao Sun <sunchao@apache.org> Signed-off-by: Dongjoon Hyun
<dhyun@apple.com>
(commit: 482a79a)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparisonSuite.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparison.scala (diff)
Commit 88e87bc8ebfa5aa1a8cc8928672749517ae0c41f by dhyun
[SPARK-32887][DOC] Correct the typo for SHOW TABLE
### What changes were proposed in this pull request? Correct the typo in
Show Table document
### Why are the changes needed? Current Document of Show Table returns
in parse error, so it is misleading to users
### Does this PR introduce _any_ user-facing change? Yes, the document
of show table is corrected now
### How was this patch tested? NA
Closes #29758 from Udbhav30/showtable.
Authored-by: Udbhav30 <u.agrawal30@gmail.com> Signed-off-by: Dongjoon
Hyun <dhyun@apple.com>
(commit: 88e87bc)
The file was modifieddocs/sql-ref-syntax-aux-show-table.md (diff)
Commit a8442c282665c93384d3465c440be588394e8ab4 by dhyun
[SPARK-32926][TESTS] Add Scala 2.13 build test in GitHub Action
### What changes were proposed in this pull request?
The PR aims to add Scala 2.13 build test coverage into GitHub Action for
Apache Spark 3.1.0.
### Why are the changes needed?
The branch is ready for Scala 2.13 and this will prevent any regression.
### Does this PR introduce _any_ user-facing change? No
### How was this patch tested?
Pass the GitHub Action.
Closes #29793 from dongjoon-hyun/SPARK-32926.
Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: Dongjoon
Hyun <dhyun@apple.com>
(commit: a8442c2)
The file was modified.github/workflows/build_and_test.yml (diff)
Commit 5817c584b8a259f5c9be13a26f2adec905474ce6 by dhyun
[SPARK-32909][SQL] Pass all `sql/hive-thriftserver` module UTs in Scala
2.13
### What changes were proposed in this pull request?
This pr fix failed and aborted cases in sql hive-thriftserver module in
Scala 2.13, the main change of this pr as follow:
- Use `s.c.Seq` instead of `Seq` in `HiveResult` because the input type
maybe `mutable.ArraySeq`, but `Seq` represent `immutable.Seq` in Scala
2.13.
- Reset classLoader after `HiveMetastoreLazyInitializationSuite`
completed because context class loader is
`NonClosableMutableURLClassLoader`  in
`HiveMetastoreLazyInitializationSuite` running process, and it propagate
to `HiveThriftServer2ListenerSuite` trigger following problems in Scala
2.13:
``` HiveThriftServer2ListenerSuite:
*** RUN ABORTED ***
java.lang.LinkageError: loader constraint violation: loader (instance
of net/bytebuddy/dynamic/loading/MultipleParentClassLoader) previously
initiated loading for a different type with name
"org/apache/hive/service/ServiceStateChangeListener"
at
org.mockito.codegen.HiveThriftServer2$MockitoMock$1850222569.<clinit>(Unknown
Source)
at
sun.reflect.GeneratedSerializationConstructorAccessor530.newInstance(Unknown
Source)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:48)
at org.objenesis.ObjenesisBase.newInstance(ObjenesisBase.java:73)
at
org.mockito.internal.creation.instance.ObjenesisInstantiator.newInstance(ObjenesisInstantiator.java:19)
at
org.mockito.internal.creation.bytebuddy.SubclassByteBuddyMockMaker.createMock(SubclassByteBuddyMockMaker.java:47)
at
org.mockito.internal.creation.bytebuddy.ByteBuddyMockMaker.createMock(ByteBuddyMockMaker.java:25)
at org.mockito.internal.util.MockUtil.createMock(MockUtil.java:35)
at org.mockito.internal.MockitoCore.mock(MockitoCore.java:63)
...
```
After this pr `HiveThriftServer2Suites` and
`HiveThriftServer2ListenerSuite` was fixed and all 461 test passed
### Why are the changes needed? We need to support a Scala 2.13 build.
### Does this PR introduce _any_ user-facing change? No
### How was this patch tested?
- Scala 2.12: Pass the Jenkins or GitHub Action
- Scala 2.13: All tests passed.
Do the following:
``` dev/change-scala-version.sh 2.13 mvn clean install -DskipTests -pl
sql/hive-thriftserver -am -Phive-thriftserver -Pscala-2.13 mvn test -pl
sql/hive-thriftserver -Phive -Phive-thriftserver -Pscala-2.13
```
**Before**
``` HiveThriftServer2ListenerSuite:
*** RUN ABORTED ***
```
**After**
``` Tests: succeeded 461, failed 0, canceled 0, ignored 17, pending 0
All tests passed.
```
Closes #29783 from LuciferYang/sql-thriftserver-tests.
Authored-by: yangjie01 <yangjie01@baidu.com> Signed-off-by: Dongjoon
Hyun <dhyun@apple.com>
(commit: 5817c58)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala (diff)
The file was modifiedsql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreLazyInitializationSuite.scala (diff)
Commit ea3b979e95f6ce11e7f6e401625a51ede3e649fc by dhyun
[SPARK-32889][SQL] orc table column name supports special characters
### What changes were proposed in this pull request? make orc table
column name support special characters like `$`
### Why are the changes needed? Special characters like `$` are allowed
in orc table column name by Hive. But it's error when execute command
"CREATE TABLE tbl(`$` INT, b INT) using orc" in spark. it's not
compatible with Hive.
`Column name "$" contains invalid character(s). Please use alias to
rename it.;Column name "$" contains invalid character(s). Please use
alias to rename it.;org.apache.spark.sql.AnalysisException: Column name
"$" contains invalid character(s). Please use alias to rename it.; at
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat$.checkFieldName(OrcFileFormat.scala:51)
at
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat$.$anonfun$checkFieldNames$1(OrcFileFormat.scala:59)
at
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat$.$anonfun$checkFieldNames$1$adapted(OrcFileFormat.scala:59)
at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
`
### Does this PR introduce _any_ user-facing change? No
### How was this patch tested? Add unit test
Closes #29761 from jzc928/orcColSpecialChar.
Authored-by: jzc <jzc@jzcMacBookPro.local> Signed-off-by: Dongjoon Hyun
<dhyun@apple.com>
(commit: ea3b979)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala (diff)
The file was modifiedsql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala (diff)
Commit 4ced58862c707aa916f7a55d15c3887c94c9b210 by yamamuro
[SPARK-32635][SQL] Fix foldable propagation
### What changes were proposed in this pull request? This PR rewrites
`FoldablePropagation` rule to replace attribute references in a node
with foldables coming only from the node's children.
Before this PR in the case of this example (with
setting`spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation`):
```scala val a = Seq("1").toDF("col1").withColumn("col2", lit("1")) val
b = Seq("2").toDF("col1").withColumn("col2", lit("2")) val aub =
a.union(b) val c = aub.filter($"col1" === "2").cache() val d =
Seq("2").toDF( "col4") val r = d.join(aub, $"col2" ===
$"col4").select("col4") val l = c.select("col2") val df = l.join(r,
$"col2" === $"col4", "LeftOuter") df.show()
``` foldable propagation happens incorrectly:
```
Join LeftOuter, (col2#6 = col4#34)                                     
                       Join LeftOuter, (col2#6 = col4#34)
!:- Project [col2#6]                                                   
                        :- Project [1 AS col2#6]
:  +- InMemoryRelation [col1#4, col2#6], StorageLevel(disk, memory,
deserialized, 1 replicas)   :  +- InMemoryRelation [col1#4, col2#6],
StorageLevel(disk, memory, deserialized, 1 replicas)
:        +- Union                                                      
                       :        +- Union
:           :- *(1) Project [value#1 AS col1#4, 1 AS col2#6]           
                       :           :- *(1) Project [value#1 AS col1#4, 1
AS col2#6]
:           :  +- *(1) Filter (isnotnull(value#1) AND (value#1 = 2))   
                       :           :  +- *(1) Filter (isnotnull(value#1)
AND (value#1 = 2))
:           :     +- *(1) LocalTableScan [value#1]                     
                       :           :     +- *(1) LocalTableScan
[value#1]
:           +- *(2) Project [value#10 AS col1#13, 2 AS col2#15]        
                       :           +- *(2) Project [value#10 AS col1#13,
2 AS col2#15]
:              +- *(2) Filter (isnotnull(value#10) AND (value#10 = 2)) 
                       :              +- *(2) Filter
(isnotnull(value#10) AND (value#10 = 2))
:                 +- *(2) LocalTableScan [value#10]                    
                       :                 +- *(2) LocalTableScan
[value#10]
+- Project [col4#34]                                                   
                       +- Project [col4#34]
   +- Join Inner, (col2#6 = col4#34)                                   
                          +- Join Inner, (col2#6 = col4#34)
      :- Project [value#31 AS col4#34]                                 
                             :- Project [value#31 AS col4#34]
      :  +- LocalRelation [value#31]                                   
                             :  +- LocalRelation [value#31]
      +- Project [col2#6]                                              
                             +- Project [col2#6]
         +- Union false, false                                         
                                +- Union false, false
            :- Project [1 AS col2#6]                                   
                                   :- Project [1 AS col2#6]
            :  +- LocalRelation [value#1]                              
                                   :  +- LocalRelation [value#1]
            +- Project [2 AS col2#15]                                  
                                   +- Project [2 AS col2#15]
               +- LocalRelation [value#10]                             
                                      +- LocalRelation [value#10]
``` and so the result is wrong:
```
+----+----+
|col2|col4|
+----+----+
|   1|null|
+----+----+
```
After this PR foldable propagation will not happen incorrectly and the
result is correct:
```
+----+----+
|col2|col4|
+----+----+
|   2|   2|
+----+----+
```
### Why are the changes needed? To fix a correctness issue.
### Does this PR introduce _any_ user-facing change? Yes, fixes a
correctness issue.
### How was this patch tested? Existing and new UTs.
Closes #29771 from peter-toth/SPARK-32635-fix-foldable-propagation.
Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Takeshi
Yamamuro <yamamuro@apache.org>
(commit: 4ced588)
The file was modifiedsql/catalyst/src/main/scala-2.13/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala (diff)
The file was modifiedsql/catalyst/src/main/scala-2.12/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala (diff)
Commit 68e0d5f2962d4045bd159b5430a8f1ae2dfde4c3 by yamamuro
[SPARK-32902][SQL] Logging plan changes for AQE
### What changes were proposed in this pull request?
Recently, we added code to log plan changes in the preparation phase in
`QueryExecution` for execution
(https://github.com/apache/spark/pull/29544). This PR intends to apply
the same fix  for logging plan changes in AQE.
### Why are the changes needed?
Easy debugging for AQE plans
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added unit tests.
Closes #29774 from maropu/PlanChangeLogForAQE.
Authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by:
Takeshi Yamamuro <yamamuro@apache.org>
(commit: 68e0d5f)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala (diff)
Commit 9d6221b9368ab3d23c63a9f24a2ba42a6f709d54 by ruifengz
[SPARK-18409][ML][FOLLOWUP] LSH approxNearestNeighbors optimization 2
### What changes were proposed in this pull request? 1, simplify the
aggregation by get `count` via `summary.count` 2, ignore nan values like
the old impl:
```
     val relativeError = 0.05
     val approxQuantile = numNearestNeighbors.toDouble / count +
relativeError
     val modelDatasetWithDist = modelDataset.withColumn(distCol,
hashDistCol)
     if (approxQuantile >= 1) {
       modelDatasetWithDist
     } else {
       val hashThreshold = modelDatasetWithDist.stat
         .approxQuantile(distCol, Array(approxQuantile), relativeError)
       // Filter the dataset where the hash value is less than the
threshold.
       modelDatasetWithDist.filter(hashDistCol <= hashThreshold(0))
     }
```
### Why are the changes needed? simplify the aggregation
### Does this PR introduce _any_ user-facing change? No
### How was this patch tested? existing testsuites
Closes #29778 from zhengruifeng/lsh_nit.
Authored-by: zhengruifeng <ruifengz@foxmail.com> Signed-off-by:
zhengruifeng <ruifengz@foxmail.com>
(commit: 9d6221b)
The file was modifiedmllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala (diff)
Commit 75dd86400c3c2348a4139586fbbead840512b909 by gurwls223
[SPARK-32908][SQL] Fix target error calculation in `percentile_approx()`
### What changes were proposed in this pull request? 1. Change the
target error calculation according to the paper [Space-Efficient Online
Computation of Quantile
Summaries](http://infolab.stanford.edu/~datar/courses/cs361a/papers/quantiles.pdf).
It says that the error `e = max(gi, deltai)/2` (see the page 59). Also
this has clear explanation [ε-approximate
quantiles](http://www.mathcs.emory.edu/~cheung/Courses/584/Syllabus/08-Quantile/Greenwald.html#proofprop1).
2. Added a test to check different accuracies. 3. Added an input CSV
file `percentile_approx-input.csv.bz2` to the resource folder
`sql/catalyst/src/main/resources` for the test.
### Why are the changes needed? To fix incorrect percentile calculation,
see an example in SPARK-32908.
### Does this PR introduce _any_ user-facing change? Yes
### How was this patch tested?
- By running existing tests in `QuantileSummariesSuite` and in
`ApproximatePercentileQuerySuite`.
- Added new test `SPARK-32908: maximum target error in
percentile_approx` to `ApproximatePercentileQuerySuite`.
Closes #29784 from MaxGekk/fix-percentile_approx-2.
Authored-by: Max Gekk <max.gekk@gmail.com> Signed-off-by: HyukjinKwon
<gurwls223@apache.org>
(commit: 75dd864)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala (diff)
The file was addedsql/core/src/test/resources/test-data/percentile_approx-input.csv.bz2
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala (diff)
Commit b49aaa33e13814a448be51a7e65a29cb515b8248 by viirya
[SPARK-32906][SQL] Struct field names should not change after
normalizing floats
### What changes were proposed in this pull request?
This PR intends to fix a minor bug when normalizing floats for struct
types;
``` scala> import
org.apache.spark.sql.execution.aggregate.HashAggregateExec scala> val df
= Seq(Tuple1(Tuple1(-0.0d)), Tuple1(Tuple1(0.0d))).toDF("k") scala> val
agg = df.distinct() scala> agg.explain()
== Physical Plan ==
*(2) HashAggregate(keys=[k#40], functions=[])
+- Exchange hashpartitioning(k#40, 200), true, [id=#62]
  +- *(1) HashAggregate(keys=[knownfloatingpointnormalized(if
(isnull(k#40)) null else named_struct(col1,
knownfloatingpointnormalized(normalizenanandzero(k#40._1)))) AS k#40],
functions=[])
     +- *(1) LocalTableScan [k#40]
scala> val aggOutput = agg.queryExecution.sparkPlan.collect { case a:
HashAggregateExec => a.output.head } scala> aggOutput.foreach { attr =>
println(attr.prettyJson) }
### Final Aggregate ###
[ {
"class" :
"org.apache.spark.sql.catalyst.expressions.AttributeReference",
"num-children" : 0,
"name" : "k",
"dataType" : {
   "type" : "struct",
   "fields" : [ {
     "name" : "_1",
               ^^^
     "type" : "double",
     "nullable" : false,
     "metadata" : { }
   } ]
},
"nullable" : true,
"metadata" : { },
"exprId" : {
   "product-class" : "org.apache.spark.sql.catalyst.expressions.ExprId",
   "id" : 40,
   "jvmId" : "a824e83f-933e-4b85-a1ff-577b5a0e2366"
},
"qualifier" : [ ]
} ]
### Partial Aggregate ###
[ {
"class" :
"org.apache.spark.sql.catalyst.expressions.AttributeReference",
"num-children" : 0,
"name" : "k",
"dataType" : {
   "type" : "struct",
   "fields" : [ {
     "name" : "col1",
               ^^^^
     "type" : "double",
     "nullable" : true,
     "metadata" : { }
   } ]
},
"nullable" : true,
"metadata" : { },
"exprId" : {
   "product-class" : "org.apache.spark.sql.catalyst.expressions.ExprId",
   "id" : 40,
   "jvmId" : "a824e83f-933e-4b85-a1ff-577b5a0e2366"
},
"qualifier" : [ ]
} ]
```
### Why are the changes needed?
bugfix.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added tests.
Closes #29780 from maropu/FixBugInNormalizedFloatingNumbers.
Authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by:
Liang-Chi Hsieh <viirya@gmail.com>
(commit: b49aaa3)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala (diff)
Commit 8b09536cdf5c5477114cc11601c8b68c70408279 by wenchen
[SPARK-27951][SQL] Support ANSI SQL NTH_VALUE window function
### What changes were proposed in this pull request? The `NTH_VALUE`
function is an ANSI SQL. For examples:
``` CREATE TEMPORARY TABLE empsalary (
   depname varchar,
   empno bigint,
   salary int,
   enroll_date date
);
INSERT INTO empsalary VALUES
('develop', 10, 5200, '2007-08-01'),
('sales', 1, 5000, '2006-10-01'),
('personnel', 5, 3500, '2007-12-10'),
('sales', 4, 4800, '2007-08-08'),
('personnel', 2, 3900, '2006-12-23'),
('develop', 7, 4200, '2008-01-01'),
('develop', 9, 4500, '2008-01-01'),
('sales', 3, 4800, '2007-08-01'),
('develop', 8, 6000, '2006-10-01'),
('develop', 11, 5200, '2007-08-15');
select first_value(salary) over(order by salary range between 1000
preceding and 1000 following),
lead(salary) over(order by salary range between 1000 preceding and 1000
following),
nth_value(salary, 1) over(order by salary range between 1000 preceding
and 1000 following),
salary from empsalary;
first_value | lead | nth_value | salary
-------------+------+-----------+--------
       3500 | 3900 |      3500 |   3500
       3500 | 4200 |      3500 |   3900
       3500 | 4500 |      3500 |   4200
       3500 | 4800 |      3500 |   4500
       3900 | 4800 |      3900 |   4800
       3900 | 5000 |      3900 |   4800
       4200 | 5200 |      4200 |   5000
       4200 | 5200 |      4200 |   5200
       4200 | 6000 |      4200 |   5200
       5000 |      |      5000 |   6000
(10 rows)
```
There are some mainstream database support the syntax.
**PostgreSQL:**
https://www.postgresql.org/docs/8.4/functions-window.html
**Vertica:**
https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SQLReferenceManual/Functions/Analytic/NTH_VALUEAnalytic.htm?tocpath=SQL%20Reference%20Manual%7CSQL%20Functions%7CAnalytic%20Functions%7C_____23
**Oracle:**
https://docs.oracle.com/en/database/oracle/oracle-database/19/sqlrf/NTH_VALUE.html#GUID-F8A0E88C-67E5-4AA6-9515-95D03A7F9EA0
**Redshift**
https://docs.aws.amazon.com/redshift/latest/dg/r_WF_NTH.html
**Presto** https://prestodb.io/docs/current/functions/window.html
**MySQL**
https://www.mysqltutorial.org/mysql-window-functions/mysql-nth_value-function/
### Why are the changes needed? The `NTH_VALUE` function is an ANSI SQL.
The `NTH_VALUE` function is very useful.
### Does this PR introduce _any_ user-facing change? No
### How was this patch tested? Exists and new UT.
Closes #29604 from beliefer/support-nth_value.
Lead-authored-by: gengjiaan <gengjiaan@360.cn> Co-authored-by: beliefer
<beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(commit: 8b09536)
The file was modifiedsql/core/src/test/resources/sql-tests/inputs/postgreSQL/window_part1.sql (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala (diff)
The file was modifiedsql/core/src/test/resources/sql-tests/inputs/window.sql (diff)
The file was modifiedsql/core/src/test/resources/sql-tests/results/window.sql.out (diff)
The file was modifiedsql/core/src/test/resources/sql-tests/results/postgreSQL/window_part3.sql.out (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala (diff)
The file was modifiedsql/core/src/test/resources/sql-tests/inputs/postgreSQL/window_part2.sql (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala (diff)
The file was modifiedsql/core/src/test/resources/sql-tests/inputs/postgreSQL/window_part3.sql (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/functions.scala (diff)
The file was modifiedsql/core/src/test/resources/sql-functions/sql-expression-schema.md (diff)
Commit 9e9d4b6994a29fb139fd50d24b5418a900c7f072 by wenchen
[SPARK-32905][CORE][YARN] ApplicationMaster fails to receive
UpdateDelegationTokens message
### What changes were proposed in this pull request?
With a long-running application in kerberized mode, the AMEndpiont
handles `UpdateDelegationTokens` message wrong, which is an
OneWayMessage that should be handled in the `receive` function.
```java 20-09-15 18:53:01 INFO yarn.YarnAllocator: Received 22
containers from YARN, launching executors on 0 of them. 20-09-16
12:52:28 ERROR netty.Inbox: Ignoring error
org.apache.spark.SparkException:
NettyRpcEndpointRef(spark-client://YarnAM) does not implement 'receive'
at
org.apache.spark.rpc.RpcEndpoint$$anonfun$receive$1.applyOrElse(RpcEndpoint.scala:70)
at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:203)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at
org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
at
org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) 20-09-17 06:52:28 ERROR
netty.Inbox: Ignoring error org.apache.spark.SparkException:
NettyRpcEndpointRef(spark-client://YarnAM) does not implement 'receive'
at
org.apache.spark.rpc.RpcEndpoint$$anonfun$receive$1.applyOrElse(RpcEndpoint.scala:70)
at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:203)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at
org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
at
org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
### Why are the changes needed?
bugfix, without a proper token refresher, the long-running apps are
going to fail potentially in kerberized cluster
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
Passing jenkins
and verify manually
I am running the sub-module `kyuubi-spark-sql-engine` of
https://github.com/yaooqinn/kyuubi
The simplest way to reproduce the bug and verify this fix is to follow
these steps
#### 1 build the `kyuubi-spark-sql-engine` module
``` mvn clean package -pl :kyuubi-spark-sql-engine
```
#### 2. config the spark with Kerberos settings towards your secured
cluster
#### 3. start it in the background
``` nohup bin/spark-submit --class
org.apache.kyuubi.engine.spark.SparkSQLEngine
../kyuubi-spark-sql-engine-1.0.0-SNAPSHOT.jar > kyuubi.log &
```
#### 4. check the AM log and see
"Updating delegation tokens ..." for SUCCESS
"Inbox: Ignoring error ...... does not implement 'receive'" for FAILURE
Closes #29777 from yaooqinn/SPARK-32905.
Authored-by: Kent Yao <yaooqinn@hotmail.com> Signed-off-by: Wenchen Fan
<wenchen@databricks.com>
(commit: 9e9d4b6)
The file was modifiedresource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala (diff)