Skip to content

Стратегия Snapshot Batch

Bases: BatchHWMStrategy

Snapshot batch strategy for :ref:db-reader.

.. note::

Cannot be used with :ref:`file-downloader`

Same as :obj:SnapshotStrategy <onetl.strategy.snapshot_strategy.SnapshotStrategy>, but reads data from the source in sequential batches (1..N) like:

.. code:: sql

1:  SELECT id, data
    FROM public.mydata
    WHERE id >= 1000 AND id <= 1100; -- from start to start+step (INCLUDING first row)

2:  WHERE id > 1100 AND id <= 1200; -- + step
3:  WHERE id > 1200 AND id <= 1200; -- + step
N:  WHERE id > 1300 AND id <= 1400; -- until stop

This allows to use less CPU and RAM on Spark cluster than reading all the data in parallel, but takes proportionally more time.

.. note::

This strategy uses HWM column value to filter data for each batch,
but does **NOT** save it into :ref:`HWM Store <hwm>`.
So every run starts from the beginning, not from the previous HWM value.

.. note::

If you only need to reduce number of rows read by Spark from opened cursor,
use :obj:`onetl.connection.db_connection.postgres.Postgres.ReadOptions.fetchsize` instead

.. warning::

Not every :ref:`DB connection <db-connections>`
supports batch strategy. For example, Kafka connection doesn't support it.
Make sure the connection you use is compatible with the SnapshotBatchStrategy.

.. versionadded:: 0.1.0

Parameters

step : Any

Step size used for generating batch SQL queries like:

.. code:: sql

    SELECT id, data
    FROM public.mydata
    WHERE id >= 1000 AND id <= 1100; -- 1000 is start value, step is 100

.. note::

    Step defines a range of values will be fetched by each batch. This is **not**
    a number of rows, it depends on a table content and value distribution across the rows.

.. note::

    ``step`` value will be added to the HWM, so it should have a proper type.

    For example, for ``TIMESTAMP`` column ``step`` type should be :obj:`datetime.timedelta`, not :obj:`int`

start : Any, default: None

If passed, the value will be used for generating WHERE clauses with ``hwm.expression`` filter,
as a start value for the first batch.

If not set, the value is determined by a separated query:

.. code:: sql

    SELECT MIN(id) as start
    FROM public.mydata
    WHERE id <= 1400; -- 1400 here is stop value (if set)

.. note::

    ``start`` should be the same type as ``hwm.expression`` value,
    e.g. :obj:`datetime.datetime` for ``TIMESTAMP`` column, :obj:`datetime.date` for ``DATE``, and so on

stop : Any, default: None

If passed, the value will be used for generating WHERE clauses with ``hwm.expression`` filter,
as a stop value for the last batch.

If not set, the value is determined by a separated query:

.. code:: sql

    SELECT MAX(id) as stop
    FROM public.mydata
    WHERE id >= 1000; -- 1000 here is start value (if set)

.. note::

    ``stop`` should be the same type as ``hwm.expression`` value,
    e.g. :obj:`datetime.datetime` for ``TIMESTAMP`` column, :obj:`datetime.date` for ``DATE``, and so on

Examples

.. tabs::

.. tab:: SnapshotBatch run

    .. code:: python

        from onetl.db import DBReader, DBWriter
        from onetl.strategy import SnapshotBatchStrategy

        reader = DBReader(
            connection=postgres,
            source="public.mydata",
            columns=["id", "data"],
            hwm=DBReader.AutoDetectHWM(name="some_hwm_name", expression="id"),
        )

        writer = DBWriter(connection=hive, target="db.newtable")

        with SnapshotBatchStrategy(step=100) as batches:
            for _ in batches:
                df = reader.run()
                writer.run(df)

    .. code:: sql

        -- get start and stop values

            SELECT MIN(id) as start, MAX(id) as stop
            FROM public.mydata;

        -- for example, start=1000 and stop=2345

        -- when each batch (1..N) will perform a query which return some part of input data

        1:  SELECT id, data
            FROM public.mydata
            WHERE id >= 1000 AND id <= 1100; -- from start to start+step (INCLUDING first row)

        2:  WHERE id > 1100 AND id <= 1200; -- + step
        3:  WHERE id > 1200 AND id <= 1300; -- + step
        N:  WHERE id > 2300 AND id <= 2345; -- until stop

