Changes

Summary

  1. [SPARK-37442][SQL] InMemoryRelation statistics bug causing broadcast (details)
  2. [SPARK-37450][SQL] Prune unnecessary fields from Generate (details)
  3. [SPARK-37522][PYTHON][TESTS] Fix (details)
Commit c37b726bd09d34e1115a8af1969485e60dc02592 by wenchen
[SPARK-37442][SQL] InMemoryRelation statistics bug causing broadcast join failures with AQE enabled

### What changes were proposed in this pull request?
Immediately materialize underlying rdd cache (using .count) for an InMemoryRelation when `buildBuffers` is called.

### Why are the changes needed?

Currently, when `CachedRDDBuilder.buildBuffers` is called, `InMemoryRelation.computeStats` will try to read the accumulators to determine what the relation size is. However, the accumulators are not actually accurate until the cachedRDD is executed and finishes. While this has not happened, the accumulators will report a range from 0 bytes to the accumulator value when the cachedRDD finishes. In AQE, join planning can happen during this time and, if it reads the size as 0 bytes, will likely plan a broadcast join mistakenly believing the build side is very small. If the InMemoryRelation is actually very large in size, then this will cause many issues during execution such as job failure due to broadcasting over 8GB.

### Does this PR introduce _any_ user-facing change?

Yes. Before, cache materialization doesn't happen until the job starts to run. Now, it happens when trying to get the rdd representing an InMemoryRelation.

### How was this patch tested?

Tests added

Closes #34684 from ChenMichael/SPARK-37442-InMemoryRelation-statistics-inaccurate-during-join-planning.

Authored-by: Michael Chen <mike.chen@workday.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala (diff)
Commit c758b44d2d1ef9dc87378ff866b7d3a93c552683 by viirya
[SPARK-37450][SQL] Prune unnecessary fields from Generate

### What changes were proposed in this pull request?

This patch proposes an optimization rule to prune unnecessary fields from `Generate` for some cases, e.g. under count-only `Aggregate`.

### Why are the changes needed?

As shown in the JIRA, if a query counts nested elements (structs) on an array by exploding the array in `Generate`, because there is no particular nested field is specified, Spark currently reads the full nested struct without any pruning, e.g.,

```
== Optimized Logical Plan ==
Aggregate [count(1) AS count(true)#20299L]
+- Project
   +- Generate explode(items#20293), [0], false, [item#20296]
      +- Filter ((size(items#20293, true) > 0) AND isnotnull(items#20293))
         +- Relation default.table[items#20293] parquet
```

An optimization can be made to pick up arbitrary nested field from the struct. So we can prune unnecessary field access and still count the number of array elements.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added test.

Closes #34701 from viirya/SPARK-37450.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala (diff)
The file was addedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/GenerateOptimizationSuite.scala
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala (diff)
Commit eec9fecf5e3c1d0631caed427d4d468f44f98de9 by dongjoon
[SPARK-37522][PYTHON][TESTS] Fix MultilayerPerceptronClassifierTest.test_raw_and_probability_prediction

### What changes were proposed in this pull request?

This PR aims to update a PySpark unit test case by increasing the tolerance by `10%` from `0.1` to `0.11`.

### Why are the changes needed?

```
$ java -version
openjdk version "17.0.1" 2021-10-19 LTS
OpenJDK Runtime Environment Zulu17.30+15-CA (build 17.0.1+12-LTS)
OpenJDK 64-Bit Server VM Zulu17.30+15-CA (build 17.0.1+12-LTS, mixed mode, sharing)

$ build/sbt test:package

$ python/run-tests --testname 'pyspark.ml.tests.test_algorithms MultilayerPerceptronClassifierTest.test_raw_and_probability_prediction' --python-executables=python3
...
======================================================================
FAIL: test_raw_and_probability_prediction (pyspark.ml.tests.test_algorithms.MultilayerPerceptronClassifierTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/Users/dongjoon/APACHE/spark-merge/python/pyspark/ml/tests/test_algorithms.py", line 104, in test_raw_and_probability_prediction
    self.assertTrue(np.allclose(result.rawPrediction, expected_rawPrediction, rtol=0.102))
AssertionError: False is not true

----------------------------------------------------------------------
Ran 1 test in 7.385s

FAILED (failures=1)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually on native AppleSilicon Java 17.

Closes #34784 from dongjoon-hyun/SPARK-37522.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
The file was modifiedpython/pyspark/ml/tests/test_algorithms.py (diff)