MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Python自动化脚本与数据库操作

2023-05-057.0k 阅读

Python自动化脚本基础

自动化脚本概念

自动化脚本是一段预先编写好的程序代码,它能够按照设定的规则和步骤自动执行一系列任务,无需人工手动干预每个操作环节。在Python的生态环境中,自动化脚本因其简洁的语法和丰富的库支持,广泛应用于系统管理、网络运维、数据处理等众多领域。

例如,在系统管理方面,自动化脚本可以实现定期备份文件、清理临时文件等任务;在网络运维中,可用于自动化配置网络设备;在数据处理领域,能自动从各种数据源获取数据、进行清洗和分析。

Python自动化脚本优势

  1. 简单易学:Python以其简洁明了的语法,新手开发者能够快速上手。相比其他编程语言,Python的代码可读性高,例如定义一个简单的函数在Python中只需寥寥几行代码:
def add_numbers(a, b):
    return a + b
  1. 丰富的库:Python拥有庞大的标准库以及众多第三方库。例如,os库用于操作系统相关操作,shutil库可进行文件和目录的高级操作。在自动化脚本中,os库常被用于文件路径处理、进程管理等任务。比如,要获取当前工作目录,可以使用以下代码:
import os
current_dir = os.getcwd()
print(current_dir)
  1. 跨平台性:Python脚本可以在Windows、Linux、macOS等多种操作系统上运行,无需对代码进行大规模修改。这使得开发者编写的自动化脚本能够在不同的系统环境中复用,提高了开发效率。

基本脚本结构与语法

  1. 注释:在Python中,注释用于对代码进行解释说明,提高代码的可读性。单行注释使用#符号,例如:
# 这是一个单行注释,用于说明下面代码的作用
print("Hello, World!")

多行注释可以使用三个单引号或三个双引号,如下所示:

'''
这是一个多行注释块
可以跨越多行
用于更详细的说明
'''
print("This is a test.")
  1. 变量与数据类型:Python是一种动态类型语言,变量在使用前无需声明类型。常见的数据类型有整数(int)、浮点数(float)、字符串(str)、列表(list)、元组(tuple)、字典(dict)等。
# 整数变量
number = 10
# 浮点数变量
pi = 3.14
# 字符串变量
name = "John"
# 列表
fruits = ["apple", "banana", "cherry"]
# 元组
coordinates = (10, 20)
# 字典
person = {"name": "Alice", "age": 30}
  1. 控制结构:Python提供了if - elif - else条件语句、for循环、while循环等控制结构。
# if - elif - else 示例
score = 85
if score >= 90:
    print("A")
elif score >= 80:
    print("B")
else:
    print("C")

# for 循环示例
for i in range(5):
    print(i)

# while 循环示例
count = 0
while count < 3:
    print(count)
    count += 1
  1. 函数:函数是组织好的、可重复使用的代码块。
def greet(name):
    return f"Hello, {name}!"
result = greet("Bob")
print(result)

数据库基础概念

数据库类型

  1. 关系型数据库:关系型数据库以二维表格的形式组织数据,数据之间通过关系(如外键)相互关联。常见的关系型数据库有MySQL、Oracle、SQL Server、PostgreSQL等。
    • MySQL:是一款开源的关系型数据库,广泛应用于Web开发领域。它具有高性能、可扩展性强等特点。
    • PostgreSQL:同样是开源的关系型数据库,以其强大的功能和对SQL标准的良好支持而闻名,尤其适用于复杂查询和数据完整性要求较高的场景。
  2. 非关系型数据库:非关系型数据库(NoSQL)则以不同的方式存储数据,如键值对、文档、图形等。常见的非关系型数据库有Redis、MongoDB、Cassandra等。
    • Redis:是一个基于键值对的内存数据库,读写速度极快,常用于缓存、消息队列等场景。
    • MongoDB:以文档形式存储数据,采用BSON(二进制JSON)格式,适合存储半结构化和非结构化数据,在大数据和敏捷开发中应用广泛。

数据库操作基本术语

  1. 数据库:是数据的集合,可看作是一个容器,包含多个表以及相关的对象(如视图、存储过程等)。例如,一个电商系统的数据库可能包含用户表、商品表、订单表等。
  2. :是数据库中存储数据的基本结构,由行(记录)和列(字段)组成。以用户表为例,每一行代表一个用户的信息,每一列则代表用户的某个属性,如姓名、年龄、邮箱等。
  3. 字段:表中的列即为字段,每个字段都有特定的数据类型,如整数、字符串、日期等。例如,用户表中的“年龄”字段可能是整数类型,“姓名”字段是字符串类型。
  4. 记录:表中的行被称为记录,它包含了一组完整的相关数据。比如在商品表中,每一条记录代表一种商品的详细信息。
  5. SQL语句:结构化查询语言(SQL)用于与数据库进行交互,执行诸如查询、插入、更新、删除等操作。常见的SQL语句有:
    • SELECT:用于查询数据,例如SELECT * FROM users;表示从users表中选择所有列的数据。
    • INSERT:用于插入新数据,如INSERT INTO users (name, age) VALUES ('Tom', 25);表示向users表的nameage字段插入数据。
    • UPDATE:用于更新现有数据,例如UPDATE users SET age = 26 WHERE name = 'Tom';表示将nameTom的用户的年龄更新为26。
    • DELETE:用于删除数据,如DELETE FROM users WHERE name = 'Tom';表示删除nameTom的用户记录。

