Changes

Summary

  1. [SPARK-32657][K8S] Update the log strings we check for & imports in (commit: 059fb65) (details)
  2. [SPARK-31999][SQL][FOLLOWUP] Adds negative test cases with typos (commit: 7048fff) (details)
  3. [SPARK-32658][CORE] Fix `PartitionWriterStream` partition length (commit: f793977) (details)
  4. [SPARK-30462][SS] Streamline the logic on file stream source and sink (commit: e6795cd) (details)
  5. [SPARK-32653][CORE] Decommissioned host/executor should be considered as (commit: 44a288f) (details)
  6. [SPARK-32607][SQL] Script Transformation ROW FORMAT DELIMITED (commit: 6dae11d) (details)
  7. [SPARK-32665][SQL][TEST] Deletes orphan directories under a warehouse (commit: d80d0ce) (details)
  8. [SPARK-28863][SQL][FOLLOWUP] Do not reuse the physical plan (commit: d378dc5) (details)
  9. [SPARK-32640][SQL] Downgrade Janino to fix a correctness bug (commit: 8b119f1) (details)
  10. [SPARK-32660][SQL][DOC] Show Avro related API in documentation (commit: de141a3) (details)
  11. [SPARK-32663][CORE] Avoid individual closing of pooled TransportClients (commit: 79b4dea) (details)
Commit 059fb6571eb8a74d468fc9903b76d5ff221a46e3 by dongjoon
[SPARK-32657][K8S] Update the log strings we check for & imports in
decommission K8s
### What changes were proposed in this pull request?
Update the log strings to match the new log messages.
### Why are the changes needed?
Tests are failing
### Does this PR introduce _any_ user-facing change?
No, test only change.
### How was this patch tested? WIP: Make sure the DecommissionSuite
passes in Jenkins.
Closes #29479 from
holdenk/SPARK-32657-Decommissioning-tests-update-log-string.
Authored-by: Holden Karau <hkarau@apple.com> Signed-off-by: Dongjoon
Hyun <dongjoon@apache.org>
(commit: 059fb65)
The file was modifiedresource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala (diff)
Commit 7048fff2304bd44c3c6b7e57a485200a8959203d by wenchen
[SPARK-31999][SQL][FOLLOWUP] Adds negative test cases with typos
### What changes were proposed in this pull request?
Address the
[#comment](https://github.com/apache/spark/pull/28840#discussion_r471172006).
### Why are the changes needed?
Make code robust.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
ut.
Closes #29453 from ulysses-you/SPARK-31999-FOLLOWUP.
Authored-by: ulysses <youxiduo@weidian.com> Signed-off-by: Wenchen Fan
<wenchen@databricks.com>
(commit: 7048fff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala (diff)
Commit f793977e9ac2ef597fca4a95356affbfbf864f88 by wenchen
[SPARK-32658][CORE] Fix `PartitionWriterStream` partition length
overflow
### What changes were proposed in this pull request?
The `count` in `PartitionWriterStream` should be a long value, instead
of int. The issue is introduced by apache/sparkabef84a . When the
overflow happens, the shuffle index file would record wrong index of a
reduceId, thus lead to `FetchFailedException: Stream is corrupted`
error.
Besides the fix, I also added some debug logs, so in the future it's
easier to debug similar issues.
### Why are the changes needed?
This is a regression and bug fix.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
A Spark user reported this issue when migrating their workload to 3.0.
One of the jobs fail deterministically on Spark 3.0 without the patch,
and the job succeed after applied the fix.
Closes #29474 from jiangxb1987/fixPartitionWriteStream.
Authored-by: Xingbo Jiang <xingbo.jiang@databricks.com> Signed-off-by:
Wenchen Fan <wenchen@databricks.com>
(commit: f793977)
The file was modifiedcore/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala (diff)
The file was modifiedcore/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java (diff)
The file was modifiedcore/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java (diff)
Commit e6795cd3416bbe32505efd4c1fa3202f451bf74d by dongjoon
[SPARK-30462][SS] Streamline the logic on file stream source and sink
metadata log to avoid memory issue
### What changes were proposed in this pull request?
In many operations on CompactibleFileStreamLog reads a metadata log file
and materializes all entries into memory. As the nature of the compact
operation, CompactibleFileStreamLog may have a huge compact log file
with bunch of entries included, and for now they're just monotonically
increasing, which means the amount of memory to materialize also grows
incrementally. This leads pressure on GC.
This patch proposes to streamline the logic on file stream source and
sink whenever possible to avoid memory issue. To make this possible we
have to break the existing behavior of excluding entries - now the
`compactLogs` method is called with all entries, which forces us to
materialize all entries into memory. This is hopefully no effect on end
users, because only file stream sink has a condition to exclude entries,
and the condition has been never true. (DELETE_ACTION has been never
set.)
Based on the observation, this patch also changes the existing UT a bit
which simulates the situation where "A" file is added, and another batch
marks the "A" file as deleted. This situation simply doesn't work with
the change, but as I mentioned earlier it hasn't been used. (I'm not
sure the UT is from the actual run. I guess not.)
### Why are the changes needed?
The memory issue (OOME) is reported by both JIRA issue and user mailing
list.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
* Existing UTs
* Manual test done
The manual test leverages the simple apps which continuously writes the
file stream sink metadata log.
https://github.com/HeartSaVioR/spark-delegation-token-experiment/commit/bea7680e4c588f455f8c3181a96c9eff5002fa1a
The test is configured to have a batch metadata log file at 1.9M (10,000
entries) whereas other Spark configuration is set to the default.
(compact interval = 10) The app runs as driver, and the heap memory on
driver is set to 3g.
> before the patch
<img width="1094" alt="Screen Shot 2020-06-23 at 3 37 44 PM"
src="https://user-images.githubusercontent.com/1317309/85375841-d94f3480-b571-11ea-817b-c6b48b34888a.png">
It only ran for 40 mins, with the latest compact batch file size as
1.3G. The process struggled with GC, and after some struggling, it threw
OOME.
> after the patch
<img width="1094" alt="Screen Shot 2020-06-23 at 3 53 29 PM"
src="https://user-images.githubusercontent.com/1317309/85375901-eff58b80-b571-11ea-837e-30d107f677f9.png">
It sustained 2 hours run (manually stopped as it's expected to run
more), with the latest compact batch file size as 2.2G. The actual
memory usage didn't even go up to 1.2G, and be cleaned up soon without
outstanding GC activity.
Closes #28904 from HeartSaVioR/SPARK-30462.
Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(commit: e6795cd)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala (diff)
The file was modifiedsql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/8 (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala (diff)
Commit 44a288fc411bb760d20b67e934de932e0248ba3c by wenchen
[SPARK-32653][CORE] Decommissioned host/executor should be considered as
inactive in TaskSchedulerImpl
### What changes were proposed in this pull request?
Add decommissioning status checking for a host or executor while
checking it's active or not. And a decommissioned host or executor
should be considered as inactive.
### Why are the changes needed?
First of all, this PR is not a correctness bug fix but gives improvement
indeed. And the main problem here we want to fix is that a
decommissioned host or executor should be considered as inactive.
`TaskSetManager.computeValidLocalityLevels` depends on
`TaskSchedulerImpl.isExecutorAlive/hasExecutorsAliveOnHost` to calculate
the locality levels. Therefore, the `TaskSetManager` could also get
corresponding locality levels of those decommissioned hosts or executors
if they're not considered as inactive. However, on the other side,
`CoarseGrainedSchedulerBackend` won't construct the `WorkerOffer` for
those decommissioned executors. That also means `TaskSetManager` might
never have a chance to launch tasks at certain locality levels but only
suffers the unnecessary delay because of delay scheduling. So, this PR
helps to reduce this kind of unnecessary delay by making decommissioned
host/executor inactive in `TaskSchedulerImpl`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added unit tests
Closes #29468 from Ngone51/fix-decom-alive.
Lead-authored-by: yi.wu <yi.wu@databricks.com> Co-authored-by: Devesh
Agrawal <devesh.agrawal@gmail.com> Co-authored-by: wuyi
<yi.wu@databricks.com> Signed-off-by: Wenchen Fan
<wenchen@databricks.com>
(commit: 44a288f)
The file was modifiedcore/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala (diff)
The file was modifiedcore/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala (diff)
Commit 6dae11d03436b2aab8ca883d0ebfd5b97759b599 by wenchen
[SPARK-32607][SQL] Script Transformation ROW FORMAT DELIMITED
`TOK_TABLEROWFORMATLINES` only support '\n'
### What changes were proposed in this pull request? Scrip Transform
no-serde (`ROW FORMAT DELIMITED`) mode `LINE TERMINNATED BY ` only
support `\n`.
Tested in hive : Hive 1.1
![image](https://user-images.githubusercontent.com/46485123/90309510-ce82a180-df1b-11ea-96ab-56e2b3229489.png)
Hive 2.3.7
![image](https://user-images.githubusercontent.com/46485123/90309504-c88cc080-df1b-11ea-853e-8f65e9ed2375.png)
### Why are the changes needed? Strictly limit the use method to ensure
the accuracy of data
### Does this PR introduce _any_ user-facing change? User use Scrip
Transform no-serde (ROW FORMAT DELIMITED) mode  with `LINE TERMINNATED
BY ` not equal `'\n'`. will throw error
### How was this patch tested? Added UT
Closes #29438 from AngersZhuuuu/SPARK-32607.
Authored-by: angerszhu <angers.zhu@gmail.com> Signed-off-by: Wenchen Fan
<wenchen@databricks.com>
(commit: 6dae11d)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala (diff)
Commit d80d0ced9ab181fabbabe766c94b10110051fda1 by dongjoon
[SPARK-32665][SQL][TEST] Deletes orphan directories under a warehouse
dir in SQLQueryTestSuite
### What changes were proposed in this pull request?
In case that a last `SQLQueryTestSuite` test run is killed, it will fail
in a next run because of a following reason:
```
[info] org.apache.spark.sql.SQLQueryTestSuite *** ABORTED *** (17
seconds, 483 milliseconds)
[info]   org.apache.spark.sql.AnalysisException: Can not create the
managed table('`testdata`'). The associated
location('file:/Users/maropu/Repositories/spark/spark-master/sql/core/spark-warehouse/org.apache.spark.sql.SQLQueryTestSuite/testdata')
already exists.;
[info]   at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.validateTableLocation(SessionCatalog.scala:355)
[info]   at
org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:170)
[info]   at
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
``` This PR intends to add code to deletes orphan directories under a
warehouse dir in `SQLQueryTestSuite` before creating test tables.
### Why are the changes needed?
To improve test convenience
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manually checked
Closes #29488 from maropu/DeleteDirs.
Authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by:
Dongjoon Hyun <dongjoon@apache.org>
(commit: d80d0ce)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala (diff)
Commit d378dc5f6db6fe37426728bea714f44b94a94861 by wenchen
[SPARK-28863][SQL][FOLLOWUP] Do not reuse the physical plan
### What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/29469
Instead of passing the physical plan to the fallbacked v1 source
directly and skipping analysis, optimization, planning altogether, this
PR proposes to pass the optimized plan.
### Why are the changes needed?
It's a bit risky to pass the physical plan directly. When the fallbacked
v1 source applies more operations to the input DataFrame, it will
re-apply the post-planning physical rules like `CollapseCodegenStages`,
`InsertAdaptiveSparkPlan`, etc., which is very tricky.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
existing test suite with some new tests
Closes #29489 from cloud-fan/follow.
Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen
Fan <wenchen@databricks.com>
(commit: d378dc5)
The file was addedsql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyOptimized.scala
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala (diff)
The file was addedsql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyOptimizedSuite.scala
The file was removedsql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyPlanned.scala
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala (diff)
The file was removedsql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyPlannedSuite.scala
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala (diff)
Commit 8b119f1663dc65c96f20606ac8405169d8f9d31d by dongjoon
[SPARK-32640][SQL] Downgrade Janino to fix a correctness bug
### What changes were proposed in this pull request?
This PR reverts https://github.com/apache/spark/pull/27860 to downgrade
Janino, as the new version has a bug.
### Why are the changes needed?
The symptom is about NaN comparison. For code below
``` if (double_value <= 0.0) {
...
} else {
...
}
```
If `double_value` is NaN, `NaN <= 0.0` is false and we should go to the
else branch. However, current Spark goes to the if branch and causes
correctness issues like SPARK-32640.
One way to fix it is:
``` boolean cond = double_value <= 0.0; if (cond) {
...
} else {
...
}
```
I'm not familiar with Janino so I don't know what's going on there.
### Does this PR introduce _any_ user-facing change?
Yes, fix correctness bugs.
### How was this patch tested?
a new test
Closes #29495 from cloud-fan/revert.
Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by:
Dongjoon Hyun <dongjoon@apache.org>
(commit: 8b119f1)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala (diff)
The file was modifieddev/deps/spark-deps-hadoop-2.7-hive-1.2 (diff)
The file was modifieddev/deps/spark-deps-hadoop-2.7-hive-2.3 (diff)
The file was modifiedpom.xml (diff)
The file was modifieddev/deps/spark-deps-hadoop-3.2-hive-2.3 (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala (diff)
Commit de141a32714fd2dbc4be2d540adabf328bbce2c4 by gengliang.wang
[SPARK-32660][SQL][DOC] Show Avro related API in documentation
### What changes were proposed in this pull request?
Currently, the Avro related APIs are missing in the documentation
https://spark.apache.org/docs/latest/api/scala/org/apache/spark/index.html
. This PR is to: 1. Mark internal Avro related classes as private 2.
Show Avro related API in Spark official API documentation
### Why are the changes needed?
Better documentation.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Build doc and preview:
![image](https://user-images.githubusercontent.com/1097932/90623042-d156ee00-e1ca-11ea-9edd-2c45b3001fd8.png)
![image](https://user-images.githubusercontent.com/1097932/90623047-d451de80-e1ca-11ea-94ba-02921b64d6f1.png)
![image](https://user-images.githubusercontent.com/1097932/90623058-d6b43880-e1ca-11ea-849a-b9ea9efe6527.png)
Closes #29476 from gengliangwang/avroAPIDoc.
Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
(commit: de141a3)
The file was modifiedexternal/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala (diff)
The file was modifiedexternal/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala (diff)
The file was modifiedexternal/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala (diff)
The file was modifiedexternal/avro/src/main/java/org/apache/spark/sql/avro/SparkAvroKeyOutputFormat.java (diff)
The file was modifiedexternal/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala (diff)
The file was modifiedproject/SparkBuild.scala (diff)
The file was modifiedexternal/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala (diff)
The file was modifiedexternal/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala (diff)
The file was modifiedexternal/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala (diff)
Commit 79b4dea1b08adc9d4b545a1af29ac50fa603936a by mridulatgmail.com
[SPARK-32663][CORE] Avoid individual closing of pooled TransportClients
(which must be closed through the pool)
### What changes were proposed in this pull request?
Removing the individual `close` method calls on the pooled
`TransportClient` instances. The pooled clients should be only closed
via `TransportClientFactory#close()`.
### Why are the changes needed?
Reusing a closed `TransportClient` leads to the exception
`java.nio.channels.ClosedChannelException`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
This is a trivial case which is not tested by specific test.
Closes #29492 from attilapiros/SPARK-32663.
Authored-by: “attilapiros” <piros.attila.zsolt@gmail.com> Signed-off-by:
Mridul Muralidharan <mridul<at>gmail.com>
(commit: 79b4dea)
The file was modifiedcommon/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java (diff)