Skip to content

Слоты Spark HDFS

Spark HDFS slots that could be implemented by third-party plugins.

.. versionadded:: 0.9.0

Source code in onetl/connection/file_df_connection/spark_hdfs/slots.py
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
@support_hooks
class SparkHDFSSlots:
    """Spark HDFS slots that could be implemented by third-party plugins.

    .. versionadded:: 0.9.0
    """

    @slot
    @staticmethod
    def normalize_cluster_name(cluster: str) -> str | None:
        """
        Normalize cluster name passed into SparkHDFS constructor.

        If hooks didn't return anything, cluster name is left intact.

        .. versionadded:: 0.9.0

        Parameters
        ----------
        cluster : :obj:`str`
            Cluster name

        Returns
        -------
        str | None
            Normalized cluster name.

            If hook cannot be applied to a specific cluster, it should return ``None``.

        Examples
        --------

        .. code:: python

            from onetl.connection import SparkHDFS
            from onetl.hooks import hook


            @SparkHDFS.Slots.normalize_cluster_name.bind
            @hook
            def normalize_cluster_name(cluster: str) -> str:
                return cluster.lower()
        """

    @slot
    @staticmethod
    def normalize_namenode_host(host: str, cluster: str) -> str | None:
        """
        Normalize namenode host passed into SparkHDFS constructor.

        If hooks didn't return anything, host is left intact.

        .. versionadded:: 0.9.0

        Parameters
        ----------
        host : :obj:`str`
            Namenode host (raw)

        cluster : :obj:`str`
            Cluster name (normalized)

        Returns
        -------
        str | None
            Normalized namenode host name.

            If hook cannot be applied to a specific host name, it should return ``None``.

        Examples
        --------

        .. code:: python

            from onetl.connection import SparkHDFS
            from onetl.hooks import hook


            @SparkHDFS.Slots.normalize_namenode_host.bind
            @hook
            def normalize_namenode_host(host: str, cluster: str) -> str | None:
                if cluster == "rnd-dwh":
                    if not host.endswith(".domain.com"):
                        # fix missing domain name
                        host += ".domain.com"
                    return host

                return None
        """

    @slot
    @staticmethod
    def get_known_clusters() -> set[str] | None:
        """
        Return collection of known clusters.

        Cluster passed into SparkHDFS constructor should be present in this list.
        If hooks didn't return anything, no validation will be performed.

        .. versionadded:: 0.9.0

        Returns
        -------
        set[str] | None
            Collection of cluster names (in normalized form).

            If hook cannot be applied, it should return ``None``.

        Examples
        --------

        .. code:: python

            from onetl.connection import SparkHDFS
            from onetl.hooks import hook


            @SparkHDFS.Slots.get_known_clusters.bind
            @hook
            def get_known_clusters() -> str[str]:
                return {"rnd-dwh", "rnd-prod"}
        """

    @slot
    @staticmethod
    def get_cluster_namenodes(cluster: str) -> set[str] | None:
        """
        Return collection of known namenodes for the cluster.

        Namenode host passed into SparkHDFS constructor should be present in this list.
        If hooks didn't return anything, no validation will be performed.

        .. versionadded:: 0.9.0

        Parameters
        ----------
        cluster : :obj:`str`
            Cluster name (normalized)

        Returns
        -------
        set[str] | None
            Collection of host names (in normalized form).

            If hook cannot be applied, it should return ``None``.

        Examples
        --------

        .. code:: python

            from onetl.connection import SparkHDFS
            from onetl.hooks import hook


            @SparkHDFS.Slots.get_cluster_namenodes.bind
            @hook
            def get_cluster_namenodes(cluster: str) -> str[str] | None:
                if cluster == "rnd-dwh":
                    return {"namenode1.domain.com", "namenode2.domain.com"}
                return None
        """

    @slot
    @staticmethod
    def get_current_cluster() -> str | None:
        """
        Get current cluster name.

        Used in :obj:`~get_current_cluster` to  automatically fill up ``cluster`` attribute of a connection.
        If hooks didn't return anything, calling the method above will raise an exception.

        .. versionadded:: 0.9.0

        Returns
        -------
        str | None
            Current cluster name (in normalized form).

            If hook cannot be applied, it should return ``None``.

        Examples
        --------

        .. code:: python

            from onetl.connection import SparkHDFS
            from onetl.hooks import hook


            @SparkHDFS.Slots.get_current_cluster.bind
            @hook
            def get_current_cluster() -> str:
                # some magic here
                return "rnd-dwh"
        """

    @slot
    @staticmethod
    def get_ipc_port(cluster: str) -> int | None:
        """
        Get IPC port number for a specific cluster.

        Used by constructor to automatically set port number if omitted.

        .. versionadded:: 0.9.0

        Parameters
        ----------
        cluster : :obj:`str`
            Cluster name (normalized)

        Returns
        -------
        int | None
            IPC port number.

            If hook cannot be applied, it should return ``None``.

        Examples
        --------

        .. code:: python

            from onetl.connection import SparkHDFS
            from onetl.hooks import hook


            @SparkHDFS.Slots.get_ipc_port.bind
            @hook
            def get_ipc_port(cluster: str) -> int | None:
                if cluster == "rnd-dwh":
                    return 8020  # Cloudera
                return None
        """

    @slot
    @staticmethod
    def is_namenode_active(host: str, cluster: str) -> bool | None:
        """
        Check whether a namenode of a specified cluster is active (=not standby) or not.

        Used for:
            * If SparkHDFS connection is created without ``host``

                Connector will iterate over :obj:`~get_cluster_namenodes` of a cluster to get active namenode,
                and then use it instead of ``host`` attribute.

            * If SparkHDFS connection is created with ``host``

                :obj:`~check` will determine whether this host is active.

        .. versionadded:: 0.9.0

        Parameters
        ----------
        host : :obj:`str`
            Namenode host (normalized)

        cluster : :obj:`str`
            Cluster name (normalized)

        Returns
        -------
        bool | None
            ``True`` if namenode is active, ``False`` if not.

            If hook cannot be applied, it should return ``None``.

        Examples
        --------

        .. code:: python

            from onetl.connection import SparkHDFS
            from onetl.hooks import hook


            @SparkHDFS.Slots.is_namenode_active.bind
            @hook
            def is_namenode_active(host: str, cluster: str) -> bool:
                # some magic here
                return True
        """

