Changes

Summary

  1. [SPARK-32381][CORE][SQL] Move and refactor parallel listing & (commit: 8ccfbc1) (details)
  2. [SPARK-32889][SQL][TESTS][FOLLOWUP] Skip special column names test in (commit: d7aa3b5) (details)
  3. [SPARK-32990][SQL] Migrate REFRESH TABLE to use UnresolvedTableOrView to (commit: e9c98c9) (details)
  4. [SPARK-32877][SQL][TEST] Add test for Hive UDF complex decimal type (commit: f2fc966) (details)
  5. [SPARK-32885][SS] Add DataStreamReader.table API (commit: 9e6882f) (details)
  6. [SPARK-32931][SQL] Unevaluable Expressions are not Foldable (commit: e887c63) (details)
  7. [SPARK-32997][K8S] Support dynamic PVC creation and deletion in K8s (commit: 6c80547) (details)
Commit 8ccfbc114e3e8d9fc919bf05602e02a506566e31 by hkarau
[SPARK-32381][CORE][SQL] Move and refactor parallel listing &
non-location sensitive listing to core
<!-- Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://spark.apache.org/contributing.html
2. Ensure you have added or run the appropriate tests for your PR:
https://spark.apache.org/developer-tools.html
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][SPARK-XXXX] Your PR title ...'.
4. Be sure to keep the PR description updated to reflect all changes.
5. Please write your PR title to summarize what this PR proposes.
6. If possible, provide a concise example to reproduce the issue for a
faster review.
7. If you want to add a new configuration, please read the guideline
first for naming configurations in
  
