Changes

Summary

  1. [SPARK-35296][SQL] Allow Dataset.observe to work even if (commit: 44b695f) (details)
  2. [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push (commit: b5a1503) (details)
  3. [SPARK-35716][SQL] Support casting of timestamp without time zone to (commit: d21ff13) (details)
  4. [SPARK-32975][K8S][FOLLOWUP] Avoid None.get exception (commit: b4b78ce) (details)
  5. [SPARK-35692][K8S] Use AtomicInteger for executor id generating (commit: bc1edba) (details)
  6. [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge (commit: a97885b) (details)
  7. [SPARK-35593][K8S][CORE] Support shuffle data recovery on the reused (commit: cf07036) (details)
  8. [SPARK-35709][DOCS] Remove the reference to third party Nomad (commit: 912d60b) (details)
  9. [SPARK-34512][BUILD][SQL] Upgrade built-in Hive to 2.3.9 (commit: 463daab) (details)
  10. [SPARK-35640][SQL] Refactor Parquet vectorized reader to remove (commit: e9ccf4a) (details)
  11. [SPARK-35718][SQL] Support casting of Date to timestamp without time (commit: e9af457) (details)
  12. [SPARK-35439][SQL][FOLLOWUP] ExpressionContainmentOrdering should not (commit: c463472) (details)
  13. [SPARK-35694][INFRA][FOLLOWUP] Increase the default JVM stack size of (commit: 62be22e) (details)
  14. [SPARK-35591][PYTHON][DOCS] Rename "Koalas" to "pandas API on Spark" in (commit: ebe529e) (details)
  15. [SPARK-35652][SQL] joinWith on two table generated from same one (commit: 6e1aa15) (details)
  16. [SPARK-35706][SQL] Consider making the ':' in STRUCT data type (commit: 57ce64c) (details)
  17. [SPARK-35695][SQL] Collect observed metrics from cached and adaptive (commit: 692dc66) (details)
  18. [SPARK-35704][SQL] Add fields to `DayTimeIntervalType` (commit: d53831f) (details)
  19. [SPARK-35396][SQL][TESTS][FOLLOWUP] Add a UT to check if a user-defined (commit: e958833) (details)
  20. [SPARK-35475][PYTHON] Fix disallow_untyped_defs mypy checks (commit: 4d21b94) (details)
  21. [SPARK-35689][SS] Add log warn when keyWithIndexToValue returns null (commit: 703376e) (details)
  22. [SPARK-35321][SQL] Don't register Hive permanent functions when creating (commit: 9c7250f) (details)
  23. [SPARK-35738][PYTHON] Support 'y' properly in DataFrame with non-numeric (commit: 76e08a8) (details)
  24. [SPARK-35746][UI] Fix taskid in the stage page task event timeline (commit: 450b415) (details)
  25. [SPARK-35734][SQL] Format day-time intervals using type fields (commit: 80f7989) (details)
  26. [SPARK-35701][SQL] Use copy-on-write semantics for SQLConf registered (commit: 0ba1d38) (details)
Commit 44b695fbb06b0d89783b4838941c68543c5a5c8b by wenchen
[SPARK-35296][SQL] Allow Dataset.observe to work even if CollectMetricsExec in a task handles multiple partitions

### What changes were proposed in this pull request?

This PR fixes an issue that `Dataset.observe` doesn't work if `CollectMetricsExec` in a task handles multiple partitions.
If `coalesce` follows `observe` and the number of partitions shrinks after `coalesce`, `CollectMetricsExec` can handle multiple partitions in a task.

### Why are the changes needed?

The current implementation of `CollectMetricsExec` doesn't consider the case it can handle multiple partitions.
Because new `updater` is created for each partition even though those partitions belong to the same task, `collector.setState(updater)` raise an assertion error.
This is a simple reproducible example.
```
$ bin/spark-shell --master "local[1]"
scala> spark.range(1, 4, 1, 3).observe("my_event", count($"id").as("count_val")).coalesce(2).collect
```
```
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:208)
at org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204)
at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72)
at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71)
at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125)
at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124)
at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124)
at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137)
at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test.

Closes #32786 from sarutak/fix-collectmetricsexec.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(commit: 44b695f)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/CollectMetricsExec.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala (diff)
Commit b5a15035851bfba12ef1c68d10103cec42cbac0c by mridulatgmail.com
[SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage

### What changes were proposed in this pull request?

Summary of the changes made as part of this PR:

1. `DAGScheduler` changes to finalize a ShuffleMapStage which involves talking to all the shuffle mergers (`ExternalShuffleService`) and getting all the completed merge statuses.
2. Once the `ShuffleMapStage` finalization is complete, mark the `ShuffleMapStage` to be finalized which marks the stage as complete and subsequently letting the child stage start.
3. Also added the relevant tests to `DAGSchedulerSuite` for changes made as part of [SPARK-32919](https://issues.apache.org/jira/browse/SPARK-32919)

Lead-authored-by: Min Shen mshenlinkedin.com
Co-authored-by: Venkata krishnan Sowrirajan vsowrirajanlinkedin.com
Co-authored-by: Chandni Singh chsinghlinkedin.com

### Why are the changes needed?

Refer to [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602)

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added unit tests to DAGSchedulerSuite

Closes #30691 from venkata91/SPARK-32920.

Lead-authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Co-authored-by: Min Shen <mshen@linkedin.com>
Co-authored-by: Chandni Singh <chsingh@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(commit: b5a1503)
The file was modifiedcore/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/MapOutputTracker.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala (diff)
The file was modifiedcore/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/Dependency.scala (diff)
The file was modifiedcore/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/internal/config/package.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/scheduler/StageInfo.scala (diff)
Commit d21ff1318f614fc207d9cd3c485e4337faa8e878 by max.gekk
[SPARK-35716][SQL] Support casting of timestamp without time zone to date type

### What changes were proposed in this pull request?

Extend the Cast expression and support TimestampWithoutTZType in casting to DateType.

### Why are the changes needed?

To conform the ANSI SQL standard which requires to support such casting.

### Does this PR introduce _any_ user-facing change?

No, the new timestamp type is not released yet.

### How was this patch tested?

Unit test

Closes #32869 from gengliangwang/castToDate.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(commit: d21ff13)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala (diff)
Commit b4b78ce26567ce7ab83d47ce3b6af87c866bcacb by dhyun
[SPARK-32975][K8S][FOLLOWUP] Avoid None.get exception

### What changes were proposed in this pull request?

A follow-up for SPARK-32975 to avoid unexpected the `None.get` exception

Run SparkPi with docker desktop, as podName is an option, we will got
```logtalk
21/06/09 01:09:12 ERROR Utils: Uncaught exception in thread main
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:529)
at scala.None$.get(Option.scala:527)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$1(ExecutorPodsAllocator.scala:110)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1417)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.start(ExecutorPodsAllocator.scala:111)
at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.start(KubernetesClusterSchedulerBackend.scala:99)
at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:220)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:581)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2686)
at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:948)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:942)
at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:30)
at org.apache.spark.examples.SparkPi.main(SparkPi.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
```

### Why are the changes needed?

fix a regression

### Does this PR introduce _any_ user-facing change?

no
### How was this patch tested?

Manual.

Closes #32830 from yaooqinn/SPARK-32975.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(commit: b4b78ce)
The file was modifiedresource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala (diff)
Commit bc1edba8f6f5e5a5de0d3005ad685054d86e478f by dhyun
[SPARK-35692][K8S] Use AtomicInteger for executor id generating

### What changes were proposed in this pull request?

AtomicInteger is enough for executor ids, in this PR, we use it to replace AtomicLong like other cluster managers, e.g. yarn, standalone

### Why are the changes needed?

See the discussion here https://github.com/apache/spark/pull/32610#discussion_r648007320

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

pass CI with existing tests

Closes #32837 from yaooqinn/SPARK-35692.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(commit: bc1edba)
The file was modifiedresource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala (diff)
The file was modifiedresource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala (diff)
Commit a97885bb2c81563a18c99df233fb9e99ff368c9c by mridulatgmail.com
[SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data

### What changes were proposed in this pull request?
This is one of the patches for SPIP SPARK-30602 which is needed for push-based shuffle.

### Summary of changes:
Executor will create the merge directories under the application temp directory provided by YARN. The access control of the folder will be set to 770, where Shuffle Service can create merged shuffle files and write merge shuffle data in to those files.

Serve the merged shuffle blocks fetch request, read the merged shuffle blocks.

### Why are the changes needed?
Refer to the SPIP in SPARK-30602.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Added unit tests.
The reference PR with the consolidated changes covering the complete implementation is also provided in SPARK-30602.
We have already verified the functionality and the improved performance as documented in the SPIP doc.

Lead-authored-by: Min Shen mshenlinkedin.com
Co-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Ye Zhou yezhoulinkedin.com

Closes #32007 from zhouyejoe/SPARK-33350.

Lead-authored-by: Ye Zhou <yezhou@linkedin.com>
Co-authored-by: Chandni Singh <chsingh@linkedin.com>
Co-authored-by: Min Shen <mshen@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(commit: a97885b)
The file was modifiedcore/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala (diff)
The file was modifiedcore/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala (diff)
The file was modifiedcore/src/test/scala/org/apache/spark/util/UtilsSuite.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/storage/BlockId.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/storage/BlockManager.scala (diff)
The file was modifiedcommon/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java (diff)
The file was modifiedcore/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala (diff)
The file was modifiedcore/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleReadingSuite.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/MapOutputTracker.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/util/Utils.scala (diff)
Commit cf07036d9bfc5c07815ab006476fd505a6863dc4 by dhyun
[SPARK-35593][K8S][CORE] Support shuffle data recovery on the reused PVCs

### What changes were proposed in this pull request?

Previously, the following two commits allow driver-owned on-demand PVC reuse.
- SPARK-35182 Support driver-owned on-demand PVC
- SPARK-35416 Support PersistentVolumeClaim Reuse

This PR aims to recover the shuffle data on those remounted PVCs. The lifecycle of PVCs are tied to the one of Spark jobs. Since this is K8s specific feature, `ShuffleDataIO` plugin is used.

### Why are the changes needed?

Although Pod is killed, we can remount PVCs and recover some data from it.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the newly added test cases.

Closes #32730 from dongjoon-hyun/SPARK-RECOVER-SHUFFLE-DATA.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(commit: cf07036)
The file was modifiedcore/src/main/scala/org/apache/spark/MapOutputTracker.scala (diff)
The file was addedresource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleDataIO.scala
The file was addedresource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
The file was addedresource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleDataIOSuite.scala
Commit 912d60b6dd2aa434fcde59e61fbe3adadd6c4975 by yamamuro
[SPARK-35709][DOCS] Remove the reference to third party Nomad integration project

### What changes were proposed in this pull request?
This PR updates documentation by removing reference to [hashicorp/nomad-spark](https://github.com/hashicorp/nomad-spark) which has been deprecated in April 2020, and will not be developed any longer.

### Why are the changes needed?
To keep the documentation updated and remove confusion for potential users being interested in running with Nomad.

### Does this PR introduce _any_ user-facing change?
Yes. A change to the documentation.

### How was this patch tested?
Generated to documentation, and checked everything is alright in the output.

Closes #32860 from pptaszynski/doc/remove-spark-nomad-project-reference.

Authored-by: Pawel Ptaszynski <pawel.ptaszynski@bolt.eu>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
(commit: 912d60b)
The file was modifieddocs/cluster-overview.md (diff)
Commit 463daabd5afd9abfb8027ebcb2e608f169ad1e40 by dhyun
[SPARK-34512][BUILD][SQL] Upgrade built-in Hive to 2.3.9

### What changes were proposed in this pull request?

This pr upgrades built-in Hive to 2.3.9. Hive 2.3.9 changes:
- [HIVE-17155] - findConfFile() in HiveConf.java has some issues with the conf path
- [HIVE-24797] - Disable validate default values when parsing Avro schemas
- [HIVE-24608] - Switch back to get_table in HMS client for Hive 2.3.x
- [HIVE-21200] - Vectorization: date column throwing java.lang.UnsupportedOperationException for parquet
- [HIVE-21563] - Improve Table#getEmptyTable performance by disabling registerAllFunctionsOnce
- [HIVE-19228] - Remove commons-httpclient 3.x usage

### Why are the changes needed?

Fix regression caused by AVRO-2035.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #32750 from wangyum/SPARK-34512.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(commit: 463daab)
The file was modifieddev/deps/spark-deps-hadoop-3.2-hive-2.3 (diff)
The file was modifieddocs/building-spark.md (diff)
The file was modifiedsql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala (diff)
The file was modifieddocs/sql-data-sources-hive-tables.md (diff)
The file was modifiedpom.xml (diff)
The file was modifieddev/deps/spark-deps-hadoop-2.7-hive-2.3 (diff)
The file was modifieddocs/sql-migration-guide.md (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala (diff)
The file was modifiedsql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala (diff)
The file was modifiedsql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala (diff)
The file was modifiedsql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala (diff)
Commit e9ccf4a50cc9c55daa08af49134ceb66394ab4c8 by d_tsai
[SPARK-35640][SQL] Refactor Parquet vectorized reader to remove duplicated code paths

### What changes were proposed in this pull request?

1. Remove duplicated code in the form of `readXXX` in `VectorizedRleValuesReader`. For instance:
```java
  public void readIntegers(
      int total,
      WritableColumnVector c,
      int rowId,
      int level,
      VectorizedValuesReader data) throws IOException {
    int left = total;
    while (left > 0) {
      if (this.currentCount == 0) this.readNextGroup();
      int n = Math.min(left, this.currentCount);
      switch (mode) {
        case RLE:
          if (currentValue == level) {
            data.readIntegers(n, c, rowId);
          } else {
            c.putNulls(rowId, n);
          }
          break;
        case PACKED:
          for (int i = 0; i < n; ++i) {
            if (currentBuffer[currentBufferIdx++] == level) {
              c.putInt(rowId + i, data.readInteger());
            } else {
              c.putNull(rowId + i);
            }
          }
          break;
      }
      rowId += n;
      left -= n;
      currentCount -= n;
    }
  }
```
and replace with:
```java
  public void readBatch(
       int total,
       int offset,
       WritableColumnVector values,
       int maxDefinitionLevel,
       VectorizedValuesReader valueReader,
       ParquetVectorUpdater updater) throws IOException {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
       int n = Math.min(left, this.currentCount);
       switch (mode) {
         case RLE:
           if (currentValue == maxDefinitionLevel) {
             updater.updateBatch(n, offset, values, valueReader);
           } else {
             values.putNulls(offset, n);
           }
           break;
         case PACKED:
           for (int i = 0; i < n; ++i) {
             if (currentBuffer[currentBufferIdx++] == maxDefinitionLevel) {
               updater.update(offset + i, values, valueReader);
             } else {
               values.putNull(offset + i);
             }
           }
           break;
       }
       offset += n;
       left -= n;
       currentCount -= n;
     }
   }
