MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Python使用SQLAlchemy连接MySQL数据库

2024-10-167.9k 阅读

一、SQLAlchemy简介

SQLAlchemy 是 Python 中最受欢迎的数据库抽象层库之一。它提供了一种高级的数据库抽象层,允许开发者使用 Python 代码与各种数据库进行交互,而无需编写特定于数据库的 SQL 语句。这种抽象层的好处在于,开发者可以在不改变太多代码的情况下,轻松切换数据库后端,比如从 MySQL 切换到 PostgreSQL。

SQLAlchemy 有两个主要的组件:SQL 表达式语言(SQL Expression Language)和 ORM(Object Relational Mapping,对象关系映射)。

  1. SQL 表达式语言:这是 SQLAlchemy 的核心部分,它允许开发者以 Pythonic 的方式构建 SQL 语句。通过这种方式,开发者可以利用 Python 的语法和数据结构来生成复杂的 SQL 查询,而不需要手动编写字符串形式的 SQL 语句。这不仅提高了代码的可读性和可维护性,还减少了 SQL 注入的风险。例如,以下代码使用 SQL 表达式语言查询数据库中的所有用户:
from sqlalchemy import create_engine, MetaData, Table, select

# 创建数据库引擎
engine = create_engine('mysql+pymysql://user:password@localhost/mydb')

# 定义元数据
metadata = MetaData()

# 反射 users 表
users = Table('users', metadata, autoload_with=engine)

# 创建查询
stmt = select([users])

# 执行查询
with engine.connect() as conn:
    result = conn.execute(stmt)
    for row in result:
        print(row)
  1. ORM:ORM 是 SQLAlchemy 提供的一种强大的功能,它允许开发者将数据库表映射为 Python 类。通过这种映射,开发者可以使用面向对象的方式与数据库进行交互,而不是编写 SQL 语句。例如,假设我们有一个 users 表,我们可以定义一个对应的 Python 类 User
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

# 创建数据库引擎
engine = create_engine('mysql+pymysql://user:password@localhost/mydb')

# 创建基类
Base = declarative_base()

# 定义 User 类
class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    email = Column(String(120))

    def __repr__(self):
        return f"<User(name='{self.name}', email='{self.email}')>"

# 创建会话
Session = sessionmaker(bind=engine)
session = Session()

# 查询所有用户
users = session.query(User).all()
for user in users:
    print(user)

二、安装必要的库

在开始使用 SQLAlchemy 连接 MySQL 数据库之前,我们需要安装两个主要的库:SQLAlchemy 和一个 MySQL 驱动。在 Python 中,常用的 MySQL 驱动有 mysql - connector - pythonpymysql。这里我们以 pymysql 为例。

  1. 安装 SQLAlchemy: 可以使用 pip 命令来安装 SQLAlchemy:
pip install sqlalchemy
  1. 安装 pymysql: 同样使用 pip 安装 pymysql
pip install pymysql

三、连接 MySQL 数据库

  1. 创建数据库引擎: 在 SQLAlchemy 中,连接数据库的第一步是创建一个数据库引擎。数据库引擎是 SQLAlchemy 与数据库之间的桥梁,它负责管理数据库连接池、执行 SQL 语句等操作。创建数据库引擎的语法如下:
from sqlalchemy import create_engine

engine = create_engine('mysql+pymysql://username:password@host:port/database_name')

其中:

  • mysql+pymysql:表示使用 pymysql 作为 MySQL 的驱动。如果使用 mysql - connector - python,则应写成 mysql+mysqlconnector
  • usernamepassword:是 MySQL 数据库的用户名和密码。
  • hostport:分别是数据库服务器的主机地址和端口号。默认情况下,MySQL 的端口号是 3306。
  • database_name:是要连接的数据库名称。

例如,如果你的 MySQL 数据库运行在本地,用户名是 root,密码是 password,数据库名称是 test,则创建引擎的代码如下:

engine = create_engine('mysql+pymysql://root:password@localhost:3306/test')
  1. 测试连接: 创建引擎后,我们可以测试连接是否成功。一种简单的方法是使用 engine.connect() 方法来获取一个数据库连接对象,并尝试执行一个简单的 SQL 语句,比如 SELECT 1
from sqlalchemy import create_engine

engine = create_engine('mysql+pymysql://root:password@localhost:3306/test')

try:
    with engine.connect() as conn:
        result = conn.execute('SELECT 1')
        print(result.fetchone())
except Exception as e:
    print(f"连接数据库时出错: {e}")

如果连接成功,上述代码将输出 (1,),表示数据库连接正常。

四、使用 SQL 表达式语言操作数据库

  1. 定义表结构: 在使用 SQL 表达式语言之前,我们需要定义数据库表的结构。这可以通过 MetaDataTable 类来完成。MetaData 类用于存储数据库模式的元数据,而 Table 类用于定义单个表的结构。
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String

# 创建数据库引擎
engine = create_engine('mysql+pymysql://root:password@localhost:3306/test')

