Skip to content

feat: Support Parquet writer options #1123

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions python/datafusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from . import functions, object_store, substrait, unparser

# The following imports are okay to remain as opaque to the user.
from ._internal import Config
from ._internal import Config, ParquetWriterOptions
from .catalog import Catalog, Database, Table
from .common import (
DFSchema,
Expand All @@ -42,7 +42,7 @@
SessionContext,
SQLOptions,
)
from .dataframe import DataFrame
from .dataframe import DataFrame, ParquetColumnOptions
from .expr import (
Expr,
WindowFrame,
Expand All @@ -66,6 +66,8 @@
"ExecutionPlan",
"Expr",
"LogicalPlan",
"ParquetColumnOptions",
"ParquetWriterOptions",
"RecordBatch",
"RecordBatchStream",
"RuntimeEnvBuilder",
Expand Down
247 changes: 167 additions & 80 deletions python/datafusion/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
Iterable,
Literal,
Optional,
Union,
overload,
)

Expand All @@ -51,67 +50,58 @@
from datafusion._internal import DataFrame as DataFrameInternal
from datafusion._internal import expr as expr_internal

from enum import Enum

from datafusion._internal import ParquetColumnOptions as ParquetColumnOptionsInternal
from datafusion._internal import ParquetWriterOptions as ParquetWriterOptionsInternal
from datafusion.expr import Expr, SortExpr, sort_or_default


# excerpt from deltalake
# https://github.com/apache/datafusion-python/pull/981#discussion_r1905619163
class Compression(Enum):
"""Enum representing the available compression types for Parquet files."""

UNCOMPRESSED = "uncompressed"
SNAPPY = "snappy"
GZIP = "gzip"
BROTLI = "brotli"
LZ4 = "lz4"
# lzo is not implemented yet
# https://github.com/apache/arrow-rs/issues/6970
# LZO = "lzo"
ZSTD = "zstd"
LZ4_RAW = "lz4_raw"

@classmethod
def from_str(cls: type[Compression], value: str) -> Compression:
"""Convert a string to a Compression enum value.

Args:
value: The string representation of the compression type.

Returns:
The Compression enum lowercase value.

Raises:
ValueError: If the string does not match any Compression enum value.
"""
try:
return cls(value.lower())
except ValueError as err:
valid_values = str([item.value for item in Compression])
error_msg = f"""
{value} is not a valid Compression.
Valid values are: {valid_values}
"""
raise ValueError(error_msg) from err

def get_default_level(self) -> Optional[int]:
"""Get the default compression level for the compression type.
class ParquetColumnOptions:
"""Parquet options for individual columns.

Contains the available options that can be applied for an individual Parquet column,
replacing the provided options in the `write_parquet`.