.. tab:: SnapshotBatch run with ``stop`` value

    .. code:: python

        ...

        with SnapshotBatchStrategy(step=100, stop=1234) as batches:
            for _ in batches:
                df = reader.run()
                writer.run(df)

    .. code:: sql

        -- stop value is set, so there is no need to fetch it from DB
        -- get start value

            SELECT MIN(id) as start
            FROM public.mydata
            WHERE id <= 1234; -- until stop

        -- for example, start=1000.
        -- when each batch (1..N) will perform a query which return some part of input data

        1:  SELECT id, data
            FROM public.mydata
            WHERE id >= 1000 AND id <= 1100; -- from start to start+step (INCLUDING first row)

        2:  WHERE id >  1100 AND id <= 1200; -- + step
        3:  WHERE id >  1200 AND id <= 1300; -- + step
        N:  WHERE id >  1300 AND id <= 1234; -- until stop

.. tab:: SnapshotBatch run with ``start`` value

    .. code:: python

        ...

        with SnapshotBatchStrategy(step=100, start=500) as batches:
            for _ in batches:
                df = reader.run()
                writer.run(df)

    .. code:: sql

        -- start value is set, so there is no need to fetch it from DB
        -- get only stop value

            SELECT MAX(id) as stop
            FROM public.mydata
            WHERE id >= 500; -- from start

        -- for example, stop=2345.
        -- when each batch (1..N) will perform a query which return some part of input data

        1:  SELECT id, data
            FROM public.mydata
            WHERE id >= 500 AND id <=  600; -- from start to start+step (INCLUDING first row)

        2:  WHERE id >  600 AND id <=  700; -- + step
        3:  WHERE id >  700 AND id <=  800; -- + step
        ...
        N:  WHERE id > 2300 AND id <= 2345; -- until stop

.. tab:: SnapshotBatch run with all options

    .. code:: python

        ...

        with SnapshotBatchStrategy(
            start=1000,
            step=100,
            stop=2000,
        ) as batches:
            for _ in batches:
                df = reader.run()
                writer.run(df)

    .. code:: sql

        -- start and stop values are set, so no need to fetch boundaries from DB
        -- each batch (1..N) will perform a query which return some part of input data

        1:    SELECT id, data
            FROM public.mydata
            WHERE id >= 1000 AND id <= 1100; -- from start to start+step (INCLUDING first row)

        2:  WHERE id >  1100 AND id <= 1200; -- + step
        3:  WHERE id >  1200 AND id <= 1300; -- + step
        ...
        N:  WHERE id >  1900 AND id <= 2000; -- until stop

.. tab:: SnapshotBatch run over non-integer column

    ``hwm.expression``, ``start`` and ``stop`` can be a date or datetime, not only integer:

    .. code:: python

        from datetime import date, timedelta

        reader = DBReader(
            connection=postgres,
            source="public.mydata",
            columns=["business_dt", "data"],
            hwm=DBReader.AutoDetectHWM(name="some_hwm_name", expression="business_dt"),
        )

        with SnapshotBatchStrategy(
            start=date("2021-01-01"),
            step=timedelta(days=5),
            stop=date("2021-01-31"),
        ) as batches:
            for _ in batches:
                df = reader.run()
                writer.run(df)

    .. code:: sql

        -- start and stop values are set, so no need to fetch boundaries from DB
        -- each batch will perform a query which return some part of input data
        -- HWM value will casted to match column type


        1:  SELECT business_dt, data
            FROM public.mydata
            WHERE business_dt >= CAST('2020-01-01' AS DATE) -- from start to start+step (INCLUDING first row)
            AND   business_dt <= CAST('2021-01-05' AS DATE);

        2:  WHERE business_dt >  CAST('2021-01-05' AS DATE) -- + step
            AND   business_dt <= CAST('2021-01-10' AS DATE);

        3:  WHERE business_dt >  CAST('2021-01-10' AS DATE) -- + step
            AND   business_dt <= CAST('2021-01-15' AS DATE);

        ...

        N:  WHERE business_dt >  CAST('2021-01-30' AS DATE)
            AND   business_dt <= CAST('2021-01-31' AS DATE); -- until stop
