MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

使用Python与PostgreSQL数据库的连接与操作

2021-01-262.9k 阅读

安装必要的库

在开始使用 Python 与 PostgreSQL 数据库进行连接与操作之前,我们需要安装 psycopg2 库。psycopg2 是一个流行的 PostgreSQL 适配器,它允许 Python 代码与 PostgreSQL 数据库进行交互。

使用 pip 安装

如果你使用的是 Python 3 并且已经安装了 pip(Python 的包管理工具),可以通过以下命令来安装 psycopg2

pip install psycopg2

如果在安装过程中遇到问题,尤其是在 Windows 系统上,可能需要安装 Microsoft Visual C++ Redistributable 等依赖项。

安装 psycopg2 - binary

在某些情况下,特别是在 Windows 系统上,安装 psycopg2 - binary 可能会更容易。这个包已经包含了预编译的二进制文件,无需手动安装额外的编译工具。

pip install psycopg2 - binary

连接到 PostgreSQL 数据库

安装好 psycopg2 库之后,我们就可以在 Python 代码中连接到 PostgreSQL 数据库了。

基本连接示例

import psycopg2

try:
    # 建立数据库连接
    connection = psycopg2.connect(
        user="your_username",
        password="your_password",
        host="127.0.0.1",
        port="5432",
        database="your_database"
    )

    cursor = connection.cursor()
    print("成功连接到数据库")

    # 这里可以执行 SQL 查询

    cursor.close()
    connection.close()
    print("数据库连接已关闭")

except (Exception, psycopg2.Error) as error:
    print("连接 PostgreSQL 数据库时出错", error)

在上述代码中:

  1. 我们首先导入 psycopg2 库。
  2. 使用 psycopg2.connect() 方法来建立与 PostgreSQL 数据库的连接。需要提供用户名、密码、主机、端口和数据库名称等参数。
  3. 通过 connection.cursor() 方法创建一个游标对象。游标用于执行 SQL 语句并获取结果。
  4. 最后,记得关闭游标和数据库连接以释放资源。

连接参数详解

  • user:数据库用户名。
  • password:用户对应的密码。
  • host:数据库服务器的主机地址。如果是本地开发,通常是 127.0.0.1localhost
  • port:PostgreSQL 数据库监听的端口,默认是 5432
  • database:要连接的具体数据库名称。

执行 SQL 查询

连接到数据库之后,我们可以使用游标对象来执行各种 SQL 查询。

执行简单的 SELECT 查询

import psycopg2

try:
    connection = psycopg2.connect(
        user="your_username",
        password="your_password",
        host="127.0.0.1",
        port="5432",
        database="your_database"
    )

    cursor = connection.cursor()

    # 执行 SELECT 查询
    select_query = "SELECT version();"
    cursor.execute(select_query)
    record = cursor.fetchone()
    print("你已连接到 - ", record)

    cursor.close()
    connection.close()

except (Exception, psycopg2.Error) as error:
    print("连接 PostgreSQL 数据库时出错", error)

在上述代码中,我们执行了一个简单的 SELECT version(); 查询,该查询返回 PostgreSQL 数据库的版本信息。cursor.execute() 方法用于执行 SQL 语句,cursor.fetchone() 方法用于获取查询结果的第一行。

获取多条记录

如果查询返回多条记录,可以使用 fetchall() 方法。

import psycopg2

try:
    connection = psycopg2.connect(
        user="your_username",
        password="your_password",
        host="127.0.0.1",
        port="5432",
        database="your_database"
    )

    cursor = connection.cursor()

    # 假设存在一个名为 employees 的表
    select_query = "SELECT * FROM employees;"
    cursor.execute(select_query)
    records = cursor.fetchall()

    print("员工表中的记录数: ", cursor.rowcount)
    print("打印员工详细信息")
    for row in records:
        print("ID = ", row[0], )
        print("姓名 = ", row[1])
        print("部门 = ", row[2], "\n")

    cursor.close()
    connection.close()

except (Exception, psycopg2.Error) as error:
    print("连接 PostgreSQL 数据库时出错", error)

这里,fetchall() 方法将查询结果的所有行作为一个列表返回,每一行是一个元组。cursor.rowcount 用于获取查询返回的行数。

带参数的查询

在实际应用中,经常需要执行带有参数的查询,以防止 SQL 注入攻击。

import psycopg2

try:
    connection = psycopg2.connect(
        user="your_username",
        password="your_password",
        host="127.0.0.1",
        port="5432",
        database="your_database"
    )

    cursor = connection.cursor()

    # 带参数的查询
    employee_id = 1
    select_query = "SELECT * FROM employees WHERE id = %s;"
    cursor.execute(select_query, (employee_id,))
    record = cursor.fetchone()
    print("找到的员工记录: ", record)

    cursor.close()
    connection.close()

except (Exception, psycopg2.Error) as error:
    print("连接 PostgreSQL 数据库时出错", error)

在上述代码中,我们使用 %s 作为参数占位符,并将实际参数作为一个元组传递给 cursor.execute() 方法。

插入数据

除了查询数据,我们还需要能够向数据库中插入新的数据。

插入单条记录

import psycopg2

try:
    connection = psycopg2.connect(
        user="your_username",
        password="your_password",
        host="127.0.0.1",
        port="5432",
        database="your_database"
    )

    cursor = connection.cursor()

    # 插入单条记录
    insert_query = "INSERT INTO employees (name, department) VALUES (%s, %s);"
    record_to_insert = ('John Doe', 'HR')
    cursor.execute(insert_query, record_to_insert)
    connection.commit()
    print(cursor.rowcount, "记录插入成功到员工表")

    cursor.close()
    connection.close()

except (Exception, psycopg2.Error) as error:
    print("连接 PostgreSQL 数据库时出错", error)

在上述代码中,我们使用 INSERT INTO 语句插入一条新记录。注意,在执行插入操作后,需要调用 connection.commit() 方法来提交事务,使插入操作生效。

插入多条记录

可以一次插入多条记录,提高效率。

import psycopg2

try:
    connection = psycopg2.connect(
        user="your_username",
        password="your_password",
        host="127.0.0.1",
        port="5432",
        database="your_database"
    )

    cursor = connection.cursor()

    # 插入多条记录
    insert_query = "INSERT INTO employees (name, department) VALUES (%s, %s);"
    records_to_insert = [
        ('Jane Smith', 'IT'),
        ('Bob Johnson', 'Finance')
    ]
    cursor.executemany(insert_query, records_to_insert)
    connection.commit()
    print(cursor.rowcount, "记录插入成功到员工表")

    cursor.close()
    connection.close()

except (Exception, psycopg2.Error) as error:
    print("连接 PostgreSQL 数据库时出错", error)

这里使用 cursor.executemany() 方法,它接受 SQL 语句和一个包含多个参数元组的列表,一次性插入多条记录。

更新数据

有时候我们需要更新数据库中已有的数据。

更新单条记录

import psycopg2

try:
    connection = psycopg2.connect(
        user="your_username",
        password="your_password",
        host="127.0.0.1",
        port="5432",
        database="your_database"
    )

    cursor = connection.cursor()

    # 更新单条记录
    update_query = "UPDATE employees SET department = %s WHERE id = %s;"
    record_to_update = ('Marketing', 1)
    cursor.execute(update_query, record_to_update)
    connection.commit()
    print(cursor.rowcount, "记录更新成功")

    cursor.close()
    connection.close()

except (Exception, psycopg2.Error) as error:
    print("连接 PostgreSQL 数据库时出错", error)

在上述代码中,我们使用 UPDATE 语句更新了 employees 表中 id 为 1 的记录的 department 字段。同样,更新操作后需要调用 connection.commit() 提交事务。

更新多条记录

import psycopg2

try:
    connection = psycopg2.connect(
        user="your_username",
        password="your_password",
        host="127.0.0.1",
        port="5432",
        database="your_database"
    )

    cursor = connection.cursor()

    # 更新多条记录
    update_query = "UPDATE employees SET department = %s WHERE department = %s;"
    record_to_update = ('New Department', 'Old Department')
    cursor.execute(update_query, record_to_update)
    connection.commit()
    print(cursor.rowcount, "记录更新成功")

    cursor.close()
    connection.close()

except (Exception, psycopg2.Error) as error:
    print("连接 PostgreSQL 数据库时出错", error)

这个例子中,我们更新了所有 departmentOld Department 的记录,将其 department 更新为 New Department

删除数据

删除数据库中的数据也是常见的操作。

删除单条记录

import psycopg2

try:
    connection = psycopg2.connect(
        user="your_username",
        password="your_password",
        host="127.0.0.1",
        port="5432",
        database="your_database"
    )

    cursor = connection.cursor()

    # 删除单条记录
    delete_query = "DELETE FROM employees WHERE id = %s;"
    record_to_delete = (1,)
    cursor.execute(delete_query, record_to_delete)
    connection.commit()
    print(cursor.rowcount, "记录删除成功")

    cursor.close()
    connection.close()

