Концепции
Здесь вы найдете подробную информацию о каждом из концептов onETL и о том, как их использовать.
Подключение (Connection)
Основы Connection
onETL предназначена для извлечения и загрузки данных в хранилища (БД и файловые), поэтому в ней есть концепт первого класса Connection для хранения учетных данных, используемых для взаимодействия с внешними системами.
Connection - это, по сути, набор параметров, таких как имя пользователя, пароль, имя хоста.
Чтобы создать подключение к определенному типу хранилища, необходимо использовать класс, соответствующий типу хранилища. Имя класса совпадает с именем типа хранилища (Oracle, MSSQL, SFTP и т. д.):
from onetl.connection import SFTP
sftp = SFTP(
host="sftp.test.com",
user="onetl",
password="onetl",
)
Все типы подключений наследуются от родительского класса BaseConnection.
Диаграмма классов
classDiagram
BaseConnection <|-- DBConnection
DBConnection <|-- Hive
DBConnection <|-- Greenplum
DBConnection <|-- MongoDB
DBConnection <|-- Kafka
DBConnection <|-- JDBCConnection
JDBCConnection <|-- Clickhouse
JDBCConnection <|-- MSSQL
JDBCConnection <|-- MySQL
JDBCConnection <|-- Postgres
JDBCConnection <|-- Oracle
JDBCConnection <|-- Teradata
BaseConnection <|-- FileConnection
FileConnection <|-- FTP
FileConnection <|-- FTPS
FileConnection <|-- HDFS
FileConnection <|-- WebDAV
FileConnection <|-- Samba
FileConnection <|-- SFTP
FileConnection <|-- S3
BaseConnection <|-- FileDFConnection
FileDFConnection <|-- SparkHDFS
FileDFConnection <|-- SparkLocalFS
FileDFConnection <|-- SparkS3
Подключения к БД (DBConnection)
Классы, унаследованные от DBConnection, можно использовать для доступа к базам данных.
DBConnection можно создать следующим образом:
from onetl.connection import MSSQL
mssql = MSSQL(
host="mssqldb.demo.com",
user="onetl",
password="onetl",
database="Telecom",
spark=spark,
)
где spark - это текущая сессия Apache Spark (SparkSession). onETL "под капотом" использует Spark и специальные Java-коннекторы для работы с базами данных.
Описание других параметров см. в документации для доступных DBConnections.
Подключения к файловым хранилищам (FileConnection)
Классы, унаследованные от FileConnection, можно использовать для доступа к файлам, хранящимся в различных файловых системах/файловых серверах.
FileConnection можно создать следующим образом:
from onetl.connection import SFTP
sftp = SFTP(
host="sftp.test.com",
user="onetl",
password="onetl",
)
Описание других параметров см. в документации для доступных FileConnections.
FileDFConnection
Классы, унаследованные от FileDFConnection, можно использовать для доступа к файлам в виде Spark DataFrames.
FileDFConnection можно создать следующим образом:
from onetl.connection import SparkHDFS
spark_hdfs = SparkHDFS(
host="namenode1.domain.com",
cluster="mycluster",
spark=spark,
)
где spark - это текущая SparkSession.
onETL использует Spark и специальные Java-коннекторы под капотом для работы с DataFrames.
Описание других параметров см. в документации для доступных FileDFConnections.
Проверка доступности соединения
После создания соединения вы можете проверить доступность базы данных/файловой системы с помощью метода check():
mssql.check()
sftp.check()
spark_hdfs.check()
Он вызовет исключение, если база данных/файловая система недоступна.
Этот метод возвращает само соединение, поэтому вы можете создать соединение и сразу же проверить его доступность:
mssql = MSSQL(
host="mssqldb.demo.com",
user="onetl",
password="onetl",
database="Telecom",
spark=spark,
).check() # <--
Извлечение/Загрузка данных
Основы
Как мы говорили выше, onETL используется для извлечения данных из удаленных систем и загрузки данных в них.
onETL предоставляет несколько классов для этого:
Все эти классы имеют метод run(), который запускает извлечение/загрузку данных:
from onetl.db import DBReader, DBWriter
reader = DBReader(
connection=mssql,
source="dbo.demo_table",
columns=["column_1", "column_2"],
)
# Read data as Spark DataFrame
df = reader.run()
db_writer = DBWriter(
connection=hive,
target="dl_sb.demo_table",
)
# Save Spark DataFrame to Hive table
writer.run(df)
Извлечение данных
Для извлечения данных используйте классы:
| Вариант использования | Connection | run() получает |
run() возвращает |
|
|---|---|---|---|---|
DBReader |
Чтение данных из базы данных | Любое DBConnection |
- | Spark DataFrame |
FileDFReader |
Чтение данных из файла или набора файлов | Любое FileDFConnection |
Нет входных данных или List[File path on FileSystem] | Spark DataFrame |
FileDownloader |
Загрузка файлов из удаленной ФС в локальную ФС | Любое FileConnection |
Нет входных данных или List[File path on remote FileSystem] | DownloadResult |
Загрузка данных
Для загрузки данных используйте классы:
| Вариант использования | Connection | run() получает |
run() возвращает |
|
|---|---|---|---|---|
DBWriter |
Запись данных из DataFrame в базу данных | Любое DBConnection |
Spark DataFrame | None |
FileDFWriter |
Запись данных из DataFrame в папку | Любое [FileDFConnection]DB-ONETL-doc-file-dataframe-connections |
Spark DataFrame | None |
FileUploader |
Загрузка файлов из локальной ФС в удаленную ФС | Любое FileConnection |
List[File path on local FileSystem] | UploadResult |
Манипулирование данными
Для манипулирования данными используйте классы:
| Вариант использования | Connection | run() получает |
run() возвращает |
|
|---|---|---|---|---|
FileMover |
Перемещение файлов между каталогами в удаленной ФС | Любое FileConnection |
List[File path on remote FileSystem] | MoveResult |
Опции
Классы извлечения и загрузки имеют параметр options, который имеет особое значение:
- все остальные параметры - ЧТО мы извлекаем / КУДА мы загружаем
- параметр
options- КАК мы извлекаем/загружаем данные
db_reader = DBReader(
# ЧТО мы читаем:
connection=mssql,
source="dbo.demo_table", # некоторая таблица из MSSQL
columns=["column_1", "column_2"], # но только определенный набор столбцов
where="column_2 > 1000", # только строки, соответствующие условию
# КАК мы читаем:
options=MSSQL.ReadOptions(
numPartitions=10, # чтение в 10 параллельных задачах
partitionColumn="id", # балансировка чтения данных путем назначения каждой задаче части данных с использованием выражения `hash(id) mod N`
partitioningMode="hash",
fetchsize=1000, # каждая задача будет получать блок из 1000 строк при каждой попытке чтения
),
)
db_writer = DBWriter(
# КУДА мы пишем - в некоторую таблицу в Hive
connection=hive,
target="dl_sb.demo_table",
# КАК мы пишем - перезаписываем все данные в существующей таблице
options=Hive.WriteOptions(if_exists="replace_entire_table"),
)
file_downloader = FileDownloader(
# ЧТО мы загружаем - файлы из некоторого каталога в SFTP
connection=sftp,
source_path="/source",
filters=[Glob("*.csv")], # только CSV файлы
limits=[MaxFilesCount(1000)], # максимум 1000 файлов
# КУДА мы загружаем - в определенный каталог в локальной ФС
local_path="/some",
# КАК мы загружаем:
options=FileDownloader.Options(
delete_source=True, # после загрузки каждого файла удалите его из source_path
if_exists="replace_file", # заменить существующие файлы в local_path
),
)
file_uploader = FileUploader(
# ЧТО мы загружаем - файлы из некоторого локального каталога
local_path="/source",
# КУДА мы загружаем - определенный удаленный каталог в HDFS
connection=hdfs,
target_path="/some",
# КАК мы загружаем:
options=FileUploader.Options(
delete_local=True, # после загрузки каждого файла удалите его из local_path
if_exists="replace_file", # заменить существующие файлы в target_path
),
)
file_mover = FileMover(
# ЧТО мы перемещаем - файлы в некотором удаленном каталоге в HDFS
source_path="/source",
connection=hdfs,
# КУДА мы перемещаем файлы
target_path="/some", # определенный удаленный каталог в том же соединении HDFS
# КАК мы загружаем - заменить существующие файлы в target_path
options=FileMover.Options(if_exists="replace_file"),
)
file_df_reader = FileDFReader(
# ЧТО мы читаем - *.csv файлы из некоторого каталога в S3
connection=s3,
source_path="/source",
file_format=CSV(),
# КАК мы читаем - загружать файлы из /source/*.csv, а не из /source/nested/*.csv
options=FileDFReader.Options(recursive=False),
)
file_df_writer = FileDFWriter(
# КУДА мы пишем - в виде .csv файлов в некотором каталоге в S3
connection=s3,
target_path="/target",
file_format=CSV(),
# КАК мы пишем - заменить все существующие файлы в /target, если они есть
options=FileDFWriter.Options(if_exists="replace_entire_directory"),
)
Более подробную информацию об options можно найти в документации к основным классам: DBConnection и FileDownloader / FileUploader / FileMover / FileDFReader / FileDFWriter
Стратегии чтения
onETL имеет несколько встроенных стратегий для чтения данных:
- Стратегия моментального снимка (стратегия по умолчанию)
- Инкрементная стратегия
- Пакетная стратегия моментального снимка
- Инкрементная пакетная стратегия
Например, инкрементная стратегия позволяет получать только новые данные из таблицы:
from onetl.strategy import IncrementalStrategy
reader = DBReader(
connection=mssql,
source="dbo.demo_table",
hwm_column="id", # обнаруживать новые данные на основе значения столбца "id"
)
# первый запуск
with IncrementalStrategy():
df = reader.run()
sleep(3600)
# второй запуск
with IncrementalStrategy():
# только строки, которые появились в источнике с момента предыдущего запуска
df = reader.run()
или получать только файлы, которые не были загружены ранее:
from onetl.strategy import IncrementalStrategy
file_downloader = FileDownloader(
connection=sftp,
source_path="/remote",
local_path="/local",
hwm_type="file_list", # сохранить все загруженные файлы в список и исключить файлы, уже присутствующие в этом списке
)
# первый запуск
with IncrementalStrategy():
files = file_downloader.run()
sleep(3600)
# второй запуск
with IncrementalStrategy():
# только файлы, которые появились в источнике с момента предыдущего запуска
files = file_downloader.run()
Большинство стратегий основаны на HWM, пожалуйста, ознакомьтесь с документацией по каждой стратегии для получения более подробной информации.
Почему просто не использовать класс Connection для извлечения/загрузки?
Соединения очень просты, у них есть только набор некоторых основных операций, например, mkdir, remove_file, get_table_schema и так далее.
Высокоуровневые операции, такие как
- Поддержка
strategy - Обработка отправки/получения метаданных
- Обработка различных опций, таких как
if_exists="replace_file"в случае загрузки/выгрузки файлов
перенесена в отдельный класс, который вызывает методы объекта соединения для выполнения некоторой сложной логики.