Python自动化脚本与数据库操作
Python自动化脚本基础
自动化脚本概念
自动化脚本是一段预先编写好的程序代码,它能够按照设定的规则和步骤自动执行一系列任务,无需人工手动干预每个操作环节。在Python的生态环境中,自动化脚本因其简洁的语法和丰富的库支持,广泛应用于系统管理、网络运维、数据处理等众多领域。
例如,在系统管理方面,自动化脚本可以实现定期备份文件、清理临时文件等任务;在网络运维中,可用于自动化配置网络设备;在数据处理领域,能自动从各种数据源获取数据、进行清洗和分析。
Python自动化脚本优势
- 简单易学:Python以其简洁明了的语法,新手开发者能够快速上手。相比其他编程语言,Python的代码可读性高,例如定义一个简单的函数在Python中只需寥寥几行代码:
def add_numbers(a, b):
return a + b
- 丰富的库:Python拥有庞大的标准库以及众多第三方库。例如,
os
库用于操作系统相关操作,shutil
库可进行文件和目录的高级操作。在自动化脚本中,os
库常被用于文件路径处理、进程管理等任务。比如,要获取当前工作目录,可以使用以下代码:
import os
current_dir = os.getcwd()
print(current_dir)
- 跨平台性:Python脚本可以在Windows、Linux、macOS等多种操作系统上运行,无需对代码进行大规模修改。这使得开发者编写的自动化脚本能够在不同的系统环境中复用,提高了开发效率。
基本脚本结构与语法
- 注释:在Python中,注释用于对代码进行解释说明,提高代码的可读性。单行注释使用
#
符号,例如:
# 这是一个单行注释,用于说明下面代码的作用
print("Hello, World!")
多行注释可以使用三个单引号或三个双引号,如下所示:
'''
这是一个多行注释块
可以跨越多行
用于更详细的说明
'''
print("This is a test.")
- 变量与数据类型:Python是一种动态类型语言,变量在使用前无需声明类型。常见的数据类型有整数(
int
)、浮点数(float
)、字符串(str
)、列表(list
)、元组(tuple
)、字典(dict
)等。
# 整数变量
number = 10
# 浮点数变量
pi = 3.14
# 字符串变量
name = "John"
# 列表
fruits = ["apple", "banana", "cherry"]
# 元组
coordinates = (10, 20)
# 字典
person = {"name": "Alice", "age": 30}
- 控制结构:Python提供了
if - elif - else
条件语句、for
循环、while
循环等控制结构。
# if - elif - else 示例
score = 85
if score >= 90:
print("A")
elif score >= 80:
print("B")
else:
print("C")
# for 循环示例
for i in range(5):
print(i)
# while 循环示例
count = 0
while count < 3:
print(count)
count += 1
- 函数:函数是组织好的、可重复使用的代码块。
def greet(name):
return f"Hello, {name}!"
result = greet("Bob")
print(result)
数据库基础概念
数据库类型
- 关系型数据库:关系型数据库以二维表格的形式组织数据,数据之间通过关系(如外键)相互关联。常见的关系型数据库有MySQL、Oracle、SQL Server、PostgreSQL等。
- MySQL:是一款开源的关系型数据库,广泛应用于Web开发领域。它具有高性能、可扩展性强等特点。
- PostgreSQL:同样是开源的关系型数据库,以其强大的功能和对SQL标准的良好支持而闻名,尤其适用于复杂查询和数据完整性要求较高的场景。
- 非关系型数据库:非关系型数据库(NoSQL)则以不同的方式存储数据,如键值对、文档、图形等。常见的非关系型数据库有Redis、MongoDB、Cassandra等。
- Redis:是一个基于键值对的内存数据库,读写速度极快,常用于缓存、消息队列等场景。
- MongoDB:以文档形式存储数据,采用BSON(二进制JSON)格式,适合存储半结构化和非结构化数据,在大数据和敏捷开发中应用广泛。
数据库操作基本术语
- 数据库:是数据的集合,可看作是一个容器,包含多个表以及相关的对象(如视图、存储过程等)。例如,一个电商系统的数据库可能包含用户表、商品表、订单表等。
- 表:是数据库中存储数据的基本结构,由行(记录)和列(字段)组成。以用户表为例,每一行代表一个用户的信息,每一列则代表用户的某个属性,如姓名、年龄、邮箱等。
- 字段:表中的列即为字段,每个字段都有特定的数据类型,如整数、字符串、日期等。例如,用户表中的“年龄”字段可能是整数类型,“姓名”字段是字符串类型。
- 记录:表中的行被称为记录,它包含了一组完整的相关数据。比如在商品表中,每一条记录代表一种商品的详细信息。
- SQL语句:结构化查询语言(SQL)用于与数据库进行交互,执行诸如查询、插入、更新、删除等操作。常见的SQL语句有:
- SELECT:用于查询数据,例如
SELECT * FROM users;
表示从users
表中选择所有列的数据。 - INSERT:用于插入新数据,如
INSERT INTO users (name, age) VALUES ('Tom', 25);
表示向users
表的name
和age
字段插入数据。 - UPDATE:用于更新现有数据,例如
UPDATE users SET age = 26 WHERE name = 'Tom';
表示将name
为Tom
的用户的年龄更新为26。 - DELETE:用于删除数据,如
DELETE FROM users WHERE name = 'Tom';
表示删除name
为Tom
的用户记录。
- SELECT:用于查询数据,例如
Python与关系型数据库操作
使用sqlite3
操作SQLite数据库
SQLite是一个轻量级的嵌入式数据库,它不需要独立的服务器进程,适合于小型应用和快速原型开发。Python标准库中的sqlite3
模块提供了操作SQLite数据库的接口。
- 连接数据库:使用
sqlite3.connect()
方法连接到SQLite数据库文件,如果文件不存在则会创建一个新的数据库文件。
import sqlite3
# 连接到数据库
conn = sqlite3.connect('example.db')
- 创建表:通过创建游标对象,使用
execute()
方法执行SQL语句来创建表。
# 创建游标
cursor = conn.cursor()
# 创建表
create_table_sql = '''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
age INTEGER
)
'''
cursor.execute(create_table_sql)
- 插入数据:可以使用
execute()
方法执行INSERT
语句插入单条数据,也可以使用executemany()
方法插入多条数据。
# 插入单条数据
insert_single_sql = "INSERT INTO users (name, age) VALUES ('Alice', 30)"
cursor.execute(insert_single_sql)
# 插入多条数据
data = [
('Bob', 25),
('Charlie', 28)
]
insert_many_sql = "INSERT INTO users (name, age) VALUES (?,?)"
cursor.executemany(insert_many_sql, data)
# 提交事务
conn.commit()
- 查询数据:使用
SELECT
语句查询数据,并通过fetchone()
、fetchmany()
或fetchall()
方法获取查询结果。
# 查询所有数据
select_all_sql = "SELECT * FROM users"
cursor.execute(select_all_sql)
results = cursor.fetchall()
for row in results:
print(row)
# 查询单条数据
select_one_sql = "SELECT * FROM users WHERE name = 'Alice'"
cursor.execute(select_one_sql)
result = cursor.fetchone()
print(result)
- 更新数据:执行
UPDATE
语句更新数据。
update_sql = "UPDATE users SET age = 31 WHERE name = 'Alice'"
cursor.execute(update_sql)
conn.commit()
- 删除数据:执行
DELETE
语句删除数据。
delete_sql = "DELETE FROM users WHERE name = 'Bob'"
cursor.execute(delete_sql)
conn.commit()
- 关闭连接:操作完成后,关闭数据库连接。
conn.close()
使用pymysql
操作MySQL数据库
pymysql
是一个纯Python实现的MySQL数据库连接库,用于在Python程序中操作MySQL数据库。
- 安装
pymysql
:可以使用pip install pymysql
命令进行安装。 - 连接数据库:
import pymysql
# 连接到MySQL数据库
conn = pymysql.connect(
host='localhost',
user='root',
password='password',
database='test_db',
charset='utf8mb4'
)
- 创建表:
# 创建游标
cursor = conn.cursor()
# 创建表
create_table_sql = '''
CREATE TABLE IF NOT EXISTS products (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255),
price DECIMAL(10, 2)
)
'''
cursor.execute(create_table_sql)
- 插入数据:
# 插入单条数据
insert_single_sql = "INSERT INTO products (name, price) VALUES ('Product 1', 19.99)"
cursor.execute(insert_single_sql)
# 插入多条数据
data = [
('Product 2', 29.99),
('Product 3', 39.99)
]
insert_many_sql = "INSERT INTO products (name, price) VALUES (%s, %s)"
cursor.executemany(insert_many_sql, data)
# 提交事务
conn.commit()
- 查询数据:
# 查询所有数据
select_all_sql = "SELECT * FROM products"
cursor.execute(select_all_sql)
results = cursor.fetchall()
for row in results:
print(row)
# 查询单条数据
select_one_sql = "SELECT * FROM products WHERE name = 'Product 1'"
cursor.execute(select_one_sql)
result = cursor.fetchone()
print(result)
- 更新数据:
update_sql = "UPDATE products SET price = 20.99 WHERE name = 'Product 1'"
cursor.execute(update_sql)
conn.commit()
- 删除数据:
delete_sql = "DELETE FROM products WHERE name = 'Product 2'"
cursor.execute(delete_sql)
conn.commit()
- 关闭连接:
conn.close()
使用psycopg2
操作PostgreSQL数据库
psycopg2
是Python连接PostgreSQL数据库的常用库,它提供了高效的接口来执行SQL语句。
- 安装
psycopg2
:使用pip install psycopg2
(对于psycopg2 - binary
可在某些安装场景下使用,它包含了预编译的二进制文件)。 - 连接数据库:
import psycopg2
# 连接到PostgreSQL数据库
conn = psycopg2.connect(
dbname='test_db',
user='user',
password='password',
host='localhost',
port='5432'
)
- 创建表:
# 创建游标
cursor = conn.cursor()
# 创建表
create_table_sql = '''
CREATE TABLE IF NOT EXISTS employees (
id SERIAL PRIMARY KEY,
name VARCHAR(255),
salary NUMERIC(10, 2)
)
'''
cursor.execute(create_table_sql)
- 插入数据:
# 插入单条数据
insert_single_sql = "INSERT INTO employees (name, salary) VALUES ('Employee 1', 5000.00)"
cursor.execute(insert_single_sql)
# 插入多条数据
data = [
('Employee 2', 6000.00),
('Employee 3', 7000.00)
]
insert_many_sql = "INSERT INTO employees (name, salary) VALUES (%s, %s)"
cursor.executemany(insert_many_sql, data)
# 提交事务
conn.commit()
- 查询数据:
# 查询所有数据
select_all_sql = "SELECT * FROM employees"
cursor.execute(select_all_sql)
results = cursor.fetchall()
for row in results:
print(row)
# 查询单条数据
select_one_sql = "SELECT * FROM employees WHERE name = 'Employee 1'"
cursor.execute(select_one_sql)
result = cursor.fetchone()
print(result)
- 更新数据:
update_sql = "UPDATE employees SET salary = 5500.00 WHERE name = 'Employee 1'"
cursor.execute(update_sql)
conn.commit()
- 删除数据:
delete_sql = "DELETE FROM employees WHERE name = 'Employee 2'"
cursor.execute(delete_sql)
conn.commit()
- 关闭连接:
conn.close()
Python与非关系型数据库操作
使用redis - py
操作Redis数据库
redis - py
是Python操作Redis数据库的常用库,它提供了简单直观的接口来执行Redis命令。
- 安装
redis - py
:使用pip install redis
进行安装。 - 连接数据库:
import redis
# 连接到Redis数据库
r = redis.Redis(host='localhost', port=6379, db = 0)
- 设置键值对:使用
set()
方法设置键值对。
r.set('name', 'John')
- 获取值:使用
get()
方法获取键对应的值。
value = r.get('name')
print(value.decode('utf - 8'))
- 删除键值对:使用
delete()
方法删除键值对。
r.delete('name')
- 其他操作:Redis支持多种数据结构,如列表、哈希、集合等。
- 列表操作:
# 向列表右侧添加元素
r.rpush('mylist', 'element1')
r.rpush('mylist', 'element2')
# 获取列表所有元素
list_values = r.lrange('mylist', 0, -1)
for value in list_values:
print(value.decode('utf - 8'))
- **哈希操作**:
# 设置哈希字段值
r.hset('myhash', 'field1', 'value1')
r.hset('myhash', 'field2', 'value2')
# 获取哈希所有字段值
hash_values = r.hgetall('myhash')
for key, value in hash_values.items():
print(key.decode('utf - 8'), value.decode('utf - 8'))
使用pymongo
操作MongoDB数据库
pymongo
是Python连接MongoDB数据库的官方驱动,提供了丰富的操作方法。
- 安装
pymongo
:使用pip install pymongo
进行安装。 - 连接数据库:
from pymongo import MongoClient
# 连接到MongoDB数据库
client = MongoClient('mongodb://localhost:27017/')
db = client['test_db']
- 插入文档:
# 获取集合
collection = db['users']
# 插入单条文档
document = {
'name': 'Alice',
'age': 30
}
collection.insert_one(document)
# 插入多条文档
documents = [
{'name': 'Bob', 'age': 25},
{'name': 'Charlie', 'age': 28}
]
collection.insert_many(documents)
- 查询文档:
# 查询所有文档
results = collection.find()
for result in results:
print(result)
# 查询单条文档
query = {'name': 'Alice'}
result = collection.find_one(query)
print(result)
- 更新文档:
query = {'name': 'Alice'}
update = {'$set': {'age': 31}}
collection.update_one(query, update)
- 删除文档:
query = {'name': 'Bob'}
collection.delete_one(query)
Python自动化脚本与数据库结合应用案例
数据备份自动化脚本
假设我们有一个MySQL数据库,需要定期备份其中的某个表的数据到CSV文件中。
- 安装依赖库:除了
pymysql
,还需要安装pandas
库用于处理CSV文件,使用pip install pandas
安装。 - 编写自动化脚本:
import pymysql
import pandas as pd
from datetime import datetime
def backup_table_to_csv():
# 连接MySQL数据库
conn = pymysql.connect(
host='localhost',
user='root',
password='password',
database='test_db',
charset='utf8mb4'
)
try:
# 查询数据
query = "SELECT * FROM products"
df = pd.read_sql(query, conn)
# 生成备份文件名
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
backup_file = f'products_backup_{timestamp}.csv'
# 保存为CSV文件
df.to_csv(backup_file, index=False)
print(f"Table backed up to {backup_file} successfully.")
except Exception as e:
print(f"Error occurred: {e}")
finally:
# 关闭连接
conn.close()
if __name__ == "__main__":
backup_table_to_csv()
此脚本连接到MySQL数据库,查询products
表的数据,将其转换为pandas
的DataFrame
对象,然后保存为带有时间戳的CSV文件。可以通过设置系统的定时任务(如在Linux系统中使用cron
,在Windows系统中使用任务计划程序)来定期执行该脚本,实现数据备份的自动化。
数据同步自动化脚本
假设我们有一个MySQL数据库和一个MongoDB数据库,需要将MySQL数据库中users
表的新增数据同步到MongoDB的users
集合中。
- 安装依赖库:安装
pymysql
和pymongo
库。 - 编写自动化脚本:
import pymysql
from pymongo import MongoClient
def sync_data():
# 连接MySQL数据库
mysql_conn = pymysql.connect(
host='localhost',
user='root',
password='password',
database='test_db',
charset='utf8mb4'
)
# 连接MongoDB数据库
mongo_client = MongoClient('mongodb://localhost:27017/')
mongo_db = mongo_client['test_db']
mongo_collection = mongo_db['users']
try:
# 获取MySQL中users表的最大ID(假设ID自增)
mysql_cursor = mysql_conn.cursor()
mysql_cursor.execute("SELECT MAX(id) FROM users")
max_id_result = mysql_cursor.fetchone()
max_id = max_id_result[0] if max_id_result[0] else 0
# 查询MySQL中新增的用户数据
query = f"SELECT * FROM users WHERE id > {max_id}"
mysql_cursor.execute(query)
new_users = mysql_cursor.fetchall()
# 将新增数据插入到MongoDB
for user in new_users:
user_dict = {
'id': user[0],
'name': user[1],
'age': user[2]
}
mongo_collection.insert_one(user_dict)
print(f"{len(new_users)} new users synced to MongoDB.")
except Exception as e:
print(f"Error occurred: {e}")
finally:
# 关闭连接
mysql_conn.close()
mongo_client.close()
if __name__ == "__main__":
sync_data()
此脚本首先获取MySQL中users
表的最大ID,然后查询ID大于该值的新增数据,将这些数据插入到MongoDB的users
集合中。同样,可以通过定时任务来定期执行该脚本,实现两个数据库之间的数据同步自动化。
基于数据库数据的自动化报告生成
假设我们有一个SQLite数据库存储销售数据,需要根据这些数据生成销售报告。
- 安装依赖库:安装
sqlite3
(已包含在Python标准库中)和matplotlib
库用于生成图表(pip install matplotlib
)。 - 编写自动化脚本:
import sqlite3
import matplotlib.pyplot as plt
import pandas as pd
def generate_sales_report():
# 连接SQLite数据库
conn = sqlite3.connect('sales.db')
try:
# 查询销售数据
query = "SELECT product, quantity, price FROM sales"
df = pd.read_sql(query, conn)
# 计算销售额
df['revenue'] = df['quantity'] * df['price']
# 按产品统计销售额
product_revenue = df.groupby('product')['revenue'].sum().reset_index()
# 绘制柱状图
plt.bar(product_revenue['product'], product_revenue['revenue'])
plt.xlabel('Product')
plt.ylabel('Revenue')
plt.title('Sales Revenue by Product')
plt.xticks(rotation = 45)
# 保存图表为图片
plt.savefig('sales_report.png')
print("Sales report generated as sales_report.png.")
except Exception as e:
print(f"Error occurred: {e}")
finally:
# 关闭连接
conn.close()
if __name__ == "__main__":
generate_sales_report()
此脚本从SQLite数据库中读取销售数据,计算每个产品的销售额,然后使用matplotlib
库生成柱状图展示各产品的销售额,并保存为图片文件,实现了基于数据库数据的自动化报告生成。可以进一步扩展,如将生成的报告发送到指定邮箱等功能。