mirror of
https://github.com/quantum5/nginx-krbauth.git
synced 2025-04-24 04:21:57 -04:00
Added type annotations for mypy
This commit is contained in:
parent
0ece1d7ce9
commit
813878b969
|
@ -6,6 +6,7 @@ import logging
|
|||
import os
|
||||
import struct
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
import gssapi
|
||||
import ldap
|
||||
|
@ -42,24 +43,25 @@ COOKIE_SECURE = os.environ.get('KRBAUTH_SECURE_COOKIE', '1').lower() not in ('0'
|
|||
|
||||
|
||||
class Context:
|
||||
def __init__(self, ldap_group):
|
||||
def __init__(self, ldap_group: Optional[str]) -> None:
|
||||
self.ldap_group = ldap_group
|
||||
|
||||
@classmethod
|
||||
def from_request(cls):
|
||||
def from_request(cls) -> 'Context':
|
||||
return cls(ldap_group=request.environ.get('KRBAUTH_LDAP_GROUP'))
|
||||
|
||||
def bytes(self):
|
||||
def bytes(self) -> bytes:
|
||||
assert self.ldap_group
|
||||
return ''.join([self.ldap_group]).encode('utf-8')
|
||||
|
||||
|
||||
def make_cookie(context):
|
||||
def make_cookie(context: Context) -> bytes:
|
||||
message = timestamp.pack(int(time.time()) + DURATION) + os.urandom(RANDOM_SIZE) + context.bytes()
|
||||
signature = hmac.new(HMAC_KEY, message, hmac_digest).digest()
|
||||
return base64.b64encode(signature + message)
|
||||
|
||||
|
||||
def verify_cookie(cookie, context):
|
||||
def verify_cookie(cookie: Optional[str], context: Context) -> bool:
|
||||
if not cookie:
|
||||
return False
|
||||
try:
|
||||
|
@ -77,7 +79,7 @@ def verify_cookie(cookie, context):
|
|||
return hmac.compare_digest(expected, signature)
|
||||
|
||||
|
||||
def make_401(reason, context, negotiate='Negotiate', **kwargs):
|
||||
def make_401(reason: str, negotiate: Optional[str] = 'Negotiate', **kwargs) -> Response:
|
||||
app.logger.info('Returning unauthorized: %s (%s)', reason, kwargs)
|
||||
resp = Response('''\
|
||||
<html>
|
||||
|
@ -98,15 +100,15 @@ def make_401(reason, context, negotiate='Negotiate', **kwargs):
|
|||
return resp
|
||||
|
||||
|
||||
def auth_success(context, next_url):
|
||||
resp = redirect(next_url, code=307)
|
||||
def auth_success(context: Context, next_url: str) -> Response:
|
||||
resp = redirect(next_url, code=307, Response=Response)
|
||||
resp.set_cookie('krbauth', make_cookie(context), secure=COOKIE_SECURE, httponly=True, samesite='Strict')
|
||||
return resp
|
||||
|
||||
|
||||
def auth_spnego(context, next_url):
|
||||
def auth_spnego(context: Context, next_url: str) -> Response:
|
||||
try:
|
||||
in_token = base64.b64decode(request.headers['Authorization'][10:])
|
||||
in_token = base64.b64decode(request.headers['Authorization'][len('Negotiate '):])
|
||||
except binascii.Error:
|
||||
return Response(status=400)
|
||||
|
||||
|
@ -115,13 +117,14 @@ def auth_spnego(context, next_url):
|
|||
out_token = krb5_ctx.step(in_token)
|
||||
|
||||
if not krb5_ctx.complete:
|
||||
return make_401('Negotiation in progress', context, negotiate=['Negotiate ' + base64.b64encode(out_token)])
|
||||
return make_401('Negotiation in progress',
|
||||
negotiate=f'Negotiate {base64.b64encode(out_token).decode("ascii")}')
|
||||
|
||||
krb5_name = krb5_ctx._inquire(initiator_name=True).initiator_name
|
||||
krb5_name = krb5_ctx.initiator_name
|
||||
except BadMechanismError:
|
||||
return make_401('GSSAPI mechanism not supported', context, negotiate=None)
|
||||
return make_401('GSSAPI mechanism not supported', negotiate=None)
|
||||
except (GSSError, GeneralError) as e:
|
||||
return make_401(str(e), context)
|
||||
return make_401(str(e))
|
||||
|
||||
if LDAP_SERVER and LDAP_SEARCH_BASE and context.ldap_group:
|
||||
ldap_ctx = ldap.initialize(LDAP_SERVER)
|
||||
|
@ -130,7 +133,7 @@ def auth_spnego(context, next_url):
|
|||
ldap_filter = '(&(memberOf=%s)(krbPrincipalName=%s))' % (context.ldap_group, krb5_name)
|
||||
result = ldap_ctx.search_s(LDAP_SEARCH_BASE, ldap.SCOPE_SUBTREE, ldap_filter, ['cn'])
|
||||
if not result:
|
||||
return make_401('Did not find LDAP group member', context, krb5_name=krb5_name)
|
||||
return make_401('Did not find LDAP group member', krb5_name=krb5_name)
|
||||
app.logger.info('Authenticated via Kerberos as: %s, %s', krb5_name, result[0][0])
|
||||
else:
|
||||
app.logger.info('Authenticated via Kerberos as: %s', krb5_name)
|
||||
|
@ -138,7 +141,7 @@ def auth_spnego(context, next_url):
|
|||
return auth_success(context, next_url)
|
||||
|
||||
|
||||
def auth_basic(context, next_url):
|
||||
def auth_basic(context: Context, next_url: str) -> Response:
|
||||
try:
|
||||
token = base64.b64decode(request.headers['Authorization'][6:])
|
||||
username, _, password = token.decode('utf-8').partition(':')
|
||||
|
@ -146,18 +149,19 @@ def auth_basic(context, next_url):
|
|||
return Response(status=400)
|
||||
|
||||
if not username or not password:
|
||||
return make_401('Invalid username or password', context)
|
||||
return make_401('Invalid username or password')
|
||||
|
||||
assert LDAP_USER_DN is not None
|
||||
dn = LDAP_USER_DN % (username,)
|
||||
ldap_ctx = ldap.initialize(LDAP_SERVER)
|
||||
try:
|
||||
ldap_ctx.bind_s(dn, password)
|
||||
except ldap.INVALID_CREDENTIALS:
|
||||
return make_401('Failed to authenticate to LDAP', context, dn=dn)
|
||||
return make_401('Failed to authenticate to LDAP', dn=dn)
|
||||
|
||||
if context.ldap_group:
|
||||
if not ldap_ctx.search_s(dn, ldap.SCOPE_BASE, '(memberof=%s)' % (context.ldap_group,)):
|
||||
return make_401('Did not find LDAP group member', context, dn=dn, group=context.ldap_group)
|
||||
return make_401('Did not find LDAP group member', dn=dn, group=context.ldap_group)
|
||||
app.logger.info('Authenticated via LDAP as: %s in %s', dn, context.ldap_group)
|
||||
else:
|
||||
app.logger.info('Authenticated via LDAP as: %s', dn)
|
||||
|
@ -166,7 +170,7 @@ def auth_basic(context, next_url):
|
|||
|
||||
|
||||
@app.route('/krbauth')
|
||||
def auth():
|
||||
def auth() -> Response:
|
||||
next_url = request.args.get('next', '/')
|
||||
context = Context.from_request()
|
||||
authorization = request.headers.get('Authorization', '')
|
||||
|
@ -176,11 +180,11 @@ def auth():
|
|||
if LDAP_USER_DN and authorization.startswith('Basic '):
|
||||
return auth_basic(context, next_url)
|
||||
|
||||
return make_401('No Authorization header sent', context)
|
||||
return make_401('No Authorization header sent')
|
||||
|
||||
|
||||
@app.endpoint('krbauth.check')
|
||||
def check():
|
||||
def check() -> Response:
|
||||
if verify_cookie(request.cookies.get('krbauth'), Context.from_request()):
|
||||
return Response(status=200)
|
||||
return Response(status=401)
|
||||
|
|
Loading…
Reference in a new issue