mirror of
https://github.com/quantum5/django-csp-advanced.git
synced 2025-04-24 11:22:00 -04:00
65 lines
2.6 KiB
Python
65 lines
2.6 KiB
Python
import logging
|
|
from django.conf import settings
|
|
from django.core.exceptions import MiddlewareNotUsed
|
|
from django.utils import six
|
|
|
|
from csp_advanced.csp import CSPCompiler, InvalidCSPError
|
|
from csp_advanced.utils import is_callable_csp_dict, call_csp_dict, merge_csp_dict
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class AdvancedCSPMiddleware(object):
|
|
def __init__(self, get_response=None):
|
|
self.get_response = get_response
|
|
self.enforced_csp = getattr(settings, 'ADVANCED_CSP', None) or {}
|
|
self.enforced_csp_is_str = isinstance(self.enforced_csp, six.string_types)
|
|
self.enforced_csp_callable = is_callable_csp_dict(self.enforced_csp)
|
|
self.report_csp = getattr(settings, 'ADVANCED_CSP_REPORT_ONLY', None) or {}
|
|
self.report_csp_callable = is_callable_csp_dict(self.report_csp)
|
|
self.report_csp_is_str = isinstance(self.enforced_csp, six.string_types)
|
|
self.report_only_csp = not self.enforced_csp
|
|
|
|
if not self.enforced_csp and not self.report_csp:
|
|
raise MiddlewareNotUsed()
|
|
|
|
def add_csp_header(self, request, response, header, base, can_call, is_str, attrs):
|
|
if header in response:
|
|
return
|
|
if is_str:
|
|
response[header] = base
|
|
return
|
|
csp = call_csp_dict(base, request, response) if can_call else base
|
|
|
|
for attr in attrs:
|
|
update = getattr(response, attr, None)
|
|
if update is not None:
|
|
if update.pop('override', False):
|
|
csp = update
|
|
else:
|
|
csp = merge_csp_dict(csp, update)
|
|
break
|
|
|
|
if not csp:
|
|
return
|
|
|
|
try:
|
|
policy = CSPCompiler(csp).compile()
|
|
except InvalidCSPError:
|
|
log.exception('Invalid CSP on page: %s', request.get_full_path())
|
|
return
|
|
response[header] = policy
|
|
|
|
def process_response(self, request, response):
|
|
if self.enforced_csp:
|
|
self.add_csp_header(request, response, 'Content-Security-Policy', self.enforced_csp,
|
|
self.enforced_csp_callable, self.enforced_csp_is_str, ('csp',))
|
|
if self.report_csp:
|
|
self.add_csp_header(request, response, 'Content-Security-Policy-Report-Only',
|
|
self.report_csp, self.report_csp_callable, self.report_csp_is_str,
|
|
('csp_report',) if self.enforced_csp else ('csp_report', 'csp'))
|
|
return response
|
|
|
|
def __call__(self, request):
|
|
return self.process_response(request, self.get_response(request))
|