Skip to content

Parquet

Bases: ReadWriteFileFormat

Parquet file format (columnar). support hooks

Based on Spark Parquet Files file format.

Supports reading/writing files with .parquet extension.

Added in 0.9.0

Examples

Note

You can pass any option mentioned in official documentation. Option names should be in camelCase!

The set of supported options depends on Spark version.

You may also set options mentioned parquet-hadoop documentation. They are prefixed with parquet. with dots in names, so instead of calling constructor Parquet(parquet.option=True) (invalid in Python) you should call method Parquet.parse({"parquet.option": True}).

from onetl.file.format import Parquet

parquet = Parquet(mergeSchema=True)
from onetl.file.format import Parquet

parquet = Parquet.parse(
    {
        "compression": "snappy",
        # Enable Bloom filter for columns 'id' and 'name'
        "parquet.bloom.filter.enabled#id": True,
        "parquet.bloom.filter.enabled#name": True,
        # Set expected number of distinct values for column 'id'
        "parquet.bloom.filter.expected.ndv#id": 10_000_000,
        # other options
    }
)
Source code in onetl/file/format/parquet.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
@support_hooks
class Parquet(ReadWriteFileFormat):
    """
    Parquet file format (columnar). [![support hooks](https://img.shields.io/badge/%20-support%20hooks-blue)](/hooks/)

    Based on [Spark Parquet Files](https://spark.apache.org/docs/latest/sql-data-sources-parquet.html) file format.

    Supports reading/writing files with `.parquet` extension.

    !!! success "Added in 0.9.0"

    Examples
    --------

    !!! note

        You can pass any option mentioned in
        [official documentation](https://spark.apache.org/docs/latest/sql-data-sources-parquet.html).
        **Option names should be in** `camelCase`!

        The set of supported options depends on Spark version.

        You may also set options mentioned
        [parquet-hadoop documentation](https://github.com/apache/parquet-java/blob/master/parquet-hadoop/README.md).
        They are prefixed with `parquet.` with dots in names,
        so instead of calling constructor `Parquet(parquet.option=True)` (invalid in Python)
        you should call method `Parquet.parse({"parquet.option": True})`.

    === "Reading files"
        ```python
        from onetl.file.format import Parquet

        parquet = Parquet(mergeSchema=True)
        ```
    === "Writing files"
        ```python
        from onetl.file.format import Parquet

        parquet = Parquet.parse(
            {
                "compression": "snappy",
                # Enable Bloom filter for columns 'id' and 'name'
                "parquet.bloom.filter.enabled#id": True,
                "parquet.bloom.filter.enabled#name": True,
                # Set expected number of distinct values for column 'id'
                "parquet.bloom.filter.expected.ndv#id": 10_000_000,
                # other options
            }
        )
        ```
    """

    name: ClassVar[str] = "parquet"

    mergeSchema: Optional[bool] = None
    """
    Merge schemas of all Parquet files being read into a single schema.
    By default, Spark config option `spark.sql.parquet.mergeSchema` value is used (`false`).

    !!! note

        Used only for reading files.
    """

    compression: Union[
        str,
        Literal["uncompressed", "snappy", "gzip", "lzo", "brotli", "lz4", "lz4raw", "zstd"],
        None,
    ] = None
    """
    Compression codec of the Parquet files.
    By default, Spark config option `spark.sql.parquet.compression.codec` value is used (`snappy`).

    !!! note

        Used only for writing files.
    """

    class Config:
        known_options = PARQUET_LIBRARY_OPTIONS
        prohibited_options = PROHIBITED_OPTIONS
        extra = "allow"

    @slot
    def check_if_supported(self, spark: SparkSession) -> None:
        # always available
        pass

    def __repr__(self):
        options_dict = self.dict(by_alias=True, exclude_none=True)
        options_dict = dict(sorted(options_dict.items()))
        if any("." in field for field in options_dict):
            return f"{self.__class__.__name__}.parse({options_dict})"

        options_kwargs = ", ".join(f"{k}={v!r}" for k, v in options_dict.items())
        return f"{self.__class__.__name__}({options_kwargs})"

mergeSchema = None class-attribute instance-attribute

Merge schemas of all Parquet files being read into a single schema. By default, Spark config option spark.sql.parquet.mergeSchema value is used (false).

Note

Used only for reading files.

compression = None class-attribute instance-attribute

Compression codec of the Parquet files. By default, Spark config option spark.sql.parquet.compression.codec value is used (snappy).

Note

Used only for writing files.