Skip to content

Проверка достижения лимита по опреденному пути

limits_stop_at(path, limits)

Check if some of limits stops at given path.

.. versionadded:: 0.8.0

Parameters

path : :obj:onetl.base.path_protocol.PathProtocol Path to check.

Iterable of :obj:onetl.base.base_file_limit.BaseFileLimit

Limits to test path against.

Returns

True if any of limit is reached while handling the path, False otherwise.

If no limits are passed, returns False.

Examples

from onetl.file.limit import MaxFilesCount, limits_stop_at from onetl.impl import LocalPath limits = [MaxFilesCount(2)] limits_stop_at(LocalPath("/path/to/file1.csv"), limits) False limits_stop_at(LocalPath("/path/to/file2.csv"), limits) False limits_stop_at(LocalPath("/path/to/file3.csv"), limits) True

Source code in onetl/file/limit/limits_stop_at.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def limits_stop_at(path: PathProtocol, limits: Iterable[BaseFileLimit]) -> bool:
    """
    Check if some of limits stops at given path.

    .. versionadded:: 0.8.0

    Parameters
    ----------
    path : :obj:`onetl.base.path_protocol.PathProtocol`
        Path to check.

    limits : Iterable of :obj:`onetl.base.base_file_limit.BaseFileLimit`
        Limits to test path against.

    Returns
    -------
    ``True`` if any of limit is reached while handling the path, ``False`` otherwise.

    If no limits are passed, returns ``False``.

    Examples
    --------

    >>> from onetl.file.limit import MaxFilesCount, limits_stop_at
    >>> from onetl.impl import LocalPath
    >>> limits = [MaxFilesCount(2)]
    >>> limits_stop_at(LocalPath("/path/to/file1.csv"), limits)
    False
    >>> limits_stop_at(LocalPath("/path/to/file2.csv"), limits)
    False
    >>> limits_stop_at(LocalPath("/path/to/file3.csv"), limits)
    True
    """
    reached = []
    for limit in limits:
        if limit.stops_at(path):
            reached.append(limit)

    if reached:
        log.debug("|FileLimit| Limits %r are reached", reached)
        return True

    return False