Changes

Summary

  1. [SPARK-34290][SQL][FOLLOWUP] Cleanup truncate table not supported for (commit: eee0273) (details)
  2. [SPARK-35636][PYTHON][DOCS][FOLLOW-UP] Restructure reference API files (commit: 921abc5) (details)
  3. [SPARK-35312][SS] Introduce new Option in Kafka source to specify (commit: 2a33117) (details)
  4. [SPARK-35659][SS] Avoid write null to StateStore (commit: 1226b9b) (details)
  5. [SPARK-35384][SQL][FOLLOWUP] Fix Scala doc for removed method parameters (commit: 66e38f4) (details)
  6. [SPARK-35602][SS] Update state schema to be able to accept long length (commit: 93a9dc4) (details)
  7. [SPARK-35512][PYTHON] Fix OverflowError(cannot convert float infinity to (commit: e79dd89) (details)
  8. [SPARK-30993][FOLLOWUP][SQL] Refactor LocalDateTimeUDT as YearUDT in (commit: 1b1a8e4) (details)
  9. [SPARK-35682][PYTHON][TESTS] Pin mypy==0.812 in GitHub Actions CI (commit: d12c147) (details)
  10. [SPARK-35647][PYTHON][DOCS] Restructure User Guide in PySpark (commit: afff421) (details)
  11. [SPARK-35378][SQL] Eagerly execute commands in QueryExecution instead of (commit: 8013f98) (details)
  12. [SPARK-35436][SS] RocksDBFileManager - save checkpoint to DFS (commit: 9f010a8) (details)
  13. [SPARK-35687][SQL][TEST] PythonUDFSuite move assume into its methods (commit: 825b620) (details)
  14. [SPARK-35664][SQL] Support java.time.LocalDateTime as an external type (commit: 84c5ca3) (details)
  15. Revert "[SPARK-35668][INFRA] Use "concurrency" syntax on Github Actions (commit: 3be7b29) (details)
  16. [SPARK-35058][SQL] Group exception messages in hive/client (commit: ebb4858) (details)
  17. [SPARK-35674][SQL][TESTS] Test timestamp without time zone in UDF (commit: 43f6b4a) (details)
  18. [SPARK-35694][INFRA] Increase the default JVM stack size of SBT/Maven (commit: 0b5683a) (details)
  19. [SPARK-35650][SQL] Enhance `RepartitionByExpression` to make it coalesce (commit: ce16369) (details)
  20. [SPARK-35390][SQL] Handle type coercion when resolving V2 functions (commit: 7d8181b) (details)
  21. [SPARK-35693][SS][TEST] Add plan check for stream-stream join unit test (commit: f4c8968) (details)
  22. [SPARK-35661][SQL] Allow deserialized off-heap memory entry (commit: 224ebae) (details)
  23. [SPARK-35697][SQL][TESTS] Test TimestampWithoutTZType as ordered and (commit: 313dc2d) (details)
  24. [SPARK-35423][ML] PCA results should be consistent, If the Matrix (commit: 519be23) (details)
  25. [SPARK-34382][SQL] Support LATERAL subqueries (commit: f49bf1a) (details)
  26. [SPARK-35698][SQL] Support casting of timestamp without time zone to (commit: 74b3df8) (details)
  27. [SPARK-35601][PYTHON] Complete arithmetic operators involving bool (commit: 3c66c11) (details)
  28. [SPARK-35705][PYTHON] Adjust pandas-on-spark (commit: e9d6015) (details)
  29. [MINOR][SQL] Modify the example of rand and randn (commit: 94b66f5) (details)
  30. [SPARK-35194][SQL][FOLLOWUP] Change Seq to collections.Seq in (commit: 7e99b65) (details)
  31. [MINOR][SQL] No need to normolize name for built-in functions (commit: 87d2ffb) (details)
  32. [SPARK-35675][SQL] EnsureRequirements remove shuffle should respect (commit: 8dde20a) (details)
  33. [SPARK-35679][SQL] instantToMicros overflow (commit: aa3de40) (details)
  34. [SPARK-35474] Enable disallow_untyped_defs mypy check for (commit: cadd3a0) (details)
  35. [SPARK-35673][SQL] Fix user-defined hint and unrecognized hint in (commit: 5280f02) (details)
  36. [SPARK-34524][SQL][FOLLOWUP] Remove unused checkAlterTablePartition in (commit: 88f1d82) (details)
  37. [SPARK-35711][SQL] Support casting of timestamp without time zone to (commit: 4180692) (details)
  38. [SPARK-35653][SQL] Fix CatalystToExternalMap interpreted path fails for (commit: e2e3fe7) (details)
Commit eee02739ed5a3b9a882604b2fbe72f9ad8abda21 by yao
[SPARK-34290][SQL][FOLLOWUP] Cleanup truncate table not supported for V2Table error

### What changes were proposed in this pull request?

Cleanup unreachable code.

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existed test.

Closes #32791 from pan3793/cleanup.

Authored-by: Cheng Pan <379377944@qq.com>
Signed-off-by: Kent Yao <yao@apache.org>
(commit: eee0273)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala (diff)
Commit 921abc51cf83bdfe93f7d188535476d9c9979781 by gurwls223
[SPARK-35636][PYTHON][DOCS][FOLLOW-UP] Restructure reference API files according to the layout

### What changes were proposed in this pull request?

This PR proposes to restructure API files according to the layout, see https://github.com/apache/spark/pull/32799. Now the pandas APIs on Spark are under a separate directory which is same level as other modules such as Spark SQL.

```bash
tree reference
```

**Before:**

```
reference
├── index.rst
├── ps_extensions.rst
├── ps_frame.rst
├── ps_general_functions.rst
├── ps_groupby.rst
├── ps_indexing.rst
├── ps_io.rst
├── ps_ml.rst
├── ps_series.rst
├── ps_window.rst
├── pyspark.ml.rst
├── pyspark.mllib.rst
├── pyspark.pandas.rst
├── pyspark.resource.rst
├── pyspark.rst
├── pyspark.sql.rst
├── pyspark.ss.rst
└── pyspark.streaming.rst
```

**After:**

```
reference
├── index.rst
├── pyspark.ml.rst
├── pyspark.mllib.rst
├── pyspark.pandas
│   ├── extensions.rst
│   ├── frame.rst
│   ├── general_functions.rst
│   ├── groupby.rst
│   ├── index.rst
│   ├── indexing.rst
│   ├── io.rst
│   ├── ml.rst
│   ├── series.rst
│   └── window.rst
├── pyspark.resource.rst
├── pyspark.rst
├── pyspark.sql.rst
├── pyspark.ss.rst
└── pyspark.streaming.rst
```

### Why are the changes needed?

To make the directory structure easier to follow.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually built and tested the docs.

Closes #32812 from HyukjinKwon/SPARK-35646-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(commit: 921abc5)
The file was addedpython/docs/source/reference/pyspark.pandas/extensions.rst
The file was removedpython/docs/source/reference/ps_groupby.rst
The file was removedpython/docs/source/reference/ps_extensions.rst
The file was addedpython/docs/source/reference/pyspark.pandas/general_functions.rst
The file was addedpython/docs/source/reference/pyspark.pandas/io.rst
The file was addedpython/docs/source/reference/pyspark.pandas/indexing.rst
The file was modifiedpython/docs/source/conf.py (diff)
The file was removedpython/docs/source/reference/ps_general_functions.rst
The file was removedpython/docs/source/reference/ps_indexing.rst
The file was addedpython/docs/source/reference/pyspark.pandas/index.rst
The file was modified.gitignore (diff)
The file was addedpython/docs/source/reference/pyspark.pandas/window.rst
The file was addedpython/docs/source/reference/pyspark.pandas/series.rst
The file was modifiedpython/docs/source/reference/index.rst (diff)
The file was removedpython/docs/source/reference/ps_frame.rst
The file was addedpython/docs/source/reference/pyspark.pandas/ml.rst
The file was removedpython/docs/source/reference/ps_ml.rst
The file was addedpython/docs/source/reference/pyspark.pandas/groupby.rst
The file was removedpython/docs/source/reference/pyspark.pandas.rst
The file was removedpython/docs/source/reference/ps_series.rst
The file was removedpython/docs/source/reference/ps_window.rst
The file was removedpython/docs/source/reference/ps_io.rst
The file was addedpython/docs/source/reference/pyspark.pandas/frame.rst
Commit 2a331177ba895b91662d94f134d7a02e59e13bfe by kabhwan.opensource
[SPARK-35312][SS] Introduce new Option in Kafka source to specify minimum number of records to read per trigger

### What changes were proposed in this pull request?
This patch introduces a new option to specify the minimum number of offsets to read per trigger i.e. minOffsetsPerTrigger and maxTriggerDelay to avoid the infinite wait for the trigger.

This new option will allow skipping trigger/batch when the number of records available in Kafka is low. This is a very useful feature in cases where we have a sudden burst of data at certain intervals in a day and data volume is low for the rest of the day.
'maxTriggerDelay' option will help to avoid cases of infinite delay in scheduling trigger and the trigger will happen irrespective of records available if the maxTriggerDelay time exceeds the last trigger. It would be an optional parameter with a default value of 15 mins. This option will be only applicable if minOffsetsPerTrigger is set.

minOffsetsPerTrigger option would be optional of course, but once specified it would take precedence over maxOffestsPerTrigger which will be honored only after minOffsetsPerTrigger is satisfied.

### Why are the changes needed?
There are many scenarios where there is a sudden burst of data at certain intervals in a day and data volume is low for the rest of the day. Tunning such jobs is difficult as decreasing trigger processing time increasing the number of batches and hence cluster resource usage and adds to small file issues. Increasing trigger processing time adds consumer lag. This patch tries to address this issue.

### How was this patch tested?
This patch was tested by adding test cases as well as manually on a cluster where the job was running for a full one day with a data burst happening once a day.
Here is the picture of databurst and hence consumer lag:
<img width="1198" alt="Screenshot 2021-04-29 at 11 39 35 PM" src="https://user-images.githubusercontent.com/1044003/116997587-9b2ab180-acfa-11eb-91fd-524802ce3316.png">

This is how the job behaved at burst time running every 4.5 mins (which is the specified trigger time):
<img width="1154" alt="Burst Time" src="https://user-images.githubusercontent.com/1044003/116997919-12f8dc00-acfb-11eb-9b0a-98387fc67560.png">

This is job behavior during the non-burst time where it is skipping 2 to 3 triggers and running once every 9 to 13.5 mins
<img width="1154" alt="Non Burst Time" src="https://user-images.githubusercontent.com/1044003/116998244-8b5f9d00-acfb-11eb-8340-33d47149ef81.png">

Here are some more stats from the two-run i.e. one normal run and the other with minOffsetsperTrigger set:

| Run | Data Size | Number of Batch Runs | Number of Files |
| ------------- | ------------- |------------- |------------- |
| Normal Run | 54.2 GB | 320 | 21968 |
| Run with minOffsetsperTrigger | 54.2 GB | 120 | 12104 |

Closes #32653 from satishgopalani/SPARK-35312.

Authored-by: Satish Gopalani <satish.gopalani@pubmatic.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(commit: 2a33117)
The file was modifiedsql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.java (diff)
The file was modifiedexternal/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala (diff)
The file was modifiedexternal/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala (diff)
The file was modifiedexternal/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala (diff)
The file was addedsql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMinRows.java
The file was modifiedexternal/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala (diff)
The file was modifiedexternal/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala (diff)
The file was modifieddocs/structured-streaming-kafka-integration.md (diff)
The file was addedsql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/CompositeReadLimit.java
Commit 1226b9badd2bc6681e4c533e0dfbc09443a86167 by viirya
[SPARK-35659][SS] Avoid write null to StateStore

### What changes were proposed in this pull request?

This patch removes the usage of putting null into StateStore.

### Why are the changes needed?

According to `get` method doc in `StateStore` API, it returns non-null row if the key exists. So basically we should avoid write null to `StateStore`. You cannot distinguish if the returned null row is because the key doesn't exist, or the value is actually null. And due to the defined behavior of `get`, it is quite easy to cause NPE error if the caller doesn't expect to get a null if the caller believes the key exists.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added test.

Closes #32796 from viirya/fix-ss-joinstatemanager.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(commit: 1226b9b)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala (diff)
Commit 66e38f48fed3bdc68487b8c284f98b3d06f55070 by dhyun
[SPARK-35384][SQL][FOLLOWUP] Fix Scala doc for removed method parameters

### What changes were proposed in this pull request?

Fix Scala doc for removed parameters for `InvokeLike.invoke`.

### Why are the changes needed?

#32532 forgot to update the Scala doc after removing 2 parameters for `InvokeLike.invoke`. This fixes it.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

N/A

Closes #32827 from sunchao/SPARK-35384-followup.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(commit: 66e38f4)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala (diff)
Commit 93a9dc479c098ef0989d64f38c2157f20ec4f32d by kabhwan.opensource
[SPARK-35602][SS] Update state schema to be able to accept long length JSON

### What changes were proposed in this pull request?

This PR fixes an issue that both key and value of state schema cannot accept long length (>65535 bytes) JSON.
To solve the problem explained below, JSON represented schema is divided into chunks whose maximum length is 65535 bytes, and each chunk is written by `DataOutputStream.writeUTF`.

As the solution changes the format of the schema, the version is also changes from `1` to `2` but old version schema is still acceptable to ensures backward compatibility.

### Why are the changes needed?

In the current implementation, writing state schema fails if the length of schema exceeds 65535 bytes and `UTFDataFormatException` is thrown.
It's due to the limitation of `DataOutputStream.writeUTF`.
`writeUTF` writes a length field first and it's 2 bytes width, meaning the maximum content length is limited to `2^16-1`=`65535` bytes.
https://docs.oracle.com/javase/8/docs/api/java/io/DataOutputStream.html#writeUTF-java.lang.String-

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New tests.

Closes #32788 from sarutak/fix-UTFDataFormatException.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(commit: 93a9dc4)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala (diff)
The file was addedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaHelper.scala
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala (diff)
Commit e79dd89cf6b513264d8205df1d4561cb07406d79 by gurwls223
[SPARK-35512][PYTHON] Fix OverflowError(cannot convert float infinity to integer) in partitionBy function

### What changes were proposed in this pull request?
Limit the batch size for `add_shuffle_key` in `partitionBy` function to fix `OverflowError: cannot convert float infinity to integer`

### Why are the changes needed?
It's not easy to write a UT, but I can use some simple code to explain the bug.
* Original code
```
        def add_shuffle_key(split, iterator):

            buckets = defaultdict(list)
            c, batch = 0, min(10 * numPartitions, 1000)

            for k, v in iterator:
                buckets[partitionFunc(k) % numPartitions].append((k, v))
                c += 1

                # check used memory and avg size of chunk of objects
                if (c % 1000 == 0 and get_used_memory() > limit
                        or c > batch):
                    n, size = len(buckets), 0
                    for split in list(buckets.keys()):
                        yield pack_long(split)
                        d = outputSerializer.dumps(buckets[split])
                        del buckets[split]
                        yield d
                        size += len(d)

                    avg = int(size / n) >> 20
                    # let 1M < avg < 10M
                    if avg < 1:
                        batch *= 1.5
                    elif avg > 10:
                        batch = max(int(batch / 1.5), 1)
                    c = 0
```
if `get_used_memory() > limit` always is `True` and `avg < 1` always is `True`, the variable `batch` will grow to infinity. then `batch = max(int(batch / 1.5), 1)` may raise `OverflowError` if `avg > 10` at some time.
* sample code to reproduce the bug
```
import sys

limit = 100
used_memory = 200
numPartitions = 64
c, batch = 0, min(10 * numPartitions, 1000)

while True:
    c += 1
    if (c % 1000 == 0 and used_memory > limit or c > batch):
        batch = batch * 1.5
        d = max(int(batch / 1.5), 1)
        print(c, batch)
```

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
It's not easy to write a UT, there is sample code to test
```
import sys

limit = 100
used_memory = 200
numPartitions = 64
c, batch = 0, min(10 * numPartitions, 1000)

while True:
    c += 1
    if (c % 1000 == 0 and used_memory > limit or c > batch):
        batch = min(sys.maxsize, batch * 1.5)
        d = max(int(batch / 1.5), 1)
        print(c, batch)
```

Closes #32667 from nolanliou/fix_partitionby.

Authored-by: liuqi <nolan.liou@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(commit: e79dd89)
The file was modifiedpython/pyspark/rdd.py (diff)
Commit 1b1a8e4eeeec9674f144bfbc011d63620dcccad2 by gengliang
[SPARK-30993][FOLLOWUP][SQL] Refactor LocalDateTimeUDT as YearUDT in UserDefinedTypeSuite

### What changes were proposed in this pull request?

Refactor LocalDateTimeUDT as YearUDT in UserDefinedTypeSuite

### Why are the changes needed?

As we are going to support java.time.LocalDateTime as an external type of TimestampWithoutTZ type https://github.com/apache/spark/pull/32814, registering java.time.LocalDateTime as UDT will cause test failures: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139469/testReport/
This PR is to unblock https://github.com/apache/spark/pull/32814.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #32824 from gengliangwang/UDTFollowUp.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(commit: 1b1a8e4)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala (diff)
Commit d12c1472f680883b67ab8441e8408b7e7867c58f by gurwls223
[SPARK-35682][PYTHON][TESTS] Pin mypy==0.812 in GitHub Actions CI

### What changes were proposed in this pull request?

Seems like the new MyPy version was released (0.901) and it broke the CI: https://github.com/python/mypy/releases.

```
python/pyspark/pandas/indexes/base.py:2007: error: Argument 1 to "from_tuples" of "MultiIndex" has incompatible type "Index"; expected "List[Tuple[Any, ...]]"
python/pyspark/testing/pandasutils.py:41: error: Library stubs not installed for "tabulate" (or incompatible with Python 3.6)
python/pyspark/testing/pandasutils.py:41: note: Hint: "python3 -m pip install types-tabulate"
python/pyspark/testing/pandasutils.py:41: note: (or run "mypy --install-types" to install all missing stub packages)
python/pyspark/testing/pandasutils.py:41: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports
Found 2 errors in 2 files (checked 312 source files)
```

I tried to fix these instances and pin it to the latest version (0.901). However, I realised that `python/pyspark/pandas/indexes/base.py:2007` has a logic issue (see https://github.com/databricks/koalas/pull/1325#discussion_r647889901 and https://github.com/databricks/koalas/pull/1325#discussion_r647890007) which cannot be fixed quickly.

Therefore, I decided to pin it to the previous version we used before for now, in order to unblock other PRs builds.

### Why are the changes needed?

To unblock other PRs.

### Does this PR introduce _any_ user-facing change?

No, dev-only.

### How was this patch tested?

I tested in my local but it has to be tested and passed in GitHub Actions in this PR.

Closes #32829 from HyukjinKwon/SPARK-35682.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(commit: d12c147)
The file was modified.github/workflows/build_and_test.yml (diff)
Commit afff42178cf8e5d85a32c9f8cb5077f1f1dacf6b by gurwls223
[SPARK-35647][PYTHON][DOCS] Restructure User Guide in PySpark documentation

### What changes were proposed in this pull request?

This PR proposes to restructure User Guide in PySpark documentation for pandas APIs on Spark.

**Before**

![Screen Shot 2021-06-08 at 8 47 41 PM](https://user-images.githubusercontent.com/6477701/121179493-cb85e280-c89a-11eb-8b93-552ebe7cd0a8.png)

**After**

![Screen Shot 2021-06-08 at 8 46 58 PM](https://user-images.githubusercontent.com/6477701/121179419-b3ae5e80-c89a-11eb-82a0-6dabbf1de12d.png)

Note that I mostly just moved the contents around except minor changes:
- Removing some questions in FAQ that don't make sense in Apache Spark
- Rename a subtitle "Working with pandas and PySpark" to "From/to pandas and PySpark DataFrames"

For renaming Koalas to either pandas-on-Spark or pandas APIs on Spark, it will be done at SPARK-35591

### Why are the changes needed?

For better readability.

### Does this PR introduce _any_ user-facing change?

Yes, it restructures the documentation as shown above.

### How was this patch tested?

I manually built the docs and tested.

Closes #32820 from HyukjinKwon/SPARK-35647.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(commit: afff421)
The file was addedpython/docs/source/user_guide/pandas_on_spark/pandas_pyspark.rst
The file was addedpython/docs/source/user_guide/pandas_on_spark/index.rst
The file was removedpython/docs/source/user_guide/ps_faq.rst
The file was modifiedpython/docs/source/user_guide/arrow_pandas.rst (diff)
The file was removedpython/docs/source/user_guide/ps_typehints.rst
The file was removedpython/docs/source/user_guide/ps_transform_apply.rst
The file was addedpython/docs/source/user_guide/pandas_on_spark/types.rst
The file was addedpython/docs/source/user_guide/sql/index.rst
The file was removedpython/docs/source/user_guide/ps_pandas_pyspark.rst
The file was modifiedpython/docs/source/user_guide/index.rst (diff)
The file was removedpython/docs/source/user_guide/ps_best_practices.rst
The file was addedpython/docs/source/user_guide/pandas_on_spark/transform_apply.rst
The file was removedpython/docs/source/user_guide/ps_types.rst
The file was addedpython/docs/source/user_guide/pandas_on_spark/typehints.rst
The file was removedpython/docs/source/user_guide/ps_options.rst
The file was removedpython/docs/source/user_guide/ps_from_to_dbms.rst
The file was addedpython/docs/source/user_guide/sql/arrow_pandas.rst
The file was addedpython/docs/source/user_guide/pandas_on_spark/faq.rst
The file was addedpython/docs/source/user_guide/pandas_on_spark/from_to_dbms.rst
The file was modifiedpython/docs/source/user_guide/python_packaging.rst (diff)
The file was addedpython/docs/source/user_guide/pandas_on_spark/best_practices.rst
The file was addedpython/docs/source/user_guide/pandas_on_spark/options.rst
Commit 8013f985a4d07a948b0c22638314162819bfb2be by wenchen
[SPARK-35378][SQL] Eagerly execute commands in QueryExecution instead of caller sides

### What changes were proposed in this pull request?
Currently, Spark eagerly executes commands on the caller side of `QueryExecution`, which is a bit hacky as `QueryExecution` is not aware of it and leads to confusion.

For example, if you run `sql("show tables").collect()`, you will see two queries with identical query plans in the web UI.
![image](https://user-images.githubusercontent.com/3182036/121193729-a72d0480-c8a0-11eb-8b12-379019607ad5.png)
![image](https://user-images.githubusercontent.com/3182036/121193822-bc099800-c8a0-11eb-9d2a-34ab1329e2f7.png)
![image](https://user-images.githubusercontent.com/3182036/121193845-c0ce4c00-c8a0-11eb-96d0-ef604a4dfab0.png)

The first query is triggered at `Dataset.logicalPlan`, which eagerly executes the command.
The second query is triggered at `Dataset.collect`, which is the normal query execution.

From the web UI, it's hard to tell that these two queries are caused by eager command execution.

This PR proposes to move the eager command execution to `QueryExecution`, and turn the command plan to `CommandResult` to indicate that command has been executed already. Now `sql("show tables").collect()` still triggers two queries, but the quey plans are not identical. The second query becomes:
![image](https://user-images.githubusercontent.com/3182036/121194850-b3659180-c8a1-11eb-9abf-2980f84f089d.png)

In addition to the UI improvements, this PR also has other benefits:
1. Simplifies code as caller side no need to worry about eager command execution. `QueryExecution` takes care of it.
2. It helps https://github.com/apache/spark/pull/32442 , where there can be more plan nodes above commands, and we need to replace commands with something like local relation that produces unsafe rows.

### Why are the changes needed?
Explained above.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
existing tests

Closes #32513 from beliefer/SPARK-35378.

Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: beliefer <beliefer@163.com>
Co-authored-by: Jiaan Geng <beliefer@163.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(commit: 8013f98)
The file was addedsql/core/src/main/scala/org/apache/spark/sql/execution/CommandResultExec.scala
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/Dataset.scala (diff)
The file was modifiedsql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala (diff)
The file was modifieddocs/sql-migration-guide.md (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala (diff)
The file was addedsql/core/src/main/scala/org/apache/spark/sql/expressions/CommandResult.scala
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala (diff)
The file was modifiedsql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala (diff)
Commit 9f010a8eb20502292b3bca42d17ce1dc357343b1 by kabhwan.opensource
[SPARK-35436][SS] RocksDBFileManager - save checkpoint to DFS

### What changes were proposed in this pull request?
The implementation for the save operation of RocksDBFileManager.

### Why are the changes needed?
Save all the files in the given local checkpoint directory as a committed version in DFS.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
New UT added.

Closes #32582 from xuanyuanking/SPARK-35436.

Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(commit: 9f010a8)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/util/Utils.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala (diff)
Commit 825b62086254ee5edeaf16fccf632674711b1bd8 by gurwls223
[SPARK-35687][SQL][TEST] PythonUDFSuite move assume into its methods

### What changes were proposed in this pull request?

Move `assume` into methods at `PythonUDFSuite`.

### Why are the changes needed?

When we run Spark test with such command:
`./build/mvn -Phadoop-2.7 -Phive -Phive-thriftserver -Pyarn -Pkubernetes clean test`

get this exception:
```
PythonUDFSuite:
org.apache.spark.sql.execution.python.PythonUDFSuite *** ABORTED ***
   java.lang.RuntimeException: Unable to load a Suite class that was discovered in the runpath: org.apache.spark.sql.execution.python.PythonUDFSuite
   at org.scalatest.tools.DiscoverySuite$.getSuiteInstance(DiscoverySuite.scala:81)
   at org.scalatest.tools.DiscoverySuite.$anonfun$nestedSuites$1(DiscoverySuite.scala:38)
   at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
   at scala.collection.Iterator.foreach(Iterator.scala:941)
   at scala.collection.Iterator.foreach$(Iterator.scala:941)
   at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
   at scala.collection.IterableLike.foreach(IterableLike.scala:74)
   at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
   at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
   at scala.collection.TraversableLike.map(TraversableLike.scala:238)
```

The test env has no PYSpark module so it failed.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

manual

Closes #32833 from ulysses-you/SPARK-35687.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(commit: 825b620)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala (diff)
Commit 84c5ca33f95e982a15efd514f103e4b85c273567 by gengliang
[SPARK-35664][SQL] Support java.time.LocalDateTime as an external type of TimestampWithoutTZ type

### What changes were proposed in this pull request?

In the PR, I propose to extend Spark SQL API to accept `java.time.LocalDateTime` as an external type of recently added new Catalyst type - `TimestampWithoutTZ`. The Java class `java.time.LocalDateTime` has a similar semantic to ANSI SQL timestamp without timezone type, and it is the most suitable to be an external type for `TimestampWithoutTZType`. In more details:

* Added `TimestampWithoutTZConverter` which converts java.time.LocalDateTime instances to/from internal representation of the Catalyst type `TimestampWithoutTZType` (to Long type). The `TimestampWithoutTZConverter` object uses new methods of DateTimeUtils:
  * localDateTimeToMicros() converts the input date time to the total length in microseconds.
  * microsToLocalDateTime() obtains a java.time.LocalDateTime
* Support new type `TimestampWithoutTZType` in RowEncoder via the methods createDeserializerForLocalDateTime() and createSerializerForLocalDateTime().
* Extended the Literal API to construct literals from `java.time.LocalDateTime` instances.

### Why are the changes needed?

To allow users parallelization of `java.time.LocalDateTime` collections, and construct timestamp without time zone columns. Also to collect such columns back to the driver side.

### Does this PR introduce _any_ user-facing change?

The PR extends existing functionality. So, users can parallelize instances of the java.time.LocalDateTime class and collect them back.
```
scala> val ds = Seq(java.time.LocalDateTime.parse("1970-01-01T00:00:00")).toDS
ds: org.apache.spark.sql.Dataset[java.time.LocalDateTime] = [value: timestampwithouttz]

scala> ds.collect()
res0: Array[java.time.LocalDateTime] = Array(1970-01-01T00:00)
```
### How was this patch tested?

New unit tests

Closes #32814 from gengliangwang/LocalDateTime.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(commit: 84c5ca3)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/SpecializedGettersReader.java (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala (diff)
The file was modifiedsql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala (diff)
Commit 3be7b29cd8beaf6d34640dec12a10bc033deecdc by gurwls223
Revert "[SPARK-35668][INFRA] Use "concurrency" syntax on Github Actions workflow"

This reverts commit f3dc549d9c4af90a4e01e9a3b8b6724aa4ceddca.
(commit: 3be7b29)
The file was modified.github/workflows/labeler.yml (diff)
The file was modified.github/workflows/benchmark.yml (diff)
The file was modified.github/workflows/notify_test_workflow.yml (diff)
The file was added.github/workflows/cancel_duplicate_workflow_runs.yml
The file was modified.github/workflows/test_report.yml (diff)
Commit ebb4858f7185c6525adc4b23bc89f0a8262bf940 by wenchen
[SPARK-35058][SQL] Group exception messages in hive/client

### What changes were proposed in this pull request?
This PR group exception messages in `sql/hive/src/main/scala/org/apache/spark/sql/hive/client`.

### Why are the changes needed?
It will largely help with standardization of error messages and its maintenance.

### Does this PR introduce _any_ user-facing change?
No. Error messages remain unchanged.

### How was this patch tested?
No new tests - pass all original tests to make sure it doesn't break any existing behavior.

Closes #32763 from beliefer/SPARK-35058.

Lead-authored-by: beliefer <beliefer@163.com>
Co-authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(commit: ebb4858)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala (diff)
The file was modifiedsql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala (diff)
The file was modifiedsql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala (diff)
The file was modifiedsql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala (diff)
Commit 43f6b4a8100fc2a55a6e4abbbc85ebac0dd4d8f0 by gengliang
[SPARK-35674][SQL][TESTS] Test timestamp without time zone in UDF

### What changes were proposed in this pull request?

Write tests for timestamp without time zone in UDF as input parameters and results.

### Why are the changes needed?

It follows https://github.com/apache/spark/pull/31779 to improve test coverage.

### Does this PR introduce _any_ user-facing change?

No
### How was this patch tested?

Unit test

Closes #32840 from gengliangwang/tswtzUDF.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(commit: 43f6b4a)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala (diff)
Commit 0b5683a4d5971ddcbe738ee6a412bad3019fdb68 by gengliang
[SPARK-35694][INFRA] Increase the default JVM stack size of SBT/Maven

### What changes were proposed in this pull request?

The jenkins SBT/Maven build keep failing with stack overflow error:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139542

We should increase the JVM stack size to 16MB.
Also, https://github.com/apache/spark/pull/32521 set the stack size to 256MB for Java 11 build, which might be too big since every thread will allocate this memory for the stack. This PR also set it as 16MB to make the config consistent.

### Why are the changes needed?

Fix SBT/Maven build.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Jenkins and GA tests.

Closes #32838 from gengliangwang/increaseSBTStackSize.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(commit: 0b5683a)
The file was modified.github/workflows/build_and_test.yml (diff)
The file was modifiedbuild/sbt (diff)
The file was modifiedpom.xml (diff)
Commit ce1636948b1fb8cfb8cc921896dc003949da1085 by wenchen
[SPARK-35650][SQL] Enhance `RepartitionByExpression` to make it coalesce partitions efficiently by AQE

### What changes were proposed in this pull request?

This PR enhances `RepartitionByExpression` to make it coalesce partitions efficiently by AQE. Usually used to merge small files.
The basic logic is: Spark first tries to coalesce partitions, if it cannot be coalesced, then use the local shuffle reader to read data to avoid exchange the data over the network.

Usage:
```sql
SELECT /*+ REPARTITION */ * FROM t
```
```scala
df.repartition()
```

For example:
coalesce small output files | local shuffle reader
--- | ---
![image](https://user-images.githubusercontent.com/5399861/120772533-fc8cad00-c552-11eb-977e-5bb61b84cbe2.png)| ![image](https://user-images.githubusercontent.com/5399861/120772324-c6e7c400-c552-11eb-9daa-f6b5021fd1b9.png)

### Why are the changes needed?

Coalesce partitions efficiently.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #32781 from wangyum/SPARK-35650.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(commit: ce16369)
The file was modifieddocs/sql-performance-tuning.md (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala (diff)
Commit 7d8181b62f17a202ba584c7bba65b61ec4724db2 by wenchen
[SPARK-35390][SQL] Handle type coercion when resolving V2 functions

### What changes were proposed in this pull request?

Handle type coercion when resolving V2 function. In particular:
- prior to evaluating function arguments, insert cast whenever the argument type doesn't match the expected input type.
- use `BoundFunction.inputTypes()` to lookup magic method for scalar function

### Why are the changes needed?

For V2 functions, the actual argument types should not necessarily match those of the input types, and Spark should handle type coercion whenever it is needed.

### Does this PR introduce _any_ user-facing change?

Yes. Now V2 function resolution should be able to handle type coercion properly.

### How was this patch tested?

Added a few new tests.

Closes #32764 from sunchao/SPARK-35390.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(commit: 7d8181b)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala (diff)
The file was modifiedsql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ScalarFunction.java (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ApplyFunctionExpression.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/V2Aggregator.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala (diff)
The file was modifiedsql/core/src/test/java/test/org/apache/spark/sql/connector/catalog/functions/JavaLongAdd.java (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateMapObjectsSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala (diff)
Commit f4c896885dfae193267da1375268da70851b9619 by wenchen
[SPARK-35693][SS][TEST] Add plan check for stream-stream join unit test

### What changes were proposed in this pull request?

The changed [unit test](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala#L566) was introduce in https://github.com/apache/spark/pull/21587, to fix the planner side of thing for stream-stream join. Ideally check the query result should catch the bug, but it would be better to add plan check to make the purpose of unit test more clearly and catch future bug from planner change.

### Why are the changes needed?

Improve unit test.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Changed test itself.

Closes #32836 from c21/ss-test.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(commit: f4c8968)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala (diff)
Commit 224ebae2730802b7b852eb0fe7e9d7d7ec59677d by wenchen
[SPARK-35661][SQL] Allow deserialized off-heap memory entry

### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/32534

#32534 proposed a use case to use `DeserializedMemoryEntry` to store off-heap data, and let Spark release the memory via the `AutoCloseable` interface. However, there is one more problem: `DeserializedMemoryEntry` always reports its size as on-heap size, which is inaccurate. If the Spark cluster is configured with small on-heap size and large off-heap size, this will trigger a lot of spilling.

This PR makes `DeserializedMemoryEntry` truly support off-heap data. Now the caller side can cache off-heap data with a new storage level `OFF_HEAP_ONLY_DESER`.

### Why are the changes needed?

correct the memory counting for off-heap data.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

updated test

Closes #32800 from cloud-fan/follow.

Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Co-authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(commit: 224ebae)
The file was modifiedcore/src/main/scala/org/apache/spark/storage/BlockManager.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/storage/StorageLevel.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala (diff)
The file was modifiedcore/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala (diff)
The file was modifiedcore/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala (diff)
Commit 313dc2d4ed581e613078273cf4c9165186490765 by wenchen
[SPARK-35697][SQL][TESTS] Test TimestampWithoutTZType as ordered and atomic type

### What changes were proposed in this pull request?
Add `TimestampWithoutTZType` to `DataTypeTestUtils.ordered`/`atomicTypes`, and implement values generation of those types in `LiteralGenerator`/`RandomDataGenerator`. In this way, the types will be tested automatically in:
1. ArithmeticExpressionSuite:
    - "function least"
    - "function greatest"
2. PredicateSuite
    - "BinaryComparison consistency check"
    - "AND, OR, EqualTo, EqualNullSafe consistency check"
3. ConditionalExpressionSuite
    - "if"
4. RandomDataGeneratorSuite
    - "Basic types"
5. CastSuite
    - "null cast"
    - "up-cast"
    - "SPARK-27671: cast from nested null type in struct"
6. OrderingSuite
    - "GenerateOrdering with TimestampWithoutTZType"
7. PredicateSuite
    - "IN with different types"
8. UnsafeRowSuite
    - "calling get(ordinal, datatype) on null columns"
9. SortSuite
    - "sorting on TimestampWithoutTZType ..."

### Why are the changes needed?
To improve test coverage.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the affected test suites.

Closes #32843 from gengliangwang/atomicTest.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(commit: 313dc2d)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralGenerator.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeTestUtils.scala (diff)
Commit 519be238be3d6c02c06f8c5b967e6e3ce108c653 by srowen
[SPARK-35423][ML] PCA results should be consistent, If the Matrix contains both Sparse and Dense vectors

### What changes were proposed in this pull request?
If the dataset contains mix of sparse and dense vectors output of PCA seems different. The issue here is we check only the first row's Vector type. If the first row is dense and rest all the row's are sparse, we compute PCA based on dense path. Similarly, if only first row in Sparse and rest all the rows are dense, we compute based on Sparse computation path.

Following datasets will produce different results with PCA, even though the data is same, except first row type is sparse.
```
val data1 = Array(
  Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
  Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
  Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)
```

```
+-----------------------------------------------------------+
|pcaFeatures                                                |
+-----------------------------------------------------------+
|[1.6485728230883807,-4.013282700516296,-5.524543751369388] |
|[-4.645104331781534,-1.1167972663619026,-5.524543751369387]|
|[-6.428880535676489,-5.337951427775355,-5.524543751369389] |
+-----------------------------------------------------------+

```
```
val data1 = Array(
  Vectors.dense(0.0, 1.0, 0.0, 7.0, 0.0 ),
  Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
  Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)
```

```
+------------------------------------------------------------+
|pcaFeatures                                                 |
+------------------------------------------------------------+
|[1.6485728230883814,-4.0132827005162985,-1.0091435193998504]|
|[-4.645104331781533,-1.1167972663619048,-1.0091435193998501]|
|[-6.428880535676488,-5.337951427775359,-1.009143519399851]  |
+------------------------------------------------------------+
```

### Why are the changes needed?
To fix inconsistent result if dataset contains both sparse and dense vectors. We need to treat the entire metrics as Sparse ONLY if all the rows are sparse. Otherwise we need to consider the matrix as dense. This PR can be a followup for the PR: https://github.com/apache/spark/pull/23126

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UTs

Closes #32734 from shahidki31/shahid/pca.

Authored-by: shahid <shahidki31@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
(commit: 519be23)
The file was modifiedmllib/src/test/scala/org/apache/spark/ml/feature/PCASuite.scala (diff)
The file was modifiedmllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala (diff)
The file was modifiedmllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala (diff)
Commit f49bf1a072dd31b74503bbc50b3ed4d7df050402 by wenchen
[SPARK-34382][SQL] Support LATERAL subqueries

### What changes were proposed in this pull request?
This PR adds support for lateral subqueries. A lateral subquery is a subquery preceded by the `LATERAL` keyword in the FROM clause of a query that can reference columns in the preceding FROM items. For example:
```sql
SELECT * FROM t1, LATERAL (SELECT * FROM t2 WHERE t1.a = t2.c)
```
A new subquery expression`LateralSubquery` is used to represent a lateral subquery. It is similar to `ScalarSubquery` but can return multiple rows and columns. A new logical unary node `LateralJoin` is used to represent a lateral join.

Here is the analyzed plan for the above query:
```scala
Project [a, b, c, d]
+- LateralJoin lateral-subquery [a], Inner
   :  +- Project [c, d]
   :     +- Filter (outer(a) = c)
   :        +- Relation [c, d]
   +- Relation [a, b]
```

Similar to a correlated subquery, a lateral subquery can be viewed as a dependent (nested loop) join where the evaluation of the right subtree depends on the current value of the left subtree.  The same technique to decorrelate a subquery is used to decorrelate a lateral join:
```scala
Project [a, b, c, d]
+- LateralJoin lateral-subquery [a && a = c], Inner  // pull up correlated predicates as join conditions
   :  +- Project [c, d]
   :     +- Relation [c, d]
   +- Relation [a, b]
```
Then the lateral join can be rewritten into a normal join:
```scala
Join Inner (a = c)
:- Relation [a, b]
+- Relation [c, d]
```

#### Follow-ups:
1. Similar to rewriting correlated scalar subqueries, rewriting lateral joins is also subject to the COUNT bug (See SPARK-15370 for more details). This is **not** handled in the current PR as it requires a sizeable amount of refactoring. It will be addressed in a subsequent PR (SPARK-35551).
2. Currently Spark does use outer query references to resolve star expressions in subqueries. This is not lateral subquery specific and can be handled in a separate PR (SPARK-35618)

### Why are the changes needed?
To support an ANSI SQL feature.

### Does this PR introduce _any_ user-facing change?
Yes. It allows users to use lateral subqueries in the FROM clause of a query.

### How was this patch tested?
- Parser test: `PlanParserSuite.scala`
- Analyzer test: `ResolveSubquerySuite.scala`
- Optimizer test: `PullupCorrelatedPredicatesSuite.scala`
- SQL test: `join-lateral.sql`, `postgreSQL/join.sql`

Closes #32303 from allisonwang-db/spark-34382-lateral.

Lead-authored-by: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(commit: f49bf1a)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala (diff)
The file was addedsql/core/src/test/resources/sql-tests/inputs/join-lateral.sql
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala (diff)
The file was modifieddocs/sql-ref-ansi-compliance.md (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PlanHelper.scala (diff)
The file was modifiedsql/core/src/test/resources/sql-tests/inputs/postgreSQL/join.sql (diff)
The file was modifiedsql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala (diff)
The file was modifiedsql/core/src/test/resources/sql-tests/results/postgreSQL/join.sql.out (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala (diff)
The file was modifiedsql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-join.sql.out (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesSuite.scala (diff)
The file was addedsql/core/src/test/resources/sql-tests/results/join-lateral.sql.out
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala (diff)
The file was modifiedsql/core/src/test/resources/sql-tests/inputs/udf/postgreSQL/udf-join.sql (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala (diff)
The file was modifiedsql/core/src/test/resources/sql-tests/inputs/postgreSQL/groupingsets.sql (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala (diff)
Commit 74b3df86f347ed6279bf127153a2c8c4927af21e by gengliang
[SPARK-35698][SQL] Support casting of timestamp without time zone to strings

### What changes were proposed in this pull request?

Extend the Cast expression and support TimestampWithoutTZType in casting to StringType.

### Why are the changes needed?

To conform the ANSI SQL standard which requires to support such casting.

### Does this PR introduce _any_ user-facing change?

No, the new timestamp type is not released yet.

### How was this patch tested?

Unit test

Closes #32846 from gengliangwang/tswtzToString.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(commit: 74b3df8)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala (diff)
Commit 3c66c11aa60f43f2cdfed221c55bc99e66653695 by ueshin
[SPARK-35601][PYTHON] Complete arithmetic operators involving bool literals, Series, and Index

### What changes were proposed in this pull request?

Completing arithmetic operators involving bool literals, Series, and Index consists of two main tasks:
- Support arithmetic operations against bool literals
- Support operators (+, *) between bool Series/Indexes.

### Why are the changes needed?

Arithmetic operators involving bool literals, Series, and Index are incomplete now.
We ought to match pandas' behaviors.

### Does this PR introduce _any_ user-facing change?

Yes.

Newly supported operations example:
```py
>>> ps.Series([1, 2, 3]) + True
0    2
1    3
2    4
dtype: int64
>>> ps.Series([1, 2, 3]) + False
0    1
1    2
2    3
dtype: int64
>>> ps.Series([True, False, True]) + True
0    True
1    True
2    True
dtype: bool
>>> ps.Series([True, False, True]) + False
0     True
1    False
2     True
dtype: bool
>>> ps.Series([True, False, True]) * True
0     True
1    False
2     True
dtype: bool
>>> ps.Series([True, False, True]) * False
0    False
1    False
2    False
dtype: bool
>>> ps.set_option('compute.ops_on_diff_frames', True)
>>> ps.Series([True, True, False]) + ps.Series([True, False, True])
0    True
1    True
2    True
dtype: bool
>>> ps.Series([True, True, False]) * ps.Series([True, False, True])
0     True
1    False
2    False
dtype: bool
```
Before the change, operations above are not supported, raising a TypeError such as
```py
>>> ps.Series([True, False, True]) + True
Traceback (most recent call last):
...
TypeError: Addition can not be applied to booleans and the given type.
>>> ps.Series([True, False, True]) + False
Traceback (most recent call last):
...
TypeError: Addition can not be applied to booleans and the given type.
```

### How was this patch tested?

Unit tests.

Closes #32785 from xinrong-databricks/datatypeops_arith_bool.

Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
(commit: 3c66c11)
The file was modifiedpython/pyspark/pandas/tests/data_type_ops/testing_utils.py (diff)
The file was modifiedpython/pyspark/pandas/data_type_ops/num_ops.py (diff)
The file was modifiedpython/pyspark/pandas/data_type_ops/boolean_ops.py (diff)
The file was modifiedpython/pyspark/pandas/tests/data_type_ops/test_boolean_ops.py (diff)
The file was modifiedpython/pyspark/pandas/tests/data_type_ops/test_num_ops.py (diff)
The file was modifiedpython/pyspark/pandas/data_type_ops/base.py (diff)
Commit e9d60156c48d2e3add916fee7c37fecfb4f2f49f by gurwls223
[SPARK-35705][PYTHON] Adjust pandas-on-spark `test_groupby_multiindex_columns` test for different pandas versions

### What changes were proposed in this pull request?

Adjust pandas-on-spark test_groupby_multiindex_columns test in order to pass with different pandas versions.

### Why are the changes needed?

pandas had introduced bugs as below:

- For pandas 1.1.3 and 1.1.4
Type error: only integer scalar arrays can be converted to a scalar index

- For pandas < 1.0.4
Type error: Can only tuple-index with a MultiIndex

We ought to adjust `test_groupby_multiindex_columns` tests by comparing with a predefined return value, rather than comparing with the pandas return value in the pandas versions above.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit tests.

Closes #32851 from xinrong-databricks/SPARK-35705.

Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(commit: e9d6015)
The file was modifiedpython/pyspark/pandas/tests/test_groupby.py (diff)
Commit 94b66f5e28d53498ae55be54c6c581fe69ae8398 by gurwls223
[MINOR][SQL] Modify the example of rand and randn

### What changes were proposed in this pull request?

This PR fixes the examples of `rand` and `randn`.

### Why are the changes needed?

SPARK-23643 (#20793) fixes an issue which is related to the seed and it causes the result of `rand` and `randn`.
Now the results of `SELECT rand(0)` and `SELECT randn((null)` are `0.7604953758285915` and `1.6034991609278433` respectively, and they should be deterministic because the number of partitions are always 1 (the leaf node is `OneRowRelation`).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Built the doc and confirmed it.
![rand-doc](https://user-images.githubusercontent.com/4736016/121359059-145a9b80-c96e-11eb-84c2-2f2b313614f3.png)

Closes #32844 from sarutak/rand-example.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(commit: 94b66f5)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala (diff)
Commit 7e99b65295ec0234d463fcd5e858a8d1a4dd5c36 by wenchen
[SPARK-35194][SQL][FOLLOWUP] Change Seq to collections.Seq in NestedColumnAliasing to work with Scala 2.13

### What changes were proposed in this pull request?

This PR changes an occurrence of `Seq` to `collections.Seq` in `NestedColumnAliasing`.

### Why are the changes needed?

In the current master, `NestedColumnAliasing` doesn't work with Scala 2.13 and the relevant tests fail.
The following are examples.

* `NestedColumnAliasingSuite`
* Subclasses of `SchemaPruningSuite`
* `ColumnPruningSuite`

```
NestedColumnAliasingSuite:
[info] - Pushing a single nested field projection *** FAILED *** (14 milliseconds)
[info]   scala.MatchError: (none#211451,ArrayBuffer(name#211451.middle)) (of class scala.Tuple2)
[info]   at org.apache.spark.sql.catalyst.optimizer.NestedColumnAliasing$.$anonfun$getAttributeToExtractValues$5(NestedColumnAliasing.scala:258)
[info]   at scala.collection.StrictOptimizedMapOps.flatMap(StrictOptimizedMapOps.scala:31)
[info]   at scala.collection.StrictOptimizedMapOps.flatMap$(StrictOptimizedMapOps.scala:30)
[info]   at scala.collection.immutable.HashMap.flatMap(HashMap.scala:39)
[info]   at org.apache.spark.sql.catalyst.optimizer.NestedColumnAliasing$.getAttributeToExtractValues(NestedColumnAliasing.scala:258)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Ran tests mentioned above and all passed with Scala 2.13.

Closes #32848 from sarutak/followup-SPARK-35194-2.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(commit: 7e99b65)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala (diff)
Commit 87d2ffbbcfc6c791b825e01e10319a3fa12a2dcc by wenchen
[MINOR][SQL] No need to normolize name for built-in functions

### What changes were proposed in this pull request?
Add an `internalRegisterFunction` for the built-in function registry. So that
we can skip the unnecessary function normalization.

### Why are the changes needed?
small refactor

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
existing ut

Closes #32842 from linhongliu-db/function-refactor.

Lead-authored-by: Linhong Liu <linhong.liu@databricks.com>
Co-authored-by: Linhong Liu <67896261+linhongliu-db@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(commit: 87d2ffb)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala (diff)
Commit 8dde20a993c895a5960f66a066684f224116958b by yao
[SPARK-35675][SQL] EnsureRequirements remove shuffle should respect PartitioningCollection

### What changes were proposed in this pull request?

Add `PartitioningCollection` in EnsureRequirements during remove shuffle.

### Why are the changes needed?

Currently `EnsureRequirements` only check if child has semantic equal `HashPartitioning` and remove
redundant shuffle. We can enhance this case using `PartitioningCollection`.

### Does this PR introduce _any_ user-facing change?

Yes, plan might be changed.

### How was this patch tested?

Add test.

Closes #32815 from ulysses-you/shuffle-node.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
(commit: 8dde20a)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala (diff)
Commit aa3de4077302fe7e0b23b01a338c7feab0e5974e by max.gekk
[SPARK-35679][SQL] instantToMicros overflow

### Why are the changes needed?
With Long.minValue cast to an instant, secs will be floored in function microsToInstant and cause overflow when multiply with Micros_per_second

```
def microsToInstant(micros: Long): Instant = {
  val secs = Math.floorDiv(micros, MICROS_PER_SECOND)
  // Unfolded Math.floorMod(us, MICROS_PER_SECOND) to reuse the result of
  // the above calculation of `secs` via `floorDiv`.
  val mos = micros - secs * MICROS_PER_SECOND  <- it will overflow here
  Instant.ofEpochSecond(secs, mos * NANOS_PER_MICROS)
}
```

But the overflow is acceptable because it won't produce any change to the result

However, when convert the instant back to micro value, it will raise Overflow Error

```
def instantToMicros(instant: Instant): Long = {
  val us = Math.multiplyExact(instant.getEpochSecond, MICROS_PER_SECOND) <- It overflow here
  val result = Math.addExact(us, NANOSECONDS.toMicros(instant.getNano))
  result
}
```

Code to reproduce this error
```
instantToMicros(microToInstant(Long.MinValue))
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Test added

Closes #32839 from dgd-contributor/SPARK-35679_instantToMicro.

Authored-by: dgd-contributor <dgd_contributor@viettel.com.vn>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(commit: aa3de40)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala (diff)
Commit cadd3a0588eeed42c6742ae1b7a2eaa85bd8a3af by ueshin
[SPARK-35474] Enable disallow_untyped_defs mypy check for pyspark.pandas.indexing

### What changes were proposed in this pull request?

Adds more type annotations in the file:
`python/pyspark/pandas/spark/indexing.py`
and fixes the mypy check failures.

### Why are the changes needed?

We should enable more disallow_untyped_defs mypy checks.

### Does this PR introduce _any_ user-facing change?

Yes.
This PR adds more type annotations in pandas APIs on Spark module, which can impact interaction with development tools for users.

### How was this patch tested?

The mypy check with a new configuration and existing tests should pass.
`./dev/lint-python`

Closes #32738 from pingsutw/SPARK-35474.

Authored-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
(commit: cadd3a0)
The file was modifiedpython/mypy.ini (diff)
The file was modifiedpython/pyspark/pandas/indexing.py (diff)
The file was modifiedpython/pyspark/pandas/generic.py (diff)
Commit 5280f02747eed9849e4a64562d38aee11e21616f by wenchen
[SPARK-35673][SQL] Fix user-defined hint and unrecognized hint in subquery

### What changes were proposed in this pull request?

Use `UnresolvedHint.resolved = child.resolved` instead `UnresolvedHint.resolved = false`, then the plan contains `UnresolvedHint` child can be optimized by rule in batch `Resolution`.

For instance, before this pr, the following plan can't be optimized by `ResolveReferences`.
```
!'Project [*]
+- SubqueryAlias __auto_generated_subquery_name
    +- UnresolvedHint use_hash
       +- Project [42 AS 42#10]
          +- OneRowRelation
```

### Why are the changes needed?

fix hint in subquery bug

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test.

Closes #32841 from cfmcgrady/SPARK-35673.

Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(commit: 5280f02)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala (diff)
Commit 88f1d82a467ca8cc366caaa59e2c8c8523cc5378 by wenchen
[SPARK-34524][SQL][FOLLOWUP] Remove unused checkAlterTablePartition in CheckAnalysis.scala

### What changes were proposed in this pull request?

#31637 removed the usage of `CheckAnalysis.checkAlterTablePartition` but didn't remove the function.

### Why are the changes needed?

To removed an unused function.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #32855 from imback82/SPARK-34524-followup.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(commit: 88f1d82)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala (diff)
Commit 41806921352c54550144034d24500ae807bd774a by gengliang
[SPARK-35711][SQL] Support casting of timestamp without time zone to timestamp type

### What changes were proposed in this pull request?

Extend the Cast expression and support TimestampWithoutTZType in casting to TimestampType.

### Why are the changes needed?

To conform the ANSI SQL standard which requires to support such casting.

### Does this PR introduce _any_ user-facing change?

No, the new timestamp type is not released yet.

### How was this patch tested?

Unit test

Closes #32864 from gengliangwang/castToTimestamp.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(commit: 4180692)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala (diff)
Commit e2e3fe77823387f6d4164eede05bf077b4235c87 by viirya
[SPARK-35653][SQL] Fix CatalystToExternalMap interpreted path fails for Map with case classes as keys or values

### What changes were proposed in this pull request?
Use the key/value LambdaFunction to convert the elements instead of
using CatalystTypeConverters.createToScalaConverter. This is how it is
done in MapObjects and that correctly handles Arrays with case classes.

### Why are the changes needed?
Before these changes the added test cases would fail with the following:
```
[info] - encode/decode for map with case class as value: Map(1 -> IntAndString(1,a)) (interpreted path) *** FAILED *** (64 milliseconds)
[info]   Encoded/Decoded data does not match input data
[info]
[info]   in:  Map(1 -> IntAndString(1,a))
[info]   out: Map(1 -> [1,a])
[info]   types: scala.collection.immutable.Map$Map1 [info]
[info]   Encoded Data: [org.apache.spark.sql.catalyst.expressions.UnsafeMapData5ecf5d9e]
[info]   Schema: value#823
[info]   root
[info]   -- value: map (nullable = true)
[info]       |-- key: integer
[info]       |-- value: struct (valueContainsNull = true)
[info]       |    |-- i: integer (nullable = false)
[info]       |    |-- s: string (nullable = true)
[info]
[info]
[info]   fromRow Expressions:
[info]   catalysttoexternalmap(lambdavariable(CatalystToExternalMap_key, IntegerType, false, 178), lambdavariable(CatalystToExternalMap_key, IntegerType, false, 178), lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179), if (isnull(lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179))) null else newInstance(class org.apache.spark.sql.catalyst.encoders.IntAndString), input[0, map<int,struct<i:int,s:string>>, true], interface scala.collection.immutable.Map
[info]   :- lambdavariable(CatalystToExternalMap_key, IntegerType, false, 178)
[info]   :- lambdavariable(CatalystToExternalMap_key, IntegerType, false, 178)
[info]   :- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
[info]   :- if (isnull(lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179))) null else newInstance(class org.apache.spark.sql.catalyst.encoders.IntAndString)
[info]   :  :- isnull(lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179))
[info]   :  :  +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
[info]   :  :- null
[info]   :  +- newInstance(class org.apache.spark.sql.catalyst.encoders.IntAndString)
[info]   :     :- assertnotnull(lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).i)
[info]   :     :  +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).i
[info]   :     :     +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
[info]   :     +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).s.toString
[info]   :        +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).s
[info]   :           +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
[info]   +- input[0, map<int,struct<i:int,s:string>>, true] (ExpressionEncoderSuite.scala:627)
```
So using a map with cases classes for keys or values and using the interpreted path would incorrect deserialize data from the catalyst representation.

### Does this PR introduce _any_ user-facing change?
Yes, it fixes the bug.

### How was this patch tested?
Existing and new unit tests in the ExpressionEncoderSuite

Closes #32783 from eejbyfeldt/fix-interpreted-path-for-map-with-case-classes.

Authored-by: Emil Ejbyfeldt <eejbyfeldt@liveintent.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(commit: e2e3fe7)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala (diff)