# 创建元数据对象
metadata = MetaData()

# 定义 users 表
users = Table('users', metadata,
              Column('id', Integer, primary_key=True),
              Column('name', String(50)),
              Column('email', String(120))
              )

# 创建表(如果不存在)
metadata.create_all(engine)

上述代码定义了一个名为 users 的表,包含 idnameemail 三个列。id 列是主键,nameemail 列分别存储字符串类型的数据。最后,使用 metadata.create_all(engine) 方法在数据库中创建该表(如果表不存在)。

  1. 插入数据: 插入数据可以使用 insert() 方法。以下代码演示了如何向 users 表中插入一条记录:
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
from sqlalchemy import insert

# 创建数据库引擎
engine = create_engine('mysql+pymysql://root:password@localhost:3306/test')

# 创建元数据对象
metadata = MetaData()

# 定义 users 表
users = Table('users', metadata,
              Column('id', Integer, primary_key=True),
              Column('name', String(50)),
              Column('email', String(120))
              )

# 创建插入语句
stmt = insert(users).values(name='John Doe', email='johndoe@example.com')

# 执行插入语句
with engine.connect() as conn:
    result = conn.execute(stmt)
    print(f"插入的记录 ID: {result.lastrowid}")
  1. 查询数据: 查询数据使用 select() 方法。以下代码演示了如何查询 users 表中的所有记录:
from sqlalchemy import create_engine, MetaData, Table, select

# 创建数据库引擎
engine = create_engine('mysql+pymysql://root:password@localhost:3306/test')

# 创建元数据对象
metadata = MetaData()

# 定义 users 表
users = Table('users', metadata, autoload_with=engine)

# 创建查询语句
stmt = select([users])

# 执行查询语句
with engine.connect() as conn:
    result = conn.execute(stmt)
    for row in result:
        print(row)
  1. 更新数据: 更新数据使用 update() 方法。以下代码演示了如何将 users 表中 nameJohn Doe 的记录的 email 更新为 newemail@example.com
from sqlalchemy import create_engine, MetaData, Table, update

# 创建数据库引擎
engine = create_engine('mysql+pymysql://root:password@localhost:3306/test')

# 创建元数据对象
metadata = MetaData()

# 定义 users 表
users = Table('users', metadata, autoload_with=engine)

# 创建更新语句
stmt = update(users).where(users.c.name == 'John Doe').values(email='newemail@example.com')

# 执行更新语句
with engine.connect() as conn:
    result = conn.execute(stmt)
    print(f"更新的记录数: {result.rowcount}")
  1. 删除数据: 删除数据使用 delete() 方法。以下代码演示了如何删除 users 表中 nameJohn Doe 的记录:
from sqlalchemy import create_engine, MetaData, Table, delete

# 创建数据库引擎
engine = create_engine('mysql+pymysql://root:password@localhost:3306/test')

# 创建元数据对象
metadata = MetaData()

# 定义 users 表
users = Table('users', metadata, autoload_with=engine)

# 创建删除语句
stmt = delete(users).where(users.c.name == 'John Doe')

# 执行删除语句
with engine.connect() as conn:
    result = conn.execute(stmt)
    print(f"删除的记录数: {result.rowcount}")

五、使用 ORM 操作数据库

  1. 定义映射类: 在使用 ORM 时,我们需要定义 Python 类来映射数据库表。这些类继承自 declarative_base() 创建的基类。
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

# 创建数据库引擎
engine = create_engine('mysql+pymysql://root:password@localhost:3306/test')

# 创建基类
Base = declarative_base()

# 定义 User 类
class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    email = Column(String(120))

    def __repr__(self):
        return f"<User(name='{self.name}', email='{self.email}')>"

上述代码定义了一个 User 类,映射到 users 表。__tablename__ 变量指定了映射的表名,类的属性对应表的列。

  1. 创建会话: 在使用 ORM 操作数据库之前,我们需要创建一个会话。会话是 SQLAlchemy ORM 中与数据库交互的核心对象,它管理事务、对象状态等。
# 创建会话
Session = sessionmaker(bind=engine)
session = Session()
  1. 插入数据: 插入数据可以通过创建映射类的实例,并使用会话的 add() 方法将其添加到会话中,最后使用 commit() 方法提交事务。
# 创建 User 实例
new_user = User(name='Jane Smith', email='janesmith@example.com')

# 添加到会话
session.add(new_user)

# 提交事务
session.commit()
  1. 查询数据: 查询数据使用会话的 query() 方法。以下代码演示了如何查询所有用户:
# 查询所有用户
users = session.query(User).all()
for user in users:
    print(user)

还可以进行更复杂的查询,比如过滤查询:

# 查询名字为 Jane Smith 的用户
user = session.query(User).filter(User.name == 'Jane Smith').first()
print(user)
  1. 更新数据: 更新数据可以先查询到要更新的对象,然后修改其属性,最后提交事务。
# 查询名字为 Jane Smith 的用户
user = session.query(User).filter(User.name == 'Jane Smith').first()

