Changes

Summary

  1. [SPARK-36105][SQL] OptimizeLocalShuffleReader support reading data of (details)
  2. [SPARK-36269][SQL] Fix only set data columns to Hive column names config (details)
  3. [SPARK-34402][SQL] Group exception about data format schema (details)
  4. [SPARK-32920][FOLLOW-UP] Fix shuffleMergeFinalized directly calling (details)
  5. [SPARK-36217][SQL] Rename CustomShuffleReader and (details)
Commit 094ae3708f54f5cb208141b345077c0e10cb08f3 by wenchen
[SPARK-36105][SQL] OptimizeLocalShuffleReader support reading data of multiple mappers in one task

### What changes were proposed in this pull request?
Added another partition spec to allow OptimizeLocalShuffleReader rule to read data from multiple mappers if the parallelism is less than the number of mappers.

### Why are the changes needed?
Optimization to the OptimizeLocalShuffleReader rule

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit tests

Closes #33310 from michaelzhang-db/supportDataFromMultipleMappers.

Authored-by: michaelzhang-db <michael.zhang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala (diff)
Commit e5616e32eecb516a6b46ae9bc5c2c850c18210a2 by wenchen
[SPARK-36269][SQL] Fix only set data columns to Hive column names config

### What changes were proposed in this pull request?

When reading Hive table, we set the Hive column id and column name configs (`hive.io.file.readcolumn.ids` and `hive.io.file.readcolumn.names`). We should set non-partition columns (data columns) for both configs, as Spark always [appends partition columns in its own Hive reader](https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala#L240). The column id config has only non-partition columns, but column name config has both partition and non-partition columns. We should keep them to be consistent with only non-partition columns. This does not cause issue for public OSS Hive file format for now. But for customized internal Hive file format, it causes the issue as we are expecting these two configs to be same.

### Why are the changes needed?

Fix the code logic to be more consistent.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing Hive tests.

Closes #33489 from c21/hive-col.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
The file was modifiedsql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala (diff)
Commit a63802f2c69e5fb271e694bfe3e2e15f09d33320 by wenchen
[SPARK-34402][SQL] Group exception about data format schema

### What changes were proposed in this pull request?
Group exception about data format schema of different format, orc/parquet

### Why are the changes needed?
group exception

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Not need

Closes #33296 from AngersZhuuuu/SPARK-34402.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
The file was modifiedsql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala (diff)
Commit ba1a7ce5ec5bd15f81a70a3c5fcd4dc6893589ed by mridulatgmail.com
[SPARK-32920][FOLLOW-UP] Fix shuffleMergeFinalized directly calling rdd.getNumPartitions as RDD is not serialized to executor

### What changes were proposed in this pull request?

`ShuffleMapTask` should not push blocks if a shuffle is already merge finalized. Currently block push is disabled for retry cases. Also fix `shuffleMergeFinalized` calling `rdd.getNumPartitions` as RDD is not serialized causing issues.

### Why are the changes needed?

No

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests

Closes #33426 from venkata91/SPARK-32920-follow-up.

Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
The file was modifiedcore/src/main/scala/org/apache/spark/Dependency.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala (diff)
Commit 6e3d404cec0ab741bee21553268b94184055aa5a by wenchen
[SPARK-36217][SQL] Rename CustomShuffleReader and OptimizeLocalShuffleReader in AQE

### What changes were proposed in this pull request?

This PR proposes to rename:

- Rename `*Reader`/`*reader` to `*Read`/`*read` for rules and execution plan (user-facing doc/config name remain untouched)
  - `*ShuffleReaderExec` ->`*ShuffleReadExec`
  - `isLocalReader` -> `isLocalRead`
  - ...
- Rename `CustomShuffle*` prefix to `AQEShuffle*`
- Rename `OptimizeLocalShuffleReader` rule to `OptimizeShuffleWithLocalRead`

### Why are the changes needed?

There are multiple problems in the current naming:

- `CustomShuffle*` -> `AQEShuffle*`
    it sounds like it is a pluggable API. However, this is actually only used by AQE.
- `OptimizeLocalShuffleReader` -> `OptimizeShuffleWithLocalRead`
    it is the name of a rule but it can be misread as a reader, which is counterintuative
- `*ReaderExec` -> `*ReadExec`
    Reader execution reads a bit odd. It should better be read execution (like `ScanExec`, `ProjectExec` and `FilterExec`). I can't find the reason to name it with something that performs an action. See also the generated plans:

    Before:

    ```
    ...
    * HashAggregate (12)
       +- CustomShuffleReader (11)
          +- ShuffleQueryStage (10)
             +- Exchange (9)
    ...
    ```

    After:

    ```
    ...
    * HashAggregate (12)
       +- AQEShuffleRead (11)
          +- ShuffleQueryStage (10)
             +- Exchange (9)
    ..
    ```

### Does this PR introduce _any_ user-facing change?

No, internal refactoring.

### How was this patch tested?

Existing unittests should cover the changes.

Closes #33429 from HyukjinKwon/SPARK-36217.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
The file was removedcore/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala
The file was removedsql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderRule.scala
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala (diff)
The file was addedsql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeShuffleWithLocalRead.scala
The file was addedcore/src/test/scala/org/apache/spark/scheduler/AQEShuffledRDD.scala
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewInRebalancePartitions.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala (diff)
The file was modifiedcore/src/test/scala/org/apache/spark/scheduler/AdaptiveSchedulingSuite.scala (diff)
The file was addedsql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala (diff)
The file was addedsql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadRule.scala
The file was removedsql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
The file was removedsql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala (diff)