Attributes:
encoding: Sets encoding for the column path. Valid values are: `plain`,
`plain_dictionary`, `rle`, `bit_packed`, `delta_binary_packed`,
`delta_length_byte_array`, `delta_byte_array`, `rle_dictionary`, and
`byte_stream_split`. These values are not case-sensitive. If `None`, uses
the default parquet options
dictionary_enabled: Sets if dictionary encoding is enabled for the column path.
If `None`, uses the default parquet options
compression: Sets default parquet compression codec for the column path. Valid
values are `uncompressed`, `snappy`, `gzip(level)`, `lzo`, `brotli(level)`,
`lz4`, `zstd(level)`, and `lz4_raw`. These values are not case-sensitive. If
`None`, uses the default parquet options.
statistics_enabled: Sets if statistics are enabled for the column Valid values
are: `none`, `chunk`, and `page` These values are not case sensitive. If
`None`, uses the default parquet options.
bloom_filter_enabled: Sets if bloom filter is enabled for the column path. If
`None`, uses the default parquet options.
bloom_filter_fpp: Sets bloom filter false positive probability for the column
path. If `None`, uses the default parquet options.
bloom_filter_ndv: Sets bloom filter number of distinct values. If `None`, uses
the default parquet options.
"""

Returns:
The default compression level for the compression type.
"""
# GZIP, BROTLI default values from deltalake repo
# https://github.com/apache/datafusion-python/pull/981#discussion_r1905619163
# ZSTD default value from delta-rs
# https://github.com/apache/datafusion-python/pull/981#discussion_r1904789223
if self == Compression.GZIP:
return 6
if self == Compression.BROTLI:
return 1
if self == Compression.ZSTD:
return 4
return None
def __init__(
self,
encoding: Optional[str] = None,
dictionary_enabled: Optional[bool] = None,
compression: Optional[str] = None,
statistics_enabled: Optional[str] = None,
bloom_filter_enabled: Optional[bool] = None,
bloom_filter_fpp: Optional[float] = None,
bloom_filter_ndv: Optional[int] = None,
) -> None:
"""Initialize the ParquetColumnOptions."""
self.encoding = encoding
self.dictionary_enabled = dictionary_enabled
self.compression = compression
self.statistics_enabled = statistics_enabled
self.bloom_filter_enabled = bloom_filter_enabled
self.bloom_filter_fpp = bloom_filter_fpp
self.bloom_filter_ndv = bloom_filter_ndv


class DataFrame:
Expand Down Expand Up @@ -704,38 +694,135 @@ def write_csv(self, path: str | pathlib.Path, with_header: bool = False) -> None
def write_parquet(
self,
path: str | pathlib.Path,
compression: Union[str, Compression] = Compression.ZSTD,
compression_level: int | None = None,
data_pagesize_limit: int = 1024 * 1024,
write_batch_size: int = 1024,
writer_version: str = "1.0",
skip_arrow_metadata: bool = False,
compression: Optional[str] = "zstd(3)",
dictionary_enabled: Optional[bool] = True,
dictionary_page_size_limit: int = 1024 * 1024,
statistics_enabled: Optional[str] = "page",
max_row_group_size: int = 1024 * 1024,
created_by: str = "datafusion-python",
column_index_truncate_length: Optional[int] = 64,
statistics_truncate_length: Optional[int] = None,
data_page_row_count_limit: int = 20_000,
encoding: Optional[str] = None,
bloom_filter_on_write: bool = False,
bloom_filter_fpp: Optional[float] = None,
bloom_filter_ndv: Optional[int] = None,
allow_single_file_parallelism: bool = True,
maximum_parallel_row_group_writers: int = 1,
maximum_buffered_record_batches_per_stream: int = 2,
column_specific_options: Optional[dict[str, ParquetColumnOptions]] = None,
) -> None:
"""Execute the :py:class:`DataFrame` and write the results to a Parquet file.

Args:
path: Path of the Parquet file to write.
compression: Compression type to use. Default is "ZSTD".
Available compression types are:
data_pagesize_limit: Sets best effort maximum size of data page in bytes.
write_batch_size: Sets write_batch_size in bytes.
writer_version: Sets parquet writer version. Valid values are `1.0` and
`2.0`.
skip_arrow_metadata: Skip encoding the embedded arrow metadata in the
KV_meta.
compression: Compression type to use. Default is "zstd(3)".
Available compression types are
- "uncompressed": No compression.
- "snappy": Snappy compression.
- "gzip": Gzip compression.
- "brotli": Brotli compression.
- "gzip(n)": Gzip compression with level n.
- "brotli(n)": Brotli compression with level n.
- "lz4": LZ4 compression.
- "lz4_raw": LZ4_RAW compression.
- "zstd": Zstandard compression.
Note: LZO is not yet implemented in arrow-rs and is therefore excluded.
compression_level: Compression level to use. For ZSTD, the
recommended range is 1 to 22, with the default being 4. Higher levels
provide better compression but slower speed.
"""
# Convert string to Compression enum if necessary
if isinstance(compression, str):
compression = Compression.from_str(compression)

