Changes

Summary

  1. [SPARK-32079][PYTHON] Remove namedtuple hack by replacing built-in (details)
  2. Revert "[SPARK-37445][BUILD] Rename the maven profile hadoop-3.2 to (details)
  3. [SPARK-37436][PYTHON] Uses Python's standard string formatter for SQL (details)
  4. [SPARK-37457][PYTHON] Update cloudpickle to v2.0.0 (details)
Commit d192d96f1b170b9f9ce533d062701c86bef8d0b8 by gurwls223
[SPARK-32079][PYTHON] Remove namedtuple hack by replacing built-in pickle to cloudpickle

### What changes were proposed in this pull request?

This PR proposes to replace Python's built-in CPickle to CPickle-based cloudpickle (requires Python 3.8+).
For Python 3.7 and below, it still uses the legacy built-in CPickle for the performance matter.

I did a bit of benchmark with basic cases, and I have seen no performance penalty (attached one of the benchmarks below).

### Why are the changes needed?

To remove named tuple hack for the issues such as: SPARK-32079,  SPARK-22674 and SPARK-27810.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

#### Micro benchmark:

```python
import time
import pickle
from pyspark import cloudpickle

def measure(f):
    start = time.time()
    f()
    end = time.time()
    print(end - start)

data = [123, "abc", (1, 2, 3), 2.2] * 100000000
measure(lambda: pickle.dumps(data))
measure(lambda: cloudpickle.dumps(data))
measure(lambda: pickle.loads(pickle.dumps(data)))
measure(lambda: cloudpickle.loads(cloudpickle.dumps(data)))
```

```
5.1765618324279785
5.2591071128845215
12.457043886184692
12.1910879611969
```

```python
import time
import random
import pickle
from pyspark import cloudpickle

def measure(f):
    start = time.time()
    f()
    end = time.time()
    print(end - start)

rand_data = []

for _ in range(10000000):
    data = [
        random.randint(1, 100),
        str(random.randint(1, 100)),
        (random.randint(1, 100), random.randint(2, 200), random.randint(3, 300)),
        random.random()
    ]
    random.shuffle(data)
    rand_data.append(data)

measure(lambda: pickle.dumps(rand_data))
measure(lambda: cloudpickle.dumps(rand_data))
measure(lambda: pickle.loads(pickle.dumps(rand_data)))
measure(lambda: cloudpickle.loads(cloudpickle.dumps(rand_data)))
```

```
7.736639976501465
7.8458099365234375
20.306012868881226
17.787282943725586
```

#### E2E benchmark:

```bash
./bin/pyspark --conf spark.python.profile=true
```

```python
import time
from collections import namedtuple
rdd = sc.parallelize([123] * 30000000)
rdd.count()  # init
start = time.time()
rdd.map(lambda x: x).count()
print(time.time() - start)
sc.show_profiles()
```

Before:

2.3216118812561035 (sec)

```
============================================================
Profile of RDD<id=2>
============================================================
         60264297 function calls (60264265 primitive calls) in 22.309 seconds

   Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
30000016   13.127    0.000   29.890    0.000 rdd.py:1291(<genexpr>)
       32    4.559    0.142   34.449    1.077 {built-in method builtins.sum}
30000000    3.723    0.000    3.723    0.000 <stdin>:1(<lambda>)
    29297    0.699    0.000    0.699    0.000 {built-in method _pickle.loads}
    29313    0.059    0.000    0.874    0.000 serializers.py:151(_read_with_length)
    58610    0.045    0.000    0.045    0.000 {method 'read' of '_io.BufferedReader' objects}
    29313    0.035    0.000    0.057    0.000 serializers.py:567(read_int)
    29313    0.025    0.000    0.899    0.000 serializers.py:135(load_stream)
    29297    0.016    0.000    0.715    0.000 serializers.py:435(loads)
    29313    0.013    0.000    0.013    0.000 {built-in method _struct.unpack}
    29329    0.006    0.000    0.006    0.000 {built-in method builtins.len}
       16    0.000    0.000    0.000    0.000 rdd.py:409(func)
       16    0.000    0.000    0.001    0.000 serializers.py:256(dump_stream)
...
```

