Skip to content

Сброс лимитов

reset_limits(limits)

Reset limits state.

Parameters

limits : Iterable of onetl.base.base_file_limit.BaseFileLimit Limits to reset.

Returns

list of BaseFileLimit List with limits, but with reset state.

List may contain original filters with reset state, or new copies.
This is an implementation detail of [reset][onetl.base.base_file_limit.BaseFileLimit.reset] method.

Examples

>>> from onetl.file.limit import MaxFilesCount, limits_reached, limits_stop_at, reset_limits
>>> from onetl.impl import LocalPath
>>> limits = [MaxFilesCount(1)]
>>> limits_reached(limits)
False
>>> # do something
>>> limits_stop_at(LocalPath("/path/to/file1.csv"), limits)
False
>>> limits_stop_at(LocalPath("/path/to/file2.csv"), limits)
True
>>> limits_reached(limits)
True
>>> new_limits = reset_limits(limits)
>>> limits_reached(new_limits)
False
Source code in onetl/file/limit/reset_limits.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def reset_limits(limits: Iterable[BaseFileLimit]) -> list[BaseFileLimit]:
    """
    Reset limits state.

    Parameters
    ----------
    limits : Iterable of [onetl.base.base_file_limit.BaseFileLimit][]
        Limits to reset.

    Returns
    -------
    list of BaseFileLimit
        List with limits, but with reset state.

        List may contain original filters with reset state, or new copies.
        This is an implementation detail of [reset][onetl.base.base_file_limit.BaseFileLimit.reset] method.

    Examples
    --------

    ```python
    >>> from onetl.file.limit import MaxFilesCount, limits_reached, limits_stop_at, reset_limits
    >>> from onetl.impl import LocalPath
    >>> limits = [MaxFilesCount(1)]
    >>> limits_reached(limits)
    False
    >>> # do something
    >>> limits_stop_at(LocalPath("/path/to/file1.csv"), limits)
    False
    >>> limits_stop_at(LocalPath("/path/to/file2.csv"), limits)
    True
    >>> limits_reached(limits)
    True
    >>> new_limits = reset_limits(limits)
    >>> limits_reached(new_limits)
    False

    ```
    """
    return [limit.reset() for limit in limits]