Skip to content

Подключение к MySQL

Bases: JDBCConnection

MySQL JDBC connection. |support_hooks|

Based on Maven package com.mysql:mysql-connector-j:9.5.0 <https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/9.5.0> (official MySQL JDBC driver <https://dev.mysql.com/doc/connector-j/en/>).

.. seealso::

Before using this connector please take into account :ref:`mysql-prerequisites`

.. versionadded:: 0.1.0

Parameters

host : str Host of MySQL database. For example: mysql0012.domain.com or 192.168.1.11

int, default: 3306

Port of MySQL database

str

User, which have proper access to the database. For example: some_user

str

Password for database connection

str

Database in RDBMS, NOT schema.

See this page <https://www.educba.com/postgresql-database-vs-schema/>_ for more details

:obj:pyspark.sql.SparkSession

Spark session.

dict, default: None

Specifies one or more extra parameters by which clients can connect to the instance.

For example: {"useSSL": "false", "allowPublicKeyRetrieval": "true"}

See MySQL JDBC driver properties documentation <https://dev.mysql.com/doc/connector-j/en/connector-j-reference-configuration-properties.html>_ for more details

Examples

Create and check MySQL connection:

.. code:: python

from onetl.connection import MySQL
from pyspark.sql import SparkSession

# Create Spark session with MySQL driver loaded
maven_packages = MySQL.get_packages()
spark = (
    SparkSession.builder.appName("spark-app-name")
    .config("spark.jars.packages", ",".join(maven_packages))
    .getOrCreate()
)

# Create connection
mysql = MySQL(
    host="database.host.or.ip",
    user="user",
    password="*****",
    extra={"useSSL": "false", "allowPublicKeyRetrieval": "true"},
    spark=spark,
).check()
Source code in onetl/connection/db_connection/mysql/connection.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
@support_hooks
class MySQL(JDBCConnection):
    """MySQL JDBC connection. |support_hooks|

    Based on Maven package `com.mysql:mysql-connector-j:9.5.0 <https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/9.5.0>`_
    (`official MySQL JDBC driver <https://dev.mysql.com/doc/connector-j/en/>`_).

    .. seealso::

        Before using this connector please take into account :ref:`mysql-prerequisites`

    .. versionadded:: 0.1.0

    Parameters
    ----------
    host : str
        Host of MySQL database. For example: ``mysql0012.domain.com`` or ``192.168.1.11``

    port : int, default: ``3306``
        Port of MySQL database

    user : str
        User, which have proper access to the database. For example: ``some_user``

    password : str
        Password for database connection

    database : str
        Database in RDBMS, NOT schema.

        See `this page <https://www.educba.com/postgresql-database-vs-schema/>`_ for more details

    spark : :obj:`pyspark.sql.SparkSession`
        Spark session.

    extra : dict, default: ``None``
        Specifies one or more extra parameters by which clients can connect to the instance.

        For example: ``{"useSSL": "false", "allowPublicKeyRetrieval": "true"}``

        See `MySQL JDBC driver properties documentation
        <https://dev.mysql.com/doc/connector-j/en/connector-j-reference-configuration-properties.html>`_
        for more details

    Examples
    --------

    Create and check MySQL connection:

    .. code:: python

        from onetl.connection import MySQL
        from pyspark.sql import SparkSession

        # Create Spark session with MySQL driver loaded
        maven_packages = MySQL.get_packages()
        spark = (
            SparkSession.builder.appName("spark-app-name")
            .config("spark.jars.packages", ",".join(maven_packages))
            .getOrCreate()
        )

        # Create connection
        mysql = MySQL(
            host="database.host.or.ip",
            user="user",
            password="*****",
            extra={"useSSL": "false", "allowPublicKeyRetrieval": "true"},
            spark=spark,
        ).check()

    """

    host: Host
    port: int = 3306
    database: Optional[str] = None
    extra: MySQLExtra = MySQLExtra()

    ReadOptions = MySQLReadOptions
    WriteOptions = MySQLWriteOptions
    SQLOptions = MySQLSQLOptions
    FetchOptions = MySQLFetchOptions
    ExecuteOptions = MySQLExecuteOptions

    Extra = MySQLExtra
    Dialect = MySQLDialect

    DRIVER: ClassVar[str] = "com.mysql.cj.jdbc.Driver"

    @slot
    @classmethod
    def get_packages(cls, package_version: str | None = None) -> list[str]:
        """
        Get package names to be downloaded by Spark. Allows specifying a custom JDBC driver version for MySQL. |support_hooks|

        .. versionadded:: 0.9.0

        Parameters
        ----------
        package_version : str, optional
            Specifies the version of the MySQL JDBC driver to use. Defaults to ``9.5.0``.

            .. versionadded:: 0.11.0

        Examples
        --------
        .. code:: python

            from onetl.connection import MySQL

            MySQL.get_packages()

            # specify a custom package version
            MySQL.get_packages(package_version="8.2.0")
        """
        default_version = "9.5.0"
        version = Version(package_version or default_version).min_digits(3)

        return [f"com.mysql:mysql-connector-j:{version}"]

    @classproperty
    def package(cls) -> str:
        """Get package name to be downloaded by Spark."""
        msg = "`MySQL.package` will be removed in 1.0.0, use `MySQL.get_packages()` instead"
        warnings.warn(msg, UserWarning, stacklevel=3)
        return "com.mysql:mysql-connector-j:9.5.0"

    @property
    def jdbc_url(self) -> str:
        if self.database:
            return f"jdbc:mysql://{self.host}:{self.port}/{self.database}"

        return f"jdbc:mysql://{self.host}:{self.port}"

    @property
    def jdbc_params(self) -> dict:
        result = super().jdbc_params
        result.update(self.extra.dict(by_alias=True))
        # https://dev.mysql.com/doc/connector-j/en/connector-j-connp-props-connection.html
        # https://stackoverflow.com/questions/31722323/mysql-connection-with-advanced-attributes-such-as-program-name
        client_info = f"program_name:{get_client_info(self.spark, unsupported=':,')}"
        connection_attributes = result.get("connectionAttributes")
        if connection_attributes and "program_name:" not in connection_attributes:
            result["connectionAttributes"] = f"{connection_attributes},{client_info}"
        elif not connection_attributes:
            result["connectionAttributes"] = client_info
        return result

    @property
    def instance_url(self) -> str:
        return f"{self.__class__.__name__.lower()}://{self.host}:{self.port}"

    def __str__(self):
        return f"{self.__class__.__name__}[{self.host}:{self.port}]"

    def _get_jdbc_connection(self, options: JDBCFetchOptions | JDBCExecuteOptions, read_only: bool):
        connection = super()._get_jdbc_connection(options, read_only)

        # connection.setReadOnly() is no-op in MySQL JDBC driver. Session type can be changed by statement:
        # https://stackoverflow.com/questions/10240890/sql-open-connection-in-read-only-mode#comment123789248_48959180
        # https://dev.mysql.com/doc/refman/8.4/en/set-transaction.html
        transaction = "READ ONLY" if read_only else "READ WRITE"
        statement = connection.prepareStatement(f"SET SESSION TRANSACTION {transaction};")
        with closing(statement):
            statement.execute()

        return connection

get_packages(package_version=None) classmethod

Get package names to be downloaded by Spark. Allows specifying a custom JDBC driver version for MySQL. |support_hooks|

.. versionadded:: 0.9.0

Parameters

package_version : str, optional Specifies the version of the MySQL JDBC driver to use. Defaults to 9.5.0.

.. versionadded:: 0.11.0

Examples

.. code:: python

from onetl.connection import MySQL

MySQL.get_packages()

# specify a custom package version
MySQL.get_packages(package_version="8.2.0")
Source code in onetl/connection/db_connection/mysql/connection.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
@slot
@classmethod
def get_packages(cls, package_version: str | None = None) -> list[str]:
    """
    Get package names to be downloaded by Spark. Allows specifying a custom JDBC driver version for MySQL. |support_hooks|

    .. versionadded:: 0.9.0

    Parameters
    ----------
    package_version : str, optional
        Specifies the version of the MySQL JDBC driver to use. Defaults to ``9.5.0``.

        .. versionadded:: 0.11.0

    Examples
    --------
    .. code:: python

        from onetl.connection import MySQL

        MySQL.get_packages()

        # specify a custom package version
        MySQL.get_packages(package_version="8.2.0")
    """
    default_version = "9.5.0"
    version = Version(package_version or default_version).min_digits(3)

    return [f"com.mysql:mysql-connector-j:{version}"]