Source code for requests.auth

"""
requests.auth
~~~~~~~~~~~~~

This module contains the authentication handlers for Requests.
"""

import hashlib
import os
import re
import threading
import time
import warnings
from base64 import b64encode

from ._internal_utils import to_native_string
from .compat import basestring, str, urlparse
from .cookies import extract_cookies_to_jar
from .utils import parse_dict_header

CONTENT_TYPE_FORM_URLENCODED = "application/x-www-form-urlencoded"
CONTENT_TYPE_MULTI_PART = "multipart/form-data"


def _basic_auth_str(username, password):
    """Returns a Basic Auth string."""

    # "I want us to put a big-ol' comment on top of it that
    # says that this behaviour is dumb but we need to preserve
    # it because people are relying on it."
    #    - Lukasa
    #
    # These are here solely to maintain backwards compatibility
    # for things like ints. This will be removed in 3.0.0.
    if not isinstance(username, basestring):
        warnings.warn(
            "Non-string usernames will no longer be supported in Requests "
            "3.0.0. Please convert the object you've passed in ({!r}) to "
            "a string or bytes object in the near future to avoid "
            "problems.".format(username),
            category=DeprecationWarning,
        )
        username = str(username)

    if not isinstance(password, basestring):
        warnings.warn(
            "Non-string passwords will no longer be supported in Requests "
            "3.0.0. Please convert the object you've passed in ({!r}) to "
            "a string or bytes object in the near future to avoid "
            "problems.".format(type(password)),
            category=DeprecationWarning,
        )
        password = str(password)
    # -- End Removal --

    if isinstance(username, str):
        username = username.encode("latin1")

    if isinstance(password, str):
        password = password.encode("latin1")

    authstr = "Basic " + to_native_string(
        b64encode(b":".join((username, password))).strip()
    )

    return authstr


