MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Python通过PyMySQL连接MySQL数据库

2023-08-292.1k 阅读

安装 PyMySQL

在开始使用 PyMySQL 连接 MySQL 数据库之前,我们需要先安装它。如果你使用的是 pip 包管理器(大多数 Python 环境默认安装),可以在命令行中运行以下命令来安装 PyMySQL:

pip install pymysql

如果你的系统中同时安装了 Python 2 和 Python 3,并且 pip 指向 Python 2,你可能需要使用 pip3 来确保安装到 Python 3 的环境中:

pip3 install pymysql

安装完成后,就可以在 Python 代码中导入并使用 PyMySQL 了。

基本连接

连接参数

要连接到 MySQL 数据库,我们需要提供一些必要的参数。主要的参数包括:

  • 主机(host):MySQL 服务器所在的主机地址,通常是 localhost,如果是远程服务器,则填写服务器的 IP 地址或域名。
  • 端口(port):MySQL 服务器监听的端口,默认是 3306
  • 用户名(user):用于登录 MySQL 数据库的用户名。
  • 密码(password):对应用户名的密码。
  • 数据库名(database):要连接的具体数据库名称。

简单连接示例

下面是一个简单的 Python 代码示例,展示如何使用 PyMySQL 连接到 MySQL 数据库:

import pymysql

# 建立数据库连接
connection = pymysql.connect(
    host='localhost',
    port=3306,
    user='your_username',
    password='your_password',
    database='your_database'
)

try:
    # 使用 cursor() 方法创建一个游标对象 cursor
    with connection.cursor() as cursor:
        # SQL 查询语句
        sql = "SELECT VERSION()"
        # 执行 SQL 语句
        cursor.execute(sql)
        # 获取单条数据
        result = cursor.fetchone()
        print("Database version : %s " % result)
finally:
    # 关闭连接
    connection.close()

在上述代码中:

  1. 我们首先导入了 pymysql 模块。
  2. 使用 pymysql.connect() 方法建立与 MySQL 数据库的连接,传入了主机、端口、用户名、密码和数据库名等参数。
  3. 通过 connection.cursor() 创建一个游标对象 cursor,游标用于执行 SQL 语句并获取结果。
  4. 定义了一个简单的 SQL 查询语句 SELECT VERSION(),用于获取 MySQL 数据库的版本信息。
  5. 使用 cursor.execute() 方法执行 SQL 语句。
  6. 使用 cursor.fetchone() 方法获取查询结果的第一行数据。
  7. 最后,在 finally 块中关闭数据库连接,以确保资源被正确释放。

执行 SQL 语句

执行查询语句

查询单条记录

除了获取数据库版本信息,我们通常会执行更复杂的查询语句,例如从表中检索数据。下面是一个从名为 users 的表中查询单条记录的示例:

import pymysql

connection = pymysql.connect(
    host='localhost',
    port=3306,
    user='your_username',
    password='your_password',
    database='your_database'
)

try:
    with connection.cursor() as cursor:
        sql = "SELECT id, name, age FROM users WHERE id = %s"
        cursor.execute(sql, (1,))
        result = cursor.fetchone()
        if result:
            print(f"ID: {result[0]}, Name: {result[1]}, Age: {result[2]}")
finally:
    connection.close()

在这个示例中:

  1. 定义了一个 SQL 查询语句 SELECT id, name, age FROM users WHERE id = %s,其中 %s 是一个占位符。
  2. 使用 cursor.execute(sql, (1,)) 方法执行查询语句,并传入占位符的值 (1,)。这样做可以防止 SQL 注入攻击。
  3. 使用 cursor.fetchone() 获取查询结果的第一行数据,如果有结果,则打印出用户的 ID、姓名和年龄。

查询多条记录

如果要查询多条记录,可以使用 cursor.fetchall() 方法。示例如下:

import pymysql

connection = pymysql.connect(
    host='localhost',
    port=3306,
    user='your_username',
    password='your_password',
    database='your_database'
)

try:
    with connection.cursor() as cursor:
        sql = "SELECT id, name, age FROM users WHERE age > %s"
        cursor.execute(sql, (18,))
        results = cursor.fetchall()
        for row in results:
            print(f"ID: {row[0]}, Name: {row[1]}, Age: {row[2]}")
finally:
    connection.close()

在此代码中:

  1. SQL 查询语句 SELECT id, name, age FROM users WHERE age > %s 用于查询年龄大于 18 岁的用户。
  2. 使用 cursor.execute() 执行查询,并传入参数 (18,)
  3. 使用 cursor.fetchall() 获取所有查询结果,然后通过循环遍历打印每一行数据。

执行插入语句

要向数据库中插入数据,可以使用 INSERT 语句。以下是一个示例:

import pymysql

connection = pymysql.connect(
    host='localhost',
    port=3306,
    user='your_username',
    password='your_password',
    database='your_database'
)

try:
    with connection.cursor() as cursor:
        sql = "INSERT INTO users (name, age) VALUES (%s, %s)"
        cursor.execute(sql, ('John Doe', 25))
        # 提交事务,使插入操作生效
        connection.commit()
        print("Data inserted successfully.")
except pymysql.Error as e:
    print(f"Error inserting data: {e}")
    # 发生错误时回滚事务
    connection.rollback()
finally:
    connection.close()

在这个例子中:

  1. 定义了 INSERT INTO users (name, age) VALUES (%s, %s) 这样的插入语句,其中 %s 是占位符。
  2. 使用 cursor.execute() 方法执行插入语句,并传入要插入的数据 ('John Doe', 25)
  3. 使用 connection.commit() 方法提交事务,使插入操作永久保存到数据库中。如果不提交事务,数据不会真正插入。
  4. 使用 try - except 块捕获可能发生的错误,如果发生错误,打印错误信息并使用 connection.rollback() 回滚事务,撤销插入操作。

执行更新语句

更新数据库中的数据可以使用 UPDATE 语句。以下是一个示例:

import pymysql

connection = pymysql.connect(
    host='localhost',
    port=3306,
    user='your_username',
    password='your_password',
    database='your_database'
)

try:
    with connection.cursor() as cursor:
        sql = "UPDATE users SET age = %s WHERE name = %s"
        cursor.execute(sql, (26, 'John Doe'))
        connection.commit()
        print("Data updated successfully.")
except pymysql.Error as e:
    print(f"Error updating data: {e}")
    connection.rollback()
finally:
    connection.close()

在此代码中:

  1. SQL 语句 UPDATE users SET age = %s WHERE name = %s 用于更新 users 表中名为 John Doe 的用户的年龄。
  2. 使用 cursor.execute() 方法执行更新语句,并传入新的年龄值和用户名。
  3. 同样使用 connection.commit() 提交事务使更新生效,在发生错误时使用 connection.rollback() 回滚事务。

执行删除语句

删除数据库中的数据可以使用 DELETE 语句。示例如下:

import pymysql

connection = pymysql.connect(
    host='localhost',
    port=3306,
    user='your_username',
    password='your_password',
    database='your_database'
)

try:
    with connection.cursor() as cursor:
        sql = "DELETE FROM users WHERE name = %s"
        cursor.execute(sql, ('John Doe',))
        connection.commit()
        print("Data deleted successfully.")
except pymysql.Error as e:
    print(f"Error deleting data: {e}")
    connection.rollback()
finally:
    connection.close()

在这个示例中:

  1. DELETE FROM users WHERE name = %s 语句用于删除 users 表中名为 John Doe 的用户记录。
  2. 使用 cursor.execute() 执行删除语句,并传入用户名。
  3. 通过 connection.commit() 提交事务使删除生效,错误时通过 connection.rollback() 回滚事务。

事务处理

事务的概念

事务是数据库操作的一个逻辑单元,它由一组相关的数据库操作组成,这些操作要么全部成功执行,要么全部不执行。事务具有 ACID 属性:

  • 原子性(Atomicity):事务中的所有操作要么全部完成,要么全部不完成,不会出现部分成功部分失败的情况。
  • 一致性(Consistency):事务执行前后,数据库的完整性约束不会被破坏。
  • 隔离性(Isolation):多个事务并发执行时,一个事务的执行不会被其他事务干扰,各个事务之间相互隔离。
  • 持久性(Durability):一旦事务提交,它对数据库所做的修改就会永久保存,即使系统崩溃也不会丢失。

使用 PyMySQL 进行事务处理

在前面的插入、更新和删除示例中,我们已经看到了基本的事务处理方式。下面通过一个更复杂的示例来深入理解事务处理。假设我们有两个表 accountstransactionsaccounts 表存储用户账户信息,transactions 表记录账户之间的转账记录。我们要实现一个转账功能,从一个账户向另一个账户转账,这涉及到两个表的操作,必须在一个事务中完成以保证数据的一致性。

import pymysql

connection = pymysql.connect(
    host='localhost',
    port=3306,
    user='your_username',
    password='your_password',
    database='your_database'
)

try:
    with connection.cursor() as cursor:
        # 开启事务
        connection.begin()
        # 从账户 A 减少金额
        sql = "UPDATE accounts SET balance = balance - %s WHERE account_number = %s"
        cursor.execute(sql, (100, 'A123'))
        # 向账户 B 增加金额
        cursor.execute("UPDATE accounts SET balance = balance + %s WHERE account_number = %s", (100, 'B456'))
        # 记录转账记录到 transactions 表
        cursor.execute("INSERT INTO transactions (from_account, to_account, amount) VALUES (%s, %s, %s)", ('A123', 'B456', 100))
        # 提交事务
        connection.commit()
        print("Transfer successful.")
except pymysql.Error as e:
    print(f"Error during transfer: {e}")
    # 发生错误时回滚事务
    connection.rollback()
finally:
    connection.close()

在这个示例中:

  1. 使用 connection.begin() 方法显式开启一个事务。
  2. 依次执行从账户 A 减少金额、向账户 B 增加金额以及记录转账记录到 transactions 表的 SQL 语句。
  3. 如果所有操作都成功,使用 connection.commit() 提交事务,使所有修改生效。
  4. 如果在执行过程中发生任何错误,try - except 块捕获异常,打印错误信息,并使用 connection.rollback() 回滚事务,撤销之前的所有操作,确保数据库状态不会出现不一致的情况。

处理不同的数据类型

日期和时间类型

MySQL 中有多种日期和时间类型,如 DATETIMEDATETIMETIMESTAMP。在 Python 中,我们可以使用 datetime 模块来处理这些类型的数据。

插入日期和时间数据

假设我们有一个表 events,包含 event_dateDATE 类型)和 event_timeDATETIME 类型)字段,以下是插入数据的示例:

import pymysql
from datetime import date, datetime

connection = pymysql.connect(
    host='localhost',
    port=3306,
    user='your_username',
    password='your_password',
    database='your_database'
)

try:
    with connection.cursor() as cursor:
        event_date = date(2023, 10, 1)
        event_datetime = datetime(2023, 10, 1, 10, 30, 0)
        sql = "INSERT INTO events (event_date, event_time) VALUES (%s, %s)"
        cursor.execute(sql, (event_date, event_datetime))
        connection.commit()
        print("Data inserted successfully.")
except pymysql.Error as e:
    print(f"Error inserting data: {e}")
    connection.rollback()
finally:
    connection.close()

在这个例子中,我们使用 datedatetime 类创建日期和日期时间对象,并将它们作为参数传递给 cursor.execute() 方法来插入数据。

查询日期和时间数据

查询日期和时间数据与查询其他类型数据类似,只是在处理结果时需要注意数据类型。

import pymysql
from datetime import date, datetime

connection = pymysql.connect(
    host='localhost',
    port=3306,
    user='your_username',
    password='your_password',
    database='your_database'
)

try:
    with connection.cursor() as cursor:
        sql = "SELECT event_date, event_time FROM events WHERE event_date >= %s"
        start_date = date(2023, 10, 1)
        cursor.execute(sql, (start_date,))
        results = cursor.fetchall()
        for row in results:
            event_date = row[0]
            event_datetime = row[1]
            print(f"Event date: {event_date}, Event time: {event_datetime}")
finally:
    connection.close()

在这个查询示例中,我们获取符合条件的日期和日期时间数据,并在 Python 中以相应的 datedatetime 对象形式处理。

二进制数据类型

MySQL 支持二进制数据类型,如 BLOB(Binary Large Object)和 LONGBLOB。在 Python 中,可以使用字节串(bytes)来处理二进制数据。

插入二进制数据

假设我们有一个表 files,包含 file_nameVARCHAR 类型)和 file_contentBLOB 类型)字段,用于存储文件的名称和内容。以下是插入二进制数据的示例:

import pymysql

connection = pymysql.connect(
    host='localhost',
    port=3306,
    user='your_username',
    password='your_password',
    database='your_database'
)

try:
    with connection.cursor() as cursor:
        file_name = 'example.txt'
        with open(file_name, 'rb') as file:
            file_content = file.read()
        sql = "INSERT INTO files (file_name, file_content) VALUES (%s, %s)"
        cursor.execute(sql, (file_name, file_content))
        connection.commit()
        print("Binary data inserted successfully.")
except pymysql.Error as e:
    print(f"Error inserting binary data: {e}")
    connection.rollback()
finally:
    connection.close()

在这个示例中,我们打开一个文件并以二进制模式读取其内容,然后将文件名和文件内容作为参数传递给 cursor.execute() 方法来插入二进制数据。

查询二进制数据

查询二进制数据后,我们可以将其保存为文件。

import pymysql

connection = pymysql.connect(
    host='localhost',
    port=3306,
    user='your_username',
    password='your_password',
    database='your_database'
)