'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
-->
### What changes were proposed in this pull request?
<!-- Please clarify what changes you are proposing. The purpose of this
section is to outline the changes and how this PR fixes the issue. If
possible, please consider writing useful notes for better and faster
reviews in your PR. See the examples below.
1. If you refactor some codes with changing classes, showing the class
hierarchy will help reviewers.
2. If you fix some SQL features, you can provide some references of
other DBMSes.
3. If there is design documentation, please add the link.
4. If there is a discussion in the mailing list, please add the link.
-->
This moves and refactors the parallel listing utilities from
`InMemoryFileIndex` to Spark core so it can be reused by modules beside
SQL. Along the process this also did some cleanups/refactorings:
- Created a `HadoopFSUtils` class under core
- Moved `InMemoryFileIndex.bulkListLeafFiles` into
`HadoopFSUtils.parallelListLeafFiles`. It now depends on a
`SparkContext` instead of `SparkSession` in SQL. Also added a few
parameters which used to be read from `SparkSession.conf`:
`ignoreMissingFiles`, `ignoreLocality`, `parallelismThreshold`,
`parallelismMax ` and `filterFun` (for additional filtering support but
we may be able to merge this with `filter` parameter in future).
- Moved `InMemoryFileIndex.listLeafFiles` into
`HadoopFSUtils.listLeafFiles` with similar changes above.
### Why are the changes needed?
<!-- Please clarify why the changes are needed. For instance,
1. If you propose a new API, clarify the use case for a new API.
2. If you fix a bug, you can clarify why it is a bug.
-->
Currently the locality-aware parallel listing mechanism only applies to
`InMemoryFileIndex`. By moving this to core, we can potentially reuse
the same mechanism for other code paths as well.
### Does this PR introduce _any_ user-facing change?
<!-- Note that it means *any* user-facing change including all aspects
such as the documentation fix. If yes, please clarify the previous
behavior and the change this PR proposes - provide the console output,
description and/or an example to show the behavior difference if
possible. If possible, please also clarify if this is a user-facing
change compared to the released Spark versions or within the unreleased
branches such as master. If no, write 'No'.
-->
No.
### How was this patch tested?
<!-- If tests were added, say they were added here. Please make sure to
add some test cases that check the changes thoroughly including negative
and positive cases if possible. If it was tested in a way different from
regular unit tests, please clarify how you tested step by step, ideally
copy and paste-able, so that other reviewers can test and check, and
descendants can verify in the future. If tests were not added, please
describe why they were not added and/or why it was difficult to add.
-->
Since this is mostly a refactoring, it relies on existing unit tests
such as those for `InMemoryFileIndex`.
Closes #29471 from sunchao/SPARK-32381.
Lead-authored-by: Chao Sun <sunchao@apache.org> Co-authored-by: Holden
Karau <hkarau@apple.com> Co-authored-by: Chao Sun <sunchao@uber.com>
Signed-off-by: Holden Karau <hkarau@apple.com>
(commit: 8ccfbc1)
The file was addedcore/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala (diff)
Commit d7aa3b56e8dbdc5582565ce3427f368edbabc708 by dhyun
[SPARK-32889][SQL][TESTS][FOLLOWUP] Skip special column names test in
Hive 1.2
### What changes were proposed in this pull request?
This PR is a followup of SPARK-32889 in order to ignore the special
column names test in `hive-1.2` profile.
### Why are the changes needed?
Hive 1.2 is too old to support special column names because it doesn't
use Apache ORC. This will recover our `hive-1.2` Jenkins job.
-
https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.7-hive-1.2/
-
https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7-hive-1.2/
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the test with Hive 1.2 profile.
Closes #29867 from dongjoon-hyun/SPARK-32889-2.
Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: Dongjoon
Hyun <dhyun@apple.com>
(commit: d7aa3b5)
The file was modifiedsql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala (diff)
Commit e9c98c910aee10efe447dc4fff951e748441d10a by wenchen
[SPARK-32990][SQL] Migrate REFRESH TABLE to use UnresolvedTableOrView to
resolve the identifier
### What changes were proposed in this pull request?
This PR proposes to migrate `REFRESH TABLE` to use
`UnresolvedTableOrView` to resolve the table/view identifier. This
allows consistent resolution rules (temp view first, etc.) to be applied
for both v1/v2 commands. More info about the consistent resolution rule
proposal can be found in
[JIRA](https://issues.apache.org/jira/browse/SPARK-29900) or [proposal
doc](https://docs.google.com/document/d/1hvLjGA8y_W_hhilpngXVub1Ebv8RsMap986nENCFnrg/edit?usp=sharing).
### Why are the changes needed?
The current behavior is not consistent between v1 and v2 commands when
resolving a temp view. In v2, the `t` in the following example is
resolved to a table:
```scala sql("CREATE TABLE testcat.ns.t (id bigint) USING foo")
sql("CREATE TEMPORARY VIEW t AS SELECT 2") sql("USE testcat.ns")
sql("REFRESH TABLE t") // 't' is resolved to testcat.ns.t
``` whereas in v1, the `t` is resolved to a temp view:
```scala sql("CREATE DATABASE test") sql("CREATE TABLE
spark_catalog.test.t (id bigint) USING csv") sql("CREATE TEMPORARY VIEW
t AS SELECT 2") sql("USE spark_catalog.test") sql("REFRESH TABLE t") //
't' is resolved to a temp view
```
### Does this PR introduce _any_ user-facing change?
After this PR, `REFRESH TABLE t` is resolved to a temp view `t` instead
of `testcat.ns.t`.
### How was this patch tested?
Added a new test
Closes #29866 from imback82/refresh_table_consistent.
Authored-by: Terry Kim <yuminkim@gmail.com> Signed-off-by: Wenchen Fan
<wenchen@databricks.com>
(commit: e9c98c9)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala (diff)
Commit f2fc96667481169affbc20cec95b9fc1c19fc7c3 by dhyun
[SPARK-32877][SQL][TEST] Add test for Hive UDF complex decimal type
### What changes were proposed in this pull request?
Add test to cover Hive UDF whose input contains complex decimal type.
Add comment to explain why we can't make `HiveSimpleUDF` extend
`ImplicitTypeCasts`.
### Why are the changes needed?
For better test coverage with Hive which we compatible or not.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add test.
Closes #29863 from ulysses-you/SPARK-32877-test.
Authored-by: ulysses <youxiduo@weidian.com> Signed-off-by: Dongjoon Hyun
<dhyun@apple.com>
(commit: f2fc966)
The file was modifiedsql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala (diff)
The file was modifiedsql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala (diff)
Commit 9e6882feca0800d5d4f9920886cb5dae73bbe1d4 by wenchen
[SPARK-32885][SS] Add DataStreamReader.table API
### What changes were proposed in this pull request? This pr aims to add
a new `table` API in DataStreamReader, which is similar to the table API
in DataFrameReader.
### Why are the changes needed? Users can directly use this API to get a
Streaming DataFrame on a table. Below is a simple example:
Application 1 for initializing and starting the streaming job:
``` val path = "/home/yuanjian.li/runtime/to_be_deleted" val tblName =
"my_table"
// Write some data to `my_table`
spark.range(3).write.format("parquet").option("path",
path).saveAsTable(tblName)
// Read the table as a streaming source, write result to destination
directory val table = spark.readStream.table(tblName)
table.writeStream.format("parquet").option("checkpointLocation",
"/home/yuanjian.li/runtime/to_be_deleted_ck").start("/home/yuanjian.li/runtime/to_be_deleted_2")
```
Application 2 for appending new data:
```
// Append new data into the path
spark.range(5).write.format("parquet").option("path",
"/home/yuanjian.li/runtime/to_be_deleted").mode("append").save()
```
Check result:
```
// The desitination directory should contains all written data
spark.read.parquet("/home/yuanjian.li/runtime/to_be_deleted_2").show()
```
### Does this PR introduce _any_ user-facing change? Yes, a new API
added.
### How was this patch tested? New UT added and integrated testing.
Closes #29756 from xuanyuanking/SPARK-32885.
Authored-by: Yuanjian Li <yuanjian.li@databricks.com> Signed-off-by:
Wenchen Fan <wenchen@databricks.com>
(commit: 9e6882f)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala (diff)
The file was modifiedsql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala (diff)
The file was addedsql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala (diff)
The file was modifiedsql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/connector/TableCapabilityCheckSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala (diff)
The file was modifiedsql/core/src/test/resources/sql-tests/results/explain.sql.out (diff)
Commit e887c639a766fde0a74e7557d1ad2b2cc4b92f1b by wenchen
[SPARK-32931][SQL] Unevaluable Expressions are not Foldable
### What changes were proposed in this pull request? Unevaluable
expressions are not foldable because we don't have an eval for it. This
PR is to clean up the code and enforce it.
### Why are the changes needed? Ensure that we will not hit the weird
cases that trigger ConstantFolding.
### Does this PR introduce _any_ user-facing change? No
### How was this patch tested? The existing tests.
Closes #29798 from gatorsmile/refactorUneval.
Lead-authored-by: gatorsmile <gatorsmile@gmail.com> Co-authored-by: Xiao
Li <gatorsmile@gmail.com> Signed-off-by: Wenchen Fan
<wenchen@databricks.com>
(commit: e887c63)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala (diff)
Commit 6c805470a7e8d1f44747dc64c2e49ebd302f9ba4 by dhyun
[SPARK-32997][K8S] Support dynamic PVC creation and deletion in K8s
driver
### What changes were proposed in this pull request?
This PR aims to support dynamic PVC creation and deletion in K8s driver.
**Configuration** This PR reuses the existing PVC volume configs.
```
spark.kubernetes.driver.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName=OnDemand
spark.kubernetes.driver.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass=gp2
spark.kubernetes.driver.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit=200Gi
spark.kubernetes.driver.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/data
spark.kubernetes.driver.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly=false
```
**PVC**
```
$ kubectl get pvc | grep driver tpcds-d6087874c6705564-driver-pvc-0
Bound    pvc-fae914a2-ca5c-4e1e-8aba-54a35357d072   200Gi RWO gp2 12m
```
**Disk**
```
$ k exec -it tpcds-d6087874c6705564-driver -- df -h | grep data
/dev/nvme5n1    197G   61M  197G   1% /data
```
```
$ k exec -it tpcds-d6087874c6705564-driver -- ls -al /data total 28
drwxr-xr-x  5 root root  4096 Sep 25 18:06 . drwxr-xr-x  1 root root  
63 Sep 25 18:06 .. drwxr-xr-x 66 root root  4096 Sep 25 18:09
blockmgr-2c9a8cc5-a05c-45fe-a58e-b8f42da88a57 drwx------  2 root root
16384 Sep 25 18:06 lost+found drwx------  4 root root  4096 Sep 25 18:07
spark-0448efe7-da2c-4f3a-bd3c-769aadb11dd6
```
**NOTE** This should be used carefully because Apache Spark doesn't
delete driver pod automatically. Since the driver PVC shares the
lifecycle of driver pod, it will exist after the job completion until
the pod deletion. However, if the users are already using pre-populated
PVCs, this isn't a regression at all in terms of the cost.
```
$ k get pod -l spark-role=driver NAME                            READY 
STATUS      RESTARTS   AGE tpcds-d6087874c6705564-driver   0/1   
Completed   0          35m
```
### Why are the changes needed?
Like executors, driver also needs larger PVC.
### Does this PR introduce _any_ user-facing change?
Yes. This is a new feature.
### How was this patch tested?
Pass the newly added test case.
Closes #29873 from dongjoon-hyun/SPARK-32997.
Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: Dongjoon
Hyun <dhyun@apple.com>
(commit: 6c80547)
The file was modifiedresource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala (diff)
The file was modifiedresource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala (diff)