Changes

Summary

  1. [SPARK-33407][PYTHON] Simplify the exception message from Python UDFs (commit: e2c7bfc) (details)
Commit e2c7bfce40c309299f99c50c191c178180d756e5 by gurwls223
[SPARK-33407][PYTHON] Simplify the exception message from Python UDFs
(disabled by default)
### What changes were proposed in this pull request?
This PR proposes to simplify the exception messages from Python UDFS.
Currently, the exception message from Python UDFs is as below:
```python from pyspark.sql.functions import udf;
spark.range(10).select(udf(lambda x: x/0)("id")).collect()
```
```python Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/.../python/pyspark/sql/dataframe.py", line 427, in show
   print(self._jdf.showString(n, 20, vertical))
File "/.../python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line
1305, in __call__
File "/.../python/pyspark/sql/utils.py", line 127, in deco
   raise_from(converted)
File "<string>", line 3, in raise_from
pyspark.sql.utils.PythonException:
An exception was thrown from Python worker in the executor: Traceback
(most recent call last):
File "/.../python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
   process()
File "/.../python/lib/pyspark.zip/pyspark/worker.py", line 597, in
process
   serializer.dump_stream(out_iter, outfile)
File "/.../python/lib/pyspark.zip/pyspark/serializers.py", line 223, in
dump_stream
   self.serializer.dump_stream(self._batched(iterator), stream)
File "/.../python/lib/pyspark.zip/pyspark/serializers.py", line 141, in
dump_stream
   for obj in iterator:
File "/.../python/lib/pyspark.zip/pyspark/serializers.py", line 212, in
_batched
   for item in iterator:
File "/.../python/lib/pyspark.zip/pyspark/worker.py", line 450, in
mapper
   result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f)
in udfs)
File "/.../python/lib/pyspark.zip/pyspark/worker.py", line 450, in
<genexpr>
   result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f)
in udfs)
File "/.../python/lib/pyspark.zip/pyspark/worker.py", line 90, in
<lambda>
   return lambda *a: f(*a)
File "/.../python/lib/pyspark.zip/pyspark/util.py", line 107, in
wrapper
   return f(*args, **kwargs)
File "<stdin>", line 1, in <lambda> ZeroDivisionError: division by zero
```
Actually, almost all cases, users only care about `ZeroDivisionError:
division by zero`. We don't really have to show the internal stuff in
99% cases.
This PR adds a configuration
`spark.sql.execution.pyspark.udf.simplifiedException.enabled` (disabled
by default) that hides the internal tracebacks related to Python worker,
(de)serialization, etc.
```python Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/.../python/pyspark/sql/dataframe.py", line 427, in show
   print(self._jdf.showString(n, 20, vertical))
File "/.../python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line
1305, in __call__
File "/.../python/pyspark/sql/utils.py", line 127, in deco
   raise_from(converted)
File "<string>", line 3, in raise_from
pyspark.sql.utils.PythonException:
An exception was thrown from Python worker in the executor: Traceback
(most recent call last):
File "<stdin>", line 1, in <lambda> ZeroDivisionError: division by zero
```
The trackback will be shown from the point when any non-PySpark file is
seen in the traceback.
### Why are the changes needed?
Without this configuration. such internal tracebacks are exposed to
users directly especially for shall or notebook users in PySpark. 99%
cases people don't care about the internal Python worker,
(de)serialization and related tracebacks. It just makes the exception
more difficult to read. For example, one statement of `x/0` above shows
a very long traceback and most of them are unnecessary.
This configuration enables the ability to show simplified tracebacks
which users will likely be most interested in.
### Does this PR introduce _any_ user-facing change?
By default, no. It adds one configuration that simplifies the exception
message. See the example above.
### How was this patch tested?
Manually tested:
```bash
$ pyspark --conf
spark.sql.execution.pyspark.udf.simplifiedException.enabled=true
```
```python from pyspark.sql.functions import udf;
spark.sparkContext.setLogLevel("FATAL");
spark.range(10).select(udf(lambda x: x/0)("id")).collect()
```
and unittests were also added.
Closes #30309 from HyukjinKwon/SPARK-33407.
Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by:
HyukjinKwon <gurwls223@apache.org>
(commit: e2c7bfc)
The file was modifiedpython/pyspark/util.py (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala (diff)
The file was modifiedpython/pyspark/worker.py (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/api/python/PythonRunner.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala (diff)