if (
compression in {Compression.GZIP, Compression.BROTLI, Compression.ZSTD}
and compression_level is None
):
compression_level = compression.get_default_level()
- "zstd(n)": Zstandard compression with level n.
dictionary_enabled: Sets if dictionary encoding is enabled. If None, uses
the default parquet writer setting.
dictionary_page_size_limit: Sets best effort maximum dictionary page size,
in bytes.
statistics_enabled: Sets if statistics are enabled for any column Valid
values are `none`, `chunk`, and `page`. If None, uses the default
parquet writer setting.
max_row_group_size: Target maximum number of rows in each row group
(defaults to 1M rows). Writing larger row groups requires more memory to
write, but can get better compression and be faster to read.
created_by: Sets "created by" property.
column_index_truncate_length: Sets column index truncate length.
statistics_truncate_length: Sets statistics truncate length. If None, uses
the default parquet writer setting.
data_page_row_count_limit: Sets best effort maximum number of rows in a data
page.
encoding: Sets default encoding for any column. Valid values are `plain`,
`plain_dictionary`, `rle`, `bit_packed`, `delta_binary_packed`,
`delta_length_byte_array`, `delta_byte_array`, `rle_dictionary`, and
`byte_stream_split`. If None, uses the default parquet writer setting.
bloom_filter_on_write: Write bloom filters for all columns when creating
parquet files.
bloom_filter_fpp: Sets bloom filter false positive probability. If None,
uses the default parquet writer setting
bloom_filter_ndv: Sets bloom filter number of distinct values. If None, uses
the default parquet writer setting.
allow_single_file_parallelism: Controls whether DataFusion will attempt to
speed up writing parquet files by serializing them in parallel. Each
column in each row group in each output file are serialized in parallel
leveraging a maximum possible core count of n_files * n_row_groups *
n_columns.
maximum_parallel_row_group_writers: By default parallel parquet writer is
tuned for minimum memory usage in a streaming execution plan. You may
see a performance benefit when writing large parquet files by increasing
`maximum_parallel_row_group_writers` and
`maximum_buffered_record_batches_per_stream` if your system has idle
cores and can tolerate additional memory usage. Boosting these values is
likely worthwhile when writing out already in-memory data, such as from
a cached data frame.
maximum_buffered_record_batches_per_stream: See
`maximum_parallel_row_group_writers`.
column_specific_options: Overrides options for specific columns. If a column
is not a part of this dictionary, it will use the parameters provided in
the `write_parquet`.
"""
options_internal = ParquetWriterOptionsInternal(
data_pagesize_limit,
write_batch_size,
writer_version,
skip_arrow_metadata,
compression,
dictionary_enabled,
dictionary_page_size_limit,
statistics_enabled,
max_row_group_size,
created_by,
column_index_truncate_length,
statistics_truncate_length,
data_page_row_count_limit,
encoding,
bloom_filter_on_write,
bloom_filter_fpp,
bloom_filter_ndv,
allow_single_file_parallelism,
maximum_parallel_row_group_writers,
maximum_buffered_record_batches_per_stream,
)

if column_specific_options is None:
column_specific_options = {}

column_specific_options_internal = {}
for column, opts in column_specific_options.items():
column_specific_options_internal[column] = ParquetColumnOptionsInternal(
bloom_filter_enabled=opts.bloom_filter_enabled,
encoding=opts.encoding,
dictionary_enabled=opts.dictionary_enabled,
compression=opts.compression,
statistics_enabled=opts.statistics_enabled,
bloom_filter_fpp=opts.bloom_filter_fpp,
bloom_filter_ndv=opts.bloom_filter_ndv,
)

self.df.write_parquet(str(path), compression.value, compression_level)
self.df.write_parquet(
str(path),
options_internal,
column_specific_options_internal,
)

def write_json(self, path: str | pathlib.Path) -> None:
"""Execute the :py:class:`DataFrame` and write the results to a JSON file.
Expand Down
Loading