# 修改 email 属性
user.email = 'newemail@example.com'

# 提交事务
session.commit()
  1. 删除数据: 删除数据可以先查询到要删除的对象,然后使用会话的 delete() 方法将其删除,最后提交事务。
# 查询名字为 Jane Smith 的用户
user = session.query(User).filter(User.name == 'Jane Smith').first()

# 删除用户
session.delete(user)

# 提交事务
session.commit()

六、事务处理

在数据库操作中,事务是一组数据库操作的集合,这些操作要么全部成功,要么全部失败。SQLAlchemy 提供了多种方式来处理事务。

  1. 自动提交模式: 默认情况下,SQLAlchemy 使用自动提交模式。在这种模式下,每个数据库操作(如插入、更新、删除)都会立即提交到数据库。例如:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

engine = create_engine('mysql+pymysql://root:password@localhost:3306/test')
Session = sessionmaker(bind=engine)
session = Session()

new_user = User(name='AutoCommit User', email='autocommit@example.com')
session.add(new_user)
# 这里不需要手动调用 session.commit(),操作会自动提交
  1. 显式事务: 在某些情况下,我们可能需要将多个数据库操作组合成一个事务,以确保它们的原子性。可以使用 begin() 方法开始一个事务,使用 commit() 方法提交事务,使用 rollback() 方法回滚事务。
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

engine = create_engine('mysql+pymysql://root:password@localhost:3306/test')
Session = sessionmaker(bind=engine)
session = Session()

try:
    session.begin()
    new_user1 = User(name='User1', email='user1@example.com')
    new_user2 = User(name='User2', email='user2@example.com')
    session.add(new_user1)
    session.add(new_user2)
    session.commit()
except Exception as e:
    session.rollback()
    print(f"事务出错: {e}")
finally:
    session.close()

在上述代码中,我们尝试向数据库中插入两个用户。如果在插入过程中出现任何错误,整个事务将回滚,不会有任何数据插入到数据库中。

七、连接池管理

SQLAlchemy 内置了连接池管理功能,以提高数据库连接的效率和性能。连接池可以重用现有的数据库连接,而不是每次都创建新的连接,从而减少了连接创建和销毁的开销。

  1. 连接池类型: SQLAlchemy 支持多种连接池类型,包括:
  • NullPool:不使用连接池,每次请求都会创建一个新的连接。适用于调试和测试环境。
  • QueuePool:默认的连接池类型,使用队列来管理连接。它会维护一定数量的活动连接,并在需要时从队列中获取连接。
  • SingletonThreadPool:使用单例线程池来管理连接。适用于多线程应用程序,每个线程共享同一个连接。
  • AsyncAdaptedQueuePool:用于异步应用程序的连接池,基于 QueuePool 进行异步适配。
  1. 配置连接池: 可以在创建数据库引擎时配置连接池的参数。例如,以下代码演示了如何配置 QueuePool 的最大连接数和超时时间:
from sqlalchemy import create_engine

engine = create_engine('mysql+pymysql://root:password@localhost:3306/test',
                       pool_size=10,
                       max_overflow=20,
                       pool_timeout=30)

其中:

  • pool_size:指定连接池中保持的活动连接数。
  • max_overflow:指定连接池可以超出 pool_size 的最大连接数。
  • pool_timeout:指定从连接池中获取连接的超时时间(秒)。

八、常见问题及解决方法

  1. 连接错误

    • 错误信息sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (1045, "Access denied for user '...'@'...' (using password: YES)")
    • 原因:用户名或密码错误,或者用户没有访问指定数据库的权限。
    • 解决方法:检查用户名和密码是否正确,确保用户具有访问数据库的适当权限。可以使用 MySQL 命令行工具登录并检查用户权限。
  2. 表不存在错误

    • 错误信息sqlalchemy.exc.NoSuchTableError: users
    • 原因:在代码中引用的表在数据库中不存在。
    • 解决方法:确保数据库中存在对应的表。如果使用 ORM,确保在创建会话之前调用 Base.metadata.create_all(engine) 来创建表。如果使用 SQL 表达式语言,确保在执行涉及表的操作之前创建了表。
  3. 数据类型不匹配错误

    • 错误信息sqlalchemy.exc.StatementError: (builtins.TypeError) SQLite DateTime type only accepts Python datetime and date objects as input.
    • 原因:在插入或更新数据时,提供的数据类型与表中列的数据类型不匹配。
    • 解决方法:检查插入或更新的数据类型是否与表定义中的数据类型一致。例如,如果表列是 DateTime 类型,确保插入的数据是 datetimedate 对象。

通过以上内容,我们详细介绍了如何使用 SQLAlchemy 在 Python 中连接 MySQL 数据库,并进行各种数据库操作,包括使用 SQL 表达式语言和 ORM。同时,还讨论了事务处理、连接池管理以及常见问题的解决方法,希望能帮助开发者在实际项目中顺利使用 SQLAlchemy 与 MySQL 进行交互。