Changes

Summary

  1. [SPARK-32953][PYTHON][SQL] Add Arrow self_destruct support to toPandas (commit: 9b875ce) (details)
  2. [SPARK-34419][SQL] Move PartitionTransforms.scala to scala directory (commit: cd38287) (details)
  3. [SPARK-34408][PYTHON] Refactor spark.udf.register to share the same path (commit: 92a8346) (details)
  4. [SPARK-34420][SQL] Throw exception if non-streaming Deduplicate is not (commit: e005385) (details)
  5. [SPARK-34418][SQL][TESTS] Check partitions existence after v1 TRUNCATE (commit: 91be583) (details)
  6. [SPARK-34380][SQL] Support ifExists for ALTER TABLE ... UNSET (commit: 9a566f8) (details)
  7. [SPARK-34428][BUILD] Update sbt version to 1.4.7 (commit: f2e1468) (details)
  8. [SPARK-33434][PYTHON][DOCS] Added RuntimeConfig to PySpark docs (commit: e3b6e4a) (details)
  9. [SPARK-34416][SQL] Adding support for user provided schema url in Avro (commit: 3d39dfa) (details)
Commit 9b875ceada60732899053fbd90728b4944d1c03d by cutlerb
[SPARK-32953][PYTHON][SQL] Add Arrow self_destruct support to toPandas

### What changes were proposed in this pull request?

Creating a Pandas dataframe via Apache Arrow currently can use twice as much memory as the final result, because during the conversion, both Pandas and Arrow retain a copy of the data. Arrow has a "self-destruct" mode now (Arrow >= 0.16) to avoid this, by freeing each column after conversion. This PR integrates support for this in toPandas, handling a couple of edge cases:

self_destruct has no effect unless the memory is allocated appropriately, which is handled in the Arrow serializer here. Essentially, the issue is that self_destruct frees memory column-wise, but Arrow record batches are oriented row-wise:

```
Record batch 0: allocation 0: column 0 chunk 0, column 1 chunk 0, ...
Record batch 1: allocation 1: column 0 chunk 1, column 1 chunk 1, ...
```

In this scenario, Arrow will drop references to all of column 0's chunks, but no memory will actually be freed, as the chunks were just slices of an underlying allocation. The PR copies each column into its own allocation so that memory is instead arranged as so:

```
Record batch 0: allocation 0 column 0 chunk 0, allocation 1 column 1 chunk 0, ...
Record batch 1: allocation 2 column 0 chunk 1, allocation 3 column 1 chunk 1, ...
```

The optimization is disabled by default, and can be enabled with the Spark SQL conf "spark.sql.execution.arrow.pyspark.selfDestruct.enabled" set to "true". We can't always apply this optimization because it's more likely to generate a dataframe with immutable buffers, which Pandas doesn't always handle well, and because it is slower overall (since it only converts one column at a time instead of in parallel).

### Why are the changes needed?

This lets us load larger datasets - in particular, with N bytes of memory, before we could never load a dataset bigger than N/2 bytes; now the overhead is more like N/1.25 or so.

### Does this PR introduce _any_ user-facing change?

Yes - it adds a new SQL conf "spark.sql.execution.arrow.pyspark.selfDestruct.enabled"

### How was this patch tested?

See the [mailing list](http://apache-spark-developers-list.1001551.n3.nabble.com/DISCUSS-Reducing-memory-usage-of-toPandas-with-Arrow-quot-self-destruct-quot-option-td30149.html) - it was tested with Python memory_profiler. Unit tests added to check memory within certain bounds and correctness with the option enabled.

Closes #29818 from lidavidm/spark-32953.

Authored-by: David Li <li.davidm96@gmail.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
(commit: 9b875ce)
The file was modifiedpython/pyspark/sql/pandas/conversion.py (diff)
The file was modifiedpython/pyspark/sql/tests/test_arrow.py (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala (diff)
Commit cd38287ce2f39fbfaf0aa7ccfce7f2cd93f6cead by viirya
[SPARK-34419][SQL] Move PartitionTransforms.scala to scala directory

### What changes were proposed in this pull request?

Move `PartitionTransforms.scala` from `sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions` to `sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions`.

### Why are the changes needed?

We should put java/scala files to their corresponding directories.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

N/A

Closes #31546 from sunchao/SPARK-34419.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(commit: cd38287)
The file was addedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PartitionTransforms.scala
The file was removedsql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/PartitionTransforms.scala
Commit 92a83463c9419eef5c22b9cb382e0ef6a77fe35d by gurwls223
[SPARK-34408][PYTHON] Refactor spark.udf.register to share the same path to generate UDF instance

### What changes were proposed in this pull request?

This PR proposes to use `_create_udf` where we need to create `UserDefinedFunction` to maintain codes easier.

### Why are the changes needed?

For the better readability of codes and maintenance.

### Does this PR introduce _any_ user-facing change?

No, refactoring.

### How was this patch tested?

Ran the existing unittests. CI in this PR should test it out too.

Closes #31537 from HyukjinKwon/SPARK-34408.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
(commit: 92a8346)
The file was modifiedpython/pyspark/sql/udf.py (diff)
Commit e0053853c90d39ef6de9d59fb933525e20bae1fa by viirya
[SPARK-34420][SQL] Throw exception if non-streaming Deduplicate is not replaced by aggregate

### What changes were proposed in this pull request?

This patch proposes to throw exception if non-streaming `Deduplicate` is not replaced by aggregate in query planner.

### Why are the changes needed?

We replace some operations in the query optimizer. For them we throw some exceptions accordingly in query planner if these logical nodes are not replaced. But `Deduplicate` is missing and it opens a possible hole. For code consistency and to prevent possible unexpected query planning error, we should add similar exception case to query planner.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test.

Closes #31547 from viirya/minor-deduplicate.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(commit: e005385)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala (diff)
Commit 91be583fb88fbc41510b40c9123948b9c8edf53f by dhyun
[SPARK-34418][SQL][TESTS] Check partitions existence after v1 TRUNCATE TABLE

### What changes were proposed in this pull request?
Add a test and modify an existing one to check that partitions still exist after v1 `TRUNCATE TABLE`.

### Why are the changes needed?
To improve test coverage.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running new test:
```
$ build/sbt -Phive -Phive-thriftserver "test:testOnly *TruncateTableSuite"
```

Closes #31544 from MaxGekk/test-truncate-partitioned-table.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(commit: 91be583)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/TruncateTableSuite.scala (diff)
Commit 9a566f83a0e126742473574476c6381f58394aed by dhyun
[SPARK-34380][SQL] Support ifExists for ALTER TABLE ... UNSET TBLPROPERTIES for v2 command

### What changes were proposed in this pull request?

This PR proposes to support `ifExists` flag for v2 `ALTER TABLE ... UNSET TBLPROPERTIES` command. Currently, the flag is not respected and the command behaves as `ifExists = true` where the command always succeeds when the properties do not exist.

### Why are the changes needed?

To support `ifExists` flag and align with v1 command behavior.

### Does this PR introduce _any_ user-facing change?

Yes, now if the property does not exist and `IF EXISTS` is not specified, the command will fail:
```
ALTER TABLE t UNSET TBLPROPERTIES ('unknown') // Fails with "Attempted to unset non-existent property 'unknown'"
ALTER TABLE t UNSET TBLPROPERTIES IF EXISTS ('unknown') // OK
```

### How was this patch tested?

Added new test

Closes #31494 from imback82/AlterTableUnsetPropertiesIfExists.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(commit: 9a566f8)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala (diff)
The file was addedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableProperties.scala
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala (diff)
Commit f2e14684961de2ef5a692a77f1e09655da3398f4 by dhyun
[SPARK-34428][BUILD] Update sbt version to 1.4.7

### What changes were proposed in this pull request?

This PR aims to update the sbt version to 1.4.7.

### Why are the changes needed?
This will bring the latest bug fixes and improvements.

- https://github.com/sbt/sbt/releases/tag/v1.4.7

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass the CIs.

Closes #31555 from williamhyun/sbt147.

Authored-by: William Hyun <williamhyun3@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(commit: f2e1468)
The file was modifiedproject/build.properties (diff)
Commit e3b6e4ad435b31aabd6781df63c50c1f92bdfeac by srowen
[SPARK-33434][PYTHON][DOCS] Added RuntimeConfig to PySpark docs

### What changes were proposed in this pull request?
Documentation for `SparkSession.conf.isModifiable` is missing from the Python API site, so we added a Configuration section to the Spark SQL page to expose docs for the `RuntimeConfig` class (the class containing `isModifiable`). Then a `:class:` reference to `RuntimeConfig` was added to the `SparkSession.conf` docstring to create a link there as well.

### Why are the changes needed?
No docs were generated for `pyspark.sql.conf.RuntimeConfig`.

### Does this PR introduce _any_ user-facing change?
Yes--a new Configuration section to the Spark SQL page and a `Returns` section of the `SparkSession.conf` docstring, so this will now show a link to the `pyspark.sql.conf.RuntimeConfig` page. This is a change compared to both the released Spark version and the unreleased master branch.

### How was this patch tested?
First built the Python docs:
```bash
cd $SPARK_HOME/docs
SKIP_SCALADOC=1 SKIP_RDOC=1 SKIP_SQLDOC=1 jekyll serve
```
Then verified all pages and links:
1. Configuration link displayed on the API Reference page, and it clicks through to Spark SQL page:
http://localhost:4000/api/python/reference/index.html
![image](https://user-images.githubusercontent.com/1160861/107601918-a2f02380-6bed-11eb-9b8f-974a0681a2a9.png)

2. Configuration section displayed on the Spark SQL page, and the RuntimeConfig link clicks through to the RuntimeConfig page:
http://localhost:4000/api/python/reference/pyspark.sql.html#configuration
![image](https://user-images.githubusercontent.com/1160861/107602058-0d08c880-6bee-11eb-8cbb-ad8c47588085.png)**

3. RuntimeConfig page displayed:
http://localhost:4000/api/python/reference/api/pyspark.sql.conf.RuntimeConfig.html
![image](https://user-images.githubusercontent.com/1160861/107602278-94eed280-6bee-11eb-95fc-445ea62ac1a4.png)

4. SparkSession.conf page displays the RuntimeConfig link, and it navigates to the RuntimeConfig page:
http://localhost:4000/api/python/reference/api/pyspark.sql.SparkSession.conf.html
![image](https://user-images.githubusercontent.com/1160861/107602435-1f373680-6bef-11eb-985a-b72432464940.png)

Closes #31483 from Eric-Lemmon/SPARK-33434-document-isModifiable.

Authored-by: Eric Lemmon <eric@lemmon.cc>
Signed-off-by: Sean Owen <srowen@gmail.com>
(commit: e3b6e4a)
The file was modifiedpython/pyspark/sql/session.py (diff)
The file was modifiedpython/docs/source/reference/pyspark.sql.rst (diff)
Commit 3d39dfa8c36cd3e2422918ff07aff4ff57eadbdb by dhyun
[SPARK-34416][SQL] Adding support for user provided schema url in Avro

### What changes were proposed in this pull request?

Added option to provide Avro schema by URL.

### Why are the changes needed?
(copied from Jira ticket)

We have a use case in which we read a huge table in Avro format. About 30k columns.

using the default Hive reader - `AvroGenericRecordReader` it is just hangs forever. after 4 hours not even one task has finished.

We tried instead to use `spark.read.format("com.databricks.spark.avro").load(..)` but we failed on:

```

org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema

..

at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtils.scala:85)
at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtils.scala:67)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:421)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174)
... 53 elided

```

because files schema contain duplicate column names (when considering case-insensitive).

So we wanted to provide a user schema with non-duplicated fields, but the schema is huge. a few MBs. it is not practical to provide it in json format.

So we patched spark-avro to be able to get also `avroSchemaUrl` in addition to `avroSchema` and it worked perfectly.

### How was this patch tested?
added a unitest to AvroSuite and tested locally with patched version

Closes #31543 from uzadude/avro_schema.

Authored-by: oraviv <oraviv@paypal.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(commit: 3d39dfa)
The file was modifiedexternal/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala (diff)
The file was modifiedexternal/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala (diff)
The file was modifiedexternal/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala (diff)
The file was modifiedexternal/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala (diff)
The file was modifiedexternal/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala (diff)
The file was addedexternal/avro/src/test/resources/test_sub.avsc
The file was modifiedexternal/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala (diff)