After:

2.279919147491455 (sec)

```
============================================================
Profile of RDD<id=2>
============================================================
         90264361 function calls (90264329 primitive calls) in 34.573 seconds

   Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
30000016   13.204    0.000   29.982    0.000 rdd.py:1291(<genexpr>)
30000000   12.087    0.000   15.879    0.000 util.py:77(wrapper)
       32    4.588    0.143   34.571    1.080 {built-in method builtins.sum}
30000000    3.792    0.000    3.792    0.000 <stdin>:1(<lambda>)
    29297    0.694    0.000    0.694    0.000 {built-in method _pickle.loads}
    29313    0.061    0.000    0.873    0.000 serializers.py:157(_read_with_length)
    58610    0.045    0.000    0.045    0.000 {method 'read' of '_io.BufferedReader' objects}
    29313    0.036    0.000    0.059    0.000 serializers.py:585(read_int)
    29313    0.026    0.000    0.900    0.000 serializers.py:141(load_stream)
    29297    0.018    0.000    0.712    0.000 serializers.py:463(loads)
    29313    0.013    0.000    0.013    0.000 {built-in method _struct.unpack}
    29329    0.007    0.000    0.007    0.000 {built-in method builtins.len}
       16    0.000    0.000   34.573    2.161 worker.py:665(process)
       16    0.000    0.000    0.000    0.000 rdd.py:409(func)
       16    0.000    0.000    0.001    0.000 serializers.py:262(dump_stream)
       16    0.000    0.000    0.001    0.000 cloudpickle_fast.py:59(dumps)
...
```

Existing test cases should cover all test cases.

Closes #34688 from HyukjinKwon/SPARK-32079.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
The file was modifiedpython/pyspark/serializers.py (diff)
The file was modifiedpython/pyspark/shuffle.py (diff)
The file was modifiedpython/pyspark/__init__.py (diff)
The file was modifiedpython/pyspark/mllib/tests/test_linalg.py (diff)
The file was modifiedpython/pyspark/tests/test_serializers.py (diff)
The file was modifiedpython/pyspark/sql/streaming.py (diff)
The file was modifiedpython/pyspark/tests/test_shuffle.py (diff)
The file was modifiedpython/pyspark/__init__.pyi (diff)
The file was modifiedpython/pyspark/ml/tests/test_linalg.py (diff)
The file was modifiedpython/pyspark/accumulators.py (diff)
The file was modifiedpython/pyspark/rdd.py (diff)
The file was modifiedpython/pyspark/tests/test_rdd.py (diff)
The file was modifiedpython/pyspark/context.py (diff)
The file was modifiedpython/pyspark/mllib/tests/test_algorithms.py (diff)
The file was modifiedpython/pyspark/worker.py (diff)
The file was modifiedpython/pyspark/mllib/common.py (diff)
The file was modifiedpython/pyspark/sql/dataframe.py (diff)
The file was modifiedpython/pyspark/ml/common.py (diff)
Commit 444cfe66a65fbdbda53366154cf547de90309608 by gurwls223
Revert "[SPARK-37445][BUILD] Rename the maven profile hadoop-3.2 to hadoop-3 and change GA test name to hadoop3.3"

This reverts commit a98f9f2fcc08f957d021ac1d1f89c457e5fd1cc6.
The file was modifiedresource-managers/kubernetes/integration-tests/pom.xml (diff)
The file was modifieddev/test-dependencies.sh (diff)
The file was modifiedresource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh (diff)
The file was removeddev/deps/spark-deps-hadoop-3.3-hive-2.3
The file was modifieddev/run-tests.py (diff)
The file was modifieddev/create-release/release-build.sh (diff)
The file was addeddev/deps/spark-deps-hadoop-3.2-hive-2.3
The file was modifiedpom.xml (diff)
The file was modifiedresource-managers/yarn/pom.xml (diff)
The file was modifieddev/run-tests-jenkins.py (diff)
The file was modifiedhadoop-cloud/pom.xml (diff)
The file was modified.github/workflows/build_and_test.yml (diff)
Commit 69e115183c1c424bd61a9a4a4724c1aff9970ef1 by gurwls223
[SPARK-37436][PYTHON] Uses Python's standard string formatter for SQL API in pandas API on Spark

