686 lines
22 KiB
Python
686 lines
22 KiB
Python
# coding: utf-8
|
|
|
|
import abc
|
|
import asyncio
|
|
from collections import defaultdict
|
|
from typing import AsyncGenerator, Iterable, Iterator, List, Optional, Union
|
|
|
|
from asn1crypto import x509
|
|
from oscrypto import trust_list
|
|
|
|
from .authority import CertTrustAnchor, TrustAnchor
|
|
from .errors import PathBuildingError
|
|
from .fetchers import CertificateFetcher
|
|
from .path import ValidationPath
|
|
from .util import CancelableAsyncIterator, ConsList
|
|
|
|
|
|
class CertificateCollection(abc.ABC):
|
|
"""
|
|
Abstract base class for read-only access to a collection of certificates.
|
|
"""
|
|
|
|
def retrieve_by_key_identifier(self, key_identifier: bytes):
|
|
"""
|
|
Retrieves a cert via its key identifier
|
|
|
|
:param key_identifier:
|
|
A byte string of the key identifier
|
|
|
|
:return:
|
|
None or an asn1crypto.x509.Certificate object
|
|
"""
|
|
candidates = self.retrieve_many_by_key_identifier(key_identifier)
|
|
if not candidates:
|
|
return None
|
|
else:
|
|
return candidates[0]
|
|
|
|
def retrieve_many_by_key_identifier(self, key_identifier: bytes):
|
|
"""
|
|
Retrieves possibly multiple certs via the corresponding key identifiers
|
|
|
|
:param key_identifier:
|
|
A byte string of the key identifier
|
|
|
|
:return:
|
|
A list of asn1crypto.x509.Certificate objects
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
def retrieve_by_name(self, name: x509.Name):
|
|
"""
|
|
Retrieves a list certs via their subject name
|
|
|
|
:param name:
|
|
An asn1crypto.x509.Name object
|
|
|
|
:return:
|
|
A list of asn1crypto.x509.Certificate objects
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
def retrieve_by_issuer_serial(self, issuer_serial):
|
|
"""
|
|
Retrieve a certificate by its ``issuer_serial`` value.
|
|
|
|
:param issuer_serial:
|
|
The ``issuer_serial`` value of the certificate.
|
|
:return:
|
|
The certificate corresponding to the ``issuer_serial`` key
|
|
passed in.
|
|
:return:
|
|
None or an asn1crypto.x509.Certificate object
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
|
|
class CertificateStore(CertificateCollection, abc.ABC):
|
|
def register(self, cert: x509.Certificate) -> bool:
|
|
"""
|
|
Register a single certificate.
|
|
|
|
:param cert:
|
|
Certificate to add.
|
|
:return:
|
|
``True`` if the certificate was added, ``False`` if it already
|
|
existed in this store.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
def register_multiple(self, certs: Iterable[x509.Certificate]):
|
|
"""
|
|
Register multiple certificates.
|
|
|
|
:param certs:
|
|
Certificates to register.
|
|
:return:
|
|
``True`` if at least one certificate was added, ``False``
|
|
if all certificates already existed in this store.
|
|
"""
|
|
|
|
added = False
|
|
for cert in certs:
|
|
added |= self.register(cert)
|
|
return added
|
|
|
|
def __iter__(self):
|
|
raise NotImplementedError
|
|
|
|
|
|
class SimpleCertificateStore(CertificateStore):
|
|
"""
|
|
Simple trustless certificate store.
|
|
"""
|
|
|
|
@classmethod
|
|
def from_certs(cls, certs):
|
|
result = cls()
|
|
for cert in certs:
|
|
result.register(cert)
|
|
return result
|
|
|
|
def __init__(self):
|
|
self.certs = {}
|
|
self._subject_map = defaultdict(list)
|
|
self._key_identifier_map = defaultdict(list)
|
|
|
|
def register(self, cert: x509.Certificate) -> bool:
|
|
"""
|
|
Register a single certificate.
|
|
|
|
:param cert:
|
|
Certificate to add.
|
|
:return:
|
|
``True`` if the certificate was added, ``False`` if it already
|
|
existed in this store.
|
|
"""
|
|
if cert.issuer_serial in self.certs:
|
|
return False
|
|
self.certs[cert.issuer_serial] = cert
|
|
self._subject_map[cert.subject.hashable].append(cert)
|
|
if cert.key_identifier:
|
|
self._key_identifier_map[cert.key_identifier].append(cert)
|
|
else:
|
|
self._key_identifier_map[cert.public_key.sha1].append(cert)
|
|
return True
|
|
|
|
def __getitem__(self, item):
|
|
return self.certs[item]
|
|
|
|
def __iter__(self):
|
|
return iter(self.certs.values())
|
|
|
|
def retrieve_many_by_key_identifier(self, key_identifier: bytes):
|
|
return self._key_identifier_map[key_identifier]
|
|
|
|
def retrieve_by_name(self, name: x509.Name):
|
|
return self._subject_map[name.hashable]
|
|
|
|
def retrieve_by_issuer_serial(self, issuer_serial):
|
|
try:
|
|
return self[issuer_serial]
|
|
except KeyError:
|
|
return None
|
|
|
|
|
|
TrustRootList = Iterable[Union[x509.Certificate, TrustAnchor]]
|
|
|
|
|
|
class TrustManager:
|
|
"""
|
|
Abstract trust manager API.
|
|
"""
|
|
|
|
def is_root(self, cert: x509.Certificate) -> bool:
|
|
"""
|
|
Checks if a certificate is in the list of trust roots in this registry
|
|
|
|
:param cert:
|
|
An asn1crypto.x509.Certificate object
|
|
|
|
:return:
|
|
A boolean - if the certificate is in the CA list
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
def find_potential_issuers(
|
|
self, cert: x509.Certificate
|
|
) -> Iterator[TrustAnchor]:
|
|
"""
|
|
Find potential issuers that might have (directly) issued
|
|
a particular certificate.
|
|
|
|
:param cert:
|
|
Issued certificate.
|
|
:return:
|
|
An iterator with potentially relevant trust anchors.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
|
|
class SimpleTrustManager(TrustManager):
|
|
"""
|
|
Trust manager backed by a list of trust roots, possibly in addition to the
|
|
system trust list.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._roots = set()
|
|
self._root_subject_map = defaultdict(list)
|
|
|
|
@classmethod
|
|
def build(
|
|
cls,
|
|
trust_roots: Optional[TrustRootList] = None,
|
|
extra_trust_roots: Optional[TrustRootList] = None,
|
|
) -> 'SimpleTrustManager':
|
|
"""
|
|
:param trust_roots:
|
|
If the operating system's trust list should not be used, instead
|
|
pass a list of asn1crypto.x509.Certificate objects. These
|
|
certificates will be used as the trust roots for the path being
|
|
built.
|
|
|
|
:param extra_trust_roots:
|
|
If the operating system's trust list should be used, but augmented
|
|
with one or more extra certificates. This should be a list of
|
|
asn1crypto.x509.Certificate objects.
|
|
:return:
|
|
"""
|
|
if trust_roots is None:
|
|
trust_roots = [e[0] for e in trust_list.get_list()]
|
|
else:
|
|
trust_roots = list(trust_roots)
|
|
|
|
if extra_trust_roots is not None:
|
|
trust_roots.extend(extra_trust_roots)
|
|
|
|
manager = SimpleTrustManager()
|
|
for trust_root in trust_roots:
|
|
manager._register_root(trust_root)
|
|
return manager
|
|
|
|
def _register_root(self, trust_root: Union[TrustAnchor, x509.Certificate]):
|
|
if isinstance(trust_root, TrustAnchor):
|
|
anchor = trust_root
|
|
else:
|
|
anchor = CertTrustAnchor(trust_root)
|
|
if anchor not in self._roots:
|
|
authority = anchor.authority
|
|
self._roots.add(anchor)
|
|
self._root_subject_map[authority.name.hashable].append(anchor)
|
|
|
|
def is_root(self, cert: x509.Certificate):
|
|
"""
|
|
Checks if a certificate is in the list of trust roots in this registry
|
|
|
|
:param cert:
|
|
An asn1crypto.x509.Certificate object
|
|
|
|
:return:
|
|
A boolean - if the certificate is in the CA list
|
|
"""
|
|
|
|
return CertTrustAnchor(cert) in self._roots
|
|
|
|
def iter_certs(self) -> Iterator[x509.Certificate]:
|
|
return (
|
|
root.certificate
|
|
for root in self._roots
|
|
if isinstance(root, CertTrustAnchor)
|
|
)
|
|
|
|
def find_potential_issuers(
|
|
self, cert: x509.Certificate
|
|
) -> Iterator[TrustAnchor]:
|
|
issuer_hashable = cert.issuer.hashable
|
|
root: TrustAnchor
|
|
for root in self._root_subject_map[issuer_hashable]:
|
|
if root.authority.is_potential_issuer_of(cert):
|
|
yield root
|
|
|
|
|
|
class CertificateRegistry(SimpleCertificateStore):
|
|
"""
|
|
Contains certificate lists used to build validation paths, and
|
|
is also capable of fetching missing certificates if a certificate
|
|
fetcher is supplied.
|
|
"""
|
|
|
|
def __init__(self, *, cert_fetcher: Optional[CertificateFetcher] = None):
|
|
super().__init__()
|
|
self.fetcher = cert_fetcher
|
|
|
|
@classmethod
|
|
def build(
|
|
cls,
|
|
certs: Iterable[x509.Certificate] = (),
|
|
*,
|
|
cert_fetcher: Optional[CertificateFetcher] = None,
|
|
):
|
|
"""
|
|
Convenience method to set up a certificate registry and import
|
|
certs into it.
|
|
|
|
:param certs:
|
|
Initial list of certificates to import.
|
|
:param cert_fetcher:
|
|
Certificate fetcher to handle retrieval of missing certificates
|
|
(in situations where that is possible).
|
|
:return:
|
|
A populated certificate registry.
|
|
"""
|
|
|
|
result: CertificateRegistry = cls(cert_fetcher=cert_fetcher)
|
|
for cert in certs:
|
|
result.register(cert)
|
|
|
|
result.fetcher = cert_fetcher
|
|
return result
|
|
|
|
def retrieve_by_name(
|
|
self,
|
|
name: x509.Name,
|
|
first_certificate: Optional[x509.Certificate] = None,
|
|
):
|
|
"""
|
|
Retrieves a list certs via their subject name
|
|
|
|
:param name:
|
|
An asn1crypto.x509.Name object
|
|
|
|
:param first_certificate:
|
|
An asn1crypto.x509.Certificate object that if found, should be
|
|
placed first in the result list
|
|
|
|
:return:
|
|
A list of asn1crypto.x509.Certificate objects
|
|
"""
|
|
|
|
output = []
|
|
first = None
|
|
for cert in super().retrieve_by_name(name):
|
|
if first_certificate and first_certificate.sha256 == cert.sha256:
|
|
first = cert
|
|
else:
|
|
output.append(cert)
|
|
if first:
|
|
output.insert(0, first)
|
|
return output
|
|
|
|
def find_potential_issuers(
|
|
self, cert: x509.Certificate, trust_manager: TrustManager
|
|
) -> Iterator[Union[TrustAnchor, x509.Certificate]]:
|
|
issuer_hashable = cert.issuer.hashable
|
|
|
|
# Info from the authority key identifier extension can be used to
|
|
# eliminate possible options when multiple keys with the same
|
|
# subject exist, such as during a transition, or with cross-signing.
|
|
|
|
# go through matching trust roots first
|
|
yield from trust_manager.find_potential_issuers(cert)
|
|
|
|
for issuer in self._subject_map[issuer_hashable]:
|
|
if trust_manager.is_root(issuer):
|
|
continue # skip, we've had these in the previous step
|
|
if cert.authority_key_identifier and issuer.key_identifier:
|
|
if cert.authority_key_identifier != issuer.key_identifier:
|
|
continue
|
|
elif cert.authority_issuer_serial:
|
|
if cert.authority_issuer_serial != issuer.issuer_serial:
|
|
continue
|
|
|
|
yield issuer
|
|
|
|
async def fetch_missing_potential_issuers(self, cert: x509.Certificate):
|
|
if self.fetcher is None:
|
|
return
|
|
|
|
issuers = [
|
|
issuer async for issuer in self.fetcher.fetch_cert_issuers(cert)
|
|
]
|
|
self.register_multiple(issuers)
|
|
|
|
for issuer in issuers:
|
|
yield issuer
|
|
|
|
|
|
class PathBuilder:
|
|
"""
|
|
Class to handle path building.
|
|
"""
|
|
|
|
def __init__(
|
|
self, trust_manager: TrustManager, registry: CertificateRegistry
|
|
):
|
|
self.trust_manager = trust_manager
|
|
self.registry = registry
|
|
|
|
def build_paths(self, end_entity_cert):
|
|
"""
|
|
Builds a list of ValidationPath objects from a certificate in the
|
|
operating system trust store to the end-entity certificate
|
|
|
|
.. note::
|
|
This is a synchronous equivalent of :meth:`async_build_paths`
|
|
that calls the latter in a new event loop. As such, it can't be used
|
|
from within asynchronous code.
|
|
|
|
:param end_entity_cert:
|
|
A byte string of a DER or PEM-encoded X.509 certificate, or an
|
|
instance of asn1crypto.x509.Certificate
|
|
|
|
:return:
|
|
A list of pyhanko_certvalidator.path.ValidationPath objects that
|
|
represent the possible paths from the end-entity certificate to one
|
|
of the CA certs.
|
|
"""
|
|
return asyncio.run(self.async_build_paths(end_entity_cert))
|
|
|
|
async def async_build_paths(self, end_entity_cert: x509.Certificate):
|
|
"""
|
|
Builds a list of ValidationPath objects from a certificate in the
|
|
operating system trust store to the end-entity certificate, returning
|
|
all paths in a single list.
|
|
|
|
:param end_entity_cert:
|
|
A byte string of a DER or PEM-encoded X.509 certificate, or an
|
|
instance of asn1crypto.x509.Certificate
|
|
|
|
:return:
|
|
A list of pyhanko_certvalidator.path.ValidationPath objects that
|
|
represent the possible paths from the end-entity certificate to one
|
|
of the CA certs.
|
|
"""
|
|
|
|
paths: List[ValidationPath] = []
|
|
async for result in self.async_build_paths_lazy(end_entity_cert):
|
|
paths.append(result)
|
|
|
|
return paths
|
|
|
|
def async_build_paths_lazy(
|
|
self, end_entity_cert: x509.Certificate
|
|
) -> CancelableAsyncIterator[ValidationPath]:
|
|
"""
|
|
Builds a list of ValidationPath objects from a certificate in the
|
|
operating system trust store to the end-entity certificate, and emit
|
|
them as an asynchronous generator.
|
|
|
|
:param end_entity_cert:
|
|
A byte string of a DER or PEM-encoded X.509 certificate, or an
|
|
instance of asn1crypto.x509.Certificate
|
|
|
|
:return:
|
|
An asynchronous iterator that yields
|
|
pyhanko_certvalidator.path.ValidationPath objects that
|
|
represent the possible paths from the end-entity certificate to one
|
|
of the CA certs, and raises PathBuildingError
|
|
if no paths could be built
|
|
"""
|
|
|
|
walker = _PathWalker(
|
|
self,
|
|
path=ConsList.sing(end_entity_cert),
|
|
certs_seen=ConsList.sing(end_entity_cert.issuer_serial),
|
|
failed_paths=[],
|
|
)
|
|
return LazyPathIterator(walker, end_entity_cert)
|
|
|
|
|
|
class _IssuerFetcher:
|
|
def __init__(
|
|
self,
|
|
path_builder: 'PathBuilder',
|
|
cert: x509.Certificate,
|
|
certs_seen: ConsList[bytes],
|
|
):
|
|
self.cert = cert
|
|
self.path_builder = path_builder
|
|
self.certs_seen = certs_seen
|
|
local_issuers = self.path_builder.registry.find_potential_issuers(
|
|
cert, self.path_builder.trust_manager
|
|
)
|
|
self.local_iss_iter = iter(local_issuers)
|
|
self.local_issuers_found = 0
|
|
self.fetched_issuers_found = 0
|
|
self._fetched_cas: Optional[
|
|
AsyncGenerator[x509.Certificate, None]
|
|
] = None
|
|
self._fetching_done = False
|
|
|
|
@property
|
|
def issuers_found(self):
|
|
return self.local_issuers_found + self.fetched_issuers_found
|
|
|
|
def __aiter__(self):
|
|
return self
|
|
|
|
def __iter__(self):
|
|
return self
|
|
|
|
def __next__(self) -> Union[TrustAnchor, x509.Certificate]:
|
|
for issuer in self.local_iss_iter:
|
|
if isinstance(issuer, x509.Certificate):
|
|
cert_id = issuer.issuer_serial
|
|
if cert_id in self.certs_seen: # no duplicates
|
|
continue
|
|
self.local_issuers_found += 1
|
|
return issuer
|
|
raise StopIteration
|
|
|
|
async def __anext__(self) -> Union[TrustAnchor, x509.Certificate]:
|
|
try:
|
|
return next(self)
|
|
except StopIteration:
|
|
pass
|
|
|
|
if (
|
|
self._fetched_cas is None
|
|
and not self.local_issuers_found
|
|
and not self._fetching_done
|
|
):
|
|
# attempt to download certs only if we didn't find anything locally
|
|
self._fetched_cas = (
|
|
self.path_builder.registry.fetch_missing_potential_issuers(
|
|
self.cert
|
|
)
|
|
)
|
|
|
|
if self._fetched_cas is not None:
|
|
async for issuer in self._fetched_cas:
|
|
cert_id = issuer.issuer_serial
|
|
if cert_id in self.certs_seen:
|
|
continue
|
|
self.fetched_issuers_found += 1
|
|
return issuer
|
|
self._fetching_done = True
|
|
raise StopAsyncIteration
|
|
|
|
async def cancel(self):
|
|
if self._fetched_cas is not None:
|
|
await self._fetched_cas.aclose()
|
|
self._fetched_cas = None
|
|
self._fetching_done = True
|
|
|
|
|
|
class _PathWalker:
|
|
def __init__(
|
|
self,
|
|
path_builder: 'PathBuilder',
|
|
path: ConsList[x509.Certificate],
|
|
certs_seen: ConsList[bytes],
|
|
failed_paths: List[ConsList[x509.Certificate]],
|
|
):
|
|
self.path = path
|
|
self.path_builder = path_builder
|
|
self.certs_seen = certs_seen
|
|
cert = path.head
|
|
assert isinstance(cert, x509.Certificate)
|
|
self._issuer_fetcher = _IssuerFetcher(path_builder, cert, certs_seen)
|
|
self.failed_paths = failed_paths
|
|
self._next_level: Optional[_PathWalker] = None
|
|
|
|
async def cancel(self):
|
|
if self._issuer_fetcher is not None:
|
|
await self._issuer_fetcher.cancel()
|
|
self._issuer_fetcher = None
|
|
if self._next_level is not None:
|
|
await self._next_level.cancel()
|
|
self._next_level = None
|
|
|
|
def __aiter__(self):
|
|
return self
|
|
|
|
async def __anext__(self):
|
|
if self._issuer_fetcher is None:
|
|
raise StopAsyncIteration # pragma: nocover
|
|
next_path = None
|
|
while next_path is None:
|
|
if self._next_level is None:
|
|
# Fetch the next candidate issuer in the list
|
|
try:
|
|
next_issuer = await self._issuer_fetcher.__anext__()
|
|
except StopAsyncIteration as e:
|
|
if not self._issuer_fetcher.issuers_found:
|
|
self.failed_paths.append(self.path)
|
|
self._issuer_fetcher = None
|
|
raise e
|
|
if isinstance(next_issuer, TrustAnchor):
|
|
# We've reached a trust root -> emit path and stop
|
|
certs = list(self.path)
|
|
return ValidationPath(next_issuer, certs[:-1], certs[-1])
|
|
else:
|
|
# if it's not a trust root, we need a new child _PathWalker
|
|
self._next_level = _PathWalker(
|
|
self.path_builder,
|
|
self.path.cons(next_issuer),
|
|
self.certs_seen.cons(next_issuer.issuer_serial),
|
|
self.failed_paths,
|
|
)
|
|
# check if next_level has any paths left, if not we clear it
|
|
# and loop around to look at the next issuer
|
|
try:
|
|
next_path = await self._next_level.__anext__()
|
|
except StopAsyncIteration:
|
|
self._next_level = None
|
|
return next_path
|
|
|
|
|
|
class LazyPathIterator(CancelableAsyncIterator[ValidationPath]):
|
|
_as_root: Optional[ValidationPath] = None
|
|
|
|
def __init__(self, walker: _PathWalker, cert: x509.Certificate):
|
|
# special case for root certs
|
|
if walker.path_builder.trust_manager.is_root(cert):
|
|
self._as_root = ValidationPath(CertTrustAnchor(cert), [], None)
|
|
self._walker: Optional[_PathWalker] = walker
|
|
self.emitted_count = 0
|
|
self._name = cert.subject.human_friendly
|
|
|
|
async def cancel(self):
|
|
if self._walker is not None:
|
|
await self._walker.cancel()
|
|
|
|
def __aiter__(self):
|
|
return self
|
|
|
|
async def __anext__(self) -> ValidationPath:
|
|
if self._walker is None:
|
|
raise StopAsyncIteration
|
|
elif self._as_root is not None:
|
|
self.emitted_count += 1
|
|
self._walker = None
|
|
return self._as_root
|
|
|
|
try:
|
|
next_path = await self._walker.__anext__()
|
|
self.emitted_count += 1
|
|
return next_path
|
|
except StopAsyncIteration:
|
|
pass
|
|
|
|
if self.emitted_count == 0:
|
|
path_head = self._walker.failed_paths[0].head
|
|
assert isinstance(path_head, x509.Certificate)
|
|
missing_issuer_name = path_head.issuer.human_friendly
|
|
self._walker = None
|
|
raise PathBuildingError(
|
|
f"Unable to build a validation path for the certificate "
|
|
f"\"{self._name}\" - no issuer matching "
|
|
f"\"{missing_issuer_name}\" was found"
|
|
)
|
|
raise StopAsyncIteration
|
|
|
|
|
|
class LayeredCertificateStore(CertificateCollection):
|
|
"""
|
|
Trustless certificate store that looks up certificates in other stores
|
|
in a specific order.
|
|
"""
|
|
|
|
def __init__(self, stores: List[CertificateCollection]):
|
|
self._stores = stores
|
|
|
|
def retrieve_many_by_key_identifier(self, key_identifier: bytes):
|
|
def _gen():
|
|
for store in self._stores:
|
|
yield from store.retrieve_many_by_key_identifier(key_identifier)
|
|
|
|
return list(_gen())
|
|
|
|
def retrieve_by_name(self, name: x509.Name):
|
|
def _gen():
|
|
for store in self._stores:
|
|
yield from store.retrieve_by_name(name)
|
|
|
|
return list(_gen())
|
|
|
|
def retrieve_by_issuer_serial(self, issuer_serial):
|
|
for store in self._stores:
|
|
result = store.retrieve_by_issuer_serial(issuer_serial)
|
|
if result is not None:
|
|
return result
|
|
return None
|