```
where the `ParquetVectorUpdater` is type specific, and has different implementations under `updateBatch` and `update`. Together, this also changes code paths handling timestamp types to use the batch read API for decoding definition levels.

2. Similar to the above, this removes code duplication in `VectorizedColumnReader.decodeDictionaryIds`. Now different implementations are under `ParquetVectorUpdater.decodeSingleDictionaryId`.

### Why are the changes needed?

`VectorizedRleValuesReader` and `VectorizedColumnReader` are becoming increasingly harder to maintain, as any change touches the above logic **will need to be replicated in 20+ places**. The issue becomes even more serious when we are going to implement column index (for instance, see how the change [here](https://github.com/apache/spark/pull/32753/files#diff-a01e174e178366aadf07f64ee690d47d343b2ca416a4a2b2ea735887c22d5934R191) has to be replicated multiple times) and complex type support (in progress) for the vectorized path.

In addition, currently dictionary decoding (see `VectorizedColumnReader.decodeDictionaryIds`) and non-dictionary decoding are handled separately, and therefore the same (very complicated) branching logic based on input Spark & Parquet types have to be replicated in two places, which is another burden for code maintenance.

The original intention is for performance. However these days JIT compilers tend to be very effective on this and will inline virtual calls aggressively to eliminate the method invocation costs (see [this](https://shipilev.net/blog/2015/black-magic-method-dispatch/) and [this](http://insightfullogic.com/blog/2014/may/12/fast-and-megamorphic-what-influences-method-invoca/)). I've also done benchmarks using a modified `DataSourceReadBenchmark` and `DateTimeRebaseBenchmark` and the result is almost exact the same before and after the change. The results can be found [here](https://gist.github.com/sunchao/674afbf942ccc2370bdcfa33efb4471c), and [here's](https://github.com/sunchao/spark/tree/parquet-refactor) the source code.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #32777 from sunchao/SPARK-35640.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
(commit: e9ccf4a)
The file was addedsql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdater.java
The file was addedsql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala (diff)
The file was modifiedsql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java (diff)
The file was modifiedsql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java (diff)
Commit e9af4576d5463f53fd431d2d59e49991f3c9b287 by wenchen
[SPARK-35718][SQL] Support casting of Date to timestamp without time zone type

### What changes were proposed in this pull request?

Extend the Cast expression and support DateType in casting to TimestampWithoutTZType.

### Why are the changes needed?

To conform the ANSI SQL standard which requires to support such casting.

### Does this PR introduce _any_ user-facing change?

No, the new timestamp type is not released yet.

### How was this patch tested?

Unit test

Closes #32873 from gengliangwang/dateToTswtz.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(commit: e9af457)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala (diff)
Commit c463472e850ecb9401dc1ddecd015c01a4def2a4 by yamamuro
[SPARK-35439][SQL][FOLLOWUP] ExpressionContainmentOrdering should not sort unrelated expressions

### What changes were proposed in this pull request?

This is a followup of #32586. We introduced `ExpressionContainmentOrdering` to sort common expressions according to their parent-child relations. For unrelated expressions, previously the ordering returns -1 which is not correct and can possibly lead to transitivity issue.

### Why are the changes needed?

To fix the possible transitivity issue of `ExpressionContainmentOrdering`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test.

Closes #32870 from viirya/SPARK-35439-followup.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
(commit: c463472)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala (diff)
Commit 62be22e929a741fc47e319f4323c0c5afddcf56a by gurwls223
[SPARK-35694][INFRA][FOLLOWUP] Increase the default JVM stack size of SBT/Maven

### What changes were proposed in this pull request?

In https://github.com/apache/spark/pull/32838, we set the default JVM stack size to 16M from 4M.
However, there are still stackoverflow error in builds:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139672/console

Let's update the value to 64M

### Why are the changes needed?

Make test build stable.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Manual trigger test builds.

Closes #32879 from gengliangwang/increaseStackAgain.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(commit: 62be22e)
The file was modified.github/workflows/build_and_test.yml (diff)
The file was modifiedpom.xml (diff)
The file was modifiedbuild/sbt (diff)
Commit ebe529e8e1849bc701a6ba89a9ed91b8edba3068 by gurwls223
[SPARK-35591][PYTHON][DOCS] Rename "Koalas" to "pandas API on Spark" in the documents

### What changes were proposed in this pull request?

This PR proposes the change the name "Koalas" to the "Pandas APIs on Spark" in the documents.

### Why are the changes needed?

Since we don't use the name "Koalas" anymore.

We should use "Pandas APIs on Spark" instead.

### Does this PR introduce _any_ user-facing change?

Yes, the name "Koalas" is renamed to "Pandas APIs on Spark" in the documents.

### How was this patch tested?

Manually built the docs and checked one by one.

Closes #32835 from itholic/SPARK-35591.

Authored-by: itholic <haejoon.lee@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(commit: ebe529e)
The file was modifiedpython/docs/source/reference/pyspark.pandas/extensions.rst (diff)
The file was modifiedpython/docs/source/user_guide/pandas_on_spark/from_to_dbms.rst (diff)
The file was modifiedpython/docs/source/user_guide/pandas_on_spark/typehints.rst (diff)
The file was modifiedpython/docs/source/user_guide/pandas_on_spark/best_practices.rst (diff)
The file was modifiedpython/docs/source/development/ps_contributing.rst (diff)
The file was modifiedpython/docs/source/user_guide/pandas_on_spark/types.rst (diff)
The file was modifiedpython/docs/source/user_guide/pandas_on_spark/pandas_pyspark.rst (diff)
The file was modifiedpython/docs/source/development/ps_design.rst (diff)
The file was modifiedpython/docs/source/reference/pyspark.pandas/frame.rst (diff)
The file was modifiedpython/docs/source/getting_started/ps_install.rst (diff)
The file was modifiedpython/docs/source/reference/pyspark.pandas/ml.rst (diff)
The file was modifiedpython/docs/source/reference/pyspark.pandas/series.rst (diff)
The file was modifiedpython/docs/source/user_guide/pandas_on_spark/transform_apply.rst (diff)
The file was modifiedpython/docs/source/user_guide/pandas_on_spark/options.rst (diff)
The file was modifiedpython/docs/source/getting_started/ps_10mins.ipynb (diff)
Commit 6e1aa15679b5fed249c62b2340151a0299401b18 by wenchen
[SPARK-35652][SQL] joinWith on two table generated from same one

### What changes were proposed in this pull request?
It seems like spark inner join is performing a cartesian join in self joining using `joinWith`

To produce this issues:
```
val df = spark.range(0,3)
df.joinWith(df, df("id") === df("id")).show()
```

Before this pull request, the result is
+---+---+
| _1 |  _2 |
+---+---+
|    0 |   0 |
|    0 |   1 |
|    0 |   2 |
|    1 |   0 |
|    1 |   1 |
|    1 |   2 |
|    2 |   0 |
|    2 |   1 |
|    2 |   2 |
+---+---+

The expected result is
+---+---+
| _1 |  _2 |
+---+---+
|    0 |   0 |
|    1 |   1 |
|    2 |   2 |
+---+---+
### Why are the changes needed?
correctness

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
add test

Closes #32863 from dgd-contributor/SPARK-35652_join_and_joinWith_in_seft_joining.

Authored-by: dgd-contributor <dgd_contributor@viettel.com.vn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(commit: 6e1aa15)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/Dataset.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala (diff)
Commit 57ce64c511c70b5a26c31b4dc69fc7e93b8bc94c by wenchen
[SPARK-35706][SQL] Consider making the ':' in STRUCT data type definition optional

### What changes were proposed in this pull request?

The STRUCT type syntax is defined like this:

STRUCT(fieldNmae: fileType [NOT NULL][COMMENT stringLiteral][,.....])

So the field list is nearly the same as a column list

if we could make ':' optional it would be so much cleaner an less proprietary

### Why are the changes needed?
ease of use

### Does this PR introduce _any_ user-facing change?
Yes, you can use Struct type list is nearly the same as a column list

### How was this patch tested?
unit tests

Closes #32858 from jerqi/master.

Authored-by: RoryQi <1242949407@qq.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(commit: 57ce64c)
The file was modifiedsql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala (diff)
The file was modifieddocs/sql-ref-datatypes.md (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala (diff)
Commit 692dc66c4a3660665c1f156df6eeb9ce6f86195e by wenchen
[SPARK-35695][SQL] Collect observed metrics from cached and adaptive execution sub-trees

### What changes were proposed in this pull request?

Collect observed metrics from cached and adaptive execution sub-trees.

### Why are the changes needed?

Currently persisting/caching will hide all observed metrics in that sub-tree from reaching the `QueryExecutionListeners`. Adaptive query execution can also hide the metrics from reaching `QueryExecutionListeners`.

### Does this PR introduce _any_ user-facing change?

Bugfix

### How was this patch tested?

New UTs

Closes #32862 from tanelk/SPARK-35695_collect_metrics_persist.

Lead-authored-by: Tanel Kiis <tanel.kiis@gmail.com>
Co-authored-by: tanel.kiis@gmail.com <tanel.kiis@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(commit: 692dc66)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/CollectMetricsExec.scala (diff)
Commit d53831ff5cef618cfbc08dcea84ca9bb61ed9903 by max.gekk
[SPARK-35704][SQL] Add fields to `DayTimeIntervalType`

### What changes were proposed in this pull request?
Extend DayTimeIntervalType to support interval fields. Valid interval field values:
- 0 (DAY)
- 1 (HOUR)
- 2 (MINUTE)
- 3 (SECOND)

After the changes, the following day-time interval types are supported:
1. `DayTimeIntervalType(0, 0)` or `DayTimeIntervalType(DAY, DAY)`
2. `DayTimeIntervalType(0, 1)` or `DayTimeIntervalType(DAY, HOUR)`
3. `DayTimeIntervalType(0, 2)` or `DayTimeIntervalType(DAY, MINUTE)`
4. `DayTimeIntervalType(0, 3)` or `DayTimeIntervalType(DAY, SECOND)`. **It is the default one**. The second fraction precision is microseconds.
5. `DayTimeIntervalType(1, 1)` or `DayTimeIntervalType(HOUR, HOUR)`
6. `DayTimeIntervalType(1, 2)` or `DayTimeIntervalType(HOUR, MINUTE)`
7. `DayTimeIntervalType(1, 3)` or `DayTimeIntervalType(HOUR, SECOND)`
8. `DayTimeIntervalType(2, 2)` or `DayTimeIntervalType(MINUTE, MINUTE)`
9. `DayTimeIntervalType(2, 3)` or `DayTimeIntervalType(MINUTE, SECOND)`
10. `DayTimeIntervalType(3, 3)` or `DayTimeIntervalType(SECOND, SECOND)`

### Why are the changes needed?
In the current implementation, Spark supports only `interval day to second` but the SQL standard allows to specify the start and end fields. The changes will allow to follow ANSI SQL standard more precisely.

### Does this PR introduce _any_ user-facing change?
Yes but `DayTimeIntervalType` has not been released yet.

### How was this patch tested?
By existing test suites.

Closes #32849 from MaxGekk/day-time-interval-type-units.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(commit: d53831f)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MutableProjectionSuite.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeTestUtils.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowWriterSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala (diff)
The file was modifiedsql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java (diff)
The file was modifiedsql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/util/ArrowUtilsSuite.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala (diff)
The file was modifiedsql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralGenerator.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/IntervalExpressionsSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala (diff)
The file was modifiedsql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala (diff)
The file was modifiedsql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnBuilder.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashMapGenerator.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/IntervalUtilsSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala (diff)
The file was modifiedsql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/intervalExpressions.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ArithmeticExpressionSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/types/DayTimeIntervalType.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGeneratorSuite.scala (diff)
The file was modifiedsql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala (diff)
The file was modifiedsql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala (diff)
Commit e958833c727442fc9efa4fc92f93db16cd5c8476 by dhyun
[SPARK-35396][SQL][TESTS][FOLLOWUP] Add a UT to check if a user-defined cachedBatch is completely released

### What changes were proposed in this pull request?
This PR is used to do add a UT to check if user-defined cached batch are completely released when clearCache called.

### Why are the changes needed?
Add a new UT file RefCountedTestCachedBatchSerializerSuite.scala

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT is added, org.apache.spark.sql.execution.columnar.RefCountedTestCachedBatchSerializerSuite

Closes #32717 from xuechendi/support_manual_close_in_InMemoryRelation.

Authored-by: Chendi Xue <chendi.xue@intel.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(commit: e958833)
The file was addedsql/core/src/test/scala/org/apache/spark/sql/execution/columnar/RefCountedTestCachedBatchSerializerSuite.scala
Commit 4d21b94d13daf958edf6bbf49c4aa17f5cb012b5 by ueshin
[SPARK-35475][PYTHON] Fix disallow_untyped_defs mypy checks

### What changes were proposed in this pull request?

Adds more type annotations in the file `python/pyspark/pandas/namespace.py` and fixes the mypy check failures.

### Why are the changes needed?

We should enable more disallow_untyped_defs mypy checks.

### Does this PR introduce _any_ user-facing change?

Yes.
This PR adds more type annotations in pandas APIs on Spark module, which can impact interaction with development tools for users.

### How was this patch tested?

The mypy check with a new configuration and existing tests should pass.

Closes #32871 from ueshin/issues/SPARK-35475/disallow_untyped_defs.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
(commit: 4d21b94)
The file was modifiedpython/mypy.ini (diff)
The file was modifiedpython/pyspark/pandas/frame.py (diff)
The file was modifiedpython/pyspark/pandas/namespace.py (diff)
The file was modifiedpython/pyspark/pandas/utils.py (diff)
Commit 703376e8a98b07326cef196962628d806cfdd9cf by kabhwan.opensource
[SPARK-35689][SS] Add log warn when keyWithIndexToValue returns null value

### What changes were proposed in this pull request?

This patch adds log warn when `keyWithIndexToValue` returns null value in `SymmetricHashJoinStateManager`.

### Why are the changes needed?

Once we get null from state store in SymmetricHashJoinStateManager, it is better to add meaningful logging for the case. It is better for debugging.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #32828 from viirya/fix-ss-joinstatemanager-followup.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(commit: 703376e)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala (diff)
Commit 9c7250fa736b29714a30ec515ceb48fd14e33e24 by yumwang
[SPARK-35321][SQL] Don't register Hive permanent functions when creating Hive client

### What changes were proposed in this pull request?

Instantiate a new Hive client through `Hive.getWithoutRegisterFns(conf, false)` instead of `Hive.get(conf)`, if `Hive` version is >= '2.3.9' (the built-in version).

### Why are the changes needed?

[HIVE-10319](https://issues.apache.org/jira/browse/HIVE-10319) introduced a new API `get_all_functions` which is only supported in Hive 1.3.0/2.0.0 and up. As result, when Spark 3.x talks to a HMS service of version 1.2 or lower, the following error will occur:
```
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.thrift.TApplicationException: Invalid method name: 'get_all_functions'
        at org.apache.hadoop.hive.ql.metadata.Hive.getAllFunctions(Hive.java:3897)
        at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:248)
        at org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:231)
        ... 96 more
