Source code for requests_gracedb.auth

#
# Copyright (C) 2019-2020  Leo P. Singer <leo.singer@ligo.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
#
from os import access, environ, getuid, R_OK
from os.path import expanduser, join
from six.moves.urllib.parse import urlparse

from safe_netrc import netrc

from .cert_reload import CertReloadingHTTPAdapter


def _find_cert():
    """Try to find a user's X509 certificate and key.

    Checks environment variables first, then expected location for default
    proxy.

    Notes
    -----
    This function is adapted from the original ``_find_x509_credentials()``
    method in https://git.ligo.org/lscsoft/gracedb-client/blob/gracedb-2.5.0/ligo/gracedb/
    rest.py, which is copyright (C) Brian Moe, Branson Stephens (2015).

    """  # noqa: E501
    result = tuple(environ.get(key)
                   for key in ('X509_USER_CERT', 'X509_USER_KEY'))
    if all(result):
        return result

    result = environ.get('X509_USER_PROXY')
    if result:
        return result

    result = join('/tmp', 'x509up_u{}'.format(getuid()))
    if access(result, R_OK):
        return result

    result = tuple(expanduser(join('~', '.globus', filename))
                   for filename in ('usercert.pem', 'userkey.pem'))
    if all(access(path, R_OK) for path in result):
        return result


def _find_username_password(url):
    host = urlparse(url).hostname

    try:
        result = netrc().authenticators(host)
    except IOError:
        result = None

    if result is not None:
        username, _, password = result
        result = (username, password)

    return result


[docs]class SessionAuthMixin(object): """A mixin for :class:`requests.Session` to add support for all GraceDB authentication mechanisms. Parameters ---------- url : str GraceDB Client URL. cert : str, tuple Client-side X.509 certificate. May be either a single filename if the certificate and private key are concatenated together, or a tuple of the filenames for the certificate and private key. username : str Username for basic auth. password : str Password for basic auth. force_noauth : bool, default=False If true, then do not use any authentication at all. fail_if_noauth : bool, default=False If true, then raise an exception if authentication credentials are not provided. cert_reload : bool, default=False If true, then automatically reload the client certificate before it expires. cert_reload_timeout : int, default=300 Reload the certificate this many seconds before it expires. Notes ----- When a new Session instance is created, the following sources of authentication are tried, in order: 1. If the :obj:`force_noauth` keyword argument is true, then perform no authentication at all. 2. If the :obj:`cert` keyword argument is provided, then use X.509 client certificate authentication. 3. If the :obj:`username` and :obj:`password` keyword arguments are provided, then use basic auth. 4. Look for a default X.509 client certificate in: a. the environment variables :envvar:`X509_USER_CERT` and :envvar:`X509_USER_KEY` b. the environment variable :envvar:`X509_USER_PROXY` c. the file :file:`/tmp/x509up_u{UID}`, where :samp:`{UID}` is your numeric user ID, if the file exists and is readable d. the files :file:`~/.globus/usercert.pem` and :file:`~/.globus/userkey.pem`, if they exist and are readable 5. Read the netrc file [1]_ located at :file:`~/.netrc`, or at the path stored in the environment variable :envvar:`NETRC`, and look for a username and password matching the hostname in the URL. 6. If the :obj:`fail_if_noauth` keyword argument is true, and no authentication source was found, then raise a :class:`ValueError`. References ---------- .. [1] The .netrc file. https://www.gnu.org/software/inetutils/manual/html_node/The-_002enetrc-file.html """ # noqa: E501 def __init__(self, url=None, cert=None, username=None, password=None, force_noauth=False, fail_if_noauth=False, cert_reload=False, cert_reload_timeout=300, **kwargs): super(SessionAuthMixin, self).__init__(**kwargs) # Support for reloading client certificates if cert_reload: self.mount('https://', CertReloadingHTTPAdapter( cert_reload_timeout=cert_reload_timeout)) # Argument validation if fail_if_noauth and force_noauth: raise ValueError( 'Must not set both force_noauth and fail_if_noauth.') if (username is None) ^ (password is None): raise ValueError('Must provide username and password, or neither.') # FIXME: these should go into elif clauses below # (as in `elif default_cert := _find_x509_credentials():`) # in order to defer unnecessary I/O, but this requires # the := operator, which requires Python 3.8. default_cert = _find_cert() default_basic_auth = _find_username_password(url) if force_noauth: pass elif cert is not None: self.cert = cert elif username is not None: self.auth = (username, password) elif default_cert is not None: self.cert = default_cert elif default_basic_auth is not None: self.auth = default_basic_auth elif fail_if_noauth: raise ValueError('No authentication credentials found.')