Python与关系型数据库操作

使用sqlite3操作SQLite数据库

SQLite是一个轻量级的嵌入式数据库,它不需要独立的服务器进程,适合于小型应用和快速原型开发。Python标准库中的sqlite3模块提供了操作SQLite数据库的接口。

  1. 连接数据库:使用sqlite3.connect()方法连接到SQLite数据库文件,如果文件不存在则会创建一个新的数据库文件。
import sqlite3

# 连接到数据库
conn = sqlite3.connect('example.db')
  1. 创建表:通过创建游标对象,使用execute()方法执行SQL语句来创建表。
# 创建游标
cursor = conn.cursor()

# 创建表
create_table_sql = '''
CREATE TABLE IF NOT EXISTS users (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT,
    age INTEGER
)
'''
cursor.execute(create_table_sql)
  1. 插入数据:可以使用execute()方法执行INSERT语句插入单条数据,也可以使用executemany()方法插入多条数据。
# 插入单条数据
insert_single_sql = "INSERT INTO users (name, age) VALUES ('Alice', 30)"
cursor.execute(insert_single_sql)

# 插入多条数据
data = [
    ('Bob', 25),
    ('Charlie', 28)
]
insert_many_sql = "INSERT INTO users (name, age) VALUES (?,?)"
cursor.executemany(insert_many_sql, data)

# 提交事务
conn.commit()
  1. 查询数据:使用SELECT语句查询数据,并通过fetchone()fetchmany()fetchall()方法获取查询结果。
# 查询所有数据
select_all_sql = "SELECT * FROM users"
cursor.execute(select_all_sql)
results = cursor.fetchall()
for row in results:
    print(row)

# 查询单条数据
select_one_sql = "SELECT * FROM users WHERE name = 'Alice'"
cursor.execute(select_one_sql)
result = cursor.fetchone()
print(result)
  1. 更新数据:执行UPDATE语句更新数据。
update_sql = "UPDATE users SET age = 31 WHERE name = 'Alice'"
cursor.execute(update_sql)
conn.commit()
  1. 删除数据:执行DELETE语句删除数据。
delete_sql = "DELETE FROM users WHERE name = 'Bob'"
cursor.execute(delete_sql)
conn.commit()
  1. 关闭连接:操作完成后,关闭数据库连接。
conn.close()

使用pymysql操作MySQL数据库

pymysql是一个纯Python实现的MySQL数据库连接库,用于在Python程序中操作MySQL数据库。

  1. 安装pymysql:可以使用pip install pymysql命令进行安装。
  2. 连接数据库
import pymysql

# 连接到MySQL数据库
conn = pymysql.connect(
    host='localhost',
    user='root',
    password='password',
    database='test_db',
    charset='utf8mb4'
)
  1. 创建表
# 创建游标
cursor = conn.cursor()

# 创建表
create_table_sql = '''
CREATE TABLE IF NOT EXISTS products (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(255),
    price DECIMAL(10, 2)
)
'''
cursor.execute(create_table_sql)
  1. 插入数据
# 插入单条数据
insert_single_sql = "INSERT INTO products (name, price) VALUES ('Product 1', 19.99)"
cursor.execute(insert_single_sql)

# 插入多条数据
data = [
    ('Product 2', 29.99),
    ('Product 3', 39.99)
]
insert_many_sql = "INSERT INTO products (name, price) VALUES (%s, %s)"
cursor.executemany(insert_many_sql, data)

# 提交事务
conn.commit()
  1. 查询数据
# 查询所有数据
select_all_sql = "SELECT * FROM products"
cursor.execute(select_all_sql)
results = cursor.fetchall()
for row in results:
    print(row)

# 查询单条数据
select_one_sql = "SELECT * FROM products WHERE name = 'Product 1'"
cursor.execute(select_one_sql)
result = cursor.fetchone()
print(result)
  1. 更新数据
update_sql = "UPDATE products SET price = 20.99 WHERE name = 'Product 1'"
cursor.execute(update_sql)
conn.commit()
  1. 删除数据
delete_sql = "DELETE FROM products WHERE name = 'Product 2'"
cursor.execute(delete_sql)
conn.commit()
  1. 关闭连接