Caused by: org.apache.thrift.TApplicationException: Invalid method name: 'get_all_functions'
        at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:79)
        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_all_functions(ThriftHiveMetastore.java:3845)
        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_all_functions(ThriftHiveMetastore.java:3833)
```

The `get_all_functions` is called only when `doRegisterAllFns` is set to true:
```java
  private Hive(HiveConf c, boolean doRegisterAllFns) throws HiveException {
    conf = c;
    if (doRegisterAllFns) {
      registerAllFunctionsOnce();
    }
  }
```

what this does is to register all Hive permanent functions defined in HMS in Hive's `FunctionRegistry` class, via iterating through results from `get_all_functions`. To Spark, this seems unnecessary as it loads Hive permanent (not built-in) UDF via directly calling the HMS API, i.e., `get_function`. The `FunctionRegistry` is only used in loading Hive's built-in function that is not supported by Spark. At this time, it only applies to `histogram_numeric`.

[HIVE-21563](https://issues.apache.org/jira/browse/HIVE-21563) introduced a new API `getWithoutRegisterFns` which skips the above registration and is available in Hive 2.3.9. Therefore, Spark should adopt it to avoid the cost.

### Does this PR introduce _any_ user-facing change?

Yes with this fix Spark now should be able to talk to HMS server with Hive 1.2.x and lower.

### How was this patch tested?

Manually started a HMS server of Hive version 1.2.2. Without the PR it failed with the above exception. With the PR the error disappeared and I can successfully perform common operations such as create table, create database, list tables, etc.

Closes #32887 from sunchao/SPARK-35321-new.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(commit: 9c7250f)
The file was modifiedsql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala (diff)
Commit 76e08a8e3d7bbc78a8bc4369d9cc3d8eff72be64 by gurwls223
[SPARK-35738][PYTHON] Support 'y' properly in DataFrame with non-numeric columns with plots

### What changes were proposed in this pull request?

This PR proposes to port the fix https://github.com/databricks/koalas/pull/2172.

```python
ks.DataFrame({'a': [1, 2, 3], 'b':["a", "b", "c"], 'c': [4, 5, 6]}).plot(kind='hist', x='a', y='c', bins=200)
```

**Before:**

```
pyspark.sql.utils.AnalysisException: cannot resolve 'least(min(a), min(b), min(c))' due to data type mismatch: The expressions should all have the same type, got LEAST(bigint, string, bigint).;
'Aggregate [unresolvedalias(least(min(a#1L), min(b#2), min(c#3L)), Some(org.apache.spark.sql.Column$$Lambda$1556/0x0000000800d9484042fb0cc1)), unresolvedalias(greatest(max(a#1L), max(b#2), max(c#3L)), Some(org.apache.spark.sql.Column$$Lambda$1556/0x0000000800d9484042fb0cc1))]
+- Project [a#1L, b#2, c#3L]
   +- Project [__index_level_0__#0L, a#1L, b#2, c#3L, monotonically_increasing_id() AS __natural_order__#8L]
      +- LogicalRDD [__index_level_0__#0L, a#1L, b#2, c#3L], false
```

**After:**

```python
Figure({
    'data': [{'hovertemplate': 'variable=a<br>value=%{text}<br>count=%{y}',
              'name': 'a',
...
```

### Why are the changes needed?

To match the behaviour with panadas' and allow users to set `x` and `y` in the DataFrame with non-numeric columns.

### Does this PR introduce _any_ user-facing change?

No to end users since the changes is not released yet. Yes to dev as described before.

### How was this patch tested?

Manually tested, added a test and tested in notebooks:

![Screen Shot 2021-06-11 at 9 11 25 PM](https://user-images.githubusercontent.com/6477701/121686038-a47a1b80-cafb-11eb-8f8e-8d968db7ebef.png)

![Screen Shot 2021-06-11 at 9 48 58 PM](https://user-images.githubusercontent.com/6477701/121688858-e22c7380-cafe-11eb-9d0a-adcbe560030f.png)

Closes #32884 from HyukjinKwon/fix-hist-plot.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(commit: 76e08a8)
The file was modifiedpython/pyspark/pandas/tests/plot/test_frame_plot_matplotlib.py (diff)
The file was modifiedpython/pyspark/pandas/plot/plotly.py (diff)
Commit 450b415028c3b00f3a002126cd11318d3932e28f by sarutak
[SPARK-35746][UI] Fix taskid in the stage page task event timeline

### What changes were proposed in this pull request?
Task id is given incorrect in the timeline plot in Stage Page

### Why are the changes needed?
Map event timeline plots to correct task
**Before:**
![image](https://user-images.githubusercontent.com/23054875/121761077-81775800-cb4b-11eb-8ec6-ee71926a6549.png)

**After**
![image](https://user-images.githubusercontent.com/23054875/121761195-02ceea80-cb4c-11eb-8ce6-07bb1cca190e.png)
### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Manually tested

Closes #32888 from shahidki31/shahid/fixtaskid.

Authored-by: shahid <shahidki31@gmail.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
(commit: 450b415)
The file was modifiedcore/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala (diff)
Commit 80f7989d9a761eff1a0b3c64ec3aabb81506953d by max.gekk
[SPARK-35734][SQL] Format day-time intervals using type fields

### What changes were proposed in this pull request?

This PR add a feature which formats day-time interval to strings using the start and end fields of `DayTimeIntervalType`.

### Why are the changes needed?

Currently, they are ignored, and any `DayTimeIntervalType` is formatted as `INTERVAL DAY TO SECOND.`

### Does this PR introduce _any_ user-facing change?

Yes. The format of day-time intervals is determined the start and end fields.

### How was this patch tested?

New test.

Closes #32891 from sarutak/interval-format.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(commit: 80f7989)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/IntervalUtilsSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala (diff)
Commit 0ba1d3852b3e6f89128403e988d6ca8772d1706c by viirya
[SPARK-35701][SQL] Use copy-on-write semantics for SQLConf registered configurations

### What changes were proposed in this pull request?

Using copy-on-write for `SQLConf.sqlConfEntries` and `SQLConf.staticConfKeys` to reduce contention in concurrent workloads.

### Why are the changes needed?

The global locks used to protect `SQLConf.sqlConfEntries` map and the `SQLConf.staticConfKeys` set can cause significant contention on the `SQLConf` instance in a concurrent setting.

Using copy-on-write versions should reduce the contention given that modifications to the configs are relatively rare.

Closes #32865 from haiyangsun-db/SPARK-35701.

Authored-by: Haiyang Sun <haiyang.sun@databricks.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(commit: 0ba1d38)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SQLHelper.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala (diff)