normalize_cluster_name(cluster) staticmethod

Normalize cluster name passed into SparkHDFS constructor.

If hooks didn't return anything, cluster name is left intact.

.. versionadded:: 0.9.0

Parameters

cluster : :obj:str Cluster name

Returns

str | None Normalized cluster name.

If hook cannot be applied to a specific cluster, it should return ``None``.

Examples

.. code:: python

from onetl.connection import SparkHDFS
from onetl.hooks import hook


@SparkHDFS.Slots.normalize_cluster_name.bind
@hook
def normalize_cluster_name(cluster: str) -> str:
    return cluster.lower()
Source code in onetl/connection/file_df_connection/spark_hdfs/slots.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@slot
@staticmethod
def normalize_cluster_name(cluster: str) -> str | None:
    """
    Normalize cluster name passed into SparkHDFS constructor.

    If hooks didn't return anything, cluster name is left intact.

    .. versionadded:: 0.9.0

    Parameters
    ----------
    cluster : :obj:`str`
        Cluster name

    Returns
    -------
    str | None
        Normalized cluster name.

        If hook cannot be applied to a specific cluster, it should return ``None``.

    Examples
    --------

    .. code:: python

        from onetl.connection import SparkHDFS
        from onetl.hooks import hook


        @SparkHDFS.Slots.normalize_cluster_name.bind
        @hook
        def normalize_cluster_name(cluster: str) -> str:
            return cluster.lower()
    """

normalize_namenode_host(host, cluster) staticmethod

Normalize namenode host passed into SparkHDFS constructor.

If hooks didn't return anything, host is left intact.

.. versionadded:: 0.9.0

Parameters

host : :obj:str Namenode host (raw)

:obj:str

Cluster name (normalized)

Returns

str | None Normalized namenode host name.

If hook cannot be applied to a specific host name, it should return ``None``.

Examples

.. code:: python

from onetl.connection import SparkHDFS
from onetl.hooks import hook


@SparkHDFS.Slots.normalize_namenode_host.bind
@hook
def normalize_namenode_host(host: str, cluster: str) -> str | None:
    if cluster == "rnd-dwh":
        if not host.endswith(".domain.com"):
            # fix missing domain name
            host += ".domain.com"
        return host

    return None
