Changes

Summary

  1. [SPARK-32785][SQL] Interval with dangling parts should not results null (commit: de44e9c) (details)
  2. [SPARK-32677][SQL] Load function resource before create (commit: 05fcf26) (details)
  3. [SPARK-32779][SQL] Avoid using synchronized API of SessionCatalog in (commit: b0322bf) (details)
  4. [SPARK-32748][SQL] Support local property propagation in (commit: 04f7f6d) (details)
  5. [SPARK-32753][SQL] Only copy tags to node with no tags (commit: c43460c) (details)
  6. [SPARK-32798][PYTHON] Make unionByName optionally fill missing columns (commit: 8bd3770) (details)
  7. [SPARK-32810][SQL] CSV/JSON data sources should avoid globbing paths (commit: 954cd9f) (details)
  8. [SPARK-32638][SQL][FOLLOWUP] Move the plan rewriting methods to (commit: 117a6f1) (details)
  9. [SPARK-32186][DOCS][PYTHON] Development - Debugging (commit: c336ae3) (details)
  10. [SPARK-32812][PYTHON][TESTS] Avoid initiating a process during the main (commit: c8c082c) (details)
  11. [SPARK-32764][SQL] -0.0 should be equal to 0.0 (commit: 4144b6d) (details)
  12. [SPARK-32736][CORE] Avoid caching the removed decommissioned executors (commit: 125cbe3) (details)