Source code in onetl/strategy/snapshot_strategy.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
class SnapshotBatchStrategy(BatchHWMStrategy):
    """Snapshot batch strategy for :ref:`db-reader`.

    .. note::

        Cannot be used with :ref:`file-downloader`

    Same as :obj:`SnapshotStrategy <onetl.strategy.snapshot_strategy.SnapshotStrategy>`,
    but reads data from the source in sequential batches (1..N) like:

    .. code:: sql

        1:  SELECT id, data
            FROM public.mydata
            WHERE id >= 1000 AND id <= 1100; -- from start to start+step (INCLUDING first row)

        2:  WHERE id > 1100 AND id <= 1200; -- + step
        3:  WHERE id > 1200 AND id <= 1200; -- + step
        N:  WHERE id > 1300 AND id <= 1400; -- until stop

    This allows to use less CPU and RAM on Spark cluster than reading all the data in parallel,
    but takes proportionally more time.

    .. note::

        This strategy uses HWM column value to filter data for each batch,
        but does **NOT** save it into :ref:`HWM Store <hwm>`.
        So every run starts from the beginning, not from the previous HWM value.

    .. note::

        If you only need to reduce number of rows read by Spark from opened cursor,
        use :obj:`onetl.connection.db_connection.postgres.Postgres.ReadOptions.fetchsize` instead

    .. warning::

        Not every :ref:`DB connection <db-connections>`
        supports batch strategy. For example, Kafka connection doesn't support it.
        Make sure the connection you use is compatible with the SnapshotBatchStrategy.

    .. versionadded:: 0.1.0

    Parameters
    ----------
    step : Any

        Step size used for generating batch SQL queries like:

        .. code:: sql

            SELECT id, data
            FROM public.mydata
            WHERE id >= 1000 AND id <= 1100; -- 1000 is start value, step is 100

        .. note::

            Step defines a range of values will be fetched by each batch. This is **not**
            a number of rows, it depends on a table content and value distribution across the rows.

        .. note::

            ``step`` value will be added to the HWM, so it should have a proper type.

            For example, for ``TIMESTAMP`` column ``step`` type should be :obj:`datetime.timedelta`, not :obj:`int`

    start : Any, default: ``None``

        If passed, the value will be used for generating WHERE clauses with ``hwm.expression`` filter,
        as a start value for the first batch.

        If not set, the value is determined by a separated query:

        .. code:: sql

            SELECT MIN(id) as start
            FROM public.mydata
            WHERE id <= 1400; -- 1400 here is stop value (if set)

        .. note::

            ``start`` should be the same type as ``hwm.expression`` value,
            e.g. :obj:`datetime.datetime` for ``TIMESTAMP`` column, :obj:`datetime.date` for ``DATE``, and so on

    stop : Any, default: ``None``

        If passed, the value will be used for generating WHERE clauses with ``hwm.expression`` filter,
        as a stop value for the last batch.

        If not set, the value is determined by a separated query:

        .. code:: sql

            SELECT MAX(id) as stop
            FROM public.mydata
            WHERE id >= 1000; -- 1000 here is start value (if set)

        .. note::

            ``stop`` should be the same type as ``hwm.expression`` value,
            e.g. :obj:`datetime.datetime` for ``TIMESTAMP`` column, :obj:`datetime.date` for ``DATE``, and so on

    Examples
    --------

    .. tabs::

        .. tab:: SnapshotBatch run

            .. code:: python

                from onetl.db import DBReader, DBWriter
                from onetl.strategy import SnapshotBatchStrategy

                reader = DBReader(
                    connection=postgres,
                    source="public.mydata",
                    columns=["id", "data"],
                    hwm=DBReader.AutoDetectHWM(name="some_hwm_name", expression="id"),
                )

                writer = DBWriter(connection=hive, target="db.newtable")

                with SnapshotBatchStrategy(step=100) as batches:
                    for _ in batches:
                        df = reader.run()
                        writer.run(df)

            .. code:: sql

                -- get start and stop values

                    SELECT MIN(id) as start, MAX(id) as stop
                    FROM public.mydata;

                -- for example, start=1000 and stop=2345

                -- when each batch (1..N) will perform a query which return some part of input data

                1:  SELECT id, data
                    FROM public.mydata
                    WHERE id >= 1000 AND id <= 1100; -- from start to start+step (INCLUDING first row)

                2:  WHERE id > 1100 AND id <= 1200; -- + step
                3:  WHERE id > 1200 AND id <= 1300; -- + step
                N:  WHERE id > 2300 AND id <= 2345; -- until stop

        .. tab:: SnapshotBatch run with ``stop`` value

            .. code:: python

                ...

                with SnapshotBatchStrategy(step=100, stop=1234) as batches:
                    for _ in batches:
                        df = reader.run()
                        writer.run(df)

            .. code:: sql

                -- stop value is set, so there is no need to fetch it from DB
                -- get start value

                    SELECT MIN(id) as start
                    FROM public.mydata
                    WHERE id <= 1234; -- until stop

                -- for example, start=1000.
                -- when each batch (1..N) will perform a query which return some part of input data

                1:  SELECT id, data
                    FROM public.mydata
                    WHERE id >= 1000 AND id <= 1100; -- from start to start+step (INCLUDING first row)

                2:  WHERE id >  1100 AND id <= 1200; -- + step
                3:  WHERE id >  1200 AND id <= 1300; -- + step
                N:  WHERE id >  1300 AND id <= 1234; -- until stop

        .. tab:: SnapshotBatch run with ``start`` value

            .. code:: python

                ...

                with SnapshotBatchStrategy(step=100, start=500) as batches:
                    for _ in batches:
                        df = reader.run()
                        writer.run(df)

            .. code:: sql

                -- start value is set, so there is no need to fetch it from DB
                -- get only stop value

                    SELECT MAX(id) as stop
                    FROM public.mydata
                    WHERE id >= 500; -- from start

                -- for example, stop=2345.
                -- when each batch (1..N) will perform a query which return some part of input data

                1:  SELECT id, data
                    FROM public.mydata
                    WHERE id >= 500 AND id <=  600; -- from start to start+step (INCLUDING first row)

                2:  WHERE id >  600 AND id <=  700; -- + step
                3:  WHERE id >  700 AND id <=  800; -- + step
                ...
                N:  WHERE id > 2300 AND id <= 2345; -- until stop

        .. tab:: SnapshotBatch run with all options

            .. code:: python

                ...

                with SnapshotBatchStrategy(
                    start=1000,
                    step=100,
                    stop=2000,
                ) as batches:
                    for _ in batches:
                        df = reader.run()
                        writer.run(df)

            .. code:: sql

                -- start and stop values are set, so no need to fetch boundaries from DB
                -- each batch (1..N) will perform a query which return some part of input data

                1:    SELECT id, data
                    FROM public.mydata
                    WHERE id >= 1000 AND id <= 1100; -- from start to start+step (INCLUDING first row)

                2:  WHERE id >  1100 AND id <= 1200; -- + step
                3:  WHERE id >  1200 AND id <= 1300; -- + step
                ...
                N:  WHERE id >  1900 AND id <= 2000; -- until stop

        .. tab:: SnapshotBatch run over non-integer column

            ``hwm.expression``, ``start`` and ``stop`` can be a date or datetime, not only integer:

            .. code:: python

                from datetime import date, timedelta

                reader = DBReader(
                    connection=postgres,
                    source="public.mydata",
                    columns=["business_dt", "data"],
                    hwm=DBReader.AutoDetectHWM(name="some_hwm_name", expression="business_dt"),
                )

                with SnapshotBatchStrategy(
                    start=date("2021-01-01"),
                    step=timedelta(days=5),
                    stop=date("2021-01-31"),
                ) as batches:
                    for _ in batches:
                        df = reader.run()
                        writer.run(df)

            .. code:: sql

                -- start and stop values are set, so no need to fetch boundaries from DB
                -- each batch will perform a query which return some part of input data
                -- HWM value will casted to match column type


                1:  SELECT business_dt, data
                    FROM public.mydata
                    WHERE business_dt >= CAST('2020-01-01' AS DATE) -- from start to start+step (INCLUDING first row)
                    AND   business_dt <= CAST('2021-01-05' AS DATE);

                2:  WHERE business_dt >  CAST('2021-01-05' AS DATE) -- + step
                    AND   business_dt <= CAST('2021-01-10' AS DATE);

                3:  WHERE business_dt >  CAST('2021-01-10' AS DATE) -- + step
                    AND   business_dt <= CAST('2021-01-15' AS DATE);

                ...

                N:  WHERE business_dt >  CAST('2021-01-30' AS DATE)
                    AND   business_dt <= CAST('2021-01-31' AS DATE); -- until stop

    """

    def fetch_hwm(self) -> None:
        pass

    def save_hwm(self) -> None:
        pass