Source code in onetl/connection/file_df_connection/spark_hdfs/slots.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@slot
@staticmethod
def normalize_namenode_host(host: str, cluster: str) -> str | None:
    """
    Normalize namenode host passed into SparkHDFS constructor.

    If hooks didn't return anything, host is left intact.

    .. versionadded:: 0.9.0

    Parameters
    ----------
    host : :obj:`str`
        Namenode host (raw)

    cluster : :obj:`str`
        Cluster name (normalized)

    Returns
    -------
    str | None
        Normalized namenode host name.

        If hook cannot be applied to a specific host name, it should return ``None``.

    Examples
    --------

    .. code:: python

        from onetl.connection import SparkHDFS
        from onetl.hooks import hook


        @SparkHDFS.Slots.normalize_namenode_host.bind
        @hook
        def normalize_namenode_host(host: str, cluster: str) -> str | None:
            if cluster == "rnd-dwh":
                if not host.endswith(".domain.com"):
                    # fix missing domain name
                    host += ".domain.com"
                return host

            return None
    """

get_known_clusters() staticmethod

Return collection of known clusters.

Cluster passed into SparkHDFS constructor should be present in this list. If hooks didn't return anything, no validation will be performed.

.. versionadded:: 0.9.0

Returns

set[str] | None Collection of cluster names (in normalized form).

If hook cannot be applied, it should return ``None``.

Examples

.. code:: python

from onetl.connection import SparkHDFS
from onetl.hooks import hook


@SparkHDFS.Slots.get_known_clusters.bind
@hook
def get_known_clusters() -> str[str]:
    return {"rnd-dwh", "rnd-prod"}
Source code in onetl/connection/file_df_connection/spark_hdfs/slots.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
@slot
@staticmethod
def get_known_clusters() -> set[str] | None:
    """
    Return collection of known clusters.

    Cluster passed into SparkHDFS constructor should be present in this list.
    If hooks didn't return anything, no validation will be performed.

    .. versionadded:: 0.9.0

    Returns
    -------
    set[str] | None
        Collection of cluster names (in normalized form).

        If hook cannot be applied, it should return ``None``.

    Examples
    --------

    .. code:: python

        from onetl.connection import SparkHDFS
        from onetl.hooks import hook


        @SparkHDFS.Slots.get_known_clusters.bind
        @hook
        def get_known_clusters() -> str[str]:
            return {"rnd-dwh", "rnd-prod"}
    """

get_cluster_namenodes(cluster) staticmethod

Return collection of known namenodes for the cluster.

Namenode host passed into SparkHDFS constructor should be present in this list. If hooks didn't return anything, no validation will be performed.

.. versionadded:: 0.9.0

Parameters

cluster : :obj:str Cluster name (normalized)

Returns

set[str] | None Collection of host names (in normalized form).

If hook cannot be applied, it should return ``None``.

Examples

.. code:: python

from onetl.connection import SparkHDFS
from onetl.hooks import hook


@SparkHDFS.Slots.get_cluster_namenodes.bind
@hook
def get_cluster_namenodes(cluster: str) -> str[str] | None:
    if cluster == "rnd-dwh":
        return {"namenode1.domain.com", "namenode2.domain.com"}
    return None
Source code in onetl/connection/file_df_connection/spark_hdfs/slots.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
@slot
@staticmethod
def get_cluster_namenodes(cluster: str) -> set[str] | None:
    """
    Return collection of known namenodes for the cluster.

    Namenode host passed into SparkHDFS constructor should be present in this list.
    If hooks didn't return anything, no validation will be performed.

    .. versionadded:: 0.9.0

    Parameters
    ----------
    cluster : :obj:`str`
        Cluster name (normalized)

    Returns
    -------
    set[str] | None
        Collection of host names (in normalized form).

        If hook cannot be applied, it should return ``None``.

    Examples
    --------

    .. code:: python

        from onetl.connection import SparkHDFS
        from onetl.hooks import hook


        @SparkHDFS.Slots.get_cluster_namenodes.bind
        @hook
        def get_cluster_namenodes(cluster: str) -> str[str] | None:
            if cluster == "rnd-dwh":
                return {"namenode1.domain.com", "namenode2.domain.com"}
            return None
    """

get_current_cluster() staticmethod

Get current cluster name.

Used in :obj:~get_current_cluster to automatically fill up cluster attribute of a connection. If hooks didn't return anything, calling the method above will raise an exception.

.. versionadded:: 0.9.0

Returns

str | None Current cluster name (in normalized form).

If hook cannot be applied, it should return ``None``.

Examples

.. code:: python

from onetl.connection import SparkHDFS
from onetl.hooks import hook


