aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py2/twisted/web/_auth
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2023-11-26 18:16:14 +0300
committershmel1k <shmel1k@ydb.tech>2023-11-26 18:43:30 +0300
commitb8cf9e88f4c5c64d9406af533d8948deb050d695 (patch)
tree218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py2/twisted/web/_auth
parent523f645a83a0ec97a0332dbc3863bb354c92a328 (diff)
downloadydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py2/twisted/web/_auth')
-rw-r--r--contrib/python/Twisted/py2/twisted/web/_auth/__init__.py7
-rw-r--r--contrib/python/Twisted/py2/twisted/web/_auth/basic.py61
-rw-r--r--contrib/python/Twisted/py2/twisted/web/_auth/digest.py56
-rw-r--r--contrib/python/Twisted/py2/twisted/web/_auth/wrapper.py236
4 files changed, 360 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py2/twisted/web/_auth/__init__.py b/contrib/python/Twisted/py2/twisted/web/_auth/__init__.py
new file mode 100644
index 0000000000..6a58870091
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/web/_auth/__init__.py
@@ -0,0 +1,7 @@
+# -*- test-case-name: twisted.web.test.test_httpauth -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+HTTP header-based authentication migrated from web2
+"""
diff --git a/contrib/python/Twisted/py2/twisted/web/_auth/basic.py b/contrib/python/Twisted/py2/twisted/web/_auth/basic.py
new file mode 100644
index 0000000000..d539457190
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/web/_auth/basic.py
@@ -0,0 +1,61 @@
+# -*- test-case-name: twisted.web.test.test_httpauth -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+HTTP BASIC authentication.
+
+@see: U{http://tools.ietf.org/html/rfc1945}
+@see: U{http://tools.ietf.org/html/rfc2616}
+@see: U{http://tools.ietf.org/html/rfc2617}
+"""
+
+from __future__ import division, absolute_import
+
+import binascii
+
+from zope.interface import implementer
+
+from twisted.cred import credentials, error
+from twisted.web.iweb import ICredentialFactory
+
+
+@implementer(ICredentialFactory)
+class BasicCredentialFactory(object):
+ """
+ Credential Factory for HTTP Basic Authentication
+
+ @type authenticationRealm: L{bytes}
+ @ivar authenticationRealm: The HTTP authentication realm which will be issued in
+ challenges.
+ """
+
+ scheme = b'basic'
+
+ def __init__(self, authenticationRealm):
+ self.authenticationRealm = authenticationRealm
+
+
+ def getChallenge(self, request):
+ """
+ Return a challenge including the HTTP authentication realm with which
+ this factory was created.
+ """
+ return {'realm': self.authenticationRealm}
+
+
+ def decode(self, response, request):
+ """
+ Parse the base64-encoded, colon-separated username and password into a
+ L{credentials.UsernamePassword} instance.
+ """
+ try:
+ creds = binascii.a2b_base64(response + b'===')
+ except binascii.Error:
+ raise error.LoginFailed('Invalid credentials')
+
+ creds = creds.split(b':', 1)
+ if len(creds) == 2:
+ return credentials.UsernamePassword(*creds)
+ else:
+ raise error.LoginFailed('Invalid credentials')
diff --git a/contrib/python/Twisted/py2/twisted/web/_auth/digest.py b/contrib/python/Twisted/py2/twisted/web/_auth/digest.py
new file mode 100644
index 0000000000..5346801f6b
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/web/_auth/digest.py
@@ -0,0 +1,56 @@
+# -*- test-case-name: twisted.web.test.test_httpauth -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Implementation of RFC2617: HTTP Digest Authentication
+
+@see: U{http://www.faqs.org/rfcs/rfc2617.html}
+"""
+
+from __future__ import division, absolute_import
+
+from zope.interface import implementer
+from twisted.cred import credentials
+from twisted.web.iweb import ICredentialFactory
+
+@implementer(ICredentialFactory)
+class DigestCredentialFactory(object):
+ """
+ Wrapper for L{digest.DigestCredentialFactory} that implements the
+ L{ICredentialFactory} interface.
+ """
+
+ scheme = b'digest'
+
+ def __init__(self, algorithm, authenticationRealm):
+ """
+ Create the digest credential factory that this object wraps.
+ """
+ self.digest = credentials.DigestCredentialFactory(algorithm,
+ authenticationRealm)
+
+
+ def getChallenge(self, request):
+ """
+ Generate the challenge for use in the WWW-Authenticate header
+
+ @param request: The L{IRequest} to with access was denied and for the
+ response to which this challenge is being generated.
+
+ @return: The L{dict} that can be used to generate a WWW-Authenticate
+ header.
+ """
+ return self.digest.getChallenge(request.getClientAddress().host)
+
+
+ def decode(self, response, request):
+ """
+ Create a L{twisted.cred.credentials.DigestedCredentials} object
+ from the given response and request.
+
+ @see: L{ICredentialFactory.decode}
+ """
+ return self.digest.decode(response,
+ request.method,
+ request.getClientAddress().host)
diff --git a/contrib/python/Twisted/py2/twisted/web/_auth/wrapper.py b/contrib/python/Twisted/py2/twisted/web/_auth/wrapper.py
new file mode 100644
index 0000000000..1804b7a416
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/web/_auth/wrapper.py
@@ -0,0 +1,236 @@
+# -*- test-case-name: twisted.web.test.test_httpauth -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+A guard implementation which supports HTTP header-based authentication
+schemes.
+
+If no I{Authorization} header is supplied, an anonymous login will be
+attempted by using a L{Anonymous} credentials object. If such a header is
+supplied and does not contain allowed credentials, or if anonymous login is
+denied, a 401 will be sent in the response along with I{WWW-Authenticate}
+headers for each of the allowed authentication schemes.
+"""
+
+from __future__ import absolute_import, division
+
+from twisted.cred import error
+from twisted.cred.credentials import Anonymous
+from twisted.python.compat import unicode
+from twisted.python.components import proxyForInterface
+from twisted.web import util
+from twisted.web.resource import ErrorPage, IResource
+from twisted.logger import Logger
+
+from zope.interface import implementer
+
+
+@implementer(IResource)
+class UnauthorizedResource(object):
+ """
+ Simple IResource to escape Resource dispatch
+ """
+ isLeaf = True
+
+
+ def __init__(self, factories):
+ self._credentialFactories = factories
+
+
+ def render(self, request):
+ """
+ Send www-authenticate headers to the client
+ """
+ def ensureBytes(s):
+ return s.encode('ascii') if isinstance(s, unicode) else s
+
+ def generateWWWAuthenticate(scheme, challenge):
+ l = []
+ for k, v in challenge.items():
+ k = ensureBytes(k)
+ v = ensureBytes(v)
+ l.append(k + b"=" + quoteString(v))
+ return b" ".join([scheme, b", ".join(l)])
+
+ def quoteString(s):
+ return b'"' + s.replace(b'\\', b'\\\\').replace(b'"', b'\\"') + b'"'
+
+ request.setResponseCode(401)
+ for fact in self._credentialFactories:
+ challenge = fact.getChallenge(request)
+ request.responseHeaders.addRawHeader(
+ b'www-authenticate',
+ generateWWWAuthenticate(fact.scheme, challenge))
+ if request.method == b'HEAD':
+ return b''
+ return b'Unauthorized'
+
+
+ def getChildWithDefault(self, path, request):
+ """
+ Disable resource dispatch
+ """
+ return self
+
+
+
+@implementer(IResource)
+class HTTPAuthSessionWrapper(object):
+ """
+ Wrap a portal, enforcing supported header-based authentication schemes.
+
+ @ivar _portal: The L{Portal} which will be used to retrieve L{IResource}
+ avatars.
+
+ @ivar _credentialFactories: A list of L{ICredentialFactory} providers which
+ will be used to decode I{Authorization} headers into L{ICredentials}
+ providers.
+ """
+ isLeaf = False
+ _log = Logger()
+
+ def __init__(self, portal, credentialFactories):
+ """
+ Initialize a session wrapper
+
+ @type portal: C{Portal}
+ @param portal: The portal that will authenticate the remote client
+
+ @type credentialFactories: C{Iterable}
+ @param credentialFactories: The portal that will authenticate the
+ remote client based on one submitted C{ICredentialFactory}
+ """
+ self._portal = portal
+ self._credentialFactories = credentialFactories
+
+
+ def _authorizedResource(self, request):
+ """
+ Get the L{IResource} which the given request is authorized to receive.
+ If the proper authorization headers are present, the resource will be
+ requested from the portal. If not, an anonymous login attempt will be
+ made.
+ """
+ authheader = request.getHeader(b'authorization')
+ if not authheader:
+ return util.DeferredResource(self._login(Anonymous()))
+
+ factory, respString = self._selectParseHeader(authheader)
+ if factory is None:
+ return UnauthorizedResource(self._credentialFactories)
+ try:
+ credentials = factory.decode(respString, request)
+ except error.LoginFailed:
+ return UnauthorizedResource(self._credentialFactories)
+ except:
+ self._log.failure("Unexpected failure from credentials factory")
+ return ErrorPage(500, None, None)
+ else:
+ return util.DeferredResource(self._login(credentials))
+
+
+ def render(self, request):
+ """
+ Find the L{IResource} avatar suitable for the given request, if
+ possible, and render it. Otherwise, perhaps render an error page
+ requiring authorization or describing an internal server failure.
+ """
+ return self._authorizedResource(request).render(request)
+
+
+ def getChildWithDefault(self, path, request):
+ """
+ Inspect the Authorization HTTP header, and return a deferred which,
+ when fired after successful authentication, will return an authorized
+ C{Avatar}. On authentication failure, an C{UnauthorizedResource} will
+ be returned, essentially halting further dispatch on the wrapped
+ resource and all children
+ """
+ # Don't consume any segments of the request - this class should be
+ # transparent!
+ request.postpath.insert(0, request.prepath.pop())
+ return self._authorizedResource(request)
+
+
+ def _login(self, credentials):
+ """
+ Get the L{IResource} avatar for the given credentials.
+
+ @return: A L{Deferred} which will be called back with an L{IResource}
+ avatar or which will errback if authentication fails.
+ """
+ d = self._portal.login(credentials, None, IResource)
+ d.addCallbacks(self._loginSucceeded, self._loginFailed)
+ return d
+
+
+ def _loginSucceeded(self, args):
+ """
+ Handle login success by wrapping the resulting L{IResource} avatar
+ so that the C{logout} callback will be invoked when rendering is
+ complete.
+ """
+ interface, avatar, logout = args
+ class ResourceWrapper(proxyForInterface(IResource, 'resource')):
+ """
+ Wrap an L{IResource} so that whenever it or a child of it
+ completes rendering, the cred logout hook will be invoked.
+
+ An assumption is made here that exactly one L{IResource} from
+ among C{avatar} and all of its children will be rendered. If
+ more than one is rendered, C{logout} will be invoked multiple
+ times and probably earlier than desired.
+ """
+ def getChildWithDefault(self, name, request):
+ """
+ Pass through the lookup to the wrapped resource, wrapping
+ the result in L{ResourceWrapper} to ensure C{logout} is
+ called when rendering of the child is complete.
+ """
+ return ResourceWrapper(self.resource.getChildWithDefault(name, request))
+
+ def render(self, request):
+ """
+ Hook into response generation so that when rendering has
+ finished completely (with or without error), C{logout} is
+ called.
+ """
+ request.notifyFinish().addBoth(lambda ign: logout())
+ return super(ResourceWrapper, self).render(request)
+
+ return ResourceWrapper(avatar)
+
+
+ def _loginFailed(self, result):
+ """
+ Handle login failure by presenting either another challenge (for
+ expected authentication/authorization-related failures) or a server
+ error page (for anything else).
+ """
+ if result.check(error.Unauthorized, error.LoginFailed):
+ return UnauthorizedResource(self._credentialFactories)
+ else:
+ self._log.failure(
+ "HTTPAuthSessionWrapper.getChildWithDefault encountered "
+ "unexpected error",
+ failure=result,
+ )
+ return ErrorPage(500, None, None)
+
+
+ def _selectParseHeader(self, header):
+ """
+ Choose an C{ICredentialFactory} from C{_credentialFactories}
+ suitable to use to decode the given I{Authenticate} header.
+
+ @return: A two-tuple of a factory and the remaining portion of the
+ header value to be decoded or a two-tuple of L{None} if no
+ factory can decode the header value.
+ """
+ elements = header.split(b' ')
+ scheme = elements[0].lower()
+ for fact in self._credentialFactories:
+ if fact.scheme == scheme:
+ return (fact, b' '.join(elements[1:]))
+ return (None, None)