except (Exception, psycopg2.Error) as error:
    print("连接 PostgreSQL 数据库时出错", error)

在上述代码中,我们使用 DELETE FROM 语句删除了 employees 表中 id 为 1 的记录。

删除多条记录

import psycopg2

try:
    connection = psycopg2.connect(
        user="your_username",
        password="your_password",
        host="127.0.0.1",
        port="5432",
        database="your_database"
    )

    cursor = connection.cursor()

    # 删除多条记录
    delete_query = "DELETE FROM employees WHERE department = %s;"
    record_to_delete = ('Old Department',)
    cursor.execute(delete_query, record_to_delete)
    connection.commit()
    print(cursor.rowcount, "记录删除成功")

    cursor.close()
    connection.close()

except (Exception, psycopg2.Error) as error:
    print("连接 PostgreSQL 数据库时出错", error)

这里我们删除了 departmentOld Department 的所有记录。

事务处理

在数据库操作中,事务是一组不可分割的操作,要么全部成功执行,要么全部回滚。psycopg2 提供了对事务的支持。

简单事务示例

import psycopg2

try:
    connection = psycopg2.connect(
        user="your_username",
        password="your_password",
        host="127.0.0.1",
        port="5432",
        database="your_database"
    )

    cursor = connection.cursor()

    try:
        # 开始事务
        connection.autocommit = False

        # 插入第一条记录
        insert_query1 = "INSERT INTO employees (name, department) VALUES (%s, %s);"
        record1 = ('Alice', 'Engineering')
        cursor.execute(insert_query1, record1)

        # 插入第二条记录,故意出错(假设部门不存在约束)
        insert_query2 = "INSERT INTO employees (name, department) VALUES (%s, %s);"
        record2 = ('Bob', 'NonExistentDepartment')
        cursor.execute(insert_query2, record2)

        # 提交事务
        connection.commit()
        print("事务成功提交")

    except (Exception, psycopg2.Error) as error:
        # 回滚事务
        connection.rollback()
        print("事务回滚,出错原因: ", error)

    finally:
        cursor.close()
        connection.close()
        connection.autocommit = True

except (Exception, psycopg2.Error) as error:
    print("连接 PostgreSQL 数据库时出错", error)

在上述代码中:

  1. 我们通过设置 connection.autocommit = False 来手动管理事务。
  2. 尝试执行两个插入操作,其中第二个操作故意会出错。
  3. 如果发生错误,使用 connection.rollback() 回滚事务,撤销所有已执行的操作。
  4. 无论是否发生错误,最后都要关闭游标和数据库连接,并恢复 autocommitTrue

处理大型结果集

当查询返回非常大的结果集时,一次性获取所有数据可能会导致内存问题。psycopg2 提供了一些方法来处理这种情况。

使用游标的迭代器

import psycopg2

try:
    connection = psycopg2.connect(
        user="your_username",
        password="your_password",
        host="127.0.0.1",
        port="5432",
        database="your_database"
    )

    cursor = connection.cursor('named_cursor', withhold=True)

    # 执行查询
    select_query = "SELECT * FROM large_table;"
    cursor.execute(select_query)

    for row in cursor:
        print(row)

    cursor.close()
    connection.close()

except (Exception, psycopg2.Error) as error:
    print("连接 PostgreSQL 数据库时出错", error)

在上述代码中,我们使用 cursor('named_cursor', withhold=True) 创建了一个命名游标,并设置 withhold=True。这样可以逐行处理结果集,而不是一次性将所有结果加载到内存中。通过迭代游标对象,我们可以逐行处理数据。

处理二进制数据

PostgreSQL 支持存储二进制数据,如图片、文件等。在 Python 中,我们可以使用 psycopg2 来处理这些二进制数据。

插入二进制数据

import psycopg2

try:
    connection = psycopg2.connect(
        user="your_username",
        password="your_password",
        host="127.0.0.1",
        port="5432",
        database="your_database"
    )

    cursor = connection.cursor()

    # 读取二进制文件
    with open('example.jpg', 'rb') as file:
        binary_data = file.read()

    # 插入二进制数据
    insert_query = "INSERT INTO binary_data_table (data) VALUES (%s);"
    cursor.execute(insert_query, (binary_data,))
    connection.commit()
    print(cursor.rowcount, "二进制数据插入成功")

    cursor.close()
    connection.close()

except (Exception, psycopg2.Error) as error:
    print("连接 PostgreSQL 数据库时出错", error)

在上述代码中,我们首先使用 open() 函数以二进制读取模式打开一个文件,然后将读取的二进制数据插入到 binary_data_table 表的 data 列中。

获取二进制数据

import psycopg2

try:
    connection = psycopg2.connect(
        user="your_username",
        password="your_password",
        host="127.0.0.1",
        port="5432",
        database="your_database"
    )

    cursor = connection.cursor()

    # 获取二进制数据
    select_query = "SELECT data FROM binary_data_table WHERE id = %s;"
    cursor.execute(select_query, (1,))
    binary_data = cursor.fetchone()[0]

    # 将二进制数据写入文件
    with open('retrieved_example.jpg', 'wb') as file:
        file.write(binary_data)

    print("二进制数据已成功获取并写入文件")

    cursor.close()
    connection.close()

except (Exception, psycopg2.Error) as error:
    print("连接 PostgreSQL 数据库时出错", error)

这里我们从 binary_data_table 表中获取 id 为 1 的记录的二进制数据,并将其写入一个新的文件。

数据库连接池

在高并发的应用场景中,频繁地创建和销毁数据库连接会带来性能开销。数据库连接池可以解决这个问题,它预先创建一定数量的数据库连接,并在需要时重复使用这些连接。

使用 psycopg2 - pool

首先需要安装 psycopg2 - pool 库:

pip install psycopg2 - pool

以下是使用连接池的示例代码:

import psycopg2
from psycopg2 import pool

try:
    # 创建连接池
    postgreSQL_pool = pool.SimpleConnectionPool(
        1,  # 最小连接数
        20,  # 最大连接数
        user="your_username",
        password="your_password",
        host="127.0.0.1",
        port="5432",
        database="your_database"
    )

    # 从连接池获取连接
    connection = postgreSQL_pool.getconn()

    cursor = connection.cursor()

    # 执行查询
    select_query = "SELECT version();"
    cursor.execute(select_query)
    record = cursor.fetchone()
    print("你已连接到 - ", record)

    cursor.close()

    # 将连接放回连接池
    postgreSQL_pool.putconn(connection)

    # 关闭连接池
    postgreSQL_pool.closeall()

except (Exception, psycopg2.Error) as error:
    print("连接 PostgreSQL 数据库时出错", error)

在上述代码中:

  1. 我们使用 pool.SimpleConnectionPool() 创建了一个连接池,设置了最小连接数和最大连接数。
  2. 通过 postgreSQL_pool.getconn() 从连接池获取一个连接。
  3. 使用完连接后,通过 postgreSQL_pool.putconn(connection) 将连接放回连接池。
  4. 最后使用 postgreSQL_pool.closeall() 关闭连接池。

通过合理使用连接池,可以显著提高应用程序在高并发场景下与 PostgreSQL 数据库交互的性能和效率。

错误处理与调试

在与 PostgreSQL 数据库交互时,可能会遇到各种错误。psycopg2 提供了详细的错误信息,帮助我们调试和解决问题。

捕获常见错误

import psycopg2

try:
    connection = psycopg2.connect(
        user="your_username",
        password="your_password",
        host="127.0.0.1",
        port="5432",
        database="your_database"
    )

    cursor = connection.cursor()

    # 故意执行错误的查询
    select_query = "SELECT * FROM non_existent_table;"
    cursor.execute(select_query)

    cursor.close()
    connection.close()

except psycopg2.ProgrammingError as e:
    print("编程错误: ", e)
except psycopg2.OperationalError as e:
    print("操作错误: ", e)
except (Exception, psycopg2.Error) as error:
    print("连接 PostgreSQL 数据库时出错", error)

在上述代码中,我们故意执行了一个查询不存在表的语句。通过捕获 psycopg2.ProgrammingErrorpsycopg2.OperationalError 等特定类型的错误,我们可以更准确地定位和解决问题。

调试技巧

  • 查看错误信息psycopg2 抛出的异常包含详细的错误信息,仔细查看这些信息可以帮助确定问题所在。
  • 打印 SQL 语句:在执行 SQL 语句之前,打印出完整的 SQL 语句(包括参数值),以确保语句的正确性。
  • 使用日志:使用 Python 的日志模块,记录数据库操作的详细信息,便于跟踪和调试。

通过合理的错误处理和调试技巧,可以更快地发现和解决与 PostgreSQL 数据库连接和操作相关的问题。

以上就是使用 Python 与 PostgreSQL 数据库进行连接与操作的详细内容,涵盖了从基本连接到高级特性的各个方面。通过掌握这些知识,你可以在 Python 应用程序中高效地与 PostgreSQL 数据库进行交互。