@SparkHDFS.Slots.get_current_cluster.bind
@hook
def get_current_cluster() -> str:
    # some magic here
    return "rnd-dwh"
Source code in onetl/connection/file_df_connection/spark_hdfs/slots.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
@slot
@staticmethod
def get_current_cluster() -> str | None:
    """
    Get current cluster name.

    Used in :obj:`~get_current_cluster` to  automatically fill up ``cluster`` attribute of a connection.
    If hooks didn't return anything, calling the method above will raise an exception.

    .. versionadded:: 0.9.0

    Returns
    -------
    str | None
        Current cluster name (in normalized form).

        If hook cannot be applied, it should return ``None``.

    Examples
    --------

    .. code:: python

        from onetl.connection import SparkHDFS
        from onetl.hooks import hook


        @SparkHDFS.Slots.get_current_cluster.bind
        @hook
        def get_current_cluster() -> str:
            # some magic here
            return "rnd-dwh"
    """

get_ipc_port(cluster) staticmethod

Get IPC port number for a specific cluster.

Used by constructor to automatically set port number if omitted.

.. versionadded:: 0.9.0

Parameters

cluster : :obj:str Cluster name (normalized)

Returns

int | None IPC port number.

If hook cannot be applied, it should return ``None``.

Examples

.. code:: python

from onetl.connection import SparkHDFS
from onetl.hooks import hook


@SparkHDFS.Slots.get_ipc_port.bind
@hook
def get_ipc_port(cluster: str) -> int | None:
    if cluster == "rnd-dwh":
        return 8020  # Cloudera
    return None
Source code in onetl/connection/file_df_connection/spark_hdfs/slots.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
@slot
@staticmethod
def get_ipc_port(cluster: str) -> int | None:
    """
    Get IPC port number for a specific cluster.

    Used by constructor to automatically set port number if omitted.

    .. versionadded:: 0.9.0

    Parameters
    ----------
    cluster : :obj:`str`
        Cluster name (normalized)

    Returns
    -------
    int | None
        IPC port number.

        If hook cannot be applied, it should return ``None``.

    Examples
    --------

    .. code:: python

        from onetl.connection import SparkHDFS
        from onetl.hooks import hook


        @SparkHDFS.Slots.get_ipc_port.bind
        @hook
        def get_ipc_port(cluster: str) -> int | None:
            if cluster == "rnd-dwh":
                return 8020  # Cloudera
            return None
    """

is_namenode_active(host, cluster) staticmethod

Check whether a namenode of a specified cluster is active (=not standby) or not.

Used for
  • If SparkHDFS connection is created without host

    Connector will iterate over :obj:~get_cluster_namenodes of a cluster to get active namenode, and then use it instead of host attribute.

  • If SparkHDFS connection is created with host

    :obj:~check will determine whether this host is active.

.. versionadded:: 0.9.0

Parameters

host : :obj:str Namenode host (normalized)

:obj:str

Cluster name (normalized)

Returns

bool | None True if namenode is active, False if not.

If hook cannot be applied, it should return ``None``.

Examples

.. code:: python

from onetl.connection import SparkHDFS
from onetl.hooks import hook


@SparkHDFS.Slots.is_namenode_active.bind
@hook
def is_namenode_active(host: str, cluster: str) -> bool:
    # some magic here
    return True
Source code in onetl/connection/file_df_connection/spark_hdfs/slots.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
@slot
@staticmethod
def is_namenode_active(host: str, cluster: str) -> bool | None:
    """
    Check whether a namenode of a specified cluster is active (=not standby) or not.

    Used for:
        * If SparkHDFS connection is created without ``host``

            Connector will iterate over :obj:`~get_cluster_namenodes` of a cluster to get active namenode,
            and then use it instead of ``host`` attribute.

        * If SparkHDFS connection is created with ``host``

            :obj:`~check` will determine whether this host is active.

    .. versionadded:: 0.9.0

    Parameters
    ----------
    host : :obj:`str`
        Namenode host (normalized)

    cluster : :obj:`str`
        Cluster name (normalized)

    Returns
    -------
    bool | None
        ``True`` if namenode is active, ``False`` if not.

        If hook cannot be applied, it should return ``None``.

    Examples
    --------

    .. code:: python

        from onetl.connection import SparkHDFS
        from onetl.hooks import hook


        @SparkHDFS.Slots.is_namenode_active.bind
        @hook
        def is_namenode_active(host: str, cluster: str) -> bool:
            # some magic here
            return True
    """