Python使用SQLAlchemy连接MySQL数据库
一、SQLAlchemy简介
SQLAlchemy 是 Python 中最受欢迎的数据库抽象层库之一。它提供了一种高级的数据库抽象层,允许开发者使用 Python 代码与各种数据库进行交互,而无需编写特定于数据库的 SQL 语句。这种抽象层的好处在于,开发者可以在不改变太多代码的情况下,轻松切换数据库后端,比如从 MySQL 切换到 PostgreSQL。
SQLAlchemy 有两个主要的组件:SQL 表达式语言(SQL Expression Language)和 ORM(Object Relational Mapping,对象关系映射)。
- SQL 表达式语言:这是 SQLAlchemy 的核心部分,它允许开发者以 Pythonic 的方式构建 SQL 语句。通过这种方式,开发者可以利用 Python 的语法和数据结构来生成复杂的 SQL 查询,而不需要手动编写字符串形式的 SQL 语句。这不仅提高了代码的可读性和可维护性,还减少了 SQL 注入的风险。例如,以下代码使用 SQL 表达式语言查询数据库中的所有用户:
from sqlalchemy import create_engine, MetaData, Table, select
# 创建数据库引擎
engine = create_engine('mysql+pymysql://user:password@localhost/mydb')
# 定义元数据
metadata = MetaData()
# 反射 users 表
users = Table('users', metadata, autoload_with=engine)
# 创建查询
stmt = select([users])
# 执行查询
with engine.connect() as conn:
result = conn.execute(stmt)
for row in result:
print(row)
- ORM:ORM 是 SQLAlchemy 提供的一种强大的功能,它允许开发者将数据库表映射为 Python 类。通过这种映射,开发者可以使用面向对象的方式与数据库进行交互,而不是编写 SQL 语句。例如,假设我们有一个
users
表,我们可以定义一个对应的 Python 类User
:
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
# 创建数据库引擎
engine = create_engine('mysql+pymysql://user:password@localhost/mydb')
# 创建基类
Base = declarative_base()
# 定义 User 类
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(50))
email = Column(String(120))
def __repr__(self):
return f"<User(name='{self.name}', email='{self.email}')>"
# 创建会话
Session = sessionmaker(bind=engine)
session = Session()
# 查询所有用户
users = session.query(User).all()
for user in users:
print(user)
二、安装必要的库
在开始使用 SQLAlchemy 连接 MySQL 数据库之前,我们需要安装两个主要的库:SQLAlchemy 和一个 MySQL 驱动。在 Python 中,常用的 MySQL 驱动有 mysql - connector - python
和 pymysql
。这里我们以 pymysql
为例。
- 安装 SQLAlchemy:
可以使用
pip
命令来安装 SQLAlchemy:
pip install sqlalchemy
- 安装 pymysql:
同样使用
pip
安装pymysql
:
pip install pymysql
三、连接 MySQL 数据库
- 创建数据库引擎: 在 SQLAlchemy 中,连接数据库的第一步是创建一个数据库引擎。数据库引擎是 SQLAlchemy 与数据库之间的桥梁,它负责管理数据库连接池、执行 SQL 语句等操作。创建数据库引擎的语法如下:
from sqlalchemy import create_engine
engine = create_engine('mysql+pymysql://username:password@host:port/database_name')
其中:
mysql+pymysql
:表示使用pymysql
作为 MySQL 的驱动。如果使用mysql - connector - python
,则应写成mysql+mysqlconnector
。username
和password
:是 MySQL 数据库的用户名和密码。host
和port
:分别是数据库服务器的主机地址和端口号。默认情况下,MySQL 的端口号是 3306。database_name
:是要连接的数据库名称。
例如,如果你的 MySQL 数据库运行在本地,用户名是 root
,密码是 password
,数据库名称是 test
,则创建引擎的代码如下:
engine = create_engine('mysql+pymysql://root:password@localhost:3306/test')
- 测试连接:
创建引擎后,我们可以测试连接是否成功。一种简单的方法是使用
engine.connect()
方法来获取一个数据库连接对象,并尝试执行一个简单的 SQL 语句,比如SELECT 1
。
from sqlalchemy import create_engine
engine = create_engine('mysql+pymysql://root:password@localhost:3306/test')
try:
with engine.connect() as conn:
result = conn.execute('SELECT 1')
print(result.fetchone())
except Exception as e:
print(f"连接数据库时出错: {e}")
如果连接成功,上述代码将输出 (1,)
,表示数据库连接正常。
四、使用 SQL 表达式语言操作数据库
- 定义表结构:
在使用 SQL 表达式语言之前,我们需要定义数据库表的结构。这可以通过
MetaData
和Table
类来完成。MetaData
类用于存储数据库模式的元数据,而Table
类用于定义单个表的结构。
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
# 创建数据库引擎
engine = create_engine('mysql+pymysql://root:password@localhost:3306/test')
# 创建元数据对象
metadata = MetaData()
# 定义 users 表
users = Table('users', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(50)),
Column('email', String(120))
)
# 创建表(如果不存在)
metadata.create_all(engine)
上述代码定义了一个名为 users
的表,包含 id
、name
和 email
三个列。id
列是主键,name
和 email
列分别存储字符串类型的数据。最后,使用 metadata.create_all(engine)
方法在数据库中创建该表(如果表不存在)。
- 插入数据:
插入数据可以使用
insert()
方法。以下代码演示了如何向users
表中插入一条记录:
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
from sqlalchemy import insert
# 创建数据库引擎
engine = create_engine('mysql+pymysql://root:password@localhost:3306/test')
# 创建元数据对象
metadata = MetaData()
# 定义 users 表
users = Table('users', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(50)),
Column('email', String(120))
)
# 创建插入语句
stmt = insert(users).values(name='John Doe', email='johndoe@example.com')
# 执行插入语句
with engine.connect() as conn:
result = conn.execute(stmt)
print(f"插入的记录 ID: {result.lastrowid}")
- 查询数据:
查询数据使用
select()
方法。以下代码演示了如何查询users
表中的所有记录:
from sqlalchemy import create_engine, MetaData, Table, select
# 创建数据库引擎
engine = create_engine('mysql+pymysql://root:password@localhost:3306/test')
# 创建元数据对象
metadata = MetaData()
# 定义 users 表
users = Table('users', metadata, autoload_with=engine)
# 创建查询语句
stmt = select([users])
# 执行查询语句
with engine.connect() as conn:
result = conn.execute(stmt)
for row in result:
print(row)
- 更新数据:
更新数据使用
update()
方法。以下代码演示了如何将users
表中name
为John Doe
的记录的email
更新为newemail@example.com
:
from sqlalchemy import create_engine, MetaData, Table, update
# 创建数据库引擎
engine = create_engine('mysql+pymysql://root:password@localhost:3306/test')
# 创建元数据对象
metadata = MetaData()
# 定义 users 表
users = Table('users', metadata, autoload_with=engine)
# 创建更新语句
stmt = update(users).where(users.c.name == 'John Doe').values(email='newemail@example.com')
# 执行更新语句
with engine.connect() as conn:
result = conn.execute(stmt)
print(f"更新的记录数: {result.rowcount}")
- 删除数据:
删除数据使用
delete()
方法。以下代码演示了如何删除users
表中name
为John Doe
的记录:
from sqlalchemy import create_engine, MetaData, Table, delete
# 创建数据库引擎
engine = create_engine('mysql+pymysql://root:password@localhost:3306/test')
# 创建元数据对象
metadata = MetaData()
# 定义 users 表
users = Table('users', metadata, autoload_with=engine)
# 创建删除语句
stmt = delete(users).where(users.c.name == 'John Doe')
# 执行删除语句
with engine.connect() as conn:
result = conn.execute(stmt)
print(f"删除的记录数: {result.rowcount}")
五、使用 ORM 操作数据库
- 定义映射类:
在使用 ORM 时,我们需要定义 Python 类来映射数据库表。这些类继承自
declarative_base()
创建的基类。
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
# 创建数据库引擎
engine = create_engine('mysql+pymysql://root:password@localhost:3306/test')
# 创建基类
Base = declarative_base()
# 定义 User 类
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(50))
email = Column(String(120))
def __repr__(self):
return f"<User(name='{self.name}', email='{self.email}')>"
上述代码定义了一个 User
类,映射到 users
表。__tablename__
变量指定了映射的表名,类的属性对应表的列。
- 创建会话: 在使用 ORM 操作数据库之前,我们需要创建一个会话。会话是 SQLAlchemy ORM 中与数据库交互的核心对象,它管理事务、对象状态等。
# 创建会话
Session = sessionmaker(bind=engine)
session = Session()
- 插入数据:
插入数据可以通过创建映射类的实例,并使用会话的
add()
方法将其添加到会话中,最后使用commit()
方法提交事务。
# 创建 User 实例
new_user = User(name='Jane Smith', email='janesmith@example.com')
# 添加到会话
session.add(new_user)
# 提交事务
session.commit()
- 查询数据:
查询数据使用会话的
query()
方法。以下代码演示了如何查询所有用户:
# 查询所有用户
users = session.query(User).all()
for user in users:
print(user)
还可以进行更复杂的查询,比如过滤查询:
# 查询名字为 Jane Smith 的用户
user = session.query(User).filter(User.name == 'Jane Smith').first()
print(user)
- 更新数据: 更新数据可以先查询到要更新的对象,然后修改其属性,最后提交事务。
# 查询名字为 Jane Smith 的用户
user = session.query(User).filter(User.name == 'Jane Smith').first()
# 修改 email 属性
user.email = 'newemail@example.com'
# 提交事务
session.commit()
- 删除数据:
删除数据可以先查询到要删除的对象,然后使用会话的
delete()
方法将其删除,最后提交事务。
# 查询名字为 Jane Smith 的用户
user = session.query(User).filter(User.name == 'Jane Smith').first()
# 删除用户
session.delete(user)
# 提交事务
session.commit()
六、事务处理
在数据库操作中,事务是一组数据库操作的集合,这些操作要么全部成功,要么全部失败。SQLAlchemy 提供了多种方式来处理事务。
- 自动提交模式: 默认情况下,SQLAlchemy 使用自动提交模式。在这种模式下,每个数据库操作(如插入、更新、删除)都会立即提交到数据库。例如:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
engine = create_engine('mysql+pymysql://root:password@localhost:3306/test')
Session = sessionmaker(bind=engine)
session = Session()
new_user = User(name='AutoCommit User', email='autocommit@example.com')
session.add(new_user)
# 这里不需要手动调用 session.commit(),操作会自动提交
- 显式事务:
在某些情况下,我们可能需要将多个数据库操作组合成一个事务,以确保它们的原子性。可以使用
begin()
方法开始一个事务,使用commit()
方法提交事务,使用rollback()
方法回滚事务。
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
engine = create_engine('mysql+pymysql://root:password@localhost:3306/test')
Session = sessionmaker(bind=engine)
session = Session()
try:
session.begin()
new_user1 = User(name='User1', email='user1@example.com')
new_user2 = User(name='User2', email='user2@example.com')
session.add(new_user1)
session.add(new_user2)
session.commit()
except Exception as e:
session.rollback()
print(f"事务出错: {e}")
finally:
session.close()
在上述代码中,我们尝试向数据库中插入两个用户。如果在插入过程中出现任何错误,整个事务将回滚,不会有任何数据插入到数据库中。
七、连接池管理
SQLAlchemy 内置了连接池管理功能,以提高数据库连接的效率和性能。连接池可以重用现有的数据库连接,而不是每次都创建新的连接,从而减少了连接创建和销毁的开销。
- 连接池类型: SQLAlchemy 支持多种连接池类型,包括:
- NullPool:不使用连接池,每次请求都会创建一个新的连接。适用于调试和测试环境。
- QueuePool:默认的连接池类型,使用队列来管理连接。它会维护一定数量的活动连接,并在需要时从队列中获取连接。
- SingletonThreadPool:使用单例线程池来管理连接。适用于多线程应用程序,每个线程共享同一个连接。
- AsyncAdaptedQueuePool:用于异步应用程序的连接池,基于
QueuePool
进行异步适配。
- 配置连接池:
可以在创建数据库引擎时配置连接池的参数。例如,以下代码演示了如何配置
QueuePool
的最大连接数和超时时间:
from sqlalchemy import create_engine
engine = create_engine('mysql+pymysql://root:password@localhost:3306/test',
pool_size=10,
max_overflow=20,
pool_timeout=30)
其中:
pool_size
:指定连接池中保持的活动连接数。max_overflow
:指定连接池可以超出pool_size
的最大连接数。pool_timeout
:指定从连接池中获取连接的超时时间(秒)。
八、常见问题及解决方法
-
连接错误:
- 错误信息:
sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (1045, "Access denied for user '...'@'...' (using password: YES)")
- 原因:用户名或密码错误,或者用户没有访问指定数据库的权限。
- 解决方法:检查用户名和密码是否正确,确保用户具有访问数据库的适当权限。可以使用 MySQL 命令行工具登录并检查用户权限。
- 错误信息:
-
表不存在错误:
- 错误信息:
sqlalchemy.exc.NoSuchTableError: users
- 原因:在代码中引用的表在数据库中不存在。
- 解决方法:确保数据库中存在对应的表。如果使用 ORM,确保在创建会话之前调用
Base.metadata.create_all(engine)
来创建表。如果使用 SQL 表达式语言,确保在执行涉及表的操作之前创建了表。
- 错误信息:
-
数据类型不匹配错误:
- 错误信息:
sqlalchemy.exc.StatementError: (builtins.TypeError) SQLite DateTime type only accepts Python datetime and date objects as input.
- 原因:在插入或更新数据时,提供的数据类型与表中列的数据类型不匹配。
- 解决方法:检查插入或更新的数据类型是否与表定义中的数据类型一致。例如,如果表列是
DateTime
类型,确保插入的数据是datetime
或date
对象。
- 错误信息:
通过以上内容,我们详细介绍了如何使用 SQLAlchemy 在 Python 中连接 MySQL 数据库,并进行各种数据库操作,包括使用 SQL 表达式语言和 ORM。同时,还讨论了事务处理、连接池管理以及常见问题的解决方法,希望能帮助开发者在实际项目中顺利使用 SQLAlchemy 与 MySQL 进行交互。