两种方法实现django审计,可查看API调用情况、登录情况等
日志审计
第三方插件
使用插件:django-easy-audit
安装
pip install django-easy-audit
Add ‘easyaudit’ to your
INSTALLED_APPS
like this:1
2
3
4INSTALLED_APPS = [
...
'easyaudit',
]Add Easy Audit’s middleware to your
MIDDLEWARE
(orMIDDLEWARE_CLASSES
) setting like this:1
2
3
4MIDDLEWARE = (
...
'easyaudit.middleware.easyaudit.EasyAuditMiddleware',
)Run
python manage.py migrate easyaudit
to create the app’s models.
效果
访问http://127.0.0.1:8000/admin/
自建app,注册到middleware
配置
创建app。
python manager.py startapp audit
models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47from django.db import models
from base.models import BaseModel
# Create your models here.
class AuditLog(BaseModel):
url = models.CharField(
max_length=254,
db_index=True,
verbose_name="URL",
help_text='URL')
method = models.CharField(
max_length=20,
db_index=True,
verbose_name="请求方式",
help_text='请求方式')
query_string = models.CharField(
max_length=254,
null=True,
verbose_name="URL请求参数",
help_text='URL请求参数')
body = models.TextField(
verbose_name="请求Body数据",
help_text="请求Body数据")
remote_ip = models.CharField(
max_length=50,
verbose_name="远程主机IP",
help_text="远程主机IP")
username = models.CharField(
max_length=50,
verbose_name="请求用户",
help_text="请求用户")
status_code = models.IntegerField(
null=True,
blank=True,
verbose_name="请求状态码",
help_text="请求状态码")
def __str__(self):
return self.url
class Meta:
db_table = "audit_log"
verbose_name = 'API 审计'
verbose_name_plural = verbose_nameserlalizers.py
1
2
3
4
5
6
7
8
9
10from .models import AuditLog
from rest_framework.serializers import ModelSerializer
class AuditLogModelSerializer(ModelSerializer):
class Meta:
model = AuditLog
fields = "__all__"views.py
1
2
3
4
5
6
7
8
9
10
11from .models import AuditLog
from .serializers import AuditLogModelSerializer
from base.views import BaseModelViewSet
# Create your views here.
class AuditLogViewSet(BaseModelViewSet):
queryset = AuditLog.objects.all()
serializer_class = AuditLogModelSerializer
search_fields = ["url", "username", "status_code"]admin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20from .models import AuditLog
from django.contrib import admin
# Register your models here.
class AuditLogAdmin(admin.ModelAdmin):
list_display = (
"url",
"method",
"query_string",
"body",
"remote_ip",
"username",
"status_code",
"create_time",
"update_time",
)同步数据库
1
python manage.py migrate audit
middleware/audit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79import json
from apps.audit.models import AuditLog
from django.utils.deprecation import MiddlewareMixin
from django.contrib.auth.models import AnonymousUser
from django.contrib.auth import load_backend
from django.contrib.auth.middleware import get_user
from django.utils.functional import SimpleLazyObject
from rest_framework.request import Request
from rest_framework_jwt.authentication import JSONWebTokenAuthentication
def get_user_jwt(request):
user = get_user(request)
print(user)
# if user.is_authenticated():
# return user
try:
user_jwt = JSONWebTokenAuthentication().authenticate(Request(request))
if user_jwt is not None:
return user_jwt[0]
except:
pass
return user
class EventAuditMiddleware(MiddlewareMixin):
"""
HTTP请求审计
"""
RequestId = None
def process_request(self, request):
"""接收请求"""
request.user = SimpleLazyObject(lambda: get_user_jwt(request))
print(request.user)
data = {
'url': request.META['PATH_INFO'],
'method': request.META['REQUEST_METHOD'],
'query_string': request.META['QUERY_STRING'],
'remote_ip': request.environ['REMOTE_ADDR'],
"username": request.user
}
if request.META['REQUEST_METHOD'] in ["POST", "PUT"]:
try:
body = json.loads(request.body)
except Exception as e:
print("RequestEventAuditLog process_request", e.args)
body = ""
else:
body = ""
data['body'] = body
request = AuditLog.objects.create(**data)
self.RequestId = request.pk
def process_view(self, request, view_func, view_args, view_kwargs):
pass
def process_template_response(self, request, response):
request = AuditLog.objects.get(pk=self.RequestId)
request.status_code = response.status_code
request.save()
return response
def process_exception(self, request, exception):
"""异常"""
request = AuditLog.objects.get(pk=self.RequestId)
request.status_code = 500
request.save()
def process_response(self, request, response):
"""处理完成"""
request = AuditLog.objects.get(pk=self.RequestId)
request.status_code = response.status_code
request.save()
return responsesettings.py
1
2
3
4MIDDLEWARE = (
...
"middleware.audit.EventAuditMiddleware",
)