Skip to content

Запись в Iceberg с помощью DBWriter

Для записи данных в Iceberg используйте [DBWriter <onetl.db.db_writer.db_writer.DBWriter>][DB-onetl-db-writer].

Примеры

from onetl.connection import Iceberg
from onetl.db import DBWriter

iceberg = Iceberg(catalog_name="my_catalog", ...)

df = ...  # data is here

writer = DBWriter(
    connection=iceberg,
    target="my_schema.my_table",  # catalog name is already defined in connection
    options=Iceberg.WriteOptions(
        if_exists="append",
    ),
)

writer.run(df)

Опции

Bases: GenericOptions

Iceberg source writing options.

Source code in onetl/connection/db_connection/iceberg/options.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
class IcebergWriteOptions(GenericOptions):
    """Iceberg source writing options."""

    class Config:
        extra = "allow"
        known_options: frozenset = frozenset()
        prohibited_options = PROHIBITED_OPTIONS

    if_exists: IcebergTableExistBehavior = Field(  # type: ignore[literal-required]
        default=IcebergTableExistBehavior.APPEND,
        alias=avoid_alias("mode"),
    )
    """Behavior of writing data into existing table.

    Possible values:
        * `append` (default)
            Appends data into existing table, or create table if it does not exist.

            Same as Spark's `df.writeTo(table).using("iceberg").append()`.

            ??? note "Behavior in details"

                * Table does not exist
                    Table is created.

                * Table exists and not partitioned
                    Data is appended to a table. Table DDL (including partition spec) is unchanged.

                * Table exists and partitioned
                    If a partition is present only in dataframe
                        Partition is created.
                    If a partition is present in both dataframe and table
                        Data is appended to existing partition.

                    !!! warning

                        This mode does not check whether table already contains
                        rows from dataframe, so duplicated rows can be created.

                        To implement deduplication, write data to staging table first,
                        and then perform some deduplication logic using [sql][].

                * Table exists and partitioned, but some partitions are present only in table, not dataframe
                    Existing partitions are left intact.

        * `replace_overlapping_partitions`
            Overwrites data in the existing partitions, or create table if it does not exist.

            Same as Spark's `df.writeTo(table).using("iceberg").overwritePartitions()`

            !!! danger

                This mode does make sense **ONLY** if the table is partitioned.
                **IF NOT, YOU'LL LOSE YOUR DATA!**

            ??? note "Behavior in details"

                * Table does not exist
                    Table is created.

                * Table exists and not partitioned
                    Data is **overwritten in all the table**. Table DDL (including partition spec) is unchanged.

                * Table exists and partitioned
                    If a partition is present only in dataframe
                        Partition is created.
                    If a partition is present in both dataframe and table
                        Existing partition **replaced** with data from dataframe.
                    If a partition is present only in table, not dataframe
                        Existing partition is left intact.

        * `replace_entire_table`
            **Recreates table** (via `DROP + CREATE`), **deleting all existing data**.
            **All existing partitions are dropped.**

            Same as Spark's `df.writeTo(table).createOrReplace()`

            !!! warning

                Table is recreated
                **instead of using original table options**. Be careful

        * `ignore`
            Ignores the write operation if the table already exists.

            ??? note "Behavior in details"

                * Table does not exist
                    Table is created.

                * Table exists
                    If the table exists, **no further action is taken**.

        * `error`
            Raises an error if the table already exists.

            ??? note "Behavior in details"

                * Table does not exist
                    Table is created.

                * Table exists
                    If the table exists, **raises an error**.
    """

    table_properties: Dict[str, Any] = Field(default_factory=dict)
    """TBLPROPERTIES to add to freshly created table.

    Examples: `{"location": "/path"}`

    !!! warning

        Used **only** while **creating new table**, or in case of `if_exists=replace_entire_table`
    """

if_exists = Field(default=(IcebergTableExistBehavior.APPEND), alias=(avoid_alias('mode'))) class-attribute instance-attribute

Behavior of writing data into existing table.

Possible values
  • append (default) Appends data into existing table, or create table if it does not exist.

    Same as Spark's df.writeTo(table).using("iceberg").append().

    Behavior in details
    • Table does not exist Table is created.

    • Table exists and not partitioned Data is appended to a table. Table DDL (including partition spec) is unchanged.

    • Table exists and partitioned If a partition is present only in dataframe Partition is created. If a partition is present in both dataframe and table Data is appended to existing partition.

      Warning

      This mode does not check whether table already contains rows from dataframe, so duplicated rows can be created.

      To implement deduplication, write data to staging table first, and then perform some deduplication logic using sql.

    • Table exists and partitioned, but some partitions are present only in table, not dataframe Existing partitions are left intact.

  • replace_overlapping_partitions Overwrites data in the existing partitions, or create table if it does not exist.

    Same as Spark's df.writeTo(table).using("iceberg").overwritePartitions()

    Danger

    This mode does make sense ONLY if the table is partitioned. IF NOT, YOU'LL LOSE YOUR DATA!

    Behavior in details
    • Table does not exist Table is created.

    • Table exists and not partitioned Data is overwritten in all the table. Table DDL (including partition spec) is unchanged.

    • Table exists and partitioned If a partition is present only in dataframe Partition is created. If a partition is present in both dataframe and table Existing partition replaced with data from dataframe. If a partition is present only in table, not dataframe Existing partition is left intact.

  • replace_entire_table Recreates table (via DROP + CREATE), deleting all existing data. All existing partitions are dropped.

    Same as Spark's df.writeTo(table).createOrReplace()

    Warning

    Table is recreated instead of using original table options. Be careful

  • ignore Ignores the write operation if the table already exists.

    Behavior in details
    • Table does not exist Table is created.

    • Table exists If the table exists, no further action is taken.

  • error Raises an error if the table already exists.

    Behavior in details
    • Table does not exist Table is created.

    • Table exists If the table exists, raises an error.

table_properties = Field(default_factory=dict) class-attribute instance-attribute

TBLPROPERTIES to add to freshly created table.

Examples: {"location": "/path"}

Warning

Used only while creating new table, or in case of if_exists=replace_entire_table