Python网络安全基础
2024-06-126.5k 阅读
网络安全基础概念
- 网络攻击类型
- 注入攻击:这是一种常见的攻击方式,攻击者通过将恶意代码插入到应用程序输入字段中,从而获取数据库或其他敏感信息的访问权限。例如SQL注入,在使用Python的Web开发中,如果对用户输入没有进行严格验证,可能会遭受此类攻击。在一个简单的基于Flask框架连接MySQL数据库的示例中,如果代码这样写:
import mysql.connector
from flask import Flask, request
app = Flask(__name__)
@app.route('/query')
def query():
username = request.args.get('username')
password = request.args.get('password')
conn = mysql.connector.connect(user='root', password='password', host='127.0.0.1', database='test')
cursor = conn.cursor()
query = "SELECT * FROM users WHERE username = '%s' AND password = '%s'" % (username, password)
cursor.execute(query)
result = cursor.fetchall()
conn.close()
return str(result)
if __name__ == '__main__':
app.run(debug=True)
在上述代码中,如果攻击者在浏览器地址栏输入/query?username=admin' OR '1'='1&password=123
,由于SQL语句拼接方式,就会执行一个恶意的查询,绕过了密码验证,因为'1'='1'
恒成立。
- 跨站脚本攻击(XSS):XSS攻击是指攻击者往Web页面里插入恶意JavaScript代码,当用户浏览该页之时,嵌入其中的JavaScript代码会被执行,从而达到恶意攻击用户的目的。例如在一个简单的Python Web应用中,如果对用户输入的评论没有进行安全过滤就直接显示在页面上:
from flask import Flask, request, render_template_string
app = Flask(__name__)
@app.route('/comment', methods=['GET', 'POST'])
def comment():
if request.method == 'POST':
comment_text = request.form.get('comment')
template = """
<html>
<body>
<h1>Comments</h1>
<p>{{ comment }}</p>
</body>
</html>
"""
return render_template_string(template, comment=comment_text)
else:
return """
<html>
<body>
<form method="post">
<label for="comment">Comment:</label><br>
<input type="text" id="comment" name="comment"><br>
<input type="submit" value="Submit">
</form>
</body>
</html>
"""
if __name__ == '__main__':
app.run(debug=True)
攻击者可以输入<script>alert('XSS')</script>
,当其他用户访问该评论页面时,这段恶意脚本就会被执行。
2. 网络安全防御机制
- 输入验证:在Python代码中,对于所有用户输入都应该进行严格验证。以处理整数输入为例,在一个简单的命令行程序中,需要用户输入一个整数:
try:
num = int(input("请输入一个整数: "))
print(f"你输入的整数是: {num}")
except ValueError:
print("输入无效,请输入一个整数。")
在Web开发中,Flask-WTF扩展可以帮助进行表单验证。首先安装flask - wtf
库,然后可以这样使用:
from flask import Flask, render_template_string, request
from wtforms import IntegerField, Form
from wtforms.validators import DataRequired
app = Flask(__name__)
class NumberForm(Form):
number = IntegerField('Number', validators=[DataRequired()])
@app.route('/input_number', methods=['GET', 'POST'])
def input_number():
form = NumberForm(request.form)
if request.method == 'POST' and form.validate():
number = form.number.data
return f"你输入的有效整数是: {number}"
template = """
<html>
<body>
<form method="post">
{{ form.csrf_token }}
<label for="number">输入一个整数:</label><br>
{{ form.number }}
<input type="submit" value="提交">
</form>
</body>
</html>
"""
return render_template_string(template, form = form)
if __name__ == '__main__':
app.run(debug=True)
- **安全编码实践**:遵循安全编码规范,例如避免使用不安全的函数。在处理文件操作时,使用`with`语句来确保文件正确关闭,防止资源泄露。
try:
with open('test.txt', 'r') as f:
content = f.read()
print(content)
except FileNotFoundError:
print("文件未找到")
Python网络编程基础
- 套接字(Socket)编程
- TCP套接字:TCP是一种面向连接的、可靠的传输协议。在Python中,可以使用
socket
模块来创建TCP套接字。以下是一个简单的TCP服务器和客户端示例。 - TCP服务器代码:
- TCP套接字:TCP是一种面向连接的、可靠的传输协议。在Python中,可以使用
import socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('127.0.0.1', 8888))
server_socket.listen(5)
print('服务器正在监听127.0.0.1:8888')
while True:
client_socket, client_address = server_socket.accept()
print(f'与{client_address}建立连接')
data = client_socket.recv(1024)
print(f'接收到数据: {data.decode()}')
response = '数据已收到'.encode()
client_socket.send(response)
client_socket.close()
- **TCP客户端代码**:
import socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect(('127.0.0.1', 8888))
message = '你好,服务器'.encode()
client_socket.send(message)
data = client_socket.recv(1024)
print(f'从服务器接收到: {data.decode()}')
client_socket.close()
- **UDP套接字**:UDP是一种无连接的、不可靠的传输协议。同样使用`socket`模块来创建UDP套接字。
- **UDP服务器代码**:
import socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.bind(('127.0.0.1', 9999))
print('UDP服务器正在监听127.0.0.1:9999')
while True:
data, client_address = server_socket.recvfrom(1024)
print(f'从{client_address}接收到数据: {data.decode()}')
response = 'UDP数据已收到'.encode()
server_socket.sendto(response, client_address)
- **UDP客户端代码**:
import socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DUDP)
server_address = ('127.0.0.1', 9999)
message = '你好,UDP服务器'.encode()
client_socket.sendto(message, server_address)
data, server_address = client_socket.recvfrom(1024)
print(f'从服务器接收到: {data.decode()}')
client_socket.close()
- HTTP协议与Web请求
- 使用
requests
库发送HTTP请求:requests
库是Python中非常流行的用于发送HTTP请求的库。例如,发送一个GET请求获取网页内容:
- 使用
import requests
response = requests.get('https://www.example.com')
if response.status_code == 200:
print(response.text)
else:
print(f'请求失败,状态码: {response.status_code}')
发送POST请求,例如向一个登录接口发送用户名和密码:
import requests
url = 'https://example.com/login'
data = {
'username': 'user',
'password': 'pass'
}
response = requests.post(url, data=data)
if response.status_code == 200:
print('登录成功')
else:
print(f'登录失败,状态码: {response.status_code}')
- **使用`http.server`模块创建简单HTTP服务器**:`http.server`模块可以用于创建简单的HTTP服务器。以下是一个基本的示例:
import http.server
import socketserver
PORT = 8000
class MyHandler(http.server.SimpleHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header('Content - type', 'text/html')
self.end_headers()
self.wfile.write(b'Hello, this is a simple HTTP server')
with socketserver.TCPServer(("", PORT), MyHandler) as httpd:
print(f'Serving at port {PORT}')
httpd.serve_forever()
Python与网络扫描
- 端口扫描
- 使用
socket
模块进行简单端口扫描:可以编写一个脚本来扫描指定主机的特定端口是否开放。
- 使用
import socket
def port_scan(host, port):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((host, port))
if result == 0:
print(f'端口 {port} 开放')
else:
print(f'端口 {port} 关闭')
sock.close()
except socket.error:
print(f'无法连接到 {host}:{port}')
host = '127.0.0.1'
ports = [22, 80, 443]
for port in ports:
port_scan(host, port)
- **使用`nmap`模块进行更强大的端口扫描**:`nmap`模块是Python对著名的Nmap网络扫描工具的封装。首先需要安装`python - nmap`库。以下是一个示例:
import nmap
nm = nmap.PortScanner()
nm.scan('127.0.0.1', '1 - 1024')
for host in nm.all_hosts():
print(f'主机 : {host}')
print(f'状态 : {nm[host].state()}')
for proto in nm[host].all_protocols():
print(f'协议 : {proto}')
lport = nm[host][proto].keys()
for port in lport:
print(f'端口 : {port}\t状态 : {nm[host][proto][port]["state"]}')
- 网络发现
- 使用ARP协议进行局域网内主机发现:可以使用
scapy
库来实现基于ARP协议的局域网主机发现。首先安装scapy
库。
- 使用ARP协议进行局域网内主机发现:可以使用
from scapy.all import ARP, Ether, srp
ip_range = '192.168.1.0/24'
arp = ARP(pdst=ip_range)
ether = Ether(dst='ff:ff:ff:ff:ff:ff')
packet = ether / arp
result = srp(packet, timeout = 3, verbose = 0)[0]
clients = []
for sent, received in result:
clients.append({'ip': received.psrc,'mac': received.hwsrc})
for client in clients:
print(f"IP: {client['ip']}\tMAC: {client['mac']}")
Python与Web安全
- Web应用漏洞检测
- SQL注入检测:可以编写一个简单的函数来检测SQL注入风险。对于一个SQL查询字符串,检查是否存在常见的SQL注入关键字。
def detect_sql_injection(query):
keywords = ['OR', 'AND', ';', '--', 'DELETE', 'DROP', 'TRUNCATE']
for keyword in keywords:
if keyword.lower() in query.lower():
return True
return False
query1 = "SELECT * FROM users WHERE username = 'admin' AND password = '123'"
query2 = "SELECT * FROM users WHERE username = 'admin' OR '1'='1'"
print(detect_sql_injection(query1))
print(detect_sql_injection(query2))
- **XSS漏洞检测**:检测用户输入中是否存在HTML或JavaScript标签。
import re
def detect_xss(input_str):
pattern = re.compile(r'<(.*?)>')
if pattern.search(input_str):
return True
return False
input1 = "这是一个正常的评论"
input2 = "<script>alert('XSS')</script>"
print(detect_xss(input1))
print(detect_xss(input2))
- Web应用安全加固
- 防止SQL注入:在使用数据库时,使用参数化查询。以
sqlite3
为例:
- 防止SQL注入:在使用数据库时,使用参数化查询。以
import sqlite3
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
username = 'admin'
password = '123'
query = "SELECT * FROM users WHERE username =? AND password =?"
cursor.execute(query, (username, password))
result = cursor.fetchall()
conn.close()
- **防止XSS**:在Web应用中,对用户输入进行HTML转义。在Flask框架中,可以使用`MarkupSafe`库中的`escape`函数:
from flask import Flask, render_template_string, request
from markupsafe import escape
app = Flask(__name__)
@app.route('/safe_comment', methods=['GET', 'POST'])
def safe_comment():
if request.method == 'POST':
comment_text = request.form.get('comment')
safe_comment = escape(comment_text)
template = """
<html>
<body>
<h1>Comments</h1>
<p>{{ comment }}</p>
</body>
</html>
"""
return render_template_string(template, comment = safe_comment)
else:
return """
<html>
<body>
<form method="post">
<label for="comment">Comment:</label><br>
<input type="text" id="comment" name="comment"><br>
<input type="submit" value="Submit">
</form>
</body>
</html>
"""
if __name__ == '__main__':
app.run(debug=True)
Python与密码学
- 对称加密
- 使用
pycryptodome
库进行AES加密:AES(高级加密标准)是一种常用的对称加密算法。首先安装pycryptodome
库。
- 使用
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
import binascii
key = b'1234567890123456'
iv = b'1234567890123456'
plaintext = b'这是一段需要加密的文本'
cipher = AES.new(key, AES.MODE_CBC, iv)
padded_plaintext = pad(plaintext, AES.block_size)
ciphertext = cipher.encrypt(padded_plaintext)
print(binascii.hexlify(ciphertext))
cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted_padded = cipher.decrypt(ciphertext)
decrypted = unpad(decrypted_padded, AES.block_size)
print(decrypted.decode())
- 非对称加密
- 使用
rsa
库进行RSA加密:RSA是一种广泛使用的非对称加密算法。安装rsa
库后,以下是示例代码:
- 使用
import rsa
# 生成密钥对
(pubkey, privkey) = rsa.newkeys(512)
message = '这是需要加密的消息'.encode()
# 加密
crypto = rsa.encrypt(message, pubkey)
print(crypto)
# 解密
decrypt = rsa.decrypt(crypto, privkey)
print(decrypt.decode())
- 哈希函数
- 使用
hashlib
库进行MD5和SHA - 256哈希计算:
- 使用
import hashlib
data = '这是需要计算哈希值的数据'.encode()
md5_hash = hashlib.md5(data)
print('MD5哈希值:', md5_hash.hexdigest())
sha256_hash = hashlib.sha256(data)
print('SHA - 256哈希值:', sha256_hash.hexdigest())
Python与恶意软件分析
- 恶意软件行为模拟
- 模拟简单的后门程序:以下代码模拟了一个简单的后门程序,在特定条件下执行系统命令。
import socket
import subprocess
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('127.0.0.1', 12345))
server_socket.listen(1)
while True:
client_socket, client_address = server_socket.accept()
command = client_socket.recv(1024).decode()
if command:
try:
result = subprocess.check_output(command, shell=True)
client_socket.send(result)
except subprocess.CalledProcessError as e:
client_socket.send(str(e).encode())
client_socket.close()
- 恶意软件检测
- 检测进程是否为恶意进程:可以通过监控进程的行为,例如CPU和内存使用情况,结合已知的恶意进程特征来判断。以下是一个简单示例,监控进程名称是否为已知恶意进程名称。
import psutil
def detect_malicious_process():
malicious_process_names = ['malware.exe', 'virus.exe']
for proc in psutil.process_iter(['name']):
if proc.info['name'] in malicious_process_names:
print(f'检测到恶意进程: {proc.info["name"]}')
detect_malicious_process()
Python与网络安全自动化
- 安全任务自动化脚本
- 定期进行端口扫描并记录结果:可以编写一个脚本,定期对指定主机进行端口扫描,并将结果记录到文件中。
import socket
import time
def port_scan(host, port):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((host, port))
if result == 0:
return True
else:
return False
sock.close()
except socket.error:
return False
host = '127.0.0.1'
ports = [22, 80, 443]
while True:
with open('port_scan_results.txt', 'a') as f:
f.write(f'扫描时间: {time.ctime()}\n')
for port in ports:
if port_scan(host, port):
f.write(f'端口 {port} 开放\n')
else:
f.write(f'端口 {port} 关闭\n')
f.write('\n')
time.sleep(3600) # 每小时扫描一次
- 安全配置自动化
- 使用
paramiko
库自动化配置远程服务器:paramiko
库可以实现通过SSH协议远程连接服务器并执行命令。例如,远程修改服务器的防火墙规则。
- 使用
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect('remote_server_ip', username='user', password='pass')
command = 'iptables -A INPUT -p tcp --dport 80 -j ACCEPT'
stdin, stdout, stderr = ssh.exec_command(command)
if stderr.read():
print(f'执行命令出错: {stderr.read().decode()}')
else:
print('命令执行成功')
ssh.close()