Changes

Summary

  1. [SPARK-23499][MESOS] Support for priority queues in Mesos scheduler (commit: 9ab0f82) (details)
  2. [SPARK-33399][SQL] Normalize output partitioning and sortorder with (commit: f5e3302) (details)
  3. [SPARK-33209][SS] Refactor unit test of stream-stream join in (commit: 5af5aa1) (details)
Commit 9ab0f82a5983d67aa41efa86bbe4080e5fd57335 by dongjoon
[SPARK-23499][MESOS] Support for priority queues in Mesos scheduler
### What changes were proposed in this pull request?
I push this PR as I could not re-open the stale one
https://github.com/apache/spark/pull/20665 .
As for Yarn or Kubernetes, Mesos users should be able to specify
priority queues to define a workload management policy for queued
drivers in the Mesos Cluster Dispatcher.
This would ensure scheduling order while enqueuing Spark applications
for a Mesos cluster.
### Why are the changes needed?
Currently, submitted drivers are kept in order of their submission: the
first driver added to the queue will be the first one to be executed
(FIFO), regardless of their priority.
See https://issues.apache.org/jira/projects/SPARK/issues/SPARK-23499 for
more details.
### Does this PR introduce _any_ user-facing change?
The MesosClusterDispatcher UI shows now Spark jobs along with the queue
to which they are submitted.
### How was this patch tested?
Unit tests. Also, this feature has been in production for 3 years now as
we use a modified Spark 2.4.0 since then.
Closes #30352 from pgillet/mesos-scheduler-priority-queue.
Lead-authored-by: Pascal Gillet <pascal.gillet@stack-labs.com>
Co-authored-by: pgillet <pascalgillet@ymail.com> Signed-off-by: Dongjoon
Hyun <dongjoon@apache.org>
(commit: 9ab0f82)
The file was modifiedresource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala (diff)
The file was modifieddocs/running-on-mesos.md (diff)
The file was modifiedresource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala (diff)
The file was modifiedresource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala (diff)
The file was modifiedresource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala (diff)
The file was modifiedresource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala (diff)
Commit f5e330284031ab8e66aa4834b947b9ca71cf28bb by yamamuro
[SPARK-33399][SQL] Normalize output partitioning and sortorder with
respect to aliases to avoid unneeded exchange/sort nodes
### What changes were proposed in this pull request? This pull request
tries to remove unneeded exchanges/sorts by normalizing the output
partitioning and sortorder information correctly with respect to
aliases.
Example: consider this join of three tables:
     |SELECT t2id, t3.id as t3id
    |FROM (
    |    SELECT t1.id as t1id, t2.id as t2id
    |    FROM t1, t2
    |    WHERE t1.id = t2.id
    |) t12, t3
    |WHERE t1id = t3.id
The plan for this looks like:
      *(9) Project [t2id#1034L, id#1004L AS t3id#1035L]
     +- *(9) SortMergeJoin [t1id#1033L], [id#1004L], Inner
        :- *(6) Sort [t1id#1033L ASC NULLS FIRST], false, 0
        :  +- Exchange hashpartitioning(t1id#1033L, 5), true, [id=#1343]
  <------------------------------
        :     +- *(5) Project [id#996L AS t1id#1033L, id#1000L AS
t2id#1034L]
        :        +- *(5) SortMergeJoin [id#996L], [id#1000L], Inner
        :           :- *(2) Sort [id#996L ASC NULLS FIRST], false, 0
        :           :  +- Exchange hashpartitioning(id#996L, 5), true,
[id=#1329]
        :           :     +- *(1) Range (0, 10, step=1, splits=2)
        :           +- *(4) Sort [id#1000L ASC NULLS FIRST], false, 0
        :              +- Exchange hashpartitioning(id#1000L, 5), true,
[id=#1335]
        :                 +- *(3) Range (0, 20, step=1, splits=2)
        +- *(8) Sort [id#1004L ASC NULLS FIRST], false, 0
           +- Exchange hashpartitioning(id#1004L, 5), true, [id=#1349]
              +- *(7) Range (0, 30, step=1, splits=2)
In this plan, the marked exchange could have been avoided as the data is
already partitioned on "t1.id". This happens because
AliasAwareOutputPartitioning class handles aliases only related to
HashPartitioning. This change normalizes all output partitioning based
on aliasing happening in Project.
### Why are the changes needed? To remove unneeded exchanges.
### Does this PR introduce _any_ user-facing change? No
### How was this patch tested? New UT added.
On TPCDS 1000 scale, this change improves the performance of query 95
from 330 seconds to 170 seconds by removing the extra Exchange.
Closes #30300 from prakharjain09/SPARK-33399-outputpartitioning.
Authored-by: Prakhar Jain <prakharjain09@gmail.com> Signed-off-by:
Takeshi Yamamuro <yamamuro@apache.org>
(commit: f5e3302)
The file was modifiedsql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q95.sf100/explain.txt (diff)
The file was modifiedsql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q2.sf100/explain.txt (diff)
The file was modifiedsql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q95.sf100/simplified.txt (diff)
The file was modifiedsql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23a.sf100/explain.txt (diff)
The file was modifiedsql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q2.sf100/simplified.txt (diff)
The file was modifiedsql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23b.sf100/explain.txt (diff)
The file was modifiedsql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23b.sf100/simplified.txt (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala (diff)
The file was modifiedsql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23a.sf100/simplified.txt (diff)
Commit 5af5aa146ecbff38b809127b5eb9805441627ed2 by kabhwan.opensource
[SPARK-33209][SS] Refactor unit test of stream-stream join in
UnsupportedOperationsSuite
### What changes were proposed in this pull request?
This PR is a followup from https://github.com/apache/spark/pull/30076 to
refactor unit test of stream-stream join in
`UnsupportedOperationsSuite`, where we had a lot of duplicated code for
stream-stream join unit test, for each join type.
### Why are the changes needed?
Help reduce duplicated code and make it easier for developers to read
and add code in the future.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing unit test in `UnsupportedOperationsSuite.scala` (pure
refactoring).
Closes #30347 from c21/stream-test.
Authored-by: Cheng Su <chengsu@fb.com> Signed-off-by: Jungtaek Lim
(HeartSaVioR) <kabhwan.opensource@gmail.com>
(commit: 5af5aa1)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala (diff)