Bases: JDBCConnection
MySQL JDBC connection. |support_hooks|
Based on Maven package com.mysql:mysql-connector-j:9.5.0 <https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/9.5.0>
(official MySQL JDBC driver <https://dev.mysql.com/doc/connector-j/en/>).
.. seealso::
Before using this connector please take into account :ref:`mysql-prerequisites`
.. versionadded:: 0.1.0
Parameters
host : str
Host of MySQL database. For example: mysql0012.domain.com or 192.168.1.11
int, default: 3306
Port of MySQL database
str
User, which have proper access to the database. For example: some_user
str
Password for database connection
str
Database in RDBMS, NOT schema.
See this page <https://www.educba.com/postgresql-database-vs-schema/>_ for more details
:obj:pyspark.sql.SparkSession
Spark session.
Examples
Create and check MySQL connection:
.. code:: python
from onetl.connection import MySQL
from pyspark.sql import SparkSession
# Create Spark session with MySQL driver loaded
maven_packages = MySQL.get_packages()
spark = (
SparkSession.builder.appName("spark-app-name")
.config("spark.jars.packages", ",".join(maven_packages))
.getOrCreate()
)
# Create connection
mysql = MySQL(
host="database.host.or.ip",
user="user",
password="*****",
extra={"useSSL": "false", "allowPublicKeyRetrieval": "true"},
spark=spark,
).check()
Source code in onetl/connection/db_connection/mysql/connection.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207 | @support_hooks
class MySQL(JDBCConnection):
"""MySQL JDBC connection. |support_hooks|
Based on Maven package `com.mysql:mysql-connector-j:9.5.0 <https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/9.5.0>`_
(`official MySQL JDBC driver <https://dev.mysql.com/doc/connector-j/en/>`_).
.. seealso::
Before using this connector please take into account :ref:`mysql-prerequisites`
.. versionadded:: 0.1.0
Parameters
----------
host : str
Host of MySQL database. For example: ``mysql0012.domain.com`` or ``192.168.1.11``
port : int, default: ``3306``
Port of MySQL database
user : str
User, which have proper access to the database. For example: ``some_user``
password : str
Password for database connection
database : str
Database in RDBMS, NOT schema.
See `this page <https://www.educba.com/postgresql-database-vs-schema/>`_ for more details
spark : :obj:`pyspark.sql.SparkSession`
Spark session.
extra : dict, default: ``None``
Specifies one or more extra parameters by which clients can connect to the instance.
For example: ``{"useSSL": "false", "allowPublicKeyRetrieval": "true"}``
See `MySQL JDBC driver properties documentation
<https://dev.mysql.com/doc/connector-j/en/connector-j-reference-configuration-properties.html>`_
for more details
Examples
--------
Create and check MySQL connection:
.. code:: python
from onetl.connection import MySQL
from pyspark.sql import SparkSession
# Create Spark session with MySQL driver loaded
maven_packages = MySQL.get_packages()
spark = (
SparkSession.builder.appName("spark-app-name")
.config("spark.jars.packages", ",".join(maven_packages))
.getOrCreate()
)
# Create connection
mysql = MySQL(
host="database.host.or.ip",
user="user",
password="*****",
extra={"useSSL": "false", "allowPublicKeyRetrieval": "true"},
spark=spark,
).check()
"""
host: Host
port: int = 3306
database: Optional[str] = None
extra: MySQLExtra = MySQLExtra()
ReadOptions = MySQLReadOptions
WriteOptions = MySQLWriteOptions
SQLOptions = MySQLSQLOptions
FetchOptions = MySQLFetchOptions
ExecuteOptions = MySQLExecuteOptions
Extra = MySQLExtra
Dialect = MySQLDialect
DRIVER: ClassVar[str] = "com.mysql.cj.jdbc.Driver"
@slot
@classmethod
def get_packages(cls, package_version: str | None = None) -> list[str]:
"""
Get package names to be downloaded by Spark. Allows specifying a custom JDBC driver version for MySQL. |support_hooks|
.. versionadded:: 0.9.0
Parameters
----------
package_version : str, optional
Specifies the version of the MySQL JDBC driver to use. Defaults to ``9.5.0``.
.. versionadded:: 0.11.0
Examples
--------
.. code:: python
from onetl.connection import MySQL
MySQL.get_packages()
# specify a custom package version
MySQL.get_packages(package_version="8.2.0")
"""
default_version = "9.5.0"
version = Version(package_version or default_version).min_digits(3)
return [f"com.mysql:mysql-connector-j:{version}"]
@classproperty
def package(cls) -> str:
"""Get package name to be downloaded by Spark."""
msg = "`MySQL.package` will be removed in 1.0.0, use `MySQL.get_packages()` instead"
warnings.warn(msg, UserWarning, stacklevel=3)
return "com.mysql:mysql-connector-j:9.5.0"
@property
def jdbc_url(self) -> str:
if self.database:
return f"jdbc:mysql://{self.host}:{self.port}/{self.database}"
return f"jdbc:mysql://{self.host}:{self.port}"
@property
def jdbc_params(self) -> dict:
result = super().jdbc_params
result.update(self.extra.dict(by_alias=True))
# https://dev.mysql.com/doc/connector-j/en/connector-j-connp-props-connection.html
# https://stackoverflow.com/questions/31722323/mysql-connection-with-advanced-attributes-such-as-program-name
client_info = f"program_name:{get_client_info(self.spark, unsupported=':,')}"
connection_attributes = result.get("connectionAttributes")
if connection_attributes and "program_name:" not in connection_attributes:
result["connectionAttributes"] = f"{connection_attributes},{client_info}"
elif not connection_attributes:
result["connectionAttributes"] = client_info
return result
@property
def instance_url(self) -> str:
return f"{self.__class__.__name__.lower()}://{self.host}:{self.port}"
def __str__(self):
return f"{self.__class__.__name__}[{self.host}:{self.port}]"
def _get_jdbc_connection(self, options: JDBCFetchOptions | JDBCExecuteOptions, read_only: bool):
connection = super()._get_jdbc_connection(options, read_only)
# connection.setReadOnly() is no-op in MySQL JDBC driver. Session type can be changed by statement:
# https://stackoverflow.com/questions/10240890/sql-open-connection-in-read-only-mode#comment123789248_48959180
# https://dev.mysql.com/doc/refman/8.4/en/set-transaction.html
transaction = "READ ONLY" if read_only else "READ WRITE"
statement = connection.prepareStatement(f"SET SESSION TRANSACTION {transaction};")
with closing(statement):
statement.execute()
return connection
|
get_packages(package_version=None)
classmethod
Get package names to be downloaded by Spark. Allows specifying a custom JDBC driver version for MySQL. |support_hooks|
.. versionadded:: 0.9.0
Parameters
package_version : str, optional
Specifies the version of the MySQL JDBC driver to use. Defaults to 9.5.0.
.. versionadded:: 0.11.0
Examples
.. code:: python
from onetl.connection import MySQL
MySQL.get_packages()
# specify a custom package version
MySQL.get_packages(package_version="8.2.0")
Source code in onetl/connection/db_connection/mysql/connection.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159 | @slot
@classmethod
def get_packages(cls, package_version: str | None = None) -> list[str]:
"""
Get package names to be downloaded by Spark. Allows specifying a custom JDBC driver version for MySQL. |support_hooks|
.. versionadded:: 0.9.0
Parameters
----------
package_version : str, optional
Specifies the version of the MySQL JDBC driver to use. Defaults to ``9.5.0``.
.. versionadded:: 0.11.0
Examples
--------
.. code:: python
from onetl.connection import MySQL
MySQL.get_packages()
# specify a custom package version
MySQL.get_packages(package_version="8.2.0")
"""
default_version = "9.5.0"
version = Version(package_version or default_version).min_digits(3)
return [f"com.mysql:mysql-connector-j:{version}"]
|