Запись в Postgres с использованием DBWriter
Для записи данных в Postgres используйте DBWriter.
Warning
Пожалуйста, учитывайте типы данных Postgres
Warning
Всегда рекомендуется создавать таблицу явно с помощью Postgres.execute вместо того, чтобы полагаться на автоматическую генерацию DDL в Spark.
Это связано с тем, что генератор DDL в Spark может создавать столбцы с другой точностью и типами, чем ожидается, что приводит к потере точности или другим проблемам.
Примеры
```python
from onetl.connection import Postgres
from onetl.db import DBWriter
postgres = Postgres(...)
df = ... # здесь данные
writer = DBWriter(
connection=postgres,
target="schema.table",
options=Postgres.WriteOptions(if_exists="append"),
)
writer.run(df)
```
Опции
Метод выше принимает Postgres.WriteOptions
onetl.connection.db_connection.postgres.options.PostgresWriteOptions
Bases: JDBCWriteOptions
Source code in onetl/connection/db_connection/postgres/options.py
19 20 | |
batchsize = 20000
class-attribute
instance-attribute
How many rows can be inserted per round trip.
Tuning this option can influence performance of writing.
.. warning::
Default value is different from Spark.
Spark uses quite small value ``1000``, which is absolutely not usable
in BigData world.
Thus we've overridden default value with ``20_000``,
which should increase writing performance.
You can increase it even more, up to ``50_000``,
but it depends on your database load and number of columns in the row.
Higher values does not increase performance.
.. versionchanged:: 0.4.0 Changed default value from 1000 to 20_000
if_exists = Field(default=(JDBCTableExistBehavior.APPEND), alias=(avoid_alias('mode')))
class-attribute
instance-attribute
Behavior of writing data into existing table.
Possible values
-
append(default) Adds new rows into existing table... dropdown:: Behavior in details
* Table does not exist Table is created using options provided by user (``createTableOptions``, ``createTableColumnTypes``, etc). * Table exists Data is appended to a table. Table has the same DDL as before writing data .. warning:: This mode does not check whether table already contains rows from dataframe, so duplicated rows can be created. Also Spark does not support passing custom options to insert statement, like ``ON CONFLICT``, so don't try to implement deduplication using unique indexes or constraints. Instead, write to staging table and perform deduplication using :obj:`~execute` method. -
replace_entire_tableTable is dropped and then created, or truncated... dropdown:: Behavior in details
* Table does not exist Table is created using options provided by user (``createTableOptions``, ``createTableColumnTypes``, etc). * Table exists Table content is replaced with dataframe content. After writing completed, target table could either have the same DDL as before writing data (``truncate=True``), or can be recreated (``truncate=False`` or source does not support truncation). -
ignoreIgnores the write operation if the table already exists... dropdown:: Behavior in details
* Table does not exist Table is created using options provided by user (``createTableOptions``, ``createTableColumnTypes``, etc). * Table exists The write operation is ignored, and no data is written to the table. -
errorRaises an error if the table already exists... dropdown:: Behavior in details
* Table does not exist Table is created using options provided by user (``createTableOptions``, ``createTableColumnTypes``, etc). * Table exists An error is raised, and no data is written to the table.
.. versionchanged:: 0.9.0
Renamed mode → if_exists
isolation_level = Field(default='READ_UNCOMMITTED', alias='isolationLevel')
class-attribute
instance-attribute
The transaction isolation level, which applies to current connection.
Possible values
NONE(as string, not Python'sNone)READ_COMMITTEDREAD_UNCOMMITTEDREPEATABLE_READSERIALIZABLE
Values correspond to transaction isolation levels defined by JDBC standard.
Please refer the documentation for
java.sql.Connection <https://docs.oracle.com/javase/8/docs/api/java/sql/Connection.html>_.
query_timeout = Field(default=None, alias='queryTimeout')
class-attribute
instance-attribute
The number of seconds the driver will wait for a statement to execute. Zero means there is no limit.
This option depends on driver implementation, some drivers can check the timeout of each query instead of an entire JDBC batch.
parse(options)
classmethod
If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.
Otherwise, an exception will be raised
Source code in onetl/impl/generic_options.py
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | |