Skip to content

Файловая система Хранилища

Bases: IcebergWarehouse, FrozenModel

Iceberg Filesystem Warehouse.

Added in 0.15.0

Note

This warehouse uses FileDFConnection classes to access data at the warehouse location. It relies on Spark's filesystem configuration and behavior.

Parameters

connection : SparkFileDFConnection File connection for data storage

str

Warehouse path

Examples

from onetl.connection import Iceberg, SparkLocalFS

local_fs_connection = SparkLocalFS(spark=spark)

warehouse = Iceberg.FilesystemWarehouse(
    connection=local_fs_connection,
    path="/warehouse/path",
)
from onetl.connection import Iceberg, SparkHDFS

hdfs_connection = SparkHDFS(
    host="namenode",
    cluster="my-cluster",
    spark=spark,
)

warehouse = Iceberg.FilesystemWarehouse(
    connection=hdfs_connection,
    path="/warehouse/path",
)
from onetl.connection import Iceberg, SparkS3

s3_connection = SparkS3(
    host="s3.domain.com",
    protocol="http",
    bucket="my-bucket",
    access_key="access_key",
    secret_key="secret_key",
    path_style_access=True,
    region="us-east-1",
    spark=spark,
)

warehouse = Iceberg.FilesystemWarehouse(
    connection=s3_connection,
    path="/warehouse/path"
)
Source code in onetl/connection/db_connection/iceberg/warehouse/filesystem.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
@support_hooks
class IcebergFilesystemWarehouse(IcebergWarehouse, FrozenModel):
    """Iceberg Filesystem Warehouse.

    !!! success "Added in 0.15.0"

    !!! note

        This warehouse uses **FileDFConnection** classes to access data at the warehouse location.
        It relies on **Spark's filesystem configuration and behavior**.

    Parameters
    ----------
    connection : SparkFileDFConnection
        File connection for data storage

    path : str
        Warehouse path

    Examples
    --------

    === "Local filesystem"
        ```python
        from onetl.connection import Iceberg, SparkLocalFS

        local_fs_connection = SparkLocalFS(spark=spark)

        warehouse = Iceberg.FilesystemWarehouse(
            connection=local_fs_connection,
            path="/warehouse/path",
        )
        ```
    === "HDFS"
        ```python
        from onetl.connection import Iceberg, SparkHDFS

        hdfs_connection = SparkHDFS(
            host="namenode",
            cluster="my-cluster",
            spark=spark,
        )

        warehouse = Iceberg.FilesystemWarehouse(
            connection=hdfs_connection,
            path="/warehouse/path",
        )
        ```
    === "S3"
        ```python
        from onetl.connection import Iceberg, SparkS3

        s3_connection = SparkS3(
            host="s3.domain.com",
            protocol="http",
            bucket="my-bucket",
            access_key="access_key",
            secret_key="secret_key",
            path_style_access=True,
            region="us-east-1",
            spark=spark,
        )

        warehouse = Iceberg.FilesystemWarehouse(
            connection=s3_connection,
            path="/warehouse/path"
        )
        ```
    """

    connection: SparkFileDFConnection
    path: PurePathProtocol

    @slot
    def get_config(self) -> dict[str, str]:
        config = {
            "warehouse": self.connection._convert_to_url(self.path),  # noqa: SLF001
            "io-impl": "org.apache.iceberg.hadoop.HadoopFileIO",
        }
        if isinstance(self.connection, SparkS3):
            prefix = self.connection._get_hadoop_config_prefix()  # noqa: SLF001
            hadoop_config = {
                "hadoop." + k: v
                for k, v in self.connection._get_expected_hadoop_config(prefix).items()  # noqa: SLF001
            }
            config.update(hadoop_config)

        return config

    @validator("path", pre=True)
    def _validate_path(cls, path, values):
        connection = values.get("connection")
        if isinstance(connection, SparkFileDFConnection):
            return connection.path_from_string(path)
        return path