Чтение из Postgres с использованием DBReader
DBReader поддерживает стратегии для инкрементального чтения данных, но не поддерживает пользовательские запросы, такие как JOIN.
Warning
Пожалуйста, учитывайте типы данных Postgres
Поддерживаемые функции DBReader
- ✅︎
columns - ✅︎
where - ✅︎
hwm, поддерживаемые стратегии: - ✅︎ Snapshot
- ✅︎ Incremental
- ✅︎ Snapshot batch
- ✅︎ Incremental batch
- ❌
hint(не поддерживается Postgres) - ❌
df_schema - ✅︎
options(см. Postgres.ReadOptions)
Примеры
Стратегия Snapshot:
from onetl.connection import Postgres
from onetl.db import DBReader
postgres = Postgres(...)
reader = DBReader(
connection=postgres,
source="schema.table",
columns=["id", "key", "CAST(value AS text) value", "updated_dt"],
where="key = 'something'",
options=Postgres.ReadOptions(partitionColumn="id", numPartitions=10),
)
df = reader.run()
Стратегия Incremental:
from onetl.connection import Postgres
from onetl.db import DBReader
from onetl.strategy import IncrementalStrategy
postgres = Postgres(...)
reader = DBReader(
connection=postgres,
source="schema.table",
columns=["id", "key", "CAST(value AS text) value", "updated_dt"],
where="key = 'something'",
hwm=DBReader.AutoDetectHWM(name="postgres_hwm", expression="updated_dt"),
options=Postgres.ReadOptions(partitionColumn="id", numPartitions=10),
)
with IncrementalStrategy():
df = reader.run()
Рекомендации
Выбирайте только требуемые столбцы
Вместо передачи "*" в DBReader(columns=[...]) предпочтительнее передавать точные имена столбцов. Это уменьшает объем данных, передаваемых из Postgres в Spark.
Обратите внимание на значение where
Вместо фильтрации данных на стороне Spark с помощью df.filter(df.column == 'value') передавайте правильное условие DBReader(where="column = 'value'").
Это не только уменьшает объем данных, отправляемых из Postgres в Spark, но и может улучшить производительность запроса.
Особенно если существуют индексы или секции для столбцов, используемых в условии where.
Опции
onetl.connection.db_connection.postgres.options.PostgresReadOptions
Bases: JDBCReadOptions
Source code in onetl/connection/db_connection/postgres/options.py
15 16 | |
fetchsize = 100000
class-attribute
instance-attribute
Fetch N rows from an opened cursor per one read round.
Tuning this option can influence performance of reading.
Warning
Default value is different from Spark.
Spark uses driver's own value, and it may be different in different drivers,
and even versions of the same driver. For example, Oracle has
default fetchsize=10, which is absolutely not usable.
Thus we've overridden default value with 100_000, which should increase reading performance.
Changed in 0.2.0
Set explicit default value to 100_000
lower_bound = Field(default=None, alias='lowerBound')
class-attribute
instance-attribute
See documentation for [partitioning_mode][] for more details
num_partitions = Field(default=1, alias='numPartitions')
class-attribute
instance-attribute
Number of jobs created by Spark to read the table content in parallel. See documentation for [partitioning_mode][] for more details
partition_column = Field(default=None, alias='partitionColumn')
class-attribute
instance-attribute
Column used to parallelize reading from a table.
Warning
It is highly recommended to use primary key, or column with an index to avoid performance issues.
Note
Column type depends on [partitioning_mode][].
partitioning_mode="range"requires column to be an integer, date or timestamp (can be NULL, but not recommended).partitioning_mode="hash"accepts any column type (NOT NULL).partitioning_mode="mod"requires column to be an integer (NOT NULL).
See documentation for [partitioning_mode][] for more details
partitioning_mode = JDBCPartitioningMode.RANGE
class-attribute
instance-attribute
Defines how Spark will parallelize reading from table.
Possible values:
-
range(default) Allocate each executor a range of values from column passed into partition_column.Spark generates for each executor an SQL query
Executor 1:
Executor 2:SELECT ... FROM table WHERE (partition_column >= lowerBound OR partition_column IS NULL) AND partition_column < (lowerBound + stride)...SELECT ... FROM table WHERE partition_column >= (lowerBound + stride) AND partition_column < (lowerBound + 2 * stride)Executor N:
WhereSELECT ... FROM table WHERE partition_column >= (lowerBound + (N-1) * stride) AND partition_column <= upperBoundstride=(upperBound - lowerBound) / numPartitions.Column type must be integer, date or timestamp.
Note
[lower_bound][], [upper_bound][] and [num_partitions][] are used just to calculate the partition stride, NOT for filtering the rows in table. So all rows in the table will be returned (unlike Incremental strategy).
Note
All queries are executed in parallel. To execute them sequentially, use Batch strategy.
-
hashAllocate each executor a set of values based on hash of the partition_column column.Spark generates for each executor an SQL query
Executor 1:
Executor 2:SELECT ... FROM table WHERE (some_hash(partition_column) mod num_partitions) = 0 -- lower_bound...SELECT ... FROM table WHERE (some_hash(partition_column) mod num_partitions) = 1 -- lower_bound + 1Executor N:
SELECT ... FROM table WHERE (some_hash(partition_column) mod num_partitions) = num_partitions-1 -- upper_boundNote
The hash function implementation depends on RDBMS. It can be
MD5or any other fast hash function, or expression based on this function call. Usually such functions accepts any column type as an input. -
modAllocate each executor a set of values based on modulus of the partition_column column.Spark generates for each executor an SQL query
Executor 1:
Executor 2:SELECT ... FROM table WHERE (partition_column mod num_partitions) = 0 -- lower_boundExecutor N:SELECT ... FROM table WHERE (partition_column mod num_partitions) = 1 -- lower_bound + 1SELECT ... FROM table WHERE (partition_column mod num_partitions) = num_partitions-1 -- upper_boundNote
Can be used only with columns of integer type.
Added in 0.5.0
Examples
Read data in 10 parallel jobs by range of values in id_column column:
ReadOptions(
partitioning_mode="range", # default mode, can be omitted
partitionColumn="id_column",
numPartitions=10,
# Options below can be discarded because they are
# calculated automatically as MIN and MAX values of `partitionColumn`
lowerBound=0,
upperBound=100_000,
)
some_column column:
ReadOptions(
partitioning_mode="hash",
partitionColumn="some_column",
numPartitions=10,
# lowerBound and upperBound are automatically set to `0` and `9`
)
id_column column:
ReadOptions(
partitioning_mode="mod",
partitionColumn="id_column",
numPartitions=10,
# lowerBound and upperBound are automatically set to `0` and `9`
)
query_timeout = Field(default=None, alias='queryTimeout')
class-attribute
instance-attribute
The number of seconds the driver will wait for a statement to execute. Zero means there is no limit.
This option depends on driver implementation, some drivers can check the timeout of each query instead of an entire JDBC batch.
session_init_statement = Field(default=None, alias='sessionInitStatement')
class-attribute
instance-attribute
After each database session is opened to the remote DB and before starting to read data, this option executes a custom SQL statement (or a PL/SQL block).
Use this to implement session initialization code.
Example:
sessionInitStatement = """
BEGIN
execute immediate
'alter session set "_serial_direct_read"=true';
END;
"""
upper_bound = Field(default=None, alias='upperBound')
class-attribute
instance-attribute
See documentation for [partitioning_mode][] for more details
parse(options)
classmethod
If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.
Otherwise, an exception will be raised
Source code in onetl/impl/generic_options.py
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | |