mirror of
https://github.com/quantum5/django-csp-advanced.git
synced 2025-04-24 11:22:00 -04:00
Add CSP middleware and tests.
This commit is contained in:
parent
e4ac7c7509
commit
d5b3e2c100
|
@ -1,130 +1,130 @@
|
||||||
from itertools import chain
|
from itertools import chain
|
||||||
|
|
||||||
|
|
||||||
class InvalidCSPError(ValueError):
|
class InvalidCSPError(ValueError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class CSPCompiler(object):
|
class CSPCompiler(object):
|
||||||
CSP_LISTS = {
|
CSP_LISTS = {
|
||||||
# Fetch directives:
|
# Fetch directives:
|
||||||
'connect-src',
|
'connect-src',
|
||||||
'child-src',
|
'child-src',
|
||||||
'default-src',
|
'default-src',
|
||||||
'font-src',
|
'font-src',
|
||||||
'frame-src',
|
'frame-src',
|
||||||
'img-src',
|
'img-src',
|
||||||
'manifest-src',
|
'manifest-src',
|
||||||
'media-src',
|
'media-src',
|
||||||
'object-src',
|
'object-src',
|
||||||
'script-src',
|
'script-src',
|
||||||
'style-src',
|
'style-src',
|
||||||
'worker-src',
|
'worker-src',
|
||||||
|
|
||||||
# Navigation directives:
|
# Navigation directives:
|
||||||
'form-action',
|
'form-action',
|
||||||
'frame-ancestors',
|
'frame-ancestors',
|
||||||
|
|
||||||
# Document directives:
|
# Document directives:
|
||||||
'base-uri',
|
'base-uri',
|
||||||
'plugin-types',
|
'plugin-types',
|
||||||
}
|
}
|
||||||
|
|
||||||
CSP_BOOLEAN = {
|
CSP_BOOLEAN = {
|
||||||
'upgrade-insecure-requests',
|
'upgrade-insecure-requests',
|
||||||
'block-all-mixed-content',
|
'block-all-mixed-content',
|
||||||
}
|
}
|
||||||
|
|
||||||
CSP_FETCH_SPECIAL = {
|
CSP_FETCH_SPECIAL = {
|
||||||
'self',
|
'self',
|
||||||
'none',
|
'none',
|
||||||
'unsafe-inline',
|
'unsafe-inline',
|
||||||
'unsafe-eval',
|
'unsafe-eval',
|
||||||
'strict-dynamic',
|
'strict-dynamic',
|
||||||
}
|
}
|
||||||
|
|
||||||
CSP_PREFIX_SPECIAL = (
|
CSP_PREFIX_SPECIAL = (
|
||||||
'nonce-',
|
'nonce-',
|
||||||
'sha256-',
|
'sha256-',
|
||||||
'sha384-',
|
'sha384-',
|
||||||
'sha512-'
|
'sha512-'
|
||||||
)
|
)
|
||||||
|
|
||||||
CSP_SANDBOX_VALID = {
|
CSP_SANDBOX_VALID = {
|
||||||
'allow-forms',
|
'allow-forms',
|
||||||
'allow-modals',
|
'allow-modals',
|
||||||
'allow-orientation-lock',
|
'allow-orientation-lock',
|
||||||
'allow-pointer-lock',
|
'allow-pointer-lock',
|
||||||
'allow-popups',
|
'allow-popups',
|
||||||
'allow-popups-to-escape-sandbox',
|
'allow-popups-to-escape-sandbox',
|
||||||
'allow-presentation',
|
'allow-presentation',
|
||||||
'allow-same-origin',
|
'allow-same-origin',
|
||||||
'allow-scripts',
|
'allow-scripts',
|
||||||
'allow-top-navigation',
|
'allow-top-navigation',
|
||||||
}
|
}
|
||||||
|
|
||||||
CSP_REQUIRE_SRI_VALID = {
|
CSP_REQUIRE_SRI_VALID = {
|
||||||
'script',
|
'script',
|
||||||
'style',
|
'style',
|
||||||
'script style',
|
'script style',
|
||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, csp_dict):
|
def __init__(self, csp_dict):
|
||||||
self.csp = csp_dict
|
self.csp = csp_dict
|
||||||
|
|
||||||
def compile(self):
|
def compile(self):
|
||||||
pieces = []
|
pieces = []
|
||||||
for name, value in self.csp.iteritems():
|
for name, value in self.csp.iteritems():
|
||||||
if name in self.CSP_LISTS:
|
if name in self.CSP_LISTS:
|
||||||
if value:
|
if value:
|
||||||
pieces.append(self.compile_list(name, value))
|
pieces.append(self.compile_list(name, value))
|
||||||
elif name in self.CSP_BOOLEAN:
|
elif name in self.CSP_BOOLEAN:
|
||||||
if value:
|
if value:
|
||||||
pieces.append(name)
|
pieces.append(name)
|
||||||
elif name == 'sandbox':
|
elif name == 'sandbox':
|
||||||
if value:
|
if value:
|
||||||
pieces.append(self.compile_sandbox(value))
|
pieces.append(self.compile_sandbox(value))
|
||||||
elif name == 'report-uri':
|
elif name == 'report-uri':
|
||||||
pieces.append(self.compile_report_uri(value))
|
pieces.append(self.compile_report_uri(value))
|
||||||
elif name == 'require-sri-for':
|
elif name == 'require-sri-for':
|
||||||
pieces.append(self.compile_require_sri_for(value))
|
pieces.append(self.compile_require_sri_for(value))
|
||||||
else:
|
else:
|
||||||
raise InvalidCSPError('Unknown directive: %s' % (name,))
|
raise InvalidCSPError('Unknown directive: %s' % (name,))
|
||||||
return '; '.join(pieces)
|
return '; '.join(pieces)
|
||||||
|
|
||||||
def compile_list(self, name, value_list):
|
def compile_list(self, name, value_list):
|
||||||
self.ensure_list(name, value_list)
|
self.ensure_list(name, value_list)
|
||||||
values = [name]
|
values = [name]
|
||||||
for value in value_list:
|
for value in value_list:
|
||||||
if value in self.CSP_FETCH_SPECIAL or value.startswith(self.CSP_PREFIX_SPECIAL):
|
if value in self.CSP_FETCH_SPECIAL or value.startswith(self.CSP_PREFIX_SPECIAL):
|
||||||
values.append("'%s'" % value)
|
values.append("'%s'" % value)
|
||||||
else:
|
else:
|
||||||
values.append(value)
|
values.append(value)
|
||||||
return ' '.join(values)
|
return ' '.join(values)
|
||||||
|
|
||||||
def compile_sandbox(self, values):
|
def compile_sandbox(self, values):
|
||||||
self.ensure_list('sandbox', values)
|
self.ensure_list('sandbox', values)
|
||||||
for value in values:
|
for value in values:
|
||||||
if value not in self.CSP_SANDBOX_VALID:
|
if value not in self.CSP_SANDBOX_VALID:
|
||||||
raise InvalidCSPError('Unknown sandbox value: %s' % (value,))
|
raise InvalidCSPError('Unknown sandbox value: %s' % (value,))
|
||||||
return ' '.join(chain(['sandbox'], values))
|
return ' '.join(chain(['sandbox'], values))
|
||||||
|
|
||||||
def compile_report_uri(self, value):
|
def compile_report_uri(self, value):
|
||||||
self.ensure_str('report-uri', value)
|
self.ensure_str('report-uri', value)
|
||||||
return 'report-uri %s' % value
|
return 'report-uri %s' % value
|
||||||
|
|
||||||
def compile_require_sri_for(self, value):
|
def compile_require_sri_for(self, value):
|
||||||
self.ensure_str('require-sri-for', value)
|
self.ensure_str('require-sri-for', value)
|
||||||
if value not in self.CSP_REQUIRE_SRI_VALID:
|
if value not in self.CSP_REQUIRE_SRI_VALID:
|
||||||
raise InvalidCSPError('Unknown require-sri-for value: %s' % (value,))
|
raise InvalidCSPError('Unknown require-sri-for value: %s' % (value,))
|
||||||
return 'require-sri-for %s' % value
|
return 'require-sri-for %s' % value
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def ensure_list(name, value):
|
def ensure_list(name, value):
|
||||||
if not isinstance(value, (list, tuple, set)):
|
if not isinstance(value, (list, tuple, set)):
|
||||||
raise InvalidCSPError('Values for %s must be list-like type, not %s', (name, type(value)))
|
raise InvalidCSPError('Values for %s must be list-like type, not %s', (name, type(value)))
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def ensure_str(name, value):
|
def ensure_str(name, value):
|
||||||
if not isinstance(value, basestring):
|
if not isinstance(value, basestring):
|
||||||
raise InvalidCSPError('Values for %s must be a string type, not %s', (name, type(value)))
|
raise InvalidCSPError('Values for %s must be a string type, not %s', (name, type(value)))
|
||||||
|
|
|
@ -0,0 +1,53 @@
|
||||||
|
from django.conf import settings
|
||||||
|
from django.core.exceptions import MiddlewareNotUsed
|
||||||
|
|
||||||
|
from csp_advanced.csp import CSPCompiler
|
||||||
|
from csp_advanced.utils import is_callable_csp_dict, call_csp_dict, merge_csp_dict
|
||||||
|
|
||||||
|
|
||||||
|
class AdvancedCSPMiddleware(object):
|
||||||
|
def __init__(self, get_response=None):
|
||||||
|
self.get_response = get_response
|
||||||
|
self.enforced_csp = getattr(settings, 'ADVANCED_CSP', None) or {}
|
||||||
|
self.enforced_csp_is_str = isinstance(self.enforced_csp, basestring)
|
||||||
|
self.enforced_csp_callable = is_callable_csp_dict(self.enforced_csp)
|
||||||
|
self.report_csp = getattr(settings, 'ADVANCED_CSP_REPORT_ONLY', None) or {}
|
||||||
|
self.report_csp_callable = is_callable_csp_dict(self.report_csp)
|
||||||
|
self.report_csp_is_str = isinstance(self.enforced_csp, basestring)
|
||||||
|
self.report_only_csp = not self.enforced_csp
|
||||||
|
|
||||||
|
if not self.enforced_csp and not self.report_csp:
|
||||||
|
raise MiddlewareNotUsed()
|
||||||
|
|
||||||
|
def add_csp_header(self, request, response, header, base, can_call, is_str, attrs):
|
||||||
|
if header in response:
|
||||||
|
return
|
||||||
|
if is_str:
|
||||||
|
response[header] = base
|
||||||
|
return
|
||||||
|
csp = call_csp_dict(base, request, response) if can_call else base
|
||||||
|
|
||||||
|
for attr in attrs:
|
||||||
|
update = getattr(response, attr, None)
|
||||||
|
if update is not None:
|
||||||
|
if update.pop('override', False):
|
||||||
|
csp = update
|
||||||
|
else:
|
||||||
|
csp = merge_csp_dict(csp, update)
|
||||||
|
break
|
||||||
|
|
||||||
|
if csp:
|
||||||
|
response[header] = CSPCompiler(csp).compile()
|
||||||
|
|
||||||
|
def process_response(self, request, response):
|
||||||
|
if self.enforced_csp:
|
||||||
|
self.add_csp_header(request, response, 'Content-Security-Policy', self.enforced_csp,
|
||||||
|
self.enforced_csp_callable, self.enforced_csp_is_str, ('csp',))
|
||||||
|
if self.report_csp:
|
||||||
|
self.add_csp_header(request, response, 'Content-Security-Policy-Report-Only',
|
||||||
|
self.report_csp, self.report_csp_callable, self.report_csp_is_str,
|
||||||
|
('csp_report',) if self.enforced_csp else ('csp_report', 'csp'))
|
||||||
|
return response
|
||||||
|
|
||||||
|
def __call__(self, request):
|
||||||
|
return self.process_response(request, self.get_response(request))
|
|
@ -1,9 +1,13 @@
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
|
|
||||||
from django.test import SimpleTestCase
|
from django.core.exceptions import MiddlewareNotUsed
|
||||||
|
from django.http import HttpResponse
|
||||||
|
from django.test import SimpleTestCase, RequestFactory, override_settings
|
||||||
|
from django.utils.decorators import decorator_from_middleware_with_args
|
||||||
|
|
||||||
from csp import CSPCompiler, InvalidCSPError
|
from csp import CSPCompiler, InvalidCSPError
|
||||||
from utils import callable_csp_dict, merge_csp_dict
|
from csp_advanced.middleware import AdvancedCSPMiddleware
|
||||||
|
from utils import call_csp_dict, merge_csp_dict, is_callable_csp_dict
|
||||||
|
|
||||||
|
|
||||||
class CSPCompileTest(SimpleTestCase):
|
class CSPCompileTest(SimpleTestCase):
|
||||||
|
@ -88,20 +92,20 @@ class CallableCSPDictTest(SimpleTestCase):
|
||||||
return func
|
return func
|
||||||
|
|
||||||
def test_callable(self):
|
def test_callable(self):
|
||||||
self.assertEqual(callable_csp_dict(
|
self.assertEqual(call_csp_dict(
|
||||||
self.make_request_taker({'key': 'value'}), self.request, self.response
|
self.make_request_taker({'key': 'value'}), self.request, self.response
|
||||||
), {'key': 'value'})
|
), {'key': 'value'})
|
||||||
|
|
||||||
def test_normal_dict(self):
|
def test_normal_dict(self):
|
||||||
self.assertEqual(callable_csp_dict({'key': 'value'}, None, None), {'key': 'value'})
|
self.assertEqual(call_csp_dict({'key': 'value'}, None, None), {'key': 'value'})
|
||||||
|
|
||||||
def test_callable_entry(self):
|
def test_callable_entry(self):
|
||||||
self.assertEqual(callable_csp_dict(
|
self.assertEqual(call_csp_dict(
|
||||||
{'key': self.make_request_taker('value')}, self.request, self.response
|
{'key': self.make_request_taker('value')}, self.request, self.response
|
||||||
), {'key': 'value'})
|
), {'key': 'value'})
|
||||||
|
|
||||||
def test_mixed_entry(self):
|
def test_mixed_entry(self):
|
||||||
self.assertEqual(callable_csp_dict({
|
self.assertEqual(call_csp_dict({
|
||||||
'key': self.make_request_taker('value'),
|
'key': self.make_request_taker('value'),
|
||||||
'name': 'mixed',
|
'name': 'mixed',
|
||||||
}, self.request, self.response), {
|
}, self.request, self.response), {
|
||||||
|
@ -109,6 +113,13 @@ class CallableCSPDictTest(SimpleTestCase):
|
||||||
'name': 'mixed'
|
'name': 'mixed'
|
||||||
})
|
})
|
||||||
|
|
||||||
|
def test_is_callable(self):
|
||||||
|
self.assertTrue(is_callable_csp_dict(self.make_request_taker({})))
|
||||||
|
self.assertTrue(is_callable_csp_dict({'key': self.make_request_taker('value')}))
|
||||||
|
self.assertFalse(is_callable_csp_dict({}))
|
||||||
|
self.assertFalse(is_callable_csp_dict({'key': 'value'}))
|
||||||
|
self.assertFalse(is_callable_csp_dict(None))
|
||||||
|
|
||||||
|
|
||||||
class MergeCSPDictTest(SimpleTestCase):
|
class MergeCSPDictTest(SimpleTestCase):
|
||||||
def test_null(self):
|
def test_null(self):
|
||||||
|
@ -133,3 +144,100 @@ class MergeCSPDictTest(SimpleTestCase):
|
||||||
|
|
||||||
def test_tuple_override(self):
|
def test_tuple_override(self):
|
||||||
self.assertEqual(merge_csp_dict({'spam': (1,)}, {'spam': (2,)}), {'spam': (1, 2)})
|
self.assertEqual(merge_csp_dict({'spam': (1,)}, {'spam': (2,)}), {'spam': (1, 2)})
|
||||||
|
|
||||||
|
|
||||||
|
class TestMiddleware(SimpleTestCase):
|
||||||
|
decorator_factory = decorator_from_middleware_with_args(AdvancedCSPMiddleware)
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.factory = RequestFactory()
|
||||||
|
|
||||||
|
def make_ok_view(self):
|
||||||
|
@self.decorator_factory()
|
||||||
|
def view(request):
|
||||||
|
return HttpResponse('ok')
|
||||||
|
return view
|
||||||
|
|
||||||
|
def get_request(self):
|
||||||
|
return self.factory.get('/')
|
||||||
|
|
||||||
|
def test_no_csp(self):
|
||||||
|
self.assertRaises(MiddlewareNotUsed, self.decorator_factory)
|
||||||
|
|
||||||
|
@override_settings(ADVANCED_CSP={'script-src': ['self']})
|
||||||
|
def test_setting_csp(self):
|
||||||
|
self.assertEqual(self.make_ok_view()(self.get_request())['Content-Security-Policy'], "script-src 'self'")
|
||||||
|
|
||||||
|
@override_settings(ADVANCED_CSP_REPORT_ONLY={'default-src': ['http://dmoj.ca']})
|
||||||
|
def test_setting_csp_report(self):
|
||||||
|
self.assertEqual(self.make_ok_view()(self.get_request())['Content-Security-Policy-Report-Only'],
|
||||||
|
"default-src http://dmoj.ca")
|
||||||
|
|
||||||
|
@override_settings(ADVANCED_CSP={'script-src': ['self']},
|
||||||
|
ADVANCED_CSP_REPORT_ONLY={'default-src': ['http://dmoj.ca']})
|
||||||
|
def test_setting_both(self):
|
||||||
|
response = self.make_ok_view()(self.get_request())
|
||||||
|
self.assertEqual(response['Content-Security-Policy'], "script-src 'self'")
|
||||||
|
self.assertEqual(response['Content-Security-Policy-Report-Only'], 'default-src http://dmoj.ca')
|
||||||
|
|
||||||
|
@override_settings(ADVANCED_CSP={'script-src': ['self']})
|
||||||
|
def test_merge_csp_same(self):
|
||||||
|
@self.decorator_factory()
|
||||||
|
def view(request):
|
||||||
|
response = HttpResponse()
|
||||||
|
response.csp = {'script-src': ['https://dmoj.ca']}
|
||||||
|
return response
|
||||||
|
self.assertEqual(view(self.get_request())['Content-Security-Policy'], "script-src 'self' https://dmoj.ca")
|
||||||
|
|
||||||
|
@override_settings(ADVANCED_CSP={'script-src': ['self']})
|
||||||
|
def test_merge_csp_different(self):
|
||||||
|
@self.decorator_factory()
|
||||||
|
def view(request):
|
||||||
|
response = HttpResponse()
|
||||||
|
response.csp = {'style-src': ['https://dmoj.ca']}
|
||||||
|
return response
|
||||||
|
self.assertEqual(view(self.get_request())['Content-Security-Policy'],
|
||||||
|
"script-src 'self'; style-src https://dmoj.ca")
|
||||||
|
|
||||||
|
@override_settings(ADVANCED_CSP={'script-src': ['self']})
|
||||||
|
def test_override_csp_explicit(self):
|
||||||
|
@self.decorator_factory()
|
||||||
|
def view(request):
|
||||||
|
response = HttpResponse()
|
||||||
|
response.csp = {'style-src': ['none'], 'override': True}
|
||||||
|
return response
|
||||||
|
self.assertEqual(view(self.get_request())['Content-Security-Policy'], "style-src 'none'")
|
||||||
|
|
||||||
|
@override_settings(ADVANCED_CSP_REPORT_ONLY={'script-src': ['self']})
|
||||||
|
def test_override_csp_to_report_explicit(self):
|
||||||
|
@self.decorator_factory()
|
||||||
|
def view(request):
|
||||||
|
response = HttpResponse()
|
||||||
|
response.csp = {'style-src': ['none'], 'override': True}
|
||||||
|
return response
|
||||||
|
self.assertEqual(view(self.get_request())['Content-Security-Policy-Report-Only'], "style-src 'none'")
|
||||||
|
|
||||||
|
@override_settings(ADVANCED_CSP_REPORT_ONLY={'script-src': ['self']})
|
||||||
|
def test_override_csp_report_both_explicit(self):
|
||||||
|
@self.decorator_factory()
|
||||||
|
def view(request):
|
||||||
|
response = HttpResponse()
|
||||||
|
response.csp = {'style-src': ['none'], 'override': True}
|
||||||
|
response.csp_report = {'script-src': ['none'], 'override': True}
|
||||||
|
return response
|
||||||
|
|
||||||
|
response = view(self.get_request())
|
||||||
|
self.assertEqual(response['Content-Security-Policy-Report-Only'], "script-src 'none'")
|
||||||
|
self.assertTrue('Content-Security-Policy' not in response)
|
||||||
|
|
||||||
|
@override_settings(ADVANCED_CSP_REPORT_ONLY={'script-src': ['self']})
|
||||||
|
def test_override_csp_report_only_explicit(self):
|
||||||
|
@self.decorator_factory()
|
||||||
|
def view(request):
|
||||||
|
response = HttpResponse()
|
||||||
|
response.csp_report = {'script-src': ['none'], 'override': True}
|
||||||
|
return response
|
||||||
|
|
||||||
|
response = view(self.get_request())
|
||||||
|
self.assertEqual(response['Content-Security-Policy-Report-Only'], "script-src 'none'")
|
||||||
|
self.assertTrue('Content-Security-Policy' not in response)
|
||||||
|
|
|
@ -1,34 +1,43 @@
|
||||||
def callable_csp_dict(data, request, response):
|
def is_callable_csp_dict(data):
|
||||||
if callable(data):
|
if callable(data):
|
||||||
return data(request, response)
|
return True
|
||||||
result = {}
|
if not isinstance(data, dict):
|
||||||
for key, value in data.iteritems():
|
return False
|
||||||
if callable(value):
|
return any(callable(value) for value in data.itervalues())
|
||||||
result[key] = value(request, response)
|
|
||||||
else:
|
|
||||||
result[key] = value
|
def call_csp_dict(data, request, response):
|
||||||
return result
|
if callable(data):
|
||||||
|
return data(request, response)
|
||||||
|
|
||||||
def merge_csp_dict(template, override):
|
result = {}
|
||||||
result = template.copy()
|
for key, value in data.iteritems():
|
||||||
for key, value in override.iteritems():
|
if callable(value):
|
||||||
if key not in result:
|
result[key] = value(request, response)
|
||||||
result[key] = value
|
else:
|
||||||
continue
|
result[key] = value
|
||||||
orig = result[key]
|
return result
|
||||||
if isinstance(orig, list):
|
|
||||||
if orig == template[key]:
|
|
||||||
result[key] = orig + list(value)
|
def merge_csp_dict(template, override):
|
||||||
else:
|
result = template.copy()
|
||||||
orig += value
|
for key, value in override.iteritems():
|
||||||
elif isinstance(orig, set):
|
if key not in result:
|
||||||
if orig == template[key]:
|
result[key] = value
|
||||||
result[key] = orig.union(value)
|
continue
|
||||||
else:
|
orig = result[key]
|
||||||
orig.update(value)
|
if isinstance(orig, list):
|
||||||
elif isinstance(orig, tuple):
|
if orig == template[key]:
|
||||||
result[key] = orig + tuple(value)
|
result[key] = orig + list(value)
|
||||||
else:
|
else:
|
||||||
result[key] = value
|
orig += value
|
||||||
return result
|
elif isinstance(orig, set):
|
||||||
|
if orig == template[key]:
|
||||||
|
result[key] = orig.union(value)
|
||||||
|
else:
|
||||||
|
orig.update(value)
|
||||||
|
elif isinstance(orig, tuple):
|
||||||
|
result[key] = orig + tuple(value)
|
||||||
|
else:
|
||||||
|
result[key] = value
|
||||||
|
return result
|
||||||
|
|
Loading…
Reference in a new issue