Source code for requests_gracedb.cert_reload
#
# Copyright (C) 2019-2020 Leo P. Singer <leo.singer@ligo.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
"""HTTPS adapter to close connections with expired client certificates."""
from __future__ import absolute_import
from datetime import datetime, timedelta
from functools import partial
from cryptography.hazmat.backends import default_backend
from cryptography.x509 import load_pem_x509_certificate
from requests.packages.urllib3.connection import HTTPSConnection
from requests.packages.urllib3.connectionpool import (HTTPConnectionPool,
HTTPSConnectionPool)
from requests.adapters import HTTPAdapter
_backend = default_backend()
[docs]def load_x509_certificate(filename):
"""Load an X.509 certificate from a file.
Parameters
----------
filename : str
The name of the certificate file.
Returns
-------
cert : cryptography.x509.Certificate
The parsed certificate.
"""
with open(filename, 'rb') as f:
data = f.read()
return load_pem_x509_certificate(data, _backend)
class _CertReloadingHTTPSConnection(HTTPSConnection):
def __init__(self, host, cert_reload_timeout=0, **kwargs):
super(_CertReloadingHTTPSConnection, self).__init__(host, **kwargs)
self._not_valid_after = datetime.max
self._reload_timeout = timedelta(seconds=cert_reload_timeout)
@property
def cert_has_expired(self):
expires = self._not_valid_after - datetime.utcnow()
return expires <= self._reload_timeout
def connect(self):
if self.cert_file:
cert = load_x509_certificate(self.cert_file)
self._not_valid_after = cert.not_valid_after
super(_CertReloadingHTTPSConnection, self).connect()
class _CertReloadingHTTPSConnectionPool(HTTPSConnectionPool):
ConnectionCls = _CertReloadingHTTPSConnection
def __init__(self, host, port=None, cert_reload_timeout=0, **kwargs):
super(_CertReloadingHTTPSConnectionPool, self).__init__(
host, port=port, **kwargs)
self.conn_kw['cert_reload_timeout'] = cert_reload_timeout
def _get_conn(self, timeout=None):
while True:
conn = super(_CertReloadingHTTPSConnectionPool, self)._get_conn(
timeout)
# Note: this loop is guaranteed to terminate because, even if the
# pool is completely drained, when we create a new connection, its
# `_not_valid_after` property is set to `datetime.max`, and the
# condition below will evaulate to `True`.
if not conn.cert_has_expired:
return conn
conn.close()
[docs]class CertReloadingHTTPAdapter(HTTPAdapter):
"""A mixin for :class:`requests.Session` to automatically reload the client
X.509 certificates if the version that is stored in the session is going to
expire soon.
Parameters
----------
cert_reload_timeout : int
Reload the certificate if it expires within this many seconds from now.
"""
def __init__(self, cert_reload_timeout=0, **kwargs):
super(CertReloadingHTTPAdapter, self).__init__(**kwargs)
https_pool_cls = partial(
_CertReloadingHTTPSConnectionPool,
cert_reload_timeout=cert_reload_timeout)
self.poolmanager.pool_classes_by_scheme = {
'http': HTTPConnectionPool,
'https': https_pool_cls}