Чтение из Postgres с использованием Postgres.sql
Postgres.sql позволяет передавать пользовательский SQL-запрос, но не поддерживает инкрементальные стратегии.
Warning
Пожалуйста, учитывайте типы данных Postgres
Warning
Запрос выполняется в соединении с режимом чтение-запись, поэтому если вы вызываете функции/процедуры с DDL/DML-операторами внутри, они могут изменить данные в вашей базе данных.
Поддержка синтаксиса
Поддерживаются только запросы со следующим синтаксисом:
- ✅︎
SELECT ... FROM ... - ✅︎
WITH alias AS (...) SELECT ... - ❌
SET ...; SELECT ...;- несколько операторов не поддерживаются
Примеры
```python
from onetl.connection import Postgres
postgres = Postgres(...)
df = postgres.sql(
"""
SELECT
id,
key,
CAST(value AS text) value,
updated_at
FROM
some.mytable
WHERE
key = 'something'
""",
options=Postgres.SQLOptions(
partitionColumn="id",
numPartitions=10,
lowerBound=0,
upperBound=1000,
),
)
```
Рекомендации
Выбирайте только необходимые столбцы
Вместо использования SELECT * FROM ... предпочтительнее указывать точные имена столбцов SELECT col1, col2, ....
Это уменьшает объем данных, передаваемых из Postgres в Spark.
Обращайте внимание на значения в условии where
Вместо фильтрации данных на стороне Spark с помощью df.filter(df.column == 'value') передавайте соответствующее условие WHERE column = 'value'.
Это не только уменьшает объем данных, передаваемых из Postgres в Spark, но также может улучшить производительность запроса.
Особенно если для столбцов, используемых в условии where, есть индексы или секционирование.
Опции
onetl.connection.db_connection.postgres.options.PostgresSQLOptions
Bases: JDBCSQLOptions
Source code in onetl/connection/db_connection/postgres/options.py
23 24 | |
fetchsize = 100000
class-attribute
instance-attribute
Fetch N rows from an opened cursor per one read round.
Tuning this option can influence performance of reading.
.. warning::
Default value is different from Spark.
Spark uses driver's own value, and it may be different in different drivers,
and even versions of the same driver. For example, Oracle has
default ``fetchsize=10``, which is absolutely not usable.
Thus we've overridden default value with ``100_000``, which should increase reading performance.
.. versionchanged:: 0.2.0
Set explicit default value to 100_000
lower_bound = Field(default=None, alias='lowerBound')
class-attribute
instance-attribute
Defines the starting boundary for partitioning the query's data. Mandatory if :obj:~partition_column is set
num_partitions = Field(default=None, alias='numPartitions')
class-attribute
instance-attribute
Number of jobs created by Spark to read the table content in parallel.
partition_column = Field(default=None, alias='partitionColumn')
class-attribute
instance-attribute
Column used to partition data across multiple executors for parallel query processing.
.. warning:: It is highly recommended to use primary key, or column with an index to avoid performance issues.
.. dropdown:: Example of using partitionColumn="id" with partitioning_mode="range"
.. code-block:: sql
-- If partition_column is 'id', with numPartitions=4, lowerBound=1, and upperBound=100:
-- Executor 1 processes IDs from 1 to 25
SELECT ... FROM table WHERE id >= 1 AND id < 26
-- Executor 2 processes IDs from 26 to 50
SELECT ... FROM table WHERE id >= 26 AND id < 51
-- Executor 3 processes IDs from 51 to 75
SELECT ... FROM table WHERE id >= 51 AND id < 76
-- Executor 4 processes IDs from 76 to 100
SELECT ... FROM table WHERE id >= 76 AND id <= 100
-- General case for Executor N
SELECT ... FROM table
WHERE partition_column >= (lowerBound + (N-1) * stride)
AND partition_column <= upperBound
-- Where ``stride`` is calculated as ``(upperBound - lowerBound) / numPartitions``.
query_timeout = Field(default=None, alias='queryTimeout')
class-attribute
instance-attribute
The number of seconds the driver will wait for a statement to execute. Zero means there is no limit.
This option depends on driver implementation, some drivers can check the timeout of each query instead of an entire JDBC batch.
session_init_statement = Field(default=None, alias='sessionInitStatement')
class-attribute
instance-attribute
After each database session is opened to the remote DB and before starting to read data, this option executes a custom SQL statement (or a PL/SQL block).
Use this to implement session initialization code.
Example:
.. code:: python
sessionInitStatement = """
BEGIN
execute immediate
'alter session set "_serial_direct_read"=true';
END;
"""
upper_bound = Field(default=None, alias='upperBound')
class-attribute
instance-attribute
Sets the ending boundary for data partitioning. Mandatory if :obj:~partition_column is set
parse(options)
classmethod
If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.
Otherwise, an exception will be raised
Source code in onetl/impl/generic_options.py
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | |