Skip to content

Parquet

Bases: ReadWriteFileFormat

Parquet file format (columnar). |support_hooks|

Based on Spark Parquet Files <https://spark.apache.org/docs/latest/sql-data-sources-parquet.html>_ file format.

Supports reading/writing files with .parquet extension.

.. versionadded:: 0.9.0

Examples

.. note ::

You can pass any option mentioned in
`official documentation <https://spark.apache.org/docs/latest/sql-data-sources-parquet.html>`_.
**Option names should be in** ``camelCase``!

The set of supported options depends on Spark version.

You may also set options mentioned `parquet-hadoop documentation <https://github.com/apache/parquet-java/blob/master/parquet-hadoop/README.md>`_.
They are prefixed with ``parquet.`` with dots in names, so instead of calling constructor ``Parquet(parquet.option=True)`` (invalid in Python)
you should call method ``Parquet.parse({"parquet.option": True})``.

.. tabs::

.. code-tab:: py Reading files

    from onetl.file.format import Parquet

    parquet = Parquet(mergeSchema=True)

.. code-tab:: py Writing files

    from onetl.file.format import Parquet

    parquet = Parquet.parse(
        {
            "compression": "snappy",
            # Enable Bloom filter for columns 'id' and 'name'
            "parquet.bloom.filter.enabled#id": True,
            "parquet.bloom.filter.enabled#name": True,
            # Set expected number of distinct values for column 'id'
            "parquet.bloom.filter.expected.ndv#id": 10_000_000,
            # other options
        }
    )
Source code in onetl/file/format/parquet.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
@support_hooks
class Parquet(ReadWriteFileFormat):
    """
    Parquet file format (columnar). |support_hooks|

    Based on `Spark Parquet Files <https://spark.apache.org/docs/latest/sql-data-sources-parquet.html>`_ file format.

    Supports reading/writing files with ``.parquet`` extension.

    .. versionadded:: 0.9.0

    Examples
    --------

    .. note ::

        You can pass any option mentioned in
        `official documentation <https://spark.apache.org/docs/latest/sql-data-sources-parquet.html>`_.
        **Option names should be in** ``camelCase``!

        The set of supported options depends on Spark version.

        You may also set options mentioned `parquet-hadoop documentation <https://github.com/apache/parquet-java/blob/master/parquet-hadoop/README.md>`_.
        They are prefixed with ``parquet.`` with dots in names, so instead of calling constructor ``Parquet(parquet.option=True)`` (invalid in Python)
        you should call method ``Parquet.parse({"parquet.option": True})``.

    .. tabs::

        .. code-tab:: py Reading files

            from onetl.file.format import Parquet

            parquet = Parquet(mergeSchema=True)

        .. code-tab:: py Writing files

            from onetl.file.format import Parquet

            parquet = Parquet.parse(
                {
                    "compression": "snappy",
                    # Enable Bloom filter for columns 'id' and 'name'
                    "parquet.bloom.filter.enabled#id": True,
                    "parquet.bloom.filter.enabled#name": True,
                    # Set expected number of distinct values for column 'id'
                    "parquet.bloom.filter.expected.ndv#id": 10_000_000,
                    # other options
                }
            )

    """

    name: ClassVar[str] = "parquet"

    mergeSchema: Optional[bool] = None
    """
    Merge schemas of all Parquet files being read into a single schema.
    By default, Spark config option ``spark.sql.parquet.mergeSchema`` value is used (``false``).

    .. note::

        Used only for reading files.
    """

    compression: Union[
        str,
        Literal["uncompressed", "snappy", "gzip", "lzo", "brotli", "lz4", "lz4raw", "zstd"],
        None,
    ] = None
    """
    Compression codec of the Parquet files.
    By default, Spark config option ``spark.sql.parquet.compression.codec`` value is used (``snappy``).

    .. note::

        Used only for writing files.
    """

    class Config:
        known_options = PARQUET_LIBRARY_OPTIONS
        prohibited_options = PROHIBITED_OPTIONS
        extra = "allow"

    @slot
    def check_if_supported(self, spark: SparkSession) -> None:
        # always available
        pass

    def __repr__(self):
        options_dict = self.dict(by_alias=True, exclude_none=True)
        options_dict = dict(sorted(options_dict.items()))
        if any("." in field for field in options_dict.keys()):
            return f"{self.__class__.__name__}.parse({options_dict})"

        options_kwargs = ", ".join(f"{k}={v!r}" for k, v in options_dict.items())
        return f"{self.__class__.__name__}({options_kwargs})"

mergeSchema = None class-attribute instance-attribute

Merge schemas of all Parquet files being read into a single schema. By default, Spark config option spark.sql.parquet.mergeSchema value is used (false).

.. note::

Used only for reading files.

compression = None class-attribute instance-attribute

Compression codec of the Parquet files. By default, Spark config option spark.sql.parquet.compression.codec value is used (snappy).

.. note::

Used only for writing files.