1. Revert "[SPARK-33248][SQL] Add a configuration to control the legacy (commit: 034070a) (details)
  2. [SPARK-33323][SQL] Add query resolved check before convert hive relation (commit: 1740b29) (details)
  3. [SPARK-33321][SQL] Migrate ANALYZE TABLE commands to use (commit: 0ad35ba) (details)
  4. [SPARK-33214][TEST][HIVE] Stop HiveExternalCatalogVersionsSuite from (commit: ff724d2) (details)
  5. [SPARK-33265][TEST] Rename classOf[Seq] to classOf[scala.collection.Seq] (commit: 0b557b3) (details)
  6. [SPARK-33338][SQL] GROUP BY using literal map should not fail (commit: 42c0b17) (details)
  7. [SPARK-31711][CORE] Register the executor source with the metrics system (commit: b7fff03) (details)
  8. [SPARK-33343][BUILD] Fix the build with sbt to copy (commit: d24dbe8) (details)
  9. [SPARK-33314][SQL] Avoid dropping rows in Avro reader (commit: 7e8eb04) (details)
  10. [SPARK-33316][SQL] Support user provided nullable Avro schema for (commit: 551b504) (details)
  11. [SPARK-33282] Migrate from deprecated probot autolabeler to GitHub (commit: 0535b34) (details)
  12. Revert "[SPARK-33277][PYSPARK][SQL] Use ContextAwareIterator to stop (commit: d530ed0) (details)
  13. [MINOR][SS][DOCS] Update join type in stream static joins code examples (commit: e66201b) (details)
  14. [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize (commit: 21413b7) (details)
Commit 034070a23aa8bcecc351bb2fec413e1662dcbb75 by wenchen
Revert "[SPARK-33248][SQL] Add a configuration to control the legacy
behavior of whether need to pad null value when value size less then
schema size"
This reverts commit 0c943cd2fbc6f2d25588991613abf469ace0153e.
(commit: 034070a)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala (diff)
The file was modifieddocs/ (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala (diff)
Commit 1740b29b3f006abd08bc01b0ca807c3721d4bb0e by wenchen
[SPARK-33323][SQL] Add query resolved check before convert hive relation
### What changes were proposed in this pull request?
Add query.resolved before  convert hive relation.
### Why are the changes needed?
For better error msg.
SELECT c1, c2 from values(1,2) t(c1, c2)
Before this PR, we get such error msg
``` org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid
call to toAttribute on unresolved object, tree: *
at scala.collection.immutable.List.foreach(List.scala:392)
### Does this PR introduce _any_ user-facing change?
Yes, error msg changed.
### How was this patch tested?
Add test.
Closes #30230 from ulysses-you/SPARK-33323.
Authored-by: ulysses <> Signed-off-by: Wenchen Fan
(commit: 1740b29)
The file was modifiedsql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala (diff)
The file was modifiedsql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala (diff)
Commit 0ad35ba5f8bd6413669b568de659334bb9a3fb44 by wenchen
[SPARK-33321][SQL] Migrate ANALYZE TABLE commands to use
UnresolvedTableOrView to resolve the identifier
### What changes were proposed in this pull request?
This PR proposes to migrate `ANALYZE TABLE` and `ANALYZE TABLE ... FOR
COLUMNS` to use `UnresolvedTableOrView` to resolve the table/view
identifier. This allows consistent resolution rules (temp view first,
etc.) to be applied for both v1/v2 commands. More info about the
consistent resolution rule proposal can be found in
[JIRA]( or [proposal
Note that `ANALYZE TABLE` is not supported for v2 tables.
### Why are the changes needed?
The changes allow consistent resolution behavior when resolving the
table/view identifier. For example, the following is the current
```scala sql("create temporary view t as select 1") sql("create database
db") sql("create table db.t using csv as select 1") sql("use db")
sql("ANALYZE TABLE t compute statistics") // Succeeds
``` With this change, ANALYZE TABLE above fails with the following:
   org.apache.spark.sql.AnalysisException: t is a temp view not table or
permanent view.; line 1 pos 0
, which is expected since temporary view is resolved first and ANALYZE
TABLE doesn't support a temporary view.
### Does this PR introduce _any_ user-facing change?
After this PR, `ANALYZE TABLE t` is resolved to a temp view `t` instead
of table `db.t`.
### How was this patch tested?
Updated existing tests.
Closes #30229 from imback82/parse_v1table.
Authored-by: Terry Kim <> Signed-off-by: Wenchen Fan
(commit: 0ad35ba)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala (diff)
The file was modifiedsql/core/src/test/resources/sql-tests/results/describe.sql.out (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala (diff)
Commit ff724d23b696b2c4232be5daf31eed569779d720 by wenchen
[SPARK-33214][TEST][HIVE] Stop HiveExternalCatalogVersionsSuite from
using a hard-coded location to store localized Spark binaries
### What changes were proposed in this pull request? This PR changes
`HiveExternalCatalogVersionsSuite` to, by default, use a standard
temporary directory to store the Spark binaries that it localizes. It
additionally adds a new System property, `spark.test.cache-dir`, which
can be used to define a static location into which the Spark binary will
be localized to allow for sharing between test executions. If the System
property is used, the downloaded binaries won't be deleted after the
test runs.
### Why are the changes needed? In SPARK-22356 (PR #19579), the
`sparkTestingDir` used by `HiveExternalCatalogVersionsSuite` became
hard-coded to enable re-use of the downloaded Spark tarball between test
// For local test, you can set `sparkTestingDir` to a static value like
`/tmp/test-spark`, to
// avoid downloading Spark of different versions in each run.
private val sparkTestingDir = new File("/tmp/test-spark")
``` However this doesn't work, since it gets deleted every time:
override def afterAll(): Unit = {
   try {
   } finally {
It's bad that we're hard-coding to a `/tmp` directory, as in some cases
this is not the proper place to store temporary files. We're not
currently making any good use of it.
### Does this PR introduce _any_ user-facing change? Developer-facing
changes only, as this is in a test.
### How was this patch tested? The test continues to execute as
Closes #30122 from
Authored-by: Erik Krogen <> Signed-off-by: Wenchen Fan
(commit: ff724d2)
The file was modifiedsql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala (diff)
Commit 0b557b329046c66ee67a8c94c5bb95ffbe50e135 by gurwls223
[SPARK-33265][TEST] Rename classOf[Seq] to classOf[scala.collection.Seq]
in PostgresIntegrationSuite for Scala 2.13
### What changes were proposed in this pull request?
This PR renames some part of `Seq` in `PostgresIntegrationSuite` to
`scala.collection.Seq`. When I run `docker-integration-test`, I noticed
that `PostgresIntegrationSuite` failed due to `ClassCastException`. The
reason is the same as what is resolved in SPARK-29292.
### Why are the changes needed?
To pass `docker-integration-test` for Scala 2.13.
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Ran `PostgresIntegrationSuite` fixed and confirmed it successfully
Closes #30166 from sarutak/fix-toseq-postgresql.
Authored-by: Kousuke Saruta <> Signed-off-by:
HyukjinKwon <>
(commit: 0b557b3)
The file was modifiedexternal/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala (diff)
Commit 42c0b175ce6ee4bf1104b6a8cef6bb6477693781 by dhyun
[SPARK-33338][SQL] GROUP BY using literal map should not fail
### What changes were proposed in this pull request?
This PR aims to fix `semanticEquals` works correctly on `GetMapValue`
expressions having literal maps with `ArrayBasedMapData` and
### Why are the changes needed?
This is a regression from Apache Spark 1.6.x.
```scala scala> sc.version res1: String = 1.6.3
scala> sqlContext.sql("SELECT map('k1', 'v1')[k] FROM t GROUP BY
map('k1', 'v1')[k]").show
| v1|
Apache Spark 2.x ~ 3.0.1 raise`RuntimeException` for the following
```sql CREATE TABLE t USING ORC AS SELECT map('k1', 'v1') m, 'k1' k
SELECT map('k1', 'v1')[k] FROM t GROUP BY 1 SELECT map('k1', 'v1')[k]
FROM t GROUP BY map('k1', 'v1')[k] SELECT map('k1', 'v1')[k] a FROM t
```scala Caused by: java.lang.RuntimeException: Couldn't find k#3 in
[keys: [k1], values: [v1][k#3]#6]
at scala.sys.package$.error(package.scala:27)
```sql spark-sql> SELECT map('k1', 'v1')[k] FROM t GROUP BY 1; v1 Time
taken: 1.278 seconds, Fetched 1 row(s) spark-sql> SELECT map('k1',
'v1')[k] FROM t GROUP BY map('k1', 'v1')[k]; v1 Time taken: 0.313
seconds, Fetched 1 row(s) spark-sql> SELECT map('k1', 'v1')[k] a FROM t
GROUP BY a; v1 Time taken: 0.265 seconds, Fetched 1 row(s)
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Pass the CIs with the newly added test case.
Closes #30246 from dongjoon-hyun/SPARK-33338.
Authored-by: Dongjoon Hyun <> Signed-off-by: Dongjoon
Hyun <>
(commit: 42c0b17)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala (diff)
Commit b7fff0397319efd2987d4cceff4f738f1c06409d by tgraves
[SPARK-31711][CORE] Register the executor source with the metrics system
when running in local mode
### What changes were proposed in this pull request? This PR proposes to
register the executor source with the Spark metrics system when running
in local mode.
### Why are the changes needed? The Apache Spark metrics system provides
many useful insights on the Spark workload. In particular, the [executor
provide detailed info, including the number of active tasks, I/O
metrics, and several task metrics details. The executor source metrics,
contrary to other sources (for example ExecutorMetrics source), is not
available when running in local mode. Having executor metrics in local
mode can be useful when testing and troubleshooting Spark workloads in a
development environment. The metrics can be fed to a dashboard to see
the evolution of resource usage and can be used to troubleshoot
performance, as [in this
example]( Currently users
will have to deploy on a cluster to be able to collect executor source
metrics, while the possibility of having them in local mode is handy for
### Does this PR introduce _any_ user-facing change?
- This PR exposes executor source metrics data when running in local
### How was this patch tested?
- Manually tested by running in local mode and inspecting the metrics
listed in http://localhost:4040/metrics/json/
- Also added a test in `SourceConfigSuite`
Closes #28528 from LucaCanali/metricsWithLocalMode.
Authored-by: Luca Canali <> Signed-off-by: Thomas
Graves <>
(commit: b7fff03)
The file was modifiedcore/src/main/scala/org/apache/spark/SparkContext.scala (diff)
The file was modifieddocs/ (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/executor/Executor.scala (diff)
The file was modifiedcore/src/test/scala/org/apache/spark/metrics/source/SourceConfigSuite.scala (diff)
Commit d24dbe89557c6cdbe5c7a2b190ccd4e847757428 by dhyun
[SPARK-33343][BUILD] Fix the build with sbt to copy
### What changes were proposed in this pull request?
This PR fix the issue that spark-shell doesn't work if it's built with
`sbt package` (without any profiles specified). It's due to
hadoop-client-runtime.jar isn't copied to
$ bin/spark-shell Exception in thread "main"
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1022)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused
by: java.lang.ClassNotFoundException:
at java.lang.ClassLoader.loadClass(
at sun.misc.Launcher$AppClassLoader.loadClass(
at java.lang.ClassLoader.loadClass(
### Why are the changes needed?
This is a bug.
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Ran spark-shell and confirmed it works.
Closes #30250 from sarutak/copy-runtime-sbt.
Authored-by: Kousuke Saruta <> Signed-off-by:
Dongjoon Hyun <>
(commit: d24dbe8)
The file was modifiedpom.xml (diff)
Commit 7e8eb0447bfca2e38040c974dce711659e613e3c by gurwls223
[SPARK-33314][SQL] Avoid dropping rows in Avro reader
### What changes were proposed in this pull request?
This PR adds a check to  RowReader#hasNextRow such that multiple calls
to RowReader#hasNextRow with no intervening call to RowReader#nextRow
will avoid consuming more than 1 record.
This PR also modifies RowReader#nextRow such that consecutive calls will
return new rows (previously consecutive calls would return the same
### Why are the changes needed?
SPARK-32346 slightly refactored the AvroFileFormat and
AvroPartitionReaderFactory to use a new iterator-like trait called
AvroUtils#RowReader. RowReader#hasNextRow consumes a raw input record
and stores the deserialized row for the next call to RowReader#nextRow.
Unfortunately, sometimes hasNextRow is called twice before nextRow is
called, resulting in a lost row.
For example (which assumes V1 Avro reader):
```scala val df = spark.range(0, 25).toDF("index")
df.write.mode("overwrite").format("avro").save("index_avro") val loaded
// The following will give the expected size loaded.collect.size
// The following will give the wrong size
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Added tests, which fail without the fix.
Closes #30221 from bersprockets/avro_iterator_play.
Authored-by: Bruce Robbins <> Signed-off-by:
HyukjinKwon <>
(commit: 7e8eb04)
The file was modifiedexternal/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala (diff)
The file was modifiedexternal/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala (diff)
Commit 551b504cfe38d1ab583e617c37e49659edd65c2e by
[SPARK-33316][SQL] Support user provided nullable Avro schema for
non-nullable catalyst schema in Avro writing
### What changes were proposed in this pull request? This change is to
support user provided nullable Avro schema for data with non-nullable
catalyst schema in Avro writing.
Without this change, when users try to use a nullable Avro schema to
write data with a non-nullable catalyst schema, it will throw an
`IncompatibleSchemaException` with a message like `Cannot convert
Catalyst type StringType to Avro type ["null","string"]`. With this
change it will assume that the data is non-nullable, log a warning
message for the nullability difference and serialize the data to Avro
format with the nullable Avro schema provided.
### Why are the changes needed? This change is needed because sometimes
our users do not have full control over the nullability of the Avro
schemas they use, and this change provides them with the flexibility.
### Does this PR introduce _any_ user-facing change? Yes. Users are
allowed to use nullable Avro schemas for data with non-nullable catalyst
schemas in Avro writing after the change.
### How was this patch tested? Added unit tests.
Closes #30224 from bozhang2820/avro-nullable.
Authored-by: Bo Zhang <> Signed-off-by: Gengliang
Wang <>
(commit: 551b504)
The file was modifiedexternal/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala (diff)
The file was modifiedexternal/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala (diff)
The file was modifiedexternal/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala (diff)
The file was modifiedexternal/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala (diff)
Commit 0535b34ad47249df4806ed70471d5539b998a3b3 by gurwls223
[SPARK-33282] Migrate from deprecated probot autolabeler to GitHub
labeler action
### What changes were proposed in this pull request?
This PR removes the old Probot Autolabeler labeling configuration, as
the probot autolabeler has been deprecated. I've updated the configs in
Iceberg and in Avro, and we also need to update here. This PR adds in an
additional workflow for labeling PRs and migrates the old probot config
to the new format. Unfortunately, because certain features have not been
released upstream, we will not get the _exact_ behavior as before. I
have documented where that is and what changes are neeeded, and in the
associated ticket I've also discussed other options and why I think this
is the best way to go. Definitely a follow up ticket is needed to get
the original behavior back in these few cases, but PRs have not been
labeled for almost a month and so it's probably best to get it right 95%
of the time and occasionally have some UI related PRs labeled as `CORE`
while the issue is resolved upstream and/or further investigated.
### Why are the changes needed?
The probot autolabeler is dead and will not be maintained going forward.
This has been confirmed with github user [at]mithro in an issue in their
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
To test this PR, I first merged the config into my local fork. I then
edited it several times and ran tests on that.
Unfortunately, I've overwritten my fork with the apache repo in order to
create a proper PR. However, I've also added the config for the same
thing in the Iceberg repo as well as the Avro repo.
I have now merged this PR into my local repo and will be running some
tests on edge cases there and for validating in general:
- [Check that the SQL label is applied for changes directly below repo
root's sql directory]( ✅
- [Check that the structured streaming label is
applied]( ✅
- [Check that a wildcard at the end of a pattern will match nested
files]( ✅
- [Check that the rule **/*pom.xml will match the root pom.xml
file]( ✅
I've also discovered that we're likely not killing github actions that
run (like large tests etc) when users push to their PR. In most cases, I
see that a user has to mark something as "OK to test", but it still
seems like we might want to discuss whether or not we should add a
cancellation step In order to save time / capacity on the runners. If so
desired, we would add an action in each workflow that cancels old runs
when a `push` action occurs on a PR. This will likely make waiting for
test runners much faster iff tests are automatically rerun on push by
anybody (such as PMCs, PRs that have been marked OK to test, etc). We
could free a large number of resources potentially if a cancellation
step was added to all of the workflows in the Apache account (as github
action API limits are set at the account level).
Admittedly, the fact that the "old" workflow runs weren't cancelled
could admittedly be because of the fact that I was working in a fork,
but given that there are explicit actions to be added to the start of
workflows to cancel old PR workflows and given that we don't have them
configured indicates to me that likely this is the case in this repo
(and in most `apache` repos as well), at least under certain
circumstances (e.g. repos that don't have "Ok to test"-like webhooks as
one example).
This is a separate issue though, which I can bring up on the mailing
list once I'm done with this PR. Unfortunately I've been very busy the
past two weeks, but if somebody else wanted to work on that I would be
happy to support with any knowledge I have.
The last Apache repo to still have the probot autolabeler in it is Beam,
at which point we can have Gavin from ASF Infra remove the permissions
for the probot autolabeler entirely. See the associated JIRA ticket for
the links to other tickets, like the one for ASF Infra to remove the
dead probot autolabeler's read and write permissions to our PRs in the
Apache organization.
Closes #30244 from kbendick/begin-migration-to-github-labeler-action.
Authored-by: Kyle Bendickson <> Signed-off-by:
HyukjinKwon <>
(commit: 0535b34)
The file was added.github/labeler.yml
The file was removed.github/autolabeler.yml
The file was added.github/workflows/labeler.yml
Commit d530ed0ea8bdba09fba6dcd51f8e4f7745781c2e by gurwls223
Revert "[SPARK-33277][PYSPARK][SQL] Use ContextAwareIterator to stop
consuming after the task ends"
This reverts commit b8a440f09880c596325dd9e6caae6b470be76a8f.
(commit: d530ed0)
The file was modifiedpython/pyspark/sql/tests/ (diff)
The file was modifiedpython/pyspark/sql/tests/ (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInPandasExec.scala (diff)
The file was modifiedpython/pyspark/sql/tests/ (diff)
Commit e66201b30bc1f3da7284af14b32e5e6200768dbd by kabhwan.opensource
[MINOR][SS][DOCS] Update join type in stream static joins code examples
### What changes were proposed in this pull request? Update join type in
stream static joins code examples in structured streaming programming
guide. 1) Scala, Java and Python examples have a common issue.
   The join keyword is "right_join", it should be "left_outer".
   a) This code snippet is an example of "left outer join" as the
streaming df is on left and static df is on right. Also, right outer  
join between stream df(left) and static df(right) is not supported.
   b) The keyword "right_join/left_join" is unsupported and it should be
So, all of these code snippets have been updated to "left_outer".
2) R exmaple is correct, but the example is of "right_outer" with static
df (left) and streaming df(right). It is changed to "left_outer" to make
it consistent with other three examples of scala, java and python.
### Why are the changes needed? To fix the mistake in example code of
### Does this PR introduce _any_ user-facing change? Yes, it is a
user-facing change (but documentation update only).
**Screenshots 1: Scala/Java/python example (similar issue)**
<img width="941" alt="Screenshot 2020-11-05 at 12 16 09 AM"
<img width="922" alt="Screenshot 2020-11-05 at 12 17 12 AM"
**Screenshots 2: R example (Make it consistent with above change)**
<img width="896" alt="Screenshot 2020-11-05 at 12 19 57 AM"
<img width="919" alt="Screenshot 2020-11-05 at 12 20 51 AM"
### How was this patch tested? The change was tested locally. 1) cd
   SKIP_API=1 jekyll build 2) Verify
docs/_site/structured-streaming-programming-guide.html file in browser.
Closes #30252 from sarveshdave1/doc-update-stream-static-joins.
Authored-by: Sarvesh Dave <> Signed-off-by:
Jungtaek Lim (HeartSaVioR) <>
(commit: e66201b)
The file was modifieddocs/ (diff)
Commit 21413b7dd4e19f725b21b92cddfbe73d1b381a05 by kabhwan.opensource
[SPARK-30294][SS] Explicitly defines read-only StateStore and optimize
for HDFSBackedStateStore
### What changes were proposed in this pull request?
There's a concept of 'read-only' and 'read+write' state store in Spark
which is defined "implicitly". Spark doesn't prevent write for
'read-only' state store; Spark just assumes read-only stateful operator
will not modify the state store. Given it's not defined explicitly, the
instance of state store has to be implemented as 'read+write' even it's
being used as 'read-only', which sometimes brings confusion.
For example, abort() in HDFSBackedStateStore -
The comment sounds as if statement works differently between 'read-only'
and 'read+write', but that's not true as both state store has state
initialized as UPDATING (no difference). So 'read-only' state also
creates the temporary file, initializes output streams to write to
temporary file, closes output streams, and finally deletes the temporary
file. This unnecessary operations are being done per batch/partition.
This patch explicitly defines 'read-only' StateStore, and enables state
store provider to create 'read-only' StateStore instance if requested.
Relevant code paths are modified, as well as 'read-only' StateStore
implementation for HDFSBackedStateStore is introduced. The new
implementation gets rid of unnecessary operations explained above.
In point of backward-compatibility view, the only thing being changed in
public API side is `StateStoreProvider`. The trait `StateStoreProvider`
has to be changed to allow requesting 'read-only' StateStore; this patch
adds default implementation which leverages 'read+write' StateStore but
wrapping with 'write-protected' StateStore instance, so that custom
providers don't need to change their code to reflect the change. But if
the providers can optimize for read-only workload, they'll be happy to
make a change.
Please note that this patch makes ReadOnlyStateStore extend StateStore
and being referred as StateStore, as StateStore is being used in so many
places and it's not easy to support both traits if we differentiate
them. So unfortunately these write methods are still exposed for
read-only state; it just throws UnsupportedOperationException.
### Why are the changes needed?
The new API opens the chance to optimize read-only state store instance
compared with read+write state store instance.
HDFSBackedStateStoreProvider is modified to provide read-only version of
state store which doesn't deal with temporary file as well as state
### Does this PR introduce any user-facing change?
Clearly "no" for most end users, and also "no" for custom state store
providers as it doesn't touch trait `StateStore` as well as provides
default implementation for added method in trait `StateStoreProvider`.
### How was this patch tested?
Modified UT. Existing UTs ensure the change doesn't break anything.
Closes #26935 from HeartSaVioR/SPARK-30294.
Authored-by: Jungtaek Lim (HeartSaVioR) <>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <>
(commit: 21413b7)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManager.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala (diff)