MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Python网络编程高级技巧

2021-08-272.2k 阅读

网络协议基础与 Python 实现

在深入探讨 Python 网络编程的高级技巧之前,让我们先回顾一下网络协议的基础知识。网络协议是网络中计算机之间进行通信的规则集合,常见的协议包括 TCP(传输控制协议)和 UDP(用户数据报协议)。

TCP 协议

TCP 是一种面向连接的、可靠的传输层协议。它通过三次握手建立连接,保证数据的有序传输和完整性。在 Python 中,可以使用 socket 模块来实现基于 TCP 的网络编程。

以下是一个简单的 TCP 服务器示例代码:

import socket

# 创建一个 TCP socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 绑定 IP 地址和端口
server_address = ('localhost', 10000)
server_socket.bind(server_address)

# 监听连接
server_socket.listen(1)
print('等待客户端连接...')

while True:
    # 接受客户端连接
    client_socket, client_address = server_socket.accept()
    print('连接来自:', client_address)

    try:
        # 接收数据
        data = client_socket.recv(1024)
        print('收到数据:', data.decode('utf-8'))

        # 发送响应
        response = '你好,客户端!'.encode('utf-8')
        client_socket.sendall(response)
    finally:
        # 关闭连接
        client_socket.close()

对应的 TCP 客户端示例代码如下:

import socket

# 创建一个 TCP socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 连接服务器
server_address = ('localhost', 10000)
client_socket.connect(server_address)

try:
    # 发送数据
    message = '你好,服务器!'.encode('utf-8')
    client_socket.sendall(message)

    # 接收响应
    data = client_socket.recv(1024)
    print('收到响应:', data.decode('utf-8'))
finally:
    # 关闭连接
    client_socket.close()

UDP 协议

UDP 是一种无连接的、不可靠的传输层协议。它不需要建立连接,直接将数据报发送出去,适合对实时性要求高但对数据准确性要求相对较低的场景,如视频流、音频流传输。

以下是一个简单的 UDP 服务器示例代码:

import socket

# 创建一个 UDP socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

# 绑定 IP 地址和端口
server_address = ('localhost', 10000)
server_socket.bind(server_address)

print('等待接收数据...')
while True:
    # 接收数据
    data, client_address = server_socket.recvfrom(1024)
    print('收到数据:', data.decode('utf-8'), '来自:', client_address)

    # 发送响应
    response = '你好,UDP 客户端!'.encode('utf-8')
    server_socket.sendto(response, client_address)

UDP 客户端示例代码如下:

import socket

# 创建一个 UDP socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DUDP)

# 服务器地址
server_address = ('localhost', 10000)

# 发送数据
message = '你好,UDP 服务器!'.encode('utf-8')
client_socket.sendto(message, server_address)

# 接收响应
data, server_address = client_socket.recvfrom(1024)
print('收到响应:', data.decode('utf-8'))

# 关闭 socket
client_socket.close()

异步网络编程

随着网络应用规模的扩大和对性能要求的提高,传统的同步网络编程方式可能会成为瓶颈。异步网络编程允许程序在等待 I/O 操作完成时继续执行其他任务,从而提高整体的效率。

基于 select 模块的异步编程

select 模块是 Python 标准库中用于实现异步 I/O 多路复用的模块。它可以同时监听多个 socket 的状态变化,如可读、可写或错误状态。

以下是一个简单的基于 select 的服务器示例代码,它可以同时处理多个客户端的连接:

import socket
import select

# 创建一个 TCP socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

# 绑定 IP 地址和端口
server_address = ('localhost', 10000)
server_socket.bind(server_address)

# 监听连接
server_socket.listen(10)

# 用于存储所有 socket 的列表
sockets_list = [server_socket]
# 用于存储客户端连接和对应的地址
clients = {}

print('等待客户端连接...')

while True:
    # 使用 select 监听可读的 socket
    read_sockets, _, exception_sockets = select.select(sockets_list, [], sockets_list)

    for notified_socket in read_sockets:
        if notified_socket == server_socket:
            # 有新的客户端连接
            client_socket, client_address = server_socket.accept()
            sockets_list.append(client_socket)
            clients[client_socket] = client_address
            print('新连接来自:', client_address)
        else:
            # 已有客户端发送数据
            try:
                data = notified_socket.recv(1024)
                if data:
                    print('收到来自 {} 的数据: {}'.format(clients[notified_socket], data.decode('utf-8')))
                    notified_socket.sendall('消息已收到'.encode('utf-8'))
                else:
                    # 客户端关闭连接
                    print('客户端 {} 已关闭连接'.format(clients[notified_socket]))
                    sockets_list.remove(notified_socket)
                    del clients[notified_socket]
            except:
                # 处理异常情况
                print('客户端 {} 出现异常,关闭连接'.format(clients[notified_socket]))
                sockets_list.remove(notified_socket)
                del clients[notified_socket]

    for notified_socket in exception_sockets:
        # 处理异常 socket
        print('客户端 {} 出现异常,关闭连接'.format(clients[notified_socket]))
        sockets_list.remove(notified_socket)
        del clients[notified_socket]

基于 asyncio 的异步编程

asyncio 是 Python 3.4 引入的标准库,用于编写异步 I/O 代码。它基于事件循环,使用 asyncawait 关键字来定义异步函数。

以下是一个基于 asyncio 的简单 TCP 服务器示例:

import asyncio

async def handle_connection(reader, writer):
    # 接收数据
    data = await reader.read(1024)
    message = data.decode('utf-8')
    addr = writer.get_extra_info('peername')
    print('收到来自 {} 的数据: {}'.format(addr, message))

    # 发送响应
    response = '你好,客户端!'.encode('utf-8')
    writer.write(response)
    await writer.drain()

    print('关闭连接')
    writer.close()

async def main():
    server = await asyncio.start_server(
        handle_connection, 'localhost', 10000)

    addr = server.sockets[0].getsockname()
    print(f'在 {addr} 启动服务器')

    async with server:
        await server.serve_forever()

if __name__ == "__main__":
    asyncio.run(main())

对应的客户端示例代码如下:

import asyncio

async def send_request():
    reader, writer = await asyncio.open_connection('localhost', 10000)

    # 发送数据
    message = '你好,服务器!'.encode('utf-8')
    writer.write(message)
    await writer.drain()

    # 接收响应
    data = await reader.read(1024)
    print('收到响应:', data.decode('utf-8'))

    print('关闭连接')
    writer.close()
    await writer.wait_closed()

if __name__ == "__main__":
    asyncio.run(send_request())

高级 socket 选项

在网络编程中,socket 提供了许多高级选项,可以对 socket 的行为进行更精细的控制。

套接字选项设置

  1. SO_REUSEADDR:该选项允许在程序关闭后,立即重用绑定的地址和端口,而不必等待操作系统释放资源。在服务器端代码中,通常可以这样设置:
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  1. SO_KEEPALIVE:启用该选项后,TCP 会定期发送心跳包,以检测连接是否仍然有效。如果一段时间内没有收到响应,TCP 会尝试重新建立连接或关闭连接。
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
  1. TCP_NODELAY:禁用 Nagle 算法。Nagle 算法会将小的数据包合并成大的数据包发送,以减少网络开销,但在一些实时性要求较高的场景下,可能会导致延迟。禁用该算法可以确保数据包立即发送。
server_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

多播(Multicast)

多播是一种允许一台主机向一组特定主机发送数据的通信方式。在 Python 中,可以使用 socket 模块来实现多播。

以下是一个简单的多播发送端示例代码:

import socket
import struct

# 多播组地址和端口
MCAST_GRP = '224.1.1.1'
MCAST_PORT = 5007

# 创建 UDP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

# 设置多播 TTL(Time to Live)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, struct.pack('b', 1))

# 发送多播消息
message = '这是一条多播消息'.encode('utf-8')
sock.sendto(message, (MCAST_GRP, MCAST_PORT))

多播接收端示例代码如下:

import socket
import struct
import time

# 多播组地址和端口
MCAST_GRP = '224.1.1.1'
MCAST_PORT = 5007

# 创建 UDP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DUDP, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

# 绑定到指定端口
sock.bind(('', MCAST_PORT))

# 加入多播组
mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)

while True:
    # 接收多播数据
    data, addr = sock.recvfrom(1024)
    print('收到多播数据:', data.decode('utf-8'), '来自:', addr)
    time.sleep(1)

网络安全与加密

在网络通信中,数据的安全性至关重要。Python 提供了多种库来实现网络安全和加密功能。

SSL/TLS 加密

ssl 模块是 Python 标准库中用于实现 SSL/TLS 加密的模块。它可以在 TCP 连接的基础上添加加密层,保护数据的传输安全。

以下是一个使用 ssl 模块的简单加密服务器示例:

import socket
import ssl

# 创建一个 TCP socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('localhost', 10000))
server_socket.listen(1)

# 创建 SSL 上下文
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
context.load_cert_chain(certfile='server.crt', keyfile='server.key')

while True:
    # 接受客户端连接
    client_socket, client_address = server_socket.accept()
    print('连接来自:', client_address)

    # 使用 SSL 包装 socket
    ssl_socket = context.wrap_socket(client_socket, server_side=True)

    try:
        # 接收数据
        data = ssl_socket.recv(1024)
        print('收到数据:', data.decode('utf-8'))

        # 发送响应
        response = '你好,加密客户端!'.encode('utf-8')
        ssl_socket.sendall(response)
    finally:
        # 关闭 SSL socket
        ssl_socket.close()

对应的加密客户端示例代码如下:

import socket
import ssl

# 创建一个 TCP socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 创建 SSL 上下文
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
context.load_verify_locations(cafile='server.crt')

# 连接服务器并使用 SSL 包装 socket
ssl_socket = context.wrap_socket(client_socket, server_hostname='localhost')
ssl_socket.connect(('localhost', 10000))

try:
    # 发送数据
    message = '你好,加密服务器!'.encode('utf-8')
    ssl_socket.sendall(message)

    # 接收响应
    data = ssl_socket.recv(1024)
    print('收到响应:', data.decode('utf-8'))
finally:
    # 关闭 SSL socket
    ssl_socket.close()

认证与授权

在网络应用中,认证(Authentication)用于验证用户的身份,而授权(Authorization)用于确定用户是否有权执行特定的操作。常见的认证方式包括用户名/密码认证、令牌认证等。

  1. 基本认证(Basic Authentication):基本认证是一种简单的认证方式,它将用户名和密码以 Base64 编码的形式发送到服务器。以下是一个简单的示例,展示如何在 HTTP 服务器中实现基本认证:
import http.server
import base64
import hashlib

# 用户名和密码
USERNAME = 'admin'
PASSWORD = 'password'

class BasicAuthHandler(http.server.BaseHTTPRequestHandler):
    def do_GET(self):
        if 'Authorization' not in self.headers:
            self.send_response(401)
            self.send_header('WWW-Authenticate', 'Basic realm="Login Required"')
            self.end_headers()
            return

        auth_header = self.headers['Authorization']
        auth_method, auth_string = auth_header.split(' ', 1)
        if auth_method.lower() != 'basic':
            self.send_response(400)
            self.end_headers()
            return

        decoded_auth = base64.b64decode(auth_string).decode('utf-8')
        username, password = decoded_auth.split(':', 1)
        if username != USERNAME or password != PASSWORD:
            self.send_response(401)
            self.send_header('WWW-Authenticate', 'Basic realm="Login Required"')
            self.end_headers()
            return

        self.send_response(200)
        self.end_headers()
        self.wfile.write(b'认证成功')

if __name__ == '__main__':
    server_address = ('', 8000)
    httpd = http.server.HTTPServer(server_address, BasicAuthHandler)
    print('在端口 8000 启动服务器')
    httpd.serve_forever()
  1. 令牌认证(Token Authentication):令牌认证通常使用 JWT(JSON Web Token)等技术。JWT 是一种用于在网络应用中安全传输信息的开放标准。以下是一个简单的示例,展示如何使用 PyJWT 库进行令牌生成和验证:
import jwt
import time

# 密钥
SECRET_KEY = 'your_secret_key'

# 生成令牌
def generate_token(username):
    payload = {
        'username': username,
        'exp': time.time() + 3600  # 令牌有效期 1 小时
    }
    token = jwt.encode(payload, SECRET_KEY, algorithm='HS256')
    return token

# 验证令牌
def verify_token(token):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
        return payload
    except jwt.ExpiredSignatureError:
        return None
    except jwt.InvalidTokenError:
        return None

# 示例使用
if __name__ == '__main__':
    username = 'user1'
    token = generate_token(username)
    print('生成的令牌:', token)

    payload = verify_token(token)
    if payload:
        print('验证成功,用户名:', payload['username'])
    else:
        print('验证失败')

网络爬虫高级技巧

网络爬虫是一种自动从网页中提取信息的程序。在实际应用中,需要掌握一些高级技巧来处理复杂的网页结构和反爬虫机制。

处理动态网页

许多现代网页使用 JavaScript 来动态加载内容。传统的基于 requests 库的爬虫可能无法获取这些动态内容。可以使用 Selenium 库结合浏览器驱动(如 ChromeDriver)来模拟浏览器行为,获取完整的网页内容。

以下是一个使用 Selenium 和 ChromeDriver 爬取动态网页的示例:

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

# 启动 Chrome 浏览器
driver = webdriver.Chrome()

# 打开网页
driver.get('https://example.com')

try:
    # 等待特定元素加载完成
    element = WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.ID, 'element_id'))
    )
    # 获取元素文本
    text = element.text
    print('获取到的文本:', text)
finally:
    # 关闭浏览器
    driver.quit()

处理反爬虫机制

网站通常会采取一些反爬虫措施,如检测异常的请求频率、检查 User - Agent 等。为了应对这些措施,可以采取以下方法:

  1. 伪装 User - Agent:在请求头中设置合理的 User - Agent,模拟真实浏览器的请求。例如,使用 requests 库时,可以这样设置:
import requests

headers = {
    'User - Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get('https://example.com', headers = headers)
  1. 控制请求频率:避免过于频繁地发送请求,可以使用 time 模块在请求之间添加适当的延迟。
import requests
import time

url_list = ['https://example1.com', 'https://example2.com']
for url in url_list:
    response = requests.get(url)
    print('获取到网页:', url)
    time.sleep(5)  # 每次请求后等待 5 秒
  1. 使用代理:通过代理服务器发送请求,可以隐藏真实的 IP 地址,防止被封禁。可以使用 requests - proxy 库来设置代理。
import requests

proxies = {
    'http': 'http://your_proxy_ip:port',
    'https': 'https://your_proxy_ip:port'
}
response = requests.get('https://example.com', proxies = proxies)

分布式网络编程

随着数据量和计算需求的不断增长,分布式网络编程变得越来越重要。Python 提供了一些库来实现分布式系统。

分布式任务队列

Celery 是一个基于 Python 的分布式任务队列,它允许将任务异步执行,并且可以在多个 worker 节点上进行分布式处理。

以下是一个简单的 Celery 示例,展示如何定义和执行任务:

  1. 安装 Celery 和消息代理(如 Redis)
pip install celery redis
  1. 定义任务
from celery import Celery

# 创建 Celery 实例
app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y
  1. 启动 worker
celery -A tasks worker --loglevel=info
  1. 调用任务
from tasks import add

result = add.delay(4, 5)
print('任务结果:', result.get())

分布式计算框架

Dask 是一个用于分布式计算的 Python 库,它提供了类似于 NumPyPandas 的接口,可以处理比内存更大的数据。

以下是一个简单的 Dask 示例,展示如何进行分布式数组计算:

import dask.array as da

# 创建一个分布式数组
x = da.ones((10000, 10000), chunks=(1000, 1000))
y = da.ones((10000, 10000), chunks=(1000, 1000))

# 进行数组计算
z = x + y

# 计算结果
result = z.sum().compute()
print('计算结果:', result)

通过掌握以上 Python 网络编程的高级技巧,开发者可以构建出更加高效、安全和可扩展的网络应用程序,满足日益增长的业务需求。无论是开发高性能的服务器、处理复杂的网络协议,还是应对网络安全和分布式计算的挑战,这些技巧都将成为宝贵的工具。在实际应用中,还需要根据具体的场景和需求,灵活选择和组合这些技术,以达到最佳的效果。