conn.close()

使用psycopg2操作PostgreSQL数据库

psycopg2是Python连接PostgreSQL数据库的常用库,它提供了高效的接口来执行SQL语句。

  1. 安装psycopg2:使用pip install psycopg2(对于psycopg2 - binary可在某些安装场景下使用,它包含了预编译的二进制文件)。
  2. 连接数据库
import psycopg2

# 连接到PostgreSQL数据库
conn = psycopg2.connect(
    dbname='test_db',
    user='user',
    password='password',
    host='localhost',
    port='5432'
)
  1. 创建表
# 创建游标
cursor = conn.cursor()

# 创建表
create_table_sql = '''
CREATE TABLE IF NOT EXISTS employees (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255),
    salary NUMERIC(10, 2)
)
'''
cursor.execute(create_table_sql)
  1. 插入数据
# 插入单条数据
insert_single_sql = "INSERT INTO employees (name, salary) VALUES ('Employee 1', 5000.00)"
cursor.execute(insert_single_sql)

# 插入多条数据
data = [
    ('Employee 2', 6000.00),
    ('Employee 3', 7000.00)
]
insert_many_sql = "INSERT INTO employees (name, salary) VALUES (%s, %s)"
cursor.executemany(insert_many_sql, data)

# 提交事务
conn.commit()
  1. 查询数据
# 查询所有数据
select_all_sql = "SELECT * FROM employees"
cursor.execute(select_all_sql)
results = cursor.fetchall()
for row in results:
    print(row)

# 查询单条数据
select_one_sql = "SELECT * FROM employees WHERE name = 'Employee 1'"
cursor.execute(select_one_sql)
result = cursor.fetchone()
print(result)
  1. 更新数据
update_sql = "UPDATE employees SET salary = 5500.00 WHERE name = 'Employee 1'"
cursor.execute(update_sql)
conn.commit()
  1. 删除数据
delete_sql = "DELETE FROM employees WHERE name = 'Employee 2'"
cursor.execute(delete_sql)
conn.commit()
  1. 关闭连接
conn.close()

Python与非关系型数据库操作

使用redis - py操作Redis数据库

redis - py是Python操作Redis数据库的常用库,它提供了简单直观的接口来执行Redis命令。

  1. 安装redis - py:使用pip install redis进行安装。
  2. 连接数据库
import redis

# 连接到Redis数据库
r = redis.Redis(host='localhost', port=6379, db = 0)
  1. 设置键值对:使用set()方法设置键值对。
r.set('name', 'John')
  1. 获取值:使用get()方法获取键对应的值。
value = r.get('name')
print(value.decode('utf - 8'))
  1. 删除键值对:使用delete()方法删除键值对。
r.delete('name')
  1. 其他操作:Redis支持多种数据结构,如列表、哈希、集合等。
    • 列表操作
# 向列表右侧添加元素
r.rpush('mylist', 'element1')
r.rpush('mylist', 'element2')

# 获取列表所有元素
list_values = r.lrange('mylist', 0, -1)
for value in list_values:
    print(value.decode('utf - 8'))
- **哈希操作**:
# 设置哈希字段值
r.hset('myhash', 'field1', 'value1')
r.hset('myhash', 'field2', 'value2')

# 获取哈希所有字段值
hash_values = r.hgetall('myhash')
for key, value in hash_values.items():
    print(key.decode('utf - 8'), value.decode('utf - 8'))

使用pymongo操作MongoDB数据库

pymongo是Python连接MongoDB数据库的官方驱动,提供了丰富的操作方法。

  1. 安装pymongo:使用pip install pymongo进行安装。
  2. 连接数据库
from pymongo import MongoClient

# 连接到MongoDB数据库
client = MongoClient('mongodb://localhost:27017/')
db = client['test_db']
  1. 插入文档
# 获取集合
collection = db['users']

# 插入单条文档
document = {
    'name': 'Alice',
    'age': 30
}
collection.insert_one(document)

# 插入多条文档
documents = [
    {'name': 'Bob', 'age': 25},
    {'name': 'Charlie', 'age': 28}
]
collection.insert_many(documents)
  1. 查询文档
# 查询所有文档
results = collection.find()
for result in results:
    print(result)

# 查询单条文档
query = {'name': 'Alice'}
result = collection.find_one(query)
print(result)
  1. 更新文档
query = {'name': 'Alice'}
update = {'$set': {'age': 31}}
collection.update_one(query, update)
  1. 删除文档
query = {'name': 'Bob'}
collection.delete_one(query)

Python自动化脚本与数据库结合应用案例

数据备份自动化脚本

假设我们有一个MySQL数据库,需要定期备份其中的某个表的数据到CSV文件中。

  1. 安装依赖库:除了pymysql,还需要安装pandas库用于处理CSV文件,使用pip install pandas安装。
  2. 编写自动化脚本
import pymysql
import pandas as pd
from datetime import datetime


def backup_table_to_csv():
    # 连接MySQL数据库
    conn = pymysql.connect(
        host='localhost',
        user='root',
        password='password',
        database='test_db',
        charset='utf8mb4'
    )

    try:
        # 查询数据
        query = "SELECT * FROM products"
        df = pd.read_sql(query, conn)

        # 生成备份文件名
        timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
        backup_file = f'products_backup_{timestamp}.csv'

        # 保存为CSV文件
        df.to_csv(backup_file, index=False)
        print(f"Table backed up to {backup_file} successfully.")
    except Exception as e:
        print(f"Error occurred: {e}")
    finally:
        # 关闭连接
        conn.close()


if __name__ == "__main__":
    backup_table_to_csv()

此脚本连接到MySQL数据库,查询products表的数据,将其转换为pandasDataFrame对象,然后保存为带有时间戳的CSV文件。可以通过设置系统的定时任务(如在Linux系统中使用cron,在Windows系统中使用任务计划程序)来定期执行该脚本,实现数据备份的自动化。

数据同步自动化脚本

假设我们有一个MySQL数据库和一个MongoDB数据库,需要将MySQL数据库中users表的新增数据同步到MongoDB的users集合中。

  1. 安装依赖库:安装pymysqlpymongo库。
  2. 编写自动化脚本
import pymysql
from pymongo import MongoClient


def sync_data():
    # 连接MySQL数据库
    mysql_conn = pymysql.connect(
        host='localhost',
        user='root',
        password='password',
        database='test_db',
        charset='utf8mb4'
    )

    # 连接MongoDB数据库
    mongo_client = MongoClient('mongodb://localhost:27017/')
    mongo_db = mongo_client['test_db']
    mongo_collection = mongo_db['users']

    try:
        # 获取MySQL中users表的最大ID(假设ID自增)
        mysql_cursor = mysql_conn.cursor()
        mysql_cursor.execute("SELECT MAX(id) FROM users")
        max_id_result = mysql_cursor.fetchone()
        max_id = max_id_result[0] if max_id_result[0] else 0

        # 查询MySQL中新增的用户数据
        query = f"SELECT * FROM users WHERE id > {max_id}"
        mysql_cursor.execute(query)
        new_users = mysql_cursor.fetchall()

        # 将新增数据插入到MongoDB
        for user in new_users:
            user_dict = {
                'id': user[0],
                'name': user[1],
                'age': user[2]
            }
            mongo_collection.insert_one(user_dict)
        print(f"{len(new_users)} new users synced to MongoDB.")
    except Exception as e:
        print(f"Error occurred: {e}")
    finally:
        # 关闭连接
        mysql_conn.close()
        mongo_client.close()


if __name__ == "__main__":
    sync_data()

此脚本首先获取MySQL中users表的最大ID,然后查询ID大于该值的新增数据,将这些数据插入到MongoDB的users集合中。同样,可以通过定时任务来定期执行该脚本,实现两个数据库之间的数据同步自动化。

基于数据库数据的自动化报告生成

假设我们有一个SQLite数据库存储销售数据,需要根据这些数据生成销售报告。

  1. 安装依赖库:安装sqlite3(已包含在Python标准库中)和matplotlib库用于生成图表(pip install matplotlib)。
  2. 编写自动化脚本
import sqlite3
import matplotlib.pyplot as plt
import pandas as pd


def generate_sales_report():
    # 连接SQLite数据库
    conn = sqlite3.connect('sales.db')

    try:
        # 查询销售数据
        query = "SELECT product, quantity, price FROM sales"
        df = pd.read_sql(query, conn)

        # 计算销售额
        df['revenue'] = df['quantity'] * df['price']

        # 按产品统计销售额
        product_revenue = df.groupby('product')['revenue'].sum().reset_index()

        # 绘制柱状图
        plt.bar(product_revenue['product'], product_revenue['revenue'])
        plt.xlabel('Product')
        plt.ylabel('Revenue')
        plt.title('Sales Revenue by Product')
        plt.xticks(rotation = 45)

        # 保存图表为图片
        plt.savefig('sales_report.png')
        print("Sales report generated as sales_report.png.")
    except Exception as e:
        print(f"Error occurred: {e}")
    finally:
        # 关闭连接
        conn.close()


if __name__ == "__main__":
    generate_sales_report()

此脚本从SQLite数据库中读取销售数据,计算每个产品的销售额,然后使用matplotlib库生成柱状图展示各产品的销售额,并保存为图片文件,实现了基于数据库数据的自动化报告生成。可以进一步扩展,如将生成的报告发送到指定邮箱等功能。