MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Python网络编程最佳实践

2024-11-153.9k 阅读

Python 网络编程基础

网络编程概述

网络编程旨在实现不同设备(节点)之间通过网络进行数据交换和通信。在现代互联网应用中,从简单的网页浏览到复杂的分布式系统,网络编程都扮演着关键角色。Python 凭借其简洁的语法和丰富的库,成为网络编程的热门选择。

套接字(Socket)基础

套接字是网络编程的核心概念,它是不同主机间进程通信的端点。在 Python 中,通过 socket 模块来操作套接字。

  1. 创建套接字

    import socket
    
    # 创建一个 TCP 套接字
    tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 创建一个 UDP 套接字
    udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    

    在上述代码中,socket.socket() 函数用于创建套接字。第一个参数 socket.AF_INET 表示使用 IPv4 地址族,若要使用 IPv6 则为 socket.AF_INET6。第二个参数 socket.SOCK_STREAM 代表 TCP 协议,socket.SOCK_DUDP 代表 UDP 协议。

  2. 绑定地址和端口 套接字创建后,需要绑定到特定的地址和端口,以便接收或发送数据。

    import socket
    
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_address = ('127.0.0.1', 8888)
    server_socket.bind(server_address)
    

    这里将服务器套接字绑定到本地回环地址 127.0.0.1(即 localhost)的 8888 端口。

  3. 监听连接(仅适用于 TCP) 对于 TCP 服务器,绑定后需要开始监听传入的连接。

    import socket
    
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_address = ('127.0.0.1', 8888)
    server_socket.bind(server_address)
    server_socket.listen(5)  # 最大连接数为 5
    

    listen() 方法中的参数指定了等待连接队列的最大长度。

  4. 接受连接(仅适用于 TCP) 服务器监听后,使用 accept() 方法接受客户端的连接。

    import socket
    
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_address = ('127.0.0.1', 8888)
    server_socket.bind(server_address)
    server_socket.listen(5)
    
    while True:
        client_socket, client_address = server_socket.accept()
        print(f"Accepted connection from {client_address}")
        client_socket.close()
    

    accept() 方法是阻塞的,直到有客户端连接进来,它返回一个新的套接字对象(用于与客户端通信)和客户端的地址。

  5. 发送和接收数据

    • TCP 发送和接收
      import socket
      
      # 客户端
      client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      server_address = ('127.0.0.1', 8888)
      client_socket.connect(server_address)
      message = "Hello, server!"
      client_socket.sendall(message.encode('utf - 8'))
      data = client_socket.recv(1024)
      print(f"Received: {data.decode('utf - 8')}")
      client_socket.close()
      
      # 服务器
      server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      server_address = ('127.0.0.1', 8888)
      server_socket.bind(server_address)
      server_socket.listen(5)
      
      while True:
          client_socket, client_address = server_socket.accept()
          data = client_socket.recv(1024)
          print(f"Received from {client_address}: {data.decode('utf - 8')}")
          response = "Message received successfully!"
          client_socket.sendall(response.encode('utf - 8'))
          client_socket.close()
      
      在 TCP 中,sendall() 方法用于发送数据,recv() 方法用于接收数据。recv() 方法的参数指定了一次最多接收的字节数。
    • UDP 发送和接收
      import socket
      
      # 客户端
      client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
      server_address = ('127.0.0.1', 8888)
      message = "Hello, UDP server!"
      client_socket.sendto(message.encode('utf - 8'), server_address)
      data, server = client_socket.recvfrom(1024)
      print(f"Received from {server}: {data.decode('utf - 8')}")
      client_socket.close()
      
      # 服务器
      server_socket = socket.socket(socket.AF_INET, socket.SOCK_DUDP)
      server_address = ('127.0.0.1', 8888)
      server_socket.bind(server_address)
      
      while True:
          data, client_address = server_socket.recvfrom(1024)
          print(f"Received from {client_address}: {data.decode('utf - 8')}")
          response = "UDP message received successfully!"
          server_socket.sendto(response.encode('utf - 8'), client_address)
      
      在 UDP 中,使用 sendto() 方法发送数据,recvfrom() 方法接收数据,这两个方法都需要指定目标地址或返回源地址。

基于 HTTP 的网络编程

HTTP 协议简介

HTTP(Hyper - Text Transfer Protocol)是用于在万维网上传输超文本的应用层协议。它基于请求 - 响应模型,客户端发送请求,服务器返回响应。HTTP 是无状态协议,即服务器不会在不同请求间记住客户端的状态。

使用 requests 库发送 HTTP 请求

requests 库是 Python 中最常用的 HTTP 客户端库,它提供了简洁易用的接口。

  1. 发送 GET 请求
    import requests
    
    response = requests.get('https://www.example.com')
    print(f"Status code: {response.status_code}")
    print(f"Content: {response.text}")
    
    requests.get() 方法发送一个 GET 请求,response.status_code 返回 HTTP 状态码,response.text 返回响应的文本内容。
  2. 发送 POST 请求
    import requests
    
    data = {'key1': 'value1', 'key2': 'value2'}
    response = requests.post('https://www.example.com/api', data = data)
    print(f"Status code: {response.status_code}")
    print(f"Content: {response.json()}")
    
    这里使用 requests.post() 方法发送 POST 请求,data 参数用于传递表单数据。如果响应是 JSON 格式,可使用 response.json() 方法将其解析为 Python 字典。
  3. 设置请求头
    import requests
    
    headers = {'User - Agent': 'MyApp/1.0'}
    response = requests.get('https://www.example.com', headers = headers)
    
    通过 headers 参数可以设置自定义的请求头。

使用 Flask 框架搭建 HTTP 服务器

Flask 是一个轻量级的 Python Web 框架,用于快速搭建 HTTP 服务器。

  1. 安装 Flask 可通过 pip install flask 命令安装。
  2. 基本示例
    from flask import Flask
    
    app = Flask(__name__)
    
    @app.route('/')
    def hello_world():
        return 'Hello, World!'
    
    if __name__ == '__main__':
        app.run()
    
    上述代码创建了一个简单的 Flask 应用,@app.route('/') 装饰器定义了根路径的处理函数。app.run() 启动服务器,默认在 127.0.0.1:5000 运行。
  3. 动态路由
    from flask import Flask
    
    app = Flask(__name__)
    
    @app.route('/user/<username>')
    def show_user_profile(username):
        return f'User {username}'
    
    if __name__ == '__main__':
        app.run()
    
    这里定义了一个动态路由,<username> 是动态部分,可根据不同的用户名返回不同内容。
  4. 处理请求方法
    from flask import Flask, request
    
    app = Flask(__name__)
    
    @app.route('/login', methods = ['GET', 'POST'])
    def login():
        if request.method == 'POST':
            return 'Handling POST request'
        else:
            return 'Handling GET request'
    
    if __name__ == '__main__':
        app.run()
    
    methods 参数指定了该路由可以接受的请求方法,通过 request.method 可判断当前请求的方法。

高级网络编程技术

异步网络编程

随着网络应用对性能和并发处理能力要求的提高,异步编程变得越来越重要。Python 提供了 asyncio 库来支持异步 I/O 操作。

  1. 基本概念
    • 协程(Coroutine):是一种可以暂停和恢复执行的函数,使用 async def 定义。
    • 事件循环(Event Loop):负责调度协程的执行,在 asyncio 中通过 asyncio.get_event_loop() 获取。
    • 任务(Task):是对协程的进一步封装,用于并发执行协程。
  2. 简单异步示例
    import asyncio
    
    async def fetch_data():
        await asyncio.sleep(2)
        return {'data': 'Some fetched data'}
    
    async def main():
        task = asyncio.create_task(fetch_data())
        result = await task
        print(result)
    
    if __name__ == '__main__':
        asyncio.run(main())
    
    在这个示例中,fetch_data() 是一个协程,使用 await asyncio.sleep(2) 模拟异步 I/O 操作(等待 2 秒)。main() 也是一个协程,通过 asyncio.create_task() 创建任务并执行 fetch_data() 协程,最后使用 await 获取结果。asyncio.run() 用于运行主协程。
  3. 异步网络请求 使用 aiohttp 库可以进行异步 HTTP 请求。
    import asyncio
    import aiohttp
    
    async def fetch(session, url):
        async with session.get(url) as response:
            return await response.json()
    
    async def main():
        async with aiohttp.ClientSession() as session:
            tasks = []
            urls = ['https://www.example1.com', 'https://www.example2.com']
            for url in urls:
                task = asyncio.create_task(fetch(session, url))
                tasks.append(task)
            results = await asyncio.gather(*tasks)
            print(results)
    
    if __name__ == '__main__':
        asyncio.run(main())
    
    这里 fetch() 协程使用 aiohttpsession.get() 发送异步 GET 请求并返回 JSON 响应。main() 协程创建多个任务并发执行多个请求,asyncio.gather() 用于等待所有任务完成并返回结果。

网络安全相关编程

在网络编程中,安全至关重要。以下介绍一些常见的网络安全相关编程要点。

  1. SSL/TLS 加密
    • 使用 ssl 模块(基于套接字)
      import socket
      import ssl
      
      context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
      context.load_cert_chain(certfile='server.crt', keyfile='server.key')
      
      server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      server_socket.bind(('127.0.0.1', 8888))
      server_socket.listen(5)
      
      ssl_socket = context.wrap_socket(server_socket, server_side = True)
      
      while True:
          client_socket, client_address = ssl_socket.accept()
          data = client_socket.recv(1024)
          print(f"Received: {data.decode('utf - 8')}")
          response = "Message received securely!"
          client_socket.sendall(response.encode('utf - 8'))
          client_socket.close()
      
      上述代码创建了一个基于 SSL/TLS 的 TCP 服务器。ssl.SSLContext 用于配置 SSL/TLS 协议版本等,load_cert_chain() 方法加载服务器的证书和私钥。wrap_socket() 方法将普通套接字包装为 SSL 套接字。
    • Flask 中启用 SSL/TLS 可以使用 gunicorn 等服务器,并结合 gunicorn 的 SSL 配置选项来为 Flask 应用启用 SSL/TLS。例如,安装 gunicorn 后,通过以下命令启动 Flask 应用并启用 SSL/TLS:
      gunicorn -b 127.0.0.1:8888 --keyfile server.key --certfile server.crt app:app
      
  2. 输入验证和防止注入攻击
    • SQL 注入防范 在使用数据库时,如 SQLite 结合 Python 的 sqlite3 模块,使用参数化查询来防止 SQL 注入。
      import sqlite3
      
      username = 'test'
      password = 'password'
      conn = sqlite3.connect('example.db')
      cursor = conn.cursor()
      query = "SELECT * FROM users WHERE username =? AND password =?"
      cursor.execute(query, (username, password))
      result = cursor.fetchone()
      conn.close()
      
      这里使用 ? 作为占位符,将实际参数通过第二个参数传递给 execute() 方法,而不是直接拼接 SQL 语句。
    • HTTP 参数验证 在 Flask 应用中,可以使用 wtforms 等库来验证 HTTP 请求参数。
      from flask import Flask, request
      from wtforms import Form, StringField, validators
      
      app = Flask(__name__)
      
      class LoginForm(Form):
          username = StringField('Username', [validators.Length(min = 4, max = 25)])
          password = StringField('Password', [validators.Length(min = 6, max = 35)])
      
      @app.route('/login', methods = ['GET', 'POST'])
      def login():
          form = LoginForm(request.form)
          if request.method == 'POST' and form.validate():
              return 'Login successful'
          else:
              return 'Invalid input'
      
      if __name__ == '__main__':
          app.run()
      
      wtforms 定义了表单字段和验证规则,form.validate() 方法用于验证表单数据是否符合规则。

网络编程中的性能优化

连接管理优化

  1. TCP 连接复用 在 HTTP 1.1 中,默认支持持久连接(TCP 连接复用)。在 requests 库中,它会自动处理连接复用。例如:
    import requests
    
    session = requests.Session()
    response1 = session.get('https://www.example.com')
    response2 = session.get('https://www.example.com/another - page')
    
    使用 requests.Session() 创建会话对象,会话会自动复用 TCP 连接,减少连接建立和关闭的开销。
  2. 连接池
    • urllib3 的连接池 urllib3requests 库背后使用的连接池库。可以直接使用 urllib3 来管理连接池。
      import urllib3
      
      http = urllib3.PoolManager()
      response = http.request('GET', 'https://www.example.com')
      
      PoolManager 会自动管理连接池,根据需要创建和复用连接。
    • 数据库连接池(以 psycopg2 为例) 对于数据库连接,如 PostgreSQL 的 psycopg2 库,可以使用 psycopg2 - pool 来实现连接池。
      from psycopg2 import pool
      
      postgreSQL_pool = pool.SimpleConnectionPool(
          1,  # 最小连接数
          20, # 最大连接数
          user="user",
          password="password",
          host="127.0.0.1",
          port="5432",
          database="example"
      )
      
      connection = postgreSQL_pool.getconn()
      cursor = connection.cursor()
      cursor.execute("SELECT * FROM some_table")
      result = cursor.fetchall()
      postgreSQL_pool.putconn(connection)
      postgreSQL_pool.closeall()
      
      这里使用 SimpleConnectionPool 创建连接池,getconn() 获取连接,putconn() 归还连接,closeall() 关闭所有连接。

数据传输优化

  1. 数据压缩
    • HTTP 压缩 在 Flask 应用中,可以使用 Flask - Compress 扩展来启用 HTTP 压缩。 首先安装 Flask - Compresspip install Flask - Compress。 然后在应用中启用:
      from flask import Flask
      from flask_compress import Compress
      
      app = Flask(__name__)
      Compress(app)
      
      @app.route('/')
      def index():
          return 'Some long content'
      
      Flask - Compress 会自动对响应数据进行压缩(如 gzip),减少数据传输量。
    • 自定义数据压缩 在套接字编程中,可以使用 zlib 库进行自定义数据压缩。
      import socket
      import zlib
      
      server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      server_address = ('127.0.0.1', 8888)
      server_socket.bind(server_address)
      server_socket.listen(5)
      
      while True:
          client_socket, client_address = server_socket.accept()
          data = client_socket.recv(1024)
          decompressed_data = zlib.decompress(data)
          print(f"Received: {decompressed_data.decode('utf - 8')}")
          response = "Message received successfully!"
          compressed_response = zlib.compress(response.encode('utf - 8'))
          client_socket.sendall(compressed_response)
          client_socket.close()
      
      这里服务器接收压缩后的数据,使用 zlib.decompress() 解压,然后将响应数据压缩后发送回去。
  2. 批量数据传输 在网络传输中,尽量批量传输数据而不是多次小数据传输。例如,在数据库操作中,使用 executemany() 方法批量插入数据。
    import sqlite3
    
    conn = sqlite3.connect('example.db')
    cursor = conn.cursor()
    data = [('user1', 'password1'), ('user2', 'password2')]
    query = "INSERT INTO users (username, password) VALUES (?,?)"
    cursor.executemany(query, data)
    conn.commit()
    conn.close()
    
    相比多次单个插入,批量插入减少了数据库交互次数,提高了性能。

网络编程实战案例

简单文件传输服务器

  1. 基于 TCP 的文件传输
    • 服务器端
      import socket
      import os
      
      server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      server_address = ('127.0.0.1', 8888)
      server_socket.bind(server_address)
      server_socket.listen(5)
      
      while True:
          client_socket, client_address = server_socket.accept()
          file_name = client_socket.recv(1024).decode('utf - 8')
          with open(file_name, 'wb') as file:
              while True:
                  data = client_socket.recv(1024)
                  if not data:
                      break
                  file.write(data)
          print(f"File {file_name} received successfully.")
          client_socket.close()
      
    • 客户端
      import socket
      import os
      
      client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      server_address = ('127.0.0.1', 8888)
      client_socket.connect(server_address)
      
      file_name = 'example.txt'
      client_socket.sendall(file_name.encode('utf - 8'))
      with open(file_name, 'rb') as file:
          while True:
              data = file.read(1024)
              if not data:
                  break
              client_socket.sendall(data)
      print(f"File {file_name} sent successfully.")
      client_socket.close()
      
    服务器首先接收客户端发送的文件名,然后接收文件内容并保存。客户端发送文件名后,逐块读取文件内容并发送给服务器。
  2. 基于 UDP 的文件传输(简单实现)
    • 服务器端
      import socket
      import os
      
      server_socket = socket.socket(socket.AF_INET, socket.SOCK_DUDP)
      server_address = ('127.0.0.1', 8888)
      server_socket.bind(server_address)
      
      file_name = None
      file = None
      while True:
          data, client_address = server_socket.recvfrom(1024)
          if not file_name:
              file_name = data.decode('utf - 8')
              file = open(file_name, 'wb')
          else:
              if not data:
                  break
              file.write(data)
      if file:
          file.close()
      print(f"File {file_name} received successfully.")
      
    • 客户端
      import socket
      import os
      
      client_socket = socket.socket(socket.AF_INET, socket.SOCK_DUDP)
      server_address = ('127.0.0.1', 8888)
      
      file_name = 'example.txt'
      client_socket.sendto(file_name.encode('utf - 8'), server_address)
      with open(file_name, 'rb') as file:
          while True:
              data = file.read(1024)
              if not data:
                  break
              client_socket.sendto(data, server_address)
      print(f"File {file_name} sent successfully.")
      client_socket.close()
      
    UDP 文件传输实现相对简单,但由于 UDP 无连接和不可靠特性,实际应用中可能需要增加更多机制(如校验和、重传等)来确保文件完整传输。

分布式爬虫系统(部分实现)

  1. 任务分配节点 使用 Flask 搭建一个简单的任务分配节点,负责接收爬虫任务请求并分配给爬虫节点。
    from flask import Flask, request
    
    app = Flask(__name__)
    
    @app.route('/assign_task', methods = ['POST'])
    def assign_task():
        task = request.json
        # 这里简单模拟将任务分配给一个固定的爬虫节点
        # 实际应用中可根据负载等进行动态分配
        crawler_node = 'http://crawler - node - 1:8000'
        import requests
        response = requests.post(crawler_node + '/receive_task', json = task)
        return response.text
    
    if __name__ == '__main__':
        app.run(host='0.0.0.0', port = 8888)
    
  2. 爬虫节点 爬虫节点接收任务分配节点的任务并执行爬虫操作。
    from flask import Flask, request
    import requests
    
    app = Flask(__name__)
    
    @app.route('/receive_task', methods = ['POST'])
    def receive_task():
        task = request.json
        url = task.get('url')
        response = requests.get(url)
        # 这里简单返回响应内容,实际可进行数据解析等操作
        return response.text
    
    if __name__ == '__main__':
        app.run(host='0.0.0.0', port = 8000)
    
    这个简单的分布式爬虫系统通过 HTTP 请求进行任务分配和数据交互,实际应用中可扩展为更复杂的分布式架构,如使用消息队列来管理任务分配等。