[docs]class AuthBase: """Base class that all auth implementations derive from""" def __call__(self, r): raise NotImplementedError("Auth hooks must be callable.")
[docs]class HTTPBasicAuth(AuthBase): """Attaches HTTP Basic Authentication to the given Request object.""" def __init__(self, username, password): self.username = username self.password = password def __eq__(self, other): return all( [ self.username == getattr(other, "username", None), self.password == getattr(other, "password", None), ] ) def __ne__(self, other): return not self == other def __call__(self, r): r.headers["Authorization"] = _basic_auth_str(self.username, self.password) return r
[docs]class HTTPProxyAuth(HTTPBasicAuth): """Attaches HTTP Proxy Authentication to a given Request object.""" def __call__(self, r): r.headers["Proxy-Authorization"] = _basic_auth_str(self.username, self.password) return r
[docs]class HTTPDigestAuth(AuthBase): """Attaches HTTP Digest Authentication to the given Request object.""" def __init__(self, username, password): self.username = username self.password = password # Keep state in per-thread local storage self._thread_local = threading.local() def init_per_thread_state(self): # Ensure state is initialized just once per-thread if not hasattr(self._thread_local, "init"): self._thread_local.init = True self._thread_local.last_nonce = "" self._thread_local.nonce_count = 0 self._thread_local.chal = {} self._thread_local.pos = None self._thread_local.num_401_calls = None def build_digest_header(self, method, url): """ :rtype: str """ realm = self._thread_local.chal["realm"] nonce = self._thread_local.chal["nonce"] qop = self._thread_local.chal.get("qop") algorithm = self._thread_local.chal.get("algorithm") opaque = self._thread_local.chal.get("opaque") hash_utf8 = None if algorithm is None: _algorithm = "MD5" else: _algorithm = algorithm.upper() # lambdas assume digest modules are imported at the top level if _algorithm == "MD5" or _algorithm == "MD5-SESS": def md5_utf8(x): if isinstance(x, str): x = x.encode("utf-8") return hashlib.md5(x).hexdigest() hash_utf8 = md5_utf8 elif _algorithm == "SHA": def sha_utf8(x): if isinstance(x, str): x = x.encode("utf-8") return hashlib.sha1(x).hexdigest() hash_utf8 = sha_utf8 elif _algorithm == "SHA-256": def sha256_utf8(x): if isinstance(x, str): x = x.encode("utf-8") return hashlib.sha256(x).hexdigest() hash_utf8 = sha256_utf8 elif _algorithm == "SHA-512": def sha512_utf8(x): if isinstance(x, str): x = x.encode("utf-8") return hashlib.sha512(x).hexdigest() hash_utf8 = sha512_utf8 KD = lambda s, d: hash_utf8(f"{s}:{d}") # noqa:E731 if hash_utf8 is None: return None # XXX not implemented yet entdig = None p_parsed = urlparse(url) #: path is request-uri defined in RFC 2616 which should not be empty path = p_parsed.path or "/" if p_parsed.query: path += f"?{p_parsed.query}" A1 = f"{self.username}:{realm}:{self.password}" A2 = f"{method}:{path}" HA1 = hash_utf8(A1) HA2 = hash_utf8(A2) if nonce == self._thread_local.last_nonce: self._thread_local.nonce_count += 1 else: self._thread_local.nonce_count = 1 ncvalue = f"{self._thread_local.nonce_count:08x}" s = str(self._thread_local.nonce_count).encode("utf-8") s += nonce.encode("utf-8") s += time.ctime().encode("utf-8") s += os.urandom(8) cnonce = hashlib.sha1(s).hexdigest()[:16] if _algorithm == "MD5-SESS": HA1 = hash_utf8(f"{HA1}:{nonce}:{cnonce}") if not qop: respdig = KD(HA1, f"{nonce}:{HA2}") elif qop == "auth" or "auth" in qop.split(","): noncebit = f"{nonce}:{ncvalue}:{cnonce}:auth:{HA2}" respdig = KD(HA1, noncebit) else: # XXX handle auth-int. return None self._thread_local.last_nonce = nonce # XXX should the partial digests be encoded too? base = ( f'username="{self.username}", realm="{realm}", nonce="{nonce}", ' f'uri="{path}", response="{respdig}"' ) if opaque: base += f', opaque="{opaque}"' if algorithm: base += f', algorithm="{algorithm}"' if entdig: base += f', digest="{entdig}"' if qop: base += f', qop="auth", nc={ncvalue}, cnonce="{cnonce}"' return f"Digest {base}" def handle_redirect(self, r, **kwargs): """Reset num_401_calls counter on redirects.""" if r.is_redirect: self._thread_local.num_401_calls = 1 def handle_401(self, r, **kwargs): """ Takes the given response and tries digest-auth, if needed. :rtype: requests.Response """ # If response is not 4xx, do not auth # See https://github.com/psf/requests/issues/3772 if not 400 <= r.status_code < 500: self._thread_local.num_401_calls = 1 return r if self._thread_local.pos is not None: # Rewind the file position indicator of the body to where # it was to resend the request. r.request.body.seek(self._thread_local.pos) s_auth = r.headers.get("www-authenticate", "") if "digest" in s_auth.lower() and self._thread_local.num_401_calls < 2: self._thread_local.num_401_calls += 1 pat = re.compile(r"digest ", flags=re.IGNORECASE) self._thread_local.chal = parse_dict_header(pat.sub("", s_auth, count=1)) # Consume content and release the original connection # to allow our new request to reuse the same one. r.content r.close() prep = r.request.copy() extract_cookies_to_jar(prep._cookies, r.request, r.raw) prep.prepare_cookies(prep._cookies) prep.headers["Authorization"] = self.build_digest_header( prep.method, prep.url ) _r = r.connection.send(prep, **kwargs) _r.history.append(r) _r.request = prep return _r self._thread_local.num_401_calls = 1 return r def __call__(self, r): # Initialize per-thread state, if needed self.init_per_thread_state() # If we have a saved nonce, skip the 401 if self._thread_local.last_nonce: r.headers["Authorization"] = self.build_digest_header(r.method, r.url) try: self._thread_local.pos = r.body.tell() except AttributeError: # In the case of HTTPDigestAuth being reused and the body of # the previous request was a file-like object, pos has the # file position of the previous body. Ensure it's set to # None. self._thread_local.pos = None r.register_hook("response", self.handle_401) r.register_hook("response", self.handle_redirect) self._thread_local.num_401_calls = 1 return r def __eq__(self, other): return all( [ self.username == getattr(other, "username", None), self.password == getattr(other, "password", None), ] ) def __ne__(self, other): return not self == other