try:
    with connection.cursor() as cursor:
        sql = "SELECT file_name, file_content FROM files WHERE file_name = %s"
        file_name = 'example.txt'
        cursor.execute(sql, (file_name,))
        result = cursor.fetchone()
        if result:
            file_name = result[0]
            file_content = result[1]
            with open(file_name, 'wb') as file:
                file.write(file_content)
            print("Binary data retrieved and saved successfully.")
finally:
    connection.close()

在这个查询示例中,我们根据文件名查询二进制数据,并将其写入一个新的文件中。

高级主题

连接池

在实际应用中,频繁地创建和关闭数据库连接会带来性能开销。连接池可以解决这个问题,它预先创建一组数据库连接并管理它们,应用程序需要连接时从连接池中获取,使用完毕后再归还到连接池中,而不是每次都创建和销毁连接。

使用 DBUtils 实现连接池

DBUtils 是一个流行的 Python 连接池库,它支持多种数据库,包括 MySQL。以下是使用 DBUtils 和 PyMySQL 实现连接池的示例:

from dbutils.pooled_db import PooledDB
import pymysql

# 创建连接池
pool = PooledDB(
    creator=pymysql,
    host='localhost',
    port=3306,
    user='your_username',
    password='your_password',
    database='your_database',
    autocommit=True,
    maxconnections=10
)

try:
    # 从连接池中获取连接
    connection = pool.connection()
    with connection.cursor() as cursor:
        sql = "SELECT VERSION()"
        cursor.execute(sql)
        result = cursor.fetchone()
        print("Database version : %s " % result)
finally:
    # 归还连接到连接池
    connection.close()

在上述代码中:

  1. 使用 PooledDB 创建一个连接池,creator 参数指定使用 pymysql 作为数据库驱动,还设置了主机、端口、用户名、密码、数据库名等连接参数,autocommit 设置为 True 表示自动提交事务,maxconnections 设置最大连接数为 10。
  2. 通过 pool.connection() 从连接池中获取一个连接。
  3. 使用完毕后,调用 connection.close() 将连接归还到连接池,而不是真正关闭连接。

处理大结果集

当查询返回的结果集非常大时,一次性将所有数据加载到内存中可能会导致内存不足的问题。PyMySQL 提供了一种逐行读取大结果集的方法,以避免这种情况。

使用 SSCursor 逐行读取

SSCursor 是 PyMySQL 中的一种游标类型,它允许逐行读取结果集,而不是一次性获取所有数据。

import pymysql

connection = pymysql.connect(
    host='localhost',
    port=3306,
    user='your_username',
    password='your_password',
    database='your_database'
)

try:
    with connection.cursor(pymysql.cursors.SSCursor) as cursor:
        sql = "SELECT * FROM large_table"
        cursor.execute(sql)
        for row in cursor:
            print(row)
finally:
    connection.close()

在这个示例中,我们使用 pymysql.cursors.SSCursor 创建了一个 SSCursor 游标。在执行查询后,通过循环逐行读取结果集,这样每次只在内存中保留一行数据,大大减少了内存占用。

处理数据库异常

在与数据库交互过程中,可能会发生各种异常,如连接错误、SQL 语法错误、数据类型不匹配等。正确处理这些异常可以使程序更加健壮。

常见异常类型

  • pymysql.OperationalError:通常表示数据库操作相关的错误,如连接失败、数据库不存在等。
  • pymysql.ProgrammingError:一般是由于 SQL 语法错误、使用未定义的表或列等编程错误引起的。
  • pymysql.DataError:当数据类型不匹配、数据溢出等与数据相关的错误发生时会抛出此异常。

异常处理示例

import pymysql

connection = pymysql.connect(
    host='localhost',
    port=3306,
    user='your_username',
    password='your_password',
    database='your_database'
)

try:
    with connection.cursor() as cursor:
        sql = "SELECT * FROM non_existent_table"
        cursor.execute(sql)
except pymysql.OperationalError as e:
    print(f"Operational error: {e}")
except pymysql.ProgrammingError as e:
    print(f"Programming error: {e}")
except pymysql.DataError as e:
    print(f"Data error: {e}")
finally:
    connection.close()

在这个示例中,我们故意尝试查询一个不存在的表,通过 try - except 块捕获可能出现的不同类型异常,并打印相应的错误信息,确保程序在遇到错误时不会崩溃,而是能够优雅地处理并提供有用的错误提示。

通过以上详细内容,你应该对使用 PyMySQL 在 Python 中连接和操作 MySQL 数据库有了全面深入的了解。无论是简单的查询插入,还是复杂的事务处理、连接池使用等高级主题,都能在实际项目中灵活运用。