Skip to content

[SPARK-31920][PYTHON] Fix pandas conversion using Arrow with __arrow_array__ columns #28743

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from

Conversation

moskvax
Copy link

@moskvax moskvax commented Jun 6, 2020

What changes were proposed in this pull request?

  1. Use pa.infer_type over pa.Schema.from_pandas to infer Arrow types for conversion, as it handles pandas extension types and can ignore pd.NA values,
  2. Check for the implementation of __arrow_array__ in series' backing arrays, and if present, use it to convert pandas DataFrame columns to Arrow arrays during serialisation.

Why are the changes needed?

These changes allow usage of pandas DataFrames which contain ExtensionDtype columns that are backed by arrays that implement __arrow_array__. DataFrames containing such columns will be returned when specifying an ExtensionDtype-extending pandas type in the dtype parameter when constructed, and can also be created via calling convert_dtypes on an existing DataFrame.

Does this PR introduce any user-facing change?

Yes. Users will be able to convert a wider variety of pandas DataFrames into Spark DataFrames using any currently released pyarrow version > 0.15.1. Prior to this fix, neither the Arrow conversion path nor the fallback path would work with these DataFrames.

How was this patch tested?

Tests were added to cover the cases of converting from pandas DataFrames with IntegerArray and StringArray backed columns. A typo was also fixed in a recently added test.

@moskvax moskvax changed the title [SPARK-31920] Fix pandas conversion using Arrow with __arrow_array__ columns [SPARK-31920][PYTHON] Fix pandas conversion using Arrow with __arrow_array__ columns Jun 6, 2020
@maropu
Copy link
Member

maropu commented Jun 7, 2020

ok to test

@maropu
Copy link
Member

maropu commented Jun 7, 2020

cc: @HyukjinKwon @viirya

@SparkQA
Copy link

SparkQA commented Jun 7, 2020

Test build #123593 has finished for PR 28743 at commit 04a15f6.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Jun 7, 2020

Thanks for your work, @moskvax! The failures looks valid, so could you fix them first?

@moskvax moskvax changed the title [SPARK-31920][PYTHON] Fix pandas conversion using Arrow with __arrow_array__ columns [SPARK-31920][PYTHON][WIP] Fix pandas conversion using Arrow with __arrow_array__ columns Jun 8, 2020
@moskvax moskvax marked this pull request as draft June 8, 2020 13:21
* Use infer_type over Schema.from_pandas for arrow type inference, as it can better handle extension types and pd.NA values
* Call __arrow_array__ directly if it is present to exit create_array early in _create_batch
* Add pandas version checks where required for tests
* Add tests covering pd.NA and BooleanDtype conversion
@moskvax moskvax marked this pull request as ready for review June 8, 2020 16:03
@moskvax moskvax changed the title [SPARK-31920][PYTHON][WIP] Fix pandas conversion using Arrow with __arrow_array__ columns [SPARK-31920][PYTHON] Fix pandas conversion using Arrow with __arrow_array__ columns Jun 8, 2020
@SparkQA
Copy link

SparkQA commented Jun 8, 2020

Test build #123640 has finished for PR 28743 at commit e60e2d4.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 8, 2020

Test build #123641 has finished for PR 28743 at commit 4476771.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 8, 2020

Test build #123642 has finished for PR 28743 at commit 406347d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@moskvax
Copy link
Author

moskvax commented Jun 10, 2020

@HyukjinKwon @viirya Please review when you've got a moment. Thank you.

for name, field in zip(schema, arrow_schema):
struct.add(name, from_arrow_type(field.type), nullable=field.nullable)
for name, t in zip(schema, inferred_types):
struct.add(name, from_arrow_type(t), nullable=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we follow nullability anymore?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

infer_type only returns a type, not a field, which would supposedly have nullability information. But it appears that in the implementation of Schema.from_pandas (link), inferring nullability was not actually done and the default nullable=True would always be returned. So this change is just following the existing behaviour of Schema.from_pandas.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a comment here to explain it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, will update with a comment.

Alternatively, any(s.isna()) could be checked if we wanted to actively infer nullability here. This would change existing behavior as well as being inconsistent with the non-Arrow path, though, which similarly defaults to inferred types being nullable:

fields = [StructField(k, _infer_type(v), True) for k, v in items]

@HyukjinKwon
Copy link
Member

cc @BryanCutler FYI

Comment on lines 157 to 162
elif type(s.dtype) == pd.CategoricalDtype:
elif is_categorical_dtype(s.dtype):
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, this change was made as CategoricalDtype is only imported into the root pandas namespace after pandas 0.24.0, which was causing AttributeError when testing with earlier versions.

@SparkQA
Copy link

SparkQA commented Jun 10, 2020

Test build #123725 has finished for PR 28743 at commit 403f579.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -394,10 +394,11 @@ def _create_from_pandas_with_arrow(self, pdf, schema, timezone):

# Create the Spark schema from list of names passed in with Arrow types
if isinstance(schema, (list, tuple)):
arrow_schema = pa.Schema.from_pandas(pdf, preserve_index=False)
inferred_types = [pa.infer_type(s, mask=s.isna(), from_pandas=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So without this change, pa.Schema.from_pandas cannot handle pandas extension types and pd.NA values?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pyarrow < 0.17.0 cannot handle either (ARROW-8159). pyarrow 0.17.x works as long as the columns that contain pd.NA values are not object-dtyped, which is the case by default as of pandas 1.0.4 (cf pandas-dev/pandas#32931). pa.infer_type can take a mask and thus avoids trying to infer the type of pd.NA values, which is what causes pa.Schema.from_pandas to fail here.

pa.Schema.from_pandas returns different types from pa.infer_type in two cases:

  1. Categorical arrays
    • pa.Schema.from_pandas returns a DictionaryType
    • pa.infer_type returns the value_type of the DictionaryType, which is what is already used to determine the Spark type of the resulting column
  2. __arrow_array__-implementing arrays which return a specialised Arrow type (IntervalArray, PeriodArray)
    • pa.Schema.from_pandas returns the type of the array returned by __arrow_array__
    • pa.infer_type does not check for __arrow_array__ and thus fails with these arrays, however these types cannot currently be converted to Spark types anyway

Neither of these cases cause regressions, which is why I propose replacing pa.Schema.from_pandas with pa.infer_type here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the second case above, so pa.Schema.from_pandas returns correct types in the case? When pa.infer_type infers the specified array types, will it just throw error or return a wrong array type?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pa.Schema.from_pandas will return a type that is a subclass of pa.ExtensionType. From that instance, there is a storage_type that is defined, which could then be checked as a Spark supported type. Assuming the Pandas extension array implemented __arrow_array__, which is recommended, see https://arrow.apache.org/docs/python/extending_types.html#controlling-conversion-to-pyarrow-array-with-the-arrow-array-protocol.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the second case above, so pa.Schema.from_pandas returns correct types in the case? When pa.infer_type infers the specified array types, will it just throw error or return a wrong array type?

pa.infer_type will throw an error for these arrays.

Comment on lines 166 to 167
mask = s.isnull()
# pass _ndarray_values to avoid potential failed type checks from pandas array types
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any test case for this?

Copy link
Author

@moskvax moskvax Jun 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a workaround for IntegerArray in pre-1.0.0 pandas, which did not yet implement __arrow_array__, so pyarrow expects it to be a NumPy array:

>>> import pandas as pd
>>> import pyarrow as pa
>>> print(pd.__version__, pa.__version__)
0.25.0 0.17.1
>>> s = pd.Series(range(3), dtype=pd.Int64Dtype())
>>> pa.Array.from_pandas(s)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "pyarrow/array.pxi", line 805, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 265, in pyarrow.lib.array
  File "pyarrow/types.pxi", line 76, in pyarrow.lib._datatype_to_pep3118
  File "pyarrow/array.pxi", line 64, in pyarrow.lib._ndarray_to_type
  File "pyarrow/error.pxi", line 108, in pyarrow.lib.check_status
pyarrow.lib.ArrowTypeError: Did not pass numpy.dtype object
>>> pa.Array.from_pandas(s, type=pa.int64())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "pyarrow/array.pxi", line 805, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 265, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 80, in pyarrow.lib._ndarray_to_array
  File "pyarrow/error.pxi", line 85, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Input object was not a NumPy array
>>> pa.Array.from_pandas(s._ndarray_values, type=pa.int64())
<pyarrow.lib.Int64Array object at 0x7fb88007a980>
[
  0,
  1,
  2
]
>>>

I'll update the comment to mention this.

@SparkQA
Copy link

SparkQA commented Jun 10, 2020

Test build #123762 has finished for PR 28743 at commit 07d7f2a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @moskvax , adding support for extension types would be great! I'm not sure using pa.infer_type is the way to go though, I think it's better to handle these cases explicitly by getting the pa.ExtensionType from pa.Schema.from_pandas and then extracting the storage_type from there. Would that be possible?

@@ -394,10 +394,11 @@ def _create_from_pandas_with_arrow(self, pdf, schema, timezone):

# Create the Spark schema from list of names passed in with Arrow types
if isinstance(schema, (list, tuple)):
arrow_schema = pa.Schema.from_pandas(pdf, preserve_index=False)
inferred_types = [pa.infer_type(s, mask=s.isna(), from_pandas=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pa.Schema.from_pandas will return a type that is a subclass of pa.ExtensionType. From that instance, there is a storage_type that is defined, which could then be checked as a Spark supported type. Assuming the Pandas extension array implemented __arrow_array__, which is recommended, see https://arrow.apache.org/docs/python/extending_types.html#controlling-conversion-to-pyarrow-array-with-the-arrow-array-protocol.

@moskvax
Copy link
Author

moskvax commented Jun 11, 2020

Thanks @moskvax , adding support for extension types would be great! I'm not sure using pa.infer_type is the way to go though, I think it's better to handle these cases explicitly by getting the pa.ExtensionType from pa.Schema.from_pandas and then extracting the storage_type from there. Would that be possible?

The goal of this PR was to allow conversion for __arrow_array__-implementing arrays of ExtensionDtype values where the underlying type can be directly converted to primitive Arrow and Spark types, so I wasn't focusing on this case at first, but I've looked into it today following the approach you described.

The storage_type of the pa.ExtensionType of PeriodArray is int64, which can be converted to a Spark column using the PeriodArray's _ndarray_values. However, without the PeriodDtype.freq, the period information cannot be reconstructed and the result in Spark is an arbitrary-looking sequence of integers:

>>> periods = pd.period_range('2020-01-01', freq='M', periods=6)
>>> pdf = pd.DataFrame({'A': pd.Series(periods)})
>>> pdf
         A
0  2020-01
1  2020-02
2  2020-03
3  2020-04
4  2020-05
5  2020-06
>>> pdf.dtypes
A    period[M]
dtype: object
>>> df = spark.createDataFrame(pdf)
>>> df.show()
+---+
|  A|
+---+
|600|
|601|
|602|
|603|
|604|
|605|
+---+

>>> df.schema
StructType(List(StructField(A,LongType,true)))

IntervalArray has an Arrow extension type with a storage_type of StructType(struct<left: timestamp[ns], right: timestamp[ns]>), which could be converted to a Spark StructType column if StructType conversion were supported by the Arrow conversion path, however the closed information would still be missing using this schema.

So, in the cases where it is possible to convert using the storage_type, I think there should be a warning that the results may be unexpected as any type metadata that may be required to meaningfully interpret the type values is being discarded. Additionally, the round-trip back to pandas won't be possible for these types.

As for pa.Schema.from_pandas, it's most useful over pa.infer_type for the purposes of Spark conversion when the array it is processing implements __arrow_array__ and thus can immediately and unambiguously return its own Arrow type. I've updated the PR to firstly try using __arrow_array__ to determine a type, then falling back on pa.infer_type. What do you think of this approach?

@SparkQA
Copy link

SparkQA commented Jun 11, 2020

Test build #123852 has finished for PR 28743 at commit 01fb6a4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait TimestampFormatterHelper extends TimeZoneAwareExpression
  • case class ProcessingTimeTrigger(intervalMs: Long) extends Trigger
  • case class ContinuousTrigger(intervalMs: Long) extends Trigger

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Oct 29, 2020
@github-actions github-actions bot closed this Oct 30, 2020
@Pverheijen
Copy link

Can this be pulled?

@careyhay
Copy link

Any way this can be revived and pulled?!

@howardcornwell
Copy link

Started hitting this issue today. Can this be reviewed and pulled?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants