MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Python RESTful API的身份认证与授权机制

2022-08-044.1k 阅读

1. 身份认证与授权的基本概念

在探讨Python RESTful API的身份认证与授权机制之前,我们先来明确一下这两个关键概念。

1.1 身份认证(Authentication)

身份认证主要是确认用户是谁,验证请求者的身份。例如,当用户登录一个网站时,输入用户名和密码,系统会根据存储的用户信息来验证这些凭据,以确定该用户是否就是其所声称的那个人。在RESTful API的场景中,客户端发送请求到服务器,服务器需要验证客户端的身份,以确保请求来自合法的实体。常见的身份认证方式有:

  • 基本认证(Basic Authentication):这是一种简单的认证方案,客户端在请求头中发送包含用户名和密码的Base64编码字符串。服务器接收到请求后,解码字符串并验证用户名和密码。虽然简单,但由于用户名和密码在每次请求中都发送,存在安全风险,不适用于敏感数据传输。
  • Bearer Token认证:客户端获取一个令牌(token),通常是JWT(JSON Web Token),并在请求头中携带这个令牌。服务器验证令牌的有效性,若有效则确认客户端身份。这种方式避免了在每次请求中发送用户名和密码,安全性更高。

1.2 授权(Authorization)

授权是在身份认证通过后,确定已认证的用户或实体被允许执行哪些操作。例如,一个系统中可能有普通用户和管理员用户,普通用户只能查看自己的信息,而管理员用户可以查看和修改所有用户的信息。授权决定了用户对资源的访问权限。常见的授权模型有:

  • 基于角色的访问控制(RBAC - Role - Based Access Control):将权限分配给角色,用户被分配到不同的角色,从而间接获得相应的权限。例如,系统中有“用户”和“管理员”角色,“用户”角色可能只有查看个人资料的权限,“管理员”角色则有管理所有用户资料的权限。
  • 基于资源的访问控制(RBAC - Resource - Based Access Control):直接将权限与资源关联,根据用户的身份和请求的资源来决定是否授权访问。例如,某个文件只有文件所有者和特定群组的用户可以访问。

2. Python中实现身份认证

2.1 使用Flask框架实现基本认证

Flask是Python中流行的Web框架,我们可以利用它来实现基本认证。首先,安装Flask:

pip install flask

以下是一个简单的Flask应用实现基本认证的示例代码:

from flask import Flask, request, Response
from werkzeug.security import check_password_hash, generate_password_hash
import base64

app = Flask(__name__)

# 模拟用户数据库
users = {
    "user1": generate_password_hash("password1")
}

@app.route('/protected', methods=['GET'])
def protected():
    auth = request.authorization
    if not auth or not auth.username or not auth.password:
        return Response('Could not verify your access level for that URL.\nYou have to login with proper credentials', 401, {'WWW - Authenticate': 'Basic realm="Login required!"'})

    if auth.username in users and check_password_hash(users[auth.username], auth.password):
        return "Access granted"
    else:
        return Response('Could not verify your access level for that URL.\nYou have to login with proper credentials', 401, {'WWW - Authenticate': 'Basic realm="Login required!"'})


if __name__ == '__main__':
    app.run(debug=True)

在上述代码中:

  • 我们创建了一个简单的Flask应用。
  • users字典模拟了用户数据库,使用generate_password_hash对密码进行哈希存储。
  • /protected路由中,通过request.authorization获取请求头中的认证信息。如果认证信息缺失或用户名密码不正确,返回401未授权响应,并在响应头中设置WWW - Authenticate字段,提示客户端需要进行认证。如果认证成功,则返回“Access granted”。

2.2 使用Django框架实现基本认证

Django也是Python常用的Web框架,它有内置的用户认证系统。首先,创建一个Django项目和应用:

django - admin startproject myproject
cd myproject
python manage.py startapp myapp

配置settings.py文件,启用用户认证系统:

INSTALLED_APPS = [
   ...
    'django.contrib.auth',
    'django.contrib.contenttypes',
   ...
]

myapp/views.py中实现一个简单的基于基本认证的视图:

from django.http import HttpResponse
from django.contrib.auth import authenticate
from django.views.decorators.csrf import csrf_exempt
import base64


@csrf_exempt
def protected_view(request):
    if 'Authorization' in request.headers:
        auth_header = request.headers['Authorization']
        auth_type, auth_token = auth_header.split(' ', 1)
        if auth_type.lower() == 'basic':
            decoded = base64.b64decode(auth_token).decode('utf - 8')
            username, password = decoded.split(':', 1)
            user = authenticate(request, username=username, password=password)
            if user is not None:
                return HttpResponse("Access granted")
    return HttpResponse("Unauthorized", status=401)


myproject/urls.py中添加路由:

from django.contrib import admin
from django.urls import path
from myapp.views import protected_view

urlpatterns = [
    path('admin/', admin.site.urls),
    path('protected/', protected_view),
]

在上述Django示例中:

  • 我们利用django.contrib.auth中的authenticate函数来验证用户。
  • 通过request.headers获取Authorization头信息,解析出用户名和密码进行认证。如果认证成功,返回“Access granted”,否则返回401未授权响应。

3. Python中实现Bearer Token认证

3.1 使用PyJWT库实现JWT令牌认证

PyJWT是Python中用于处理JSON Web Tokens的库。安装它:

pip install PyJWT

以下是一个使用Flask和PyJWT实现Bearer Token认证的示例:

from flask import Flask, request, Response
import jwt
from datetime import datetime, timedelta
from werkzeug.security import check_password_hash, generate_password_hash


app = Flask(__name__)
app.config['SECRET_KEY'] = 'your - secret - key'

# 模拟用户数据库
users = {
    "user1": generate_password_hash("password1")
}


@app.route('/login', methods=['POST'])
def login():
    auth = request.authorization
    if not auth or not auth.username or not auth.password:
        return Response('Could not verify', 401, {'WWW - Authenticate': 'Basic realm="Login required!"'})

    if auth.username in users and check_password_hash(users[auth.username], auth.password):
        expiration = datetime.utcnow() + timedelta(minutes=30)
        payload = {
            'username': auth.username,
            'exp': expiration
        }
        token = jwt.encode(payload, app.config['SECRET_KEY'], algorithm='HS256')
        return {'token': token}
    else:
        return Response('Could not verify', 401, {'WWW - Authenticate': 'Basic realm="Login required!"'})


@app.route('/protected', methods=['GET'])
def protected():
    token = None
    if 'Authorization' in request.headers:
        token = request.headers['Authorization'].split(' ')[1]

    if not token:
        return Response('Token is missing!', 401)

    try:
        data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
        current_user = data['username']
        return f"Hello, {current_user}! This is a protected route."
    except jwt.ExpiredSignatureError:
        return Response('Token has expired!', 401)
    except jwt.InvalidTokenError:
        return Response('Invalid token!', 401)


if __name__ == '__main__':
    app.run(debug=True)

在这个示例中:

  • /login路由用于用户登录,验证用户名和密码后,生成一个包含用户名和过期时间的JWT令牌,并返回给客户端。
  • /protected路由用于保护资源,从请求头中获取令牌,验证令牌的有效性。如果令牌有效,返回受保护资源的内容;如果令牌缺失、过期或无效,则返回相应的错误响应。

3.2 使用Django Rest Framework实现JWT认证

Django Rest Framework(DRF)是一个强大的用于构建Web APIs的框架,它对JWT认证有很好的支持。首先,安装必要的库:

pip install djangorestframework
pip install djangorestframework - simplejwt

settings.py中配置DRF和JWT:

INSTALLED_APPS = [
   ...
   'rest_framework',
   'rest_framework_simplejwt',
   ...
]

REST_FRAMEWORK = {
    'DEFAULT_AUTHENTICATION_CLASSES': (
       'rest_framework_simplejwt.authentication.JWTAuthentication',
    )
}

myapp/views.py中创建一些视图:

from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from django.contrib.auth.models import User
from rest_framework_simplejwt.tokens import RefreshToken


class LoginView(APIView):
    def post(self, request):
        username = request.data.get('username')
        password = request.data.get('password')
        user = User.objects.filter(username=username).first()
        if user and user.check_password(password):
            refresh = RefreshToken.for_user(user)
            return Response({
               'refresh': str(refresh),
                'access': str(refresh.access_token),
            })
        return Response({'detail': 'Invalid credentials'}, status=status.HTTP_401_UNAUTHORIZED)


class ProtectedView(APIView):
    def get(self, request):
        return Response({'message': 'This is a protected view'})


myproject/urls.py中添加路由:

from django.contrib import admin
from django.urls import path
from myapp.views import LoginView, ProtectedView
from rest_framework_simplejwt.views import TokenRefreshView

urlpatterns = [
    path('admin/', admin.site.urls),
    path('login/', LoginView.as_view()),
    path('protected/', ProtectedView.as_view()),
    path('api/token/refresh/', TokenRefreshView.as_view(), name='token_refresh'),
]

在这个DRF示例中:

  • LoginView处理用户登录,验证成功后返回刷新令牌(refresh token)和访问令牌(access token)。
  • ProtectedView是一个受保护的视图,由于配置了JWT认证,只有携带有效JWT令牌的请求才能访问该视图。

4. 授权机制的实现

4.1 基于角色的访问控制(RBAC)在Python中的实现

以Flask为例,我们可以通过在用户模型中添加角色信息,并在视图函数中根据角色进行授权。首先,定义角色和用户模型:

from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import check_password_hash, generate_password_hash

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] ='sqlite:///example.db'
db = SQLAlchemy(app)


class Role(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(50), unique=True)


class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(50), unique=True)
    password = db.Column(db.String(100))
    role_id = db.Column(db.Integer, db.ForeignKey('role.id'))
    role = db.relationship('Role', backref=db.backref('users', lazy='dynamic'))


def create_user(username, password, role_name):
    role = Role.query.filter_by(name=role_name).first()
    if not role:
        role = Role(name=role_name)
        db.session.add(role)
        db.session.commit()
    hashed_password = generate_password_hash(password)
    user = User(username=username, password=hashed_password, role=role)
    db.session.add(user)
    db.session.commit()
    return user


然后,在视图函数中进行基于角色的授权:

from flask import request, Response
from functools import wraps


def role_required(role_name):
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            token = None
            if 'Authorization' in request.headers:
                token = request.headers['Authorization'].split(' ')[1]
            if not token:
                return Response('Token is missing!', 401)
            try:
                data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
                current_user = User.query.filter_by(username=data['username']).first()
                if current_user.role.name!= role_name:
                    return Response('Access denied', 403)
                return f(*args, **kwargs)
            except jwt.ExpiredSignatureError:
                return Response('Token has expired!', 401)
            except jwt.InvalidTokenError:
                return Response('Invalid token!', 401)

        return decorated_function

    return decorator


@app.route('/admin - only', methods=['GET'])
@role_required('admin')
def admin_only():
    return "This is an admin - only route"


在上述代码中:

  • 我们定义了RoleUser模型,通过外键关联用户和角色。
  • role_required装饰器用于检查用户的角色是否符合要求。如果用户角色不是指定的角色,返回403禁止访问响应。

4.2 基于资源的访问控制(RBAC)在Python中的实现

以Django为例,假设我们有一个文章资源,只有文章作者或管理员可以编辑。首先,定义文章模型:

from django.db import models
from django.contrib.auth.models import User


class Article(models.Model):
    title = models.CharField(max_length=100)
    content = models.TextField()
    author = models.ForeignKey(User, on_delete=models.CASCADE)


然后,在视图函数中进行基于资源的授权:

from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from myapp.models import Article


class ArticleEditView(APIView):
    def put(self, request, article_id):
        article = Article.objects.filter(id=article_id).first()
        if not article:
            return Response({'detail': 'Article not found'}, status=status.HTTP_404_NOT_FOUND)
        if request.user!= article.author and not request.user.is_staff:
            return Response({'detail': 'Access denied'}, status=status.HTTP_403_FORBIDDEN)
        article.title = request.data.get('title', article.title)
        article.content = request.data.get('content', article.content)
        article.save()
        return Response({'detail': 'Article updated successfully'})


在这个Django示例中:

  • ArticleEditView处理文章编辑请求。
  • 首先检查文章是否存在,然后验证当前用户是否是文章作者或管理员。如果不是,则返回403禁止访问响应;如果是,则允许更新文章。

5. 安全考虑

5.1 防止中间人攻击

在使用基本认证时,由于用户名和密码在网络上传输,容易受到中间人攻击。为了防止这种情况,应该始终使用HTTPS协议。HTTPS通过加密传输数据,确保中间人无法窃取或篡改数据。对于Bearer Token认证,虽然令牌比用户名密码更安全,但同样建议在HTTPS环境下使用,以防止令牌被截取。

5.2 令牌的安全存储与传输

对于Bearer Token,客户端应该安全地存储令牌,例如在HTTP - only的Cookie中或本地存储(但要注意本地存储存在一定风险)。在传输过程中,确保令牌在请求头中正确携带,并且服务器对令牌进行严格的验证,包括验证签名、过期时间等。

5.3 密码安全

在处理用户密码时,如在基本认证或用户注册场景中,永远不要明文存储密码。使用强大的密码哈希算法,如bcrypt、argon2等。在Python中,werkzeug.security提供了generate_password_hashcheck_password_hash函数来处理密码哈希和验证。

5.4 防止暴力破解

为了防止暴力破解用户名和密码,应该实施一些策略,如限制登录尝试次数、使用验证码、增加密码复杂度要求等。对于令牌认证,要确保令牌的生成算法足够强大,难以被猜测或破解。

通过深入理解并合理实现Python RESTful API中的身份认证与授权机制,并注重安全方面的考虑,可以构建出安全可靠的Web服务。在实际应用中,应根据具体的业务需求和安全要求,选择合适的认证和授权方式,并不断优化和完善安全策略。