Commit de44e9cfa07e32d293d68355916ac0dbd31d5c54 by wenchen
[SPARK-32785][SQL] Interval with dangling parts should not results null
### What changes were proposed in this pull request?
bugfix for incomplete interval values, e.g. interval '1', interval '1
day 2', currently these cases will result null, but actually we should
fail them with IllegalArgumentsException
### Why are the changes needed?
correctness
### Does this PR introduce _any_ user-facing change?
yes, incomplete intervals will throw exception now
#### before
``` bin/spark-sql -S -e "select interval '1', interval '+', interval '1
day -'"
NULL NULL NULL
```
#### after
```
-- !query select interval '1'
-- !query schema struct<>
-- !query output org.apache.spark.sql.catalyst.parser.ParseException
Cannot parse the INTERVAL value: 1(line 1, pos 7)
== SQL == select interval '1'
```
### How was this patch tested?
unit tests added
Closes #29635 from yaooqinn/SPARK-32785.
Authored-by: Kent Yao <yaooqinn@hotmail.com> Signed-off-by: Wenchen Fan
<wenchen@databricks.com>
(commit: de44e9c)
The file was modifieddocs/sql-migration-guide.md (diff)
The file was modifiedsql/core/src/test/resources/sql-tests/inputs/interval.sql (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/IntervalUtilsSuite.scala (diff)
The file was modifiedsql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out (diff)
The file was modifiedsql/core/src/test/resources/sql-tests/results/interval.sql.out (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala (diff)
Commit 05fcf26b7966781772338e1f6d53690ab52cc66f by wenchen
[SPARK-32677][SQL] Load function resource before create
### What changes were proposed in this pull request?
Change `CreateFunctionCommand` code that add class check before create
function.
### Why are the changes needed?
We have different behavior between create permanent function and
temporary function when function class is invaild. e.g.,
``` create function f as 'test.non.exists.udf';
-- Time taken: 0.104 seconds
create temporary function f as 'test.non.exists.udf'
-- Error in query: Can not load class 'test.non.exists.udf' when
registering the function 'f', please make sure it is on the classpath;
```
And Hive also fails both of them.
### Does this PR introduce _any_ user-facing change?
Yes, user will get exception when create a invalid udf.
### How was this patch tested?
New test.
Closes #29502 from ulysses-you/function.
Authored-by: ulysses <youxiduo@weidian.com> Signed-off-by: Wenchen Fan
<wenchen@databricks.com>
(commit: 05fcf26)
The file was modifiedsql/core/src/test/resources/sql-tests/results/udf/udf-udaf.sql.out (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala (diff)
The file was modifiedsql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUDFDynamicLoadSuite.scala (diff)
The file was modifiedsql/core/src/test/resources/sql-tests/results/udaf.sql.out (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala (diff)
The file was modifiedpython/pyspark/sql/tests/test_catalog.py (diff)
Commit b0322bf05ab582ecd915be374b6e3915742049d7 by gurwls223
[SPARK-32779][SQL] Avoid using synchronized API of SessionCatalog in
withClient flow, this leads to DeadLock
### What changes were proposed in this pull request?
No need of using database name in `loadPartition` API of `Shim_v3_0` to
get the hive table, in hive there is a overloaded method which gives
hive table using table name. By using this API dependency with
`SessionCatalog` can be removed in Shim layer
### Why are the changes needed? To avoid deadlock when communicating
with Hive metastore 3.1.x
``` Found one Java-level deadlock:
=============================
"worker3":
waiting to lock monitor 0x00007faf0be602b8 (object 0x00000007858f85f0,
a org.apache.spark.sql.hive.HiveSessionCatalog),
which is held by "worker0"
"worker0":
waiting to lock monitor 0x00007faf0be5fc88 (object 0x0000000785c15c80,
a org.apache.spark.sql.hive.HiveExternalCatalog),
which is held by "worker3"
Java stack information for the threads listed above:
===================================================
"worker3":
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCurrentDatabase(SessionCatalog.scala:256)
- waiting to lock <0x00000007858f85f0> (a
org.apache.spark.sql.hive.HiveSessionCatalog)
at
org.apache.spark.sql.hive.client.Shim_v3_0.loadPartition(HiveShim.scala:1332)
at
org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$loadPartition$1(HiveClientImpl.scala:870)
at
org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$4459/1387095575.apply$mcV$sp(Unknown
Source)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:294)
at
org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$2227/313239499.apply(Unknown
Source)
at
org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:227)
at
org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:226)
- locked <0x0000000785ef9d78> (a
org.apache.spark.sql.hive.client.IsolatedClientLoader)
at
org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:276)
at
org.apache.spark.sql.hive.client.HiveClientImpl.loadPartition(HiveClientImpl.scala:860)
at
org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$loadPartition$1(HiveExternalCatalog.scala:911)
at
org.apache.spark.sql.hive.HiveExternalCatalog$$Lambda$4457/2037578495.apply$mcV$sp(Unknown
Source)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
- locked <0x0000000785c15c80> (a
org.apache.spark.sql.hive.HiveExternalCatalog)
at
org.apache.spark.sql.hive.HiveExternalCatalog.loadPartition(HiveExternalCatalog.scala:890)
at
org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.loadPartition(ExternalCatalogWithListener.scala:179)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.loadPartition(SessionCatalog.scala:512)
at
org.apache.spark.sql.execution.command.LoadDataCommand.run(tables.scala:383)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
- locked <0x00000007b1690ff8> (a
org.apache.spark.sql.execution.command.ExecutedCommandExec)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at
org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
at org.apache.spark.sql.Dataset$$Lambda$2084/428667685.apply(Unknown
Source)
at
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
at org.apache.spark.sql.Dataset$$Lambda$2085/559530590.apply(Unknown
Source)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2093/139449177.apply(Unknown
Source)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2086/1088974677.apply(Unknown
Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.Dataset$$$Lambda$1959/1977822284.apply(Unknown
Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
at
org.apache.spark.sql.SparkSession$$Lambda$1899/424830920.apply(Unknown
Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
at
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anon$1.run(<console>:45)
at java.lang.Thread.run(Thread.java:748)
"worker0":
at
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
- waiting to lock <0x0000000785c15c80
> (a org.apache.spark.sql.hive.HiveExternalCatalog)
at
org.apache.spark.sql.hive.HiveExternalCatalog.tableExists(HiveExternalCatalog.scala:851)
at
org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.tableExists(ExternalCatalogWithListener.scala:146)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.tableExists(SessionCatalog.scala:432)
- locked <0x00000007858f85f0> (a
org.apache.spark.sql.hive.HiveSessionCatalog)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.requireTableExists(SessionCatalog.scala:185)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.loadPartition(SessionCatalog.scala:509)
at
org.apache.spark.sql.execution.command.LoadDataCommand.run(tables.scala:383)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
- locked <0x00000007b529af58> (a
org.apache.spark.sql.execution.command.ExecutedCommandExec)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at
org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
at org.apache.spark.sql.Dataset$$Lambda$2084/428667685.apply(Unknown
Source)
at
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
at org.apache.spark.sql.Dataset$$Lambda$2085/559530590.apply(Unknown
Source)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2093/139449177.apply(Unknown
Source)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2086/1088974677.apply(Unknown
Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.Dataset$$$Lambda$1959/1977822284.apply(Unknown
Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
at
org.apache.spark.sql.SparkSession$$Lambda$1899/424830920.apply(Unknown
Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
at
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anon$1.run(<console>:45)
at java.lang.Thread.run(Thread.java:748)
Found 1 deadlock.
```
### Does this PR introduce _any_ user-facing change? No
### How was this patch tested? Tested using below script by executing in
spark-shell and I found no dead lock
launch spark-shell using ./bin/spark-shell --conf
"spark.sql.hive.metastore.jars=maven" --conf
spark.sql.hive.metastore.version=3.1 --conf
spark.hadoop.datanucleus.schema.autoCreateAll=true
**code**
``` def testHiveDeadLock = {
     import scala.collection.mutable.ArrayBuffer
     import scala.util.Random
     println("test hive DeadLock")
     spark.sql("drop database if exists testDeadLock cascade")
     spark.sql("create database testDeadLock")
     spark.sql("use testDeadLock")
     val tableCount = 100
     val tableNamePrefix = "testdeadlock"
     for (i <- 0 until tableCount) {
       val tableName = s"$tableNamePrefix${i + 1}"
       spark.sql(s"drop table if exists $tableName")
       spark.sql(s"create table $tableName (a bigint) partitioned by (b
bigint) stored as orc")
     }
      val threads = new ArrayBuffer[Thread]
     for (i <- 0 until tableCount) {
       threads.append(new Thread( new Runnable {
         override def run: Unit = {
           val tableName = s"$tableNamePrefix${i + 1}"
           val rand = Random
           val df = spark.range(0, 20000).toDF("a")
           val location = s"/tmp/${rand.nextLong.abs}"
           df.write.mode("overwrite").orc(location)
           spark.sql(
             s"""
       LOAD DATA LOCAL INPATH '$location' INTO TABLE $tableName
partition (b=$i)""")
         }
       }, s"worker$i"))
       threads(i).start()
     }
      for (i <- 0 until tableCount) {
       println(s"Joining with thread $i")
       threads(i).join()
     }
     for (i <- 0 until tableCount) {
       val tableName = s"$tableNamePrefix${i + 1}"
       spark.sql(s"select count(*) from $tableName").show(false)
     }
     println("All done")
   }
    for(i <- 0 until 100) {
     testHiveDeadLock
     println(s"completed {$i}th iteration")
   }
}
```
Closes #29649 from sandeep-katta/metastore3.1DeadLock.
Authored-by: sandeep.katta <sandeep.katta2007@gmail.com> Signed-off-by:
HyukjinKwon <gurwls223@apache.org>
(commit: b0322bf)
The file was modifiedsql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala (diff)
Commit 04f7f6dac0b9177e11482cca4e7ebf7b7564e45f by wenchen
[SPARK-32748][SQL] Support local property propagation in
SubqueryBroadcastExec
### What changes were proposed in this pull request?
Since
[SPARK-22590](https://github.com/apache/spark/commit/2854091d12d670b014c41713e72153856f4d3f6a),
local property propagation is supported through
`SQLExecution.withThreadLocalCaptured` in both `BroadcastExchangeExec`
and `SubqueryExec` when computing `relationFuture`. This pr adds the
support in `SubqueryBroadcastExec`.
### Why are the changes needed?
Local property propagation is missed in `SubqueryBroadcastExec`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add a new test.
Closes #29589 from wzhfy/thread_local.
Authored-by: Zhenhua Wang <wzh_zju@163.com> Signed-off-by: Wenchen Fan
<wenchen@databricks.com>
(commit: 04f7f6d)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryBroadcastExec.scala (diff)
Commit c43460cf82a075fd071717489798cde6a61b8515 by wenchen
[SPARK-32753][SQL] Only copy tags to node with no tags
### What changes were proposed in this pull request? Only copy tags to
node with no tags when transforming plans.
### Why are the changes needed? cloud-fan [made a good
point](https://github.com/apache/spark/pull/29593#discussion_r482013121)
that it doesn't make sense to append tags to existing nodes when nodes
are removed. That will cause such bugs as duplicate rows when
deduplicating and repartitioning by the same column with AQE.
``` spark.range(10).union(spark.range(10)).createOrReplaceTempView("v1")
val df = spark.sql("select id from v1 group by id distribute by id")
println(df.collect().toArray.mkString(","))
println(df.queryExecution.executedPlan)
// With AQE
[4],[0],[3],[2],[1],[7],[6],[8],[5],[9],[4],[0],[3],[2],[1],[7],[6],[8],[5],[9]
AdaptiveSparkPlan(isFinalPlan=true)
+- CustomShuffleReader local
  +- ShuffleQueryStage 0
     +- Exchange hashpartitioning(id#183L, 10), true
        +- *(3) HashAggregate(keys=[id#183L], functions=[],
output=[id#183L])
           +- Union
              :- *(1) Range (0, 10, step=1, splits=2)
              +- *(2) Range (0, 10, step=1, splits=2)
// Without AQE
[4],[7],[0],[6],[8],[3],[2],[5],[1],[9]
*(4) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
+- Exchange hashpartitioning(id#206L, 10), true
  +- *(3) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
     +- Union
        :- *(1) Range (0, 10, step=1, splits=2)
        +- *(2) Range (0, 10, step=1, splits=2)
```
It's too expensive to detect node removal so we make a compromise only
to copy tags to node with no tags.
### Does this PR introduce _any_ user-facing change? Yes. Fix a bug.
### How was this patch tested? Add test.
Closes #29593 from manuzhang/spark-32753.
Authored-by: manuzhang <owenzhang1990@gmail.com> Signed-off-by: Wenchen
Fan <wenchen@databricks.com>
(commit: c43460c)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala (diff)
Commit 8bd3770552cced8356c8cf0329ddbaec67cec6d7 by gurwls223
[SPARK-32798][PYTHON] Make unionByName optionally fill missing columns
with nulls in PySpark
### What changes were proposed in this pull request?
This PR proposes to add new argument `allowMissingColumns` to
`unionByName` for allowing users to specify whether to allow missing
columns or not.
### Why are the changes needed?
To expose `allowMissingColumns` argument in Python API also. Currently
this is only exposed in Scala/Java APIs.
### Does this PR introduce _any_ user-facing change?
Yes, it adds a new examples with new argument in the docstring.
### How was this patch tested?
Doctest added and manually tested
```
$ python/run-tests --testnames pyspark.sql.dataframe Running PySpark
tests. Output is in /.../spark/python/unit-tests.log Will test against
the following Python executables: ['/.../python3', 'python3.8'] Will
test the following Python tests: ['pyspark.sql.dataframe']
/.../python3 python_implementation is CPython
/.../python3 version is: Python 3.8.5 python3.8 python_implementation is
CPython python3.8 version is: Python 3.8.5 Starting test(/.../python3):
pyspark.sql.dataframe Starting test(python3.8): pyspark.sql.dataframe
Finished test(python3.8): pyspark.sql.dataframe (35s) Finished
test(/.../python3): pyspark.sql.dataframe (35s) Tests passed in 35
seconds
```
Closes #29657 from itholic/SPARK-32798.
Authored-by: itholic <haejoon309@naver.com> Signed-off-by: HyukjinKwon
<gurwls223@apache.org>
(commit: 8bd3770)
The file was modifiedpython/pyspark/sql/dataframe.py (diff)
Commit 954cd9feaa1a3d4ad9a235811ae58e02a63e8386 by gurwls223
[SPARK-32810][SQL] CSV/JSON data sources should avoid globbing paths
when inferring schema
### What changes were proposed in this pull request? In the PR, I
propose to fix an issue with the CSV and JSON data sources in Spark SQL
when both of the following are true:
* no user specified schema
* some file paths contain escaped glob metacharacters, such as `[``]`,
`{``}`, `*` etc.
### Why are the changes needed? To fix the issue when the follow two
queries try to read from paths `[abc].csv` and `[abc].json`:
```scala spark.read.csv("""/tmp/\[abc\].csv""").show
spark.read.json("""/tmp/\[abc\].json""").show
``` but would end up hitting an exception:
``` org.apache.spark.sql.AnalysisException: Path does not exist:
file:/tmp/[abc].csv;
at
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:722)
at
scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:392)
```
### Does this PR introduce _any_ user-facing change? Yes
### How was this patch tested? Added new test cases in
`DataFrameReaderWriterSuite`.
Closes #29659 from MaxGekk/globbing-paths-when-inferring-schema.
Authored-by: Max Gekk <max.gekk@gmail.com> Signed-off-by: HyukjinKwon
<gurwls223@apache.org>
(commit: 954cd9f)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala (diff)
Commit 117a6f135bafdb117f61ba74999e6df6dbba5e08 by yamamuro
[SPARK-32638][SQL][FOLLOWUP] Move the plan rewriting methods to
QueryPlan
### What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/29485
It moves the plan rewriting methods from `Analyzer` to `QueryPlan`, so
that it can work with `SparkPlan` as well. This PR also does an
improvement to support a corner case (The attribute to be replace stays
together with an unresolved attribute), and make it more general, so
that `WidenSetOperationTypes` can rewrite the plan in one shot like
before.
### Why are the changes needed?
Code cleanup and generalize.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing test
Closes #29643 from cloud-fan/cleanup.
Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Takeshi
Yamamuro <yamamuro@apache.org>
(commit: 117a6f1)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala (diff)
Commit c336ae39cdda5f06859374c464778732d3d96917 by gurwls223
[SPARK-32186][DOCS][PYTHON] Development - Debugging
### What changes were proposed in this pull request?
This PR proposes to document the way of debugging PySpark. It's pretty
much self-descriptive.
I made a demo site to review it more effectively:
https://hyukjin-spark.readthedocs.io/en/stable/development/debugging.html
### Why are the changes needed?
To let users know how to debug PySpark applications.
### Does this PR introduce _any_ user-facing change?
Yes, it adds a new page in the documentation about debugging PySpark.
### How was this patch tested?
Manually built the doc.
Closes #29639 from HyukjinKwon/SPARK-32186.
Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by:
HyukjinKwon <gurwls223@apache.org>
(commit: c336ae3)
The file was addeddocs/img/pyspark-remote-debug1.png
The file was modifiedpython/docs/source/development/index.rst (diff)
The file was addeddocs/img/pyspark-remote-debug2.png
The file was addedpython/docs/source/development/debugging.rst
Commit c8c082ce380b2357623511c6625503fb3f1d65bf by gurwls223
[SPARK-32812][PYTHON][TESTS] Avoid initiating a process during the main
process for run-tests.py
### What changes were proposed in this pull request?
In certain environments, seems it fails to run `run-tests.py` script as
below:
``` Traceback (most recent call last):
File "<string>", line 1, in <module>
...
raise RuntimeError(''' RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable. Traceback (most
recent call last):
...
raise EOFError EOFError
```
The reason is that `Manager.dict()` launches another process when the
main process is initiated.
It works in most environments for an unknown reason but it should be
good to avoid such pattern as guided from Python itself.
### Why are the changes needed?
To prevent the test failure for Python.
### Does this PR introduce _any_ user-facing change?
No, it fixes a test script.
### How was this patch tested?
Manually ran the script after fixing.
``` Running PySpark tests. Output is in /.../python/unit-tests.log Will
test against the following Python executables: ['/.../python3',
'python3.8'] Will test the following Python tests:
['pyspark.sql.dataframe']
/.../python3 python_implementation is CPython
/.../python3 version is: Python 3.8.5 python3.8 python_implementation is
CPython python3.8 version is: Python 3.8.5 Starting test(/.../python3):
pyspark.sql.dataframe Starting test(python3.8): pyspark.sql.dataframe
Finished test(/.../python3): pyspark.sql.dataframe (33s) Finished
test(python3.8): pyspark.sql.dataframe (34s) Tests passed in 34 seconds
```
Closes #29666 from itholic/SPARK-32812.
Authored-by: itholic <haejoon309@naver.com> Signed-off-by: HyukjinKwon
<gurwls223@apache.org>
(commit: c8c082c)
The file was modifiedpython/run-tests.py (diff)
Commit 4144b6da5215e9264056c2f6720fd819e18e23a5 by dongjoon
[SPARK-32764][SQL] -0.0 should be equal to 0.0
### What changes were proposed in this pull request?
This is a Spark 3.0 regression introduced by
https://github.com/apache/spark/pull/26761. We missed a corner case that
`java.lang.Double.compare` treats 0.0 and -0.0 as different, which
breaks SQL semantic.
This PR adds back the `OrderingUtil`, to provide custom compare methods
that take care of 0.0 vs -0.0
### Why are the changes needed?
Fix a correctness bug.
### Does this PR introduce _any_ user-facing change?
Yes, now `SELECT  0.0 > -0.0` returns false correctly as Spark 2.x.
### How was this patch tested?
new tests
Closes #29647 from cloud-fan/float.
Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by:
Dongjoon Hyun <dongjoon@apache.org>
(commit: 4144b6d)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/types/DoubleType.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/types/numerics.scala (diff)
The file was addedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/SQLOrderingUtilSuite.scala
The file was addedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SQLOrderingUtil.scala
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/types/FloatType.scala (diff)
Commit 125cbe3ae0d664ddc80b5b83cc82a43a0cefb5ca by wenchen
[SPARK-32736][CORE] Avoid caching the removed decommissioned executors
in TaskSchedulerImpl
### What changes were proposed in this pull request?
The motivation of this PR is to avoid caching the removed decommissioned
executors in `TaskSchedulerImpl`. The cache is introduced in
https://github.com/apache/spark/pull/29422. The cache will hold the
`isHostDecommissioned` info for a while. So if the task `FetchFailure`
event comes after the executor loss event, `DAGScheduler` can still get
the `isHostDecommissioned` from the cache and unregister the host
shuffle map status when the host is decommissioned too.
This PR tries to achieve the same goal without the cache. Instead of
saving the `workerLost` in `ExecutorUpdated` /
`ExecutorDecommissionInfo` / `ExecutorDecommissionState`, we could save
the `hostOpt` directly. When the host is decommissioned or lost too, the
`hostOpt` can be a specific host address. Otherwise, it's `None` to
indicate that only the executor is decommissioned or lost.
Now that we have the host info, we can also unregister the host shuffle
map status when `executorLost` is triggered for the decommissioned
executor.
Besides, this PR also includes a few cleanups around the touched code.
### Why are the changes needed?
It helps to unregister the shuffle map status earlier for both
decommission and normal executor lost cases.
It also saves memory in  `TaskSchedulerImpl` and simplifies the code a
little bit.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
This PR only refactor the code. The original behaviour should be covered
by `DecommissionWorkerSuite`.
Closes #29579 from Ngone51/impr-decom.
Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Wenchen Fan
<wenchen@databricks.com>
(commit: 125cbe3)
The file was modifiedcore/src/main/scala/org/apache/spark/deploy/master/Master.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/internal/config/package.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala (diff)
The file was modifiedcore/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala (diff)
The file was modifiedcore/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/deploy/DeployMessage.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala (diff)
The file was modifiedcore/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala (diff)
The file was modifiedstreaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala (diff)
The file was modifiedstreaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala (diff)
The file was modifiedcore/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala (diff)
The file was modifiedcore/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala (diff)
The file was modifiedcore/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala (diff)
The file was modifiedcore/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala (diff)