### What changes were proposed in this pull request?

This PR proposes to use [Python's standard string formatter](https://docs.python.org/3/library/string.html#custom-string-formatting) instead of hacky custom SQL parser for SQL API in pandas API on Spark

### Why are the changes needed?

Current implementation of parsing is very hacky, and does not work. It is [dependent on Python's internal module](https://github.com/apache/spark/blob/master/python/pyspark/pandas/sql_processor.py#L291), and [Series is being treated as a table](https://github.com/apache/spark/blob/master/python/pyspark/pandas/sql_processor.py#L339-L340), etc.

We should have the Python standard string formatter with the standard interface and the standard support.

### Does this PR introduce _any_ user-facing change?

Yes.

**Disallowed:**

1. `Series` as a table

    ```python
    myser = ps.Series({'a': [1.0, 2.0, 3.0], 'b': [15.0, 30.0, 45.0]})
    ps.sql("SELECT * from {myser}", myser=myser)
    ```

2. `list` and `range`

    ```python
    strs = ['a', 'b']
    ps.sql("SELECT 'a' IN {strs}", strs=strs)
    ```

3. Automatic local/global variable detection:

    ```python
    strs = ['a', 'b']
    ps.sql("SELECT 'a' IN {strs}")
    ```

**Allowed:**

1. `Series` as a column

    ```python
    mydf = ps.range(10)
    ps.sql("SELECT {ser} FROM {mydf}", ser=mydf.id, mydf=mydf)
    ```

2. Reference checking (between `Series` and `DataFrame`)

    ```python
    mydf = ps.range(10)
    ps.sql("SELECT {ser} FROM tblA", ser=mydf.id)
    ```

    ```
    ValueError: The series in {ser} does not refer any dataframe specified.
    ```

3. Attribute supports from frame (standard Python support):

    ```python
    mydf = ps.range(10)
    ps.sql("SELECT {tbl.id} FROM {tbl}", tbl=mydf)
    ```

### How was this patch tested?

Doctests were added.

Closes #34677 from HyukjinKwon/custom-formatter.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
The file was modifiedpython/docs/source/migration_guide/pyspark_3.2_to_3.3.rst (diff)
The file was modifiedpython/pyspark/pandas/__init__.py (diff)
The file was modifiedpython/pyspark/pandas/tests/test_sql.py (diff)
The file was modifiedpython/pyspark/pandas/sql_processor.py (diff)
The file was modifieddev/sparktestsupport/modules.py (diff)
The file was addedpython/pyspark/pandas/sql_formatter.py
The file was modifiedpython/pyspark/pandas/usage_logging/__init__.py (diff)
Commit 95fc4c56426706546601d339067ce6e3e7f4e03f by gurwls223
[SPARK-37457][PYTHON] Update cloudpickle to v2.0.0

### What changes were proposed in this pull request?

This PR proposes to upgrade cloudpickle from 1.6.0 to 2.0.0 (see also https://github.com/cloudpipe/cloudpickle/compare/v1.6.0...v2.0.0).

### Why are the changes needed?

To leverage bug fixes from the cloudpickle upstream.
More importantly, 2.0.0 added the official support of Python 3.8 and 3.9.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Jenkins build and GitHub actions build will test it out.

Closes #34705 from HyukjinKwon/cloudpickle-2.0.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
The file was modifiedpython/pyspark/cloudpickle/cloudpickle_fast.py (diff)
The file was modifiedpython/pyspark/cloudpickle/__init__.py (diff)
The file was modifiedpython/pyspark/cloudpickle/cloudpickle.py (diff)