From 813878b9696010b096d4561f78158b3f4af0de8c Mon Sep 17 00:00:00 2001 From: Quantum Date: Sun, 25 Apr 2021 04:12:52 -0400 Subject: [PATCH] Added type annotations for mypy --- mypy.ini | 2 ++ nginx_krbauth.py | 48 ++++++++++++++++++++++++++---------------------- 2 files changed, 28 insertions(+), 22 deletions(-) create mode 100644 mypy.ini diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..976ba02 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,2 @@ +[mypy] +ignore_missing_imports = True diff --git a/nginx_krbauth.py b/nginx_krbauth.py index b56d2bd..2deb6d8 100644 --- a/nginx_krbauth.py +++ b/nginx_krbauth.py @@ -6,6 +6,7 @@ import logging import os import struct import time +from typing import Optional import gssapi import ldap @@ -42,24 +43,25 @@ COOKIE_SECURE = os.environ.get('KRBAUTH_SECURE_COOKIE', '1').lower() not in ('0' class Context: - def __init__(self, ldap_group): + def __init__(self, ldap_group: Optional[str]) -> None: self.ldap_group = ldap_group @classmethod - def from_request(cls): + def from_request(cls) -> 'Context': return cls(ldap_group=request.environ.get('KRBAUTH_LDAP_GROUP')) - def bytes(self): + def bytes(self) -> bytes: + assert self.ldap_group return ''.join([self.ldap_group]).encode('utf-8') -def make_cookie(context): +def make_cookie(context: Context) -> bytes: message = timestamp.pack(int(time.time()) + DURATION) + os.urandom(RANDOM_SIZE) + context.bytes() signature = hmac.new(HMAC_KEY, message, hmac_digest).digest() return base64.b64encode(signature + message) -def verify_cookie(cookie, context): +def verify_cookie(cookie: Optional[str], context: Context) -> bool: if not cookie: return False try: @@ -77,7 +79,7 @@ def verify_cookie(cookie, context): return hmac.compare_digest(expected, signature) -def make_401(reason, context, negotiate='Negotiate', **kwargs): +def make_401(reason: str, negotiate: Optional[str] = 'Negotiate', **kwargs) -> Response: app.logger.info('Returning unauthorized: %s (%s)', reason, kwargs) resp = Response('''\ @@ -98,15 +100,15 @@ def make_401(reason, context, negotiate='Negotiate', **kwargs): return resp -def auth_success(context, next_url): - resp = redirect(next_url, code=307) +def auth_success(context: Context, next_url: str) -> Response: + resp = redirect(next_url, code=307, Response=Response) resp.set_cookie('krbauth', make_cookie(context), secure=COOKIE_SECURE, httponly=True, samesite='Strict') return resp -def auth_spnego(context, next_url): +def auth_spnego(context: Context, next_url: str) -> Response: try: - in_token = base64.b64decode(request.headers['Authorization'][10:]) + in_token = base64.b64decode(request.headers['Authorization'][len('Negotiate '):]) except binascii.Error: return Response(status=400) @@ -115,13 +117,14 @@ def auth_spnego(context, next_url): out_token = krb5_ctx.step(in_token) if not krb5_ctx.complete: - return make_401('Negotiation in progress', context, negotiate=['Negotiate ' + base64.b64encode(out_token)]) + return make_401('Negotiation in progress', + negotiate=f'Negotiate {base64.b64encode(out_token).decode("ascii")}') - krb5_name = krb5_ctx._inquire(initiator_name=True).initiator_name + krb5_name = krb5_ctx.initiator_name except BadMechanismError: - return make_401('GSSAPI mechanism not supported', context, negotiate=None) + return make_401('GSSAPI mechanism not supported', negotiate=None) except (GSSError, GeneralError) as e: - return make_401(str(e), context) + return make_401(str(e)) if LDAP_SERVER and LDAP_SEARCH_BASE and context.ldap_group: ldap_ctx = ldap.initialize(LDAP_SERVER) @@ -130,7 +133,7 @@ def auth_spnego(context, next_url): ldap_filter = '(&(memberOf=%s)(krbPrincipalName=%s))' % (context.ldap_group, krb5_name) result = ldap_ctx.search_s(LDAP_SEARCH_BASE, ldap.SCOPE_SUBTREE, ldap_filter, ['cn']) if not result: - return make_401('Did not find LDAP group member', context, krb5_name=krb5_name) + return make_401('Did not find LDAP group member', krb5_name=krb5_name) app.logger.info('Authenticated via Kerberos as: %s, %s', krb5_name, result[0][0]) else: app.logger.info('Authenticated via Kerberos as: %s', krb5_name) @@ -138,7 +141,7 @@ def auth_spnego(context, next_url): return auth_success(context, next_url) -def auth_basic(context, next_url): +def auth_basic(context: Context, next_url: str) -> Response: try: token = base64.b64decode(request.headers['Authorization'][6:]) username, _, password = token.decode('utf-8').partition(':') @@ -146,18 +149,19 @@ def auth_basic(context, next_url): return Response(status=400) if not username or not password: - return make_401('Invalid username or password', context) + return make_401('Invalid username or password') + assert LDAP_USER_DN is not None dn = LDAP_USER_DN % (username,) ldap_ctx = ldap.initialize(LDAP_SERVER) try: ldap_ctx.bind_s(dn, password) except ldap.INVALID_CREDENTIALS: - return make_401('Failed to authenticate to LDAP', context, dn=dn) + return make_401('Failed to authenticate to LDAP', dn=dn) if context.ldap_group: if not ldap_ctx.search_s(dn, ldap.SCOPE_BASE, '(memberof=%s)' % (context.ldap_group,)): - return make_401('Did not find LDAP group member', context, dn=dn, group=context.ldap_group) + return make_401('Did not find LDAP group member', dn=dn, group=context.ldap_group) app.logger.info('Authenticated via LDAP as: %s in %s', dn, context.ldap_group) else: app.logger.info('Authenticated via LDAP as: %s', dn) @@ -166,7 +170,7 @@ def auth_basic(context, next_url): @app.route('/krbauth') -def auth(): +def auth() -> Response: next_url = request.args.get('next', '/') context = Context.from_request() authorization = request.headers.get('Authorization', '') @@ -176,11 +180,11 @@ def auth(): if LDAP_USER_DN and authorization.startswith('Basic '): return auth_basic(context, next_url) - return make_401('No Authorization header sent', context) + return make_401('No Authorization header sent') @app.endpoint('krbauth.check') -def check(): +def check() -> Response: if verify_cookie(request.cookies.get('krbauth'), Context.from_request()): return Response(status=200) return Response(status=401)