Skip to content

[SPARK-34521][PYTHON][SQL] Fix spark.createDataFrame when using pandas with StringDtype #34509

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from

Conversation

nicolasazrak
Copy link
Contributor

@nicolasazrak nicolasazrak commented Nov 8, 2021

What changes were proposed in this pull request?

This change fixes SPARK-34521. It allows creating a spark DataFrame from a pandas DataFrame that is using a StringDtype column and arrow pyspark enabled.

Why are the changes needed?

Pandas stores string columns in two different ways: using a numpy ndarray or using a custom StringArray. The StringArray version is used when specifing the dtype=string. When that happens, spark cannot serialize the column to arrow. Converting the Series before fixes this problem.

However, due to the different ways to handle string columns, doing spark.createDataFrame(pandas_dataframe).toPandas() might not equal to pandas_dataframe. The column dtype could be different.

More info: https://pandas.pydata.org/docs/user_guide/text.html

Does this PR introduce any user-facing change?

Trying to create a spark DataFrame from a pandas DataFrame using a string dtype and "spark.sql.execution.arrow.pyspark.enabled" now doesn't throw an exception and returns the expected dataframe.

Before:

spark.createDataFrame(pd.DataFrame({"A": ['a', 'b', 'c']}, dtype="string"))

Error
Traceback (most recent call last):
  File "/home/nico/projects/playground/spark/python/pyspark/sql/tests/test_arrow.py", line 415, in test_createDataFrame_with_string_dtype
    print(self.spark.createDataFrame(pd.DataFrame({"A": ['a', 'b', 'c']}, dtype="string")))
  File "/home/nico/projects/playground/spark/python/pyspark/sql/session.py", line 823, in createDataFrame
    return super(SparkSession, self).createDataFrame(  # type: ignore[call-overload]
  File "/home/nico/projects/playground/spark/python/pyspark/sql/pandas/conversion.py", line 358, in createDataFrame
    return self._create_from_pandas_with_arrow(data, schema, timezone)
  File "/home/nico/projects/playground/spark/python/pyspark/sql/pandas/conversion.py", line 550, in _create_from_pandas_with_arrow
    self._sc  # type: ignore[attr-defined]
  File "/home/nico/projects/playground/spark/python/pyspark/context.py", line 611, in _serialize_to_jvm
    serializer.dump_stream(data, tempFile)
  File "/home/nico/projects/playground/spark/python/pyspark/sql/pandas/serializers.py", line 221, in dump_stream
    super(ArrowStreamPandasSerializer, self).dump_stream(batches, stream)
  File "/home/nico/projects/playground/spark/python/pyspark/sql/pandas/serializers.py", line 81, in dump_stream
    for batch in iterator:
  File "/home/nico/projects/playground/spark/python/pyspark/sql/pandas/serializers.py", line 220, in <genexpr>
    batches = (self._create_batch(series) for series in iterator)
  File "/home/nico/projects/playground/spark/python/pyspark/sql/pandas/serializers.py", line 211, in _create_batch
    arrs.append(create_array(s, t))
  File "/home/nico/projects/playground/spark/python/pyspark/sql/pandas/serializers.py", line 185, in create_array
    raise e
  File "/home/nico/projects/playground/spark/python/pyspark/sql/pandas/serializers.py", line 175, in create_array
    array = pa.Array.from_pandas(s, mask=mask, type=t, safe=self._safecheck)
  File "pyarrow/array.pxi", line 904, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 252, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 107, in pyarrow.lib._handle_arrow_array_protocol
ValueError: Cannot specify a mask or a size when passing an object that is converted with the __arrow_array__ protocol.

After:

spark.createDataFrame(pd.DataFrame({"A": ['a', 'b', 'c']}, dtype="string"))

DataFrame[A: string]

How was this patch tested?

Using the test_createDataFrame_with_string_dtype test.

Copy link
Contributor

@itholic itholic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@itholic
Copy link
Contributor

itholic commented Nov 18, 2021

Not a big deal, but could you add a simple example to PR description what issue is resolved with before/after ??

e.g.

Before:

>>> spark.createDataFrame(pd.DataFrame({"A": ['a', 'b', 'c']}, dtype="string"))
.../spark/python/pyspark/sql/pandas/conversion.py:402: UserWarning: createDataFrame attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, failed by the reason below:
  Cannot specify a mask or a size when passing an object that is converted with the __arrow_array__ protocol.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warnings.warn(msg)
DataFrame[A: string]

After:

>>> spark.createDataFrame(pd.DataFrame({"A": ['a', 'b', 'c']}, dtype="string"))
DataFrame[A: string]

@nicolasazrak
Copy link
Contributor Author

@itholic Thanks. I've updated the description to reflect the change.

@nicolasazrak
Copy link
Contributor Author

Hey @itholic can we merge this?

@itholic
Copy link
Contributor

itholic commented Nov 29, 2021

It's LGTM but let's wait until other members to verify this change since I don't have permission to merge 😅

cc @HyukjinKwon @ueshin Could you take a look at this one when you find some time ?

@@ -169,6 +169,8 @@ def create_array(s, t):
elif is_categorical_dtype(s.dtype):
# Note: This can be removed once minimum pyarrow version is >= 0.16.1
s = s.astype(s.dtypes.categories.dtype)
elif t is not None and pa.types.is_string(t):
s = s.astype(str)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so are you saying that the type of strings from pandas can produce different type in arrow? cc @BryanCutler

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while I understand that we can work around,

Pandas stores string columns in two different ways: using a numpy ndarray or using a custom StringArray. The StringArray version is used when specifing the dtype=string. When that happens, spark cannot serialize the column to arrow.

This sounds like an issue in Arrow side.

Copy link
Contributor Author

@nicolasazrak nicolasazrak Nov 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I think it again, adding support for the StringArray in arrow is the real solution, this change is just a workaround. I don't know much about the arrow internals or if they can add some metadata to support two different string types. If you feel this is better done in arrow upstream feel free to close the PR and I'll investigate it from the arrow side. Otherwise, we can leave this patch adding a comment and keep investigating in arrow.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave it to @BryanCutler (whos a maintainer for both PySpark and PyArrow).

Copy link
Member

@ueshin ueshin Nov 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid I don't think this is a right fix.

Actually this happens with Int64, Int32, or other extension dtypes as well.

>>> spark.createDataFrame(pd.DataFrame({"A": [1, 2, 3]}, dtype="Int64"))
.../python/pyspark/sql/pandas/conversion.py:422: UserWarning: createDataFrame attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, failed by the reason below:
  Cannot specify a mask or a size when passing an object that is converted with the __arrow_array__ protocol.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warnings.warn(msg)
DataFrame[A: bigint]

The error message says:

Cannot specify a mask or a size when passing an object that is converted with the __arrow_array__ protocol.

, so we should follow the message.

How about at line 163,

if hasattr(s.values, '__arrow_array__'):
    mask = None
else:
    mask = s.isnull()

cc @BryanCutler

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @ueshin , the correct thing to do would be to continue to use pyarrow to convert the column in this case with mask = None, that way the implementation is sure to do the conversion most efficiently. I think for this case it should then produce a standard arrow UTF-8 array that PySpark could handle.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ueshin Makes a lot of sense. I wasn't aware this was a problem for other dtypes. Removing the mask works, but writing a quick test:

with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": True}):
    pandas_df = pd.DataFrame({"col": [1, 2, 3, None]}, dtype="Int64")
    df = self.spark.createDataFrame(pandas_df)
    assert_frame_equal(pandas_df, df.toPandas())

fails with:

Attribute "dtype" are different
[left]:  Int64
[right]: float64

I'm going to take a look where the conversion is failing.

Copy link
Member

@ueshin ueshin Dec 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine as long as the df's data type is LongType for dtype="Int64" because df.toPandas() won't keep the extension dtypes.
So the dtype of df.toPandas() will be int64, or float64 if the column contains None.

@ueshin
Copy link
Member

ueshin commented Dec 6, 2021

ok to test

Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, except for the lint-python error.
@nicolasazrak Could you run ./dev/reformat-python to fix the linter error?
@BryanCutler Could you double check the fix is right? I'd leave it to you.

@SparkQA
Copy link

SparkQA commented Dec 6, 2021

Test build #145964 has finished for PR 34509 at commit 944a515.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 6, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50438/

@SparkQA
Copy link

SparkQA commented Dec 6, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50438/

@SparkQA
Copy link

SparkQA commented Dec 6, 2021

Test build #145966 has finished for PR 34509 at commit 10d4bcf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 6, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50441/

@SparkQA
Copy link

SparkQA commented Dec 6, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50441/

@@ -215,7 +215,10 @@ def _create_batch(self, series):
series = ((s, None) if not isinstance(s, (list, tuple)) else s for s in series)

def create_array(s, t):
mask = s.isnull()
if hasattr(s.values, "__arrow_array__"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this actually reminds me of #28743 how is it related to each other?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks really similar. I've added my tests to that branch and they passed. (However, lot of other tests failed but probably for other reasons).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's preferred to use s.array instead of s.values, would you mind giving that a try?

@BryanCutler
Copy link
Member

The difference with #28743 is that was trying to deal with pyarrow extension types. For a pandas extension type the __arrow_array__ interface will return an arrow array which could be a standard arrow type or an extension type. For this PR, we are talking about a standard string array, which pyspark can work with. Otherwise, if it's a pyarrow extension type, the storage type would need to be checked, which would be a standard arrow type. From that, pyspark could work with the storage type but that might not be very useful because all of the extension would be stripped out.

This PR is a step in the right direction, so I think it's ok to merge. This will add support for any pandas extension types that are backed by a standard arrow array, although I don't think it will be able to convert it back to pandas as the original extension type. To fully support pandas/pyarrow extension types we would need to propagate the extension type info through spark so that when it is worked on again in python, the extension part can be loaded back up. I'm not exactly sure how difficult that might be to do.

Copy link
Member

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine to me, I just had a minor request to use s.array instead.

@@ -215,7 +215,10 @@ def _create_batch(self, series):
series = ((s, None) if not isinstance(s, (list, tuple)) else s for s in series)

def create_array(s, t):
mask = s.isnull()
if hasattr(s.values, "__arrow_array__"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's preferred to use s.array instead of s.values, would you mind giving that a try?

@SparkQA
Copy link

SparkQA commented Dec 14, 2021

Test build #146194 has finished for PR 34509 at commit e343f68.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 14, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50668/

@SparkQA
Copy link

SparkQA commented Dec 14, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50668/

@SparkQA
Copy link

SparkQA commented Dec 14, 2021

Test build #146197 has finished for PR 34509 at commit c1f779e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class PandasSQLStringFormatter(string.Formatter):
  • class SQLStringFormatter(string.Formatter):
  • class ExecutorRollPlugin extends SparkPlugin
  • class ExecutorRollDriverPlugin extends DriverPlugin with Logging
  • case class MapContainsKey(
  • case class TryElementAt(left: Expression, right: Expression, child: Expression)
  • case class AesEncrypt(
  • case class AesDecrypt(

@SparkQA
Copy link

SparkQA commented Dec 14, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50671/

@SparkQA
Copy link

SparkQA commented Dec 14, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50671/

Copy link
Member

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@HyukjinKwon
Copy link
Member

Thanks @BryanCutler !

@BryanCutler
Copy link
Member

merged to master, thanks @nicolasazrak . Do you have an apache.org account so I could assign the issue to you?

@nicolasazrak
Copy link
Contributor Author

@BryanCutler Yes, my username is nicolasazrak

@nicolasazrak nicolasazrak deleted the SPARK-34521 branch December 16, 2021 12:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants