292 lines
9.1 KiB
Python
292 lines
9.1 KiB
Python
from datetime import datetime
|
|
from typing import Dict, Iterable, List, Optional, Set
|
|
|
|
from asn1crypto import crl, ocsp, x509
|
|
|
|
from pyhanko_certvalidator.authority import Authority
|
|
from pyhanko_certvalidator.errors import OCSPFetchError
|
|
from pyhanko_certvalidator.fetchers import Fetchers
|
|
from pyhanko_certvalidator.ltv.poe import (
|
|
KnownPOE,
|
|
POEManager,
|
|
POEType,
|
|
ValidationObject,
|
|
ValidationObjectType,
|
|
digest_for_poe,
|
|
)
|
|
from pyhanko_certvalidator.policy_decl import NonRevokedStatusAssertion
|
|
from pyhanko_certvalidator.registry import CertificateRegistry
|
|
from pyhanko_certvalidator.revinfo.archival import (
|
|
CRLContainer,
|
|
OCSPContainer,
|
|
sort_freshest_first,
|
|
)
|
|
|
|
|
|
class RevinfoManager:
|
|
"""
|
|
.. versionadded:: 0.20.0
|
|
|
|
Class to manage and potentially fetch revocation information.
|
|
|
|
:param certificate_registry:
|
|
The associated certificate registry.
|
|
:param poe_manager:
|
|
The proof-of-existence (POE) data manager.
|
|
:param crls:
|
|
CRL data.
|
|
:param ocsps:
|
|
OCSP response data.
|
|
:param fetchers:
|
|
Fetchers for collecting revocation information.
|
|
If ``None``, no fetching will be performed.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
certificate_registry: CertificateRegistry,
|
|
poe_manager: POEManager,
|
|
crls: Iterable[CRLContainer],
|
|
ocsps: Iterable[OCSPContainer],
|
|
assertions: Iterable[NonRevokedStatusAssertion] = (),
|
|
fetchers: Optional[Fetchers] = None,
|
|
):
|
|
self._certificate_registry = certificate_registry
|
|
self._poe_manager = poe_manager
|
|
|
|
self._revocation_certs: Dict[bytes, x509.Certificate] = {}
|
|
self._crl_issuer_map: Dict[bytes, x509.Certificate] = {}
|
|
|
|
self._crls: List[CRLContainer] = []
|
|
if crls:
|
|
self._crls = sort_freshest_first(crls)
|
|
|
|
self._ocsps: List[OCSPContainer] = []
|
|
if ocsps:
|
|
self._ocsps = ocsps = sort_freshest_first(ocsps)
|
|
for ocsp_response in ocsps:
|
|
self._extract_ocsp_certs(ocsp_response)
|
|
|
|
self._fetchers = fetchers
|
|
self._assertions: Dict[bytes, NonRevokedStatusAssertion] = {
|
|
assertion.cert_sha256: assertion for assertion in assertions
|
|
}
|
|
|
|
@property
|
|
def poe_manager(self) -> POEManager:
|
|
"""
|
|
The proof-of-existence (POE) data manager.
|
|
"""
|
|
return self._poe_manager
|
|
|
|
@property
|
|
def certificate_registry(self) -> CertificateRegistry:
|
|
"""
|
|
The associated certificate registry.
|
|
"""
|
|
return self._certificate_registry
|
|
|
|
@property
|
|
def fetching_allowed(self) -> bool:
|
|
"""
|
|
Boolean indicating whether fetching is allowed.
|
|
"""
|
|
return self._fetchers is not None
|
|
|
|
@property
|
|
def crls(self) -> List[crl.CertificateList]:
|
|
"""
|
|
A list of all cached :class:`crl.CertificateList` objects
|
|
"""
|
|
|
|
raw_crls = [cont.crl_data for cont in self._crls]
|
|
if not self._fetchers:
|
|
return raw_crls
|
|
return list(self._fetchers.crl_fetcher.fetched_crls()) + raw_crls
|
|
|
|
@property
|
|
def ocsps(self) -> List[ocsp.OCSPResponse]:
|
|
"""
|
|
A list of all cached :class:`ocsp.OCSPResponse` objects
|
|
"""
|
|
|
|
raw_ocsps = [cont.ocsp_response_data for cont in self._ocsps]
|
|
if not self._fetchers:
|
|
return raw_ocsps
|
|
|
|
return list(self._fetchers.ocsp_fetcher.fetched_responses()) + raw_ocsps
|
|
|
|
@property
|
|
def new_revocation_certs(self) -> List[x509.Certificate]:
|
|
"""
|
|
A list of newly-fetched :class:`x509.Certificate` objects that were
|
|
obtained from OCSP responses and CRLs
|
|
"""
|
|
|
|
return list(self._revocation_certs.values())
|
|
|
|
def _extract_ocsp_certs(self, ocsp_response: OCSPContainer):
|
|
"""
|
|
Extracts any certificates included with an OCSP response and adds them
|
|
to the certificate registry
|
|
|
|
:param ocsp_response:
|
|
An asn1crypto.ocsp.OCSPResponse object to look for certs inside of
|
|
"""
|
|
|
|
poe_man = self._poe_manager
|
|
ocsp_poe_time = poe_man[ocsp_response]
|
|
|
|
registry = self._certificate_registry
|
|
revo_certs = self._revocation_certs
|
|
|
|
basic = ocsp_response.extract_basic_ocsp_response()
|
|
if basic is not None and basic['certs']:
|
|
for other_cert in basic['certs']:
|
|
if registry.register(other_cert):
|
|
revo_certs[other_cert.issuer_serial] = other_cert
|
|
poe_man.register_known_poe(
|
|
KnownPOE(
|
|
poe_type=POEType.VALIDATION,
|
|
digest=digest_for_poe(other_cert.dump()),
|
|
# register with the same POE time as the OCSP
|
|
# response
|
|
poe_time=ocsp_poe_time,
|
|
validation_object=ValidationObject(
|
|
object_type=ValidationObjectType.CERTIFICATE,
|
|
value=other_cert,
|
|
),
|
|
)
|
|
)
|
|
|
|
def record_crl_issuer(self, certificate_list, cert):
|
|
"""
|
|
Records the certificate that issued a certificate list. Used to reduce
|
|
processing code when dealing with self-issued certificates and multiple
|
|
CRLs.
|
|
|
|
:param certificate_list:
|
|
An ans1crypto.crl.CertificateList object
|
|
|
|
:param cert:
|
|
An ans1crypto.x509.Certificate object
|
|
"""
|
|
|
|
self._crl_issuer_map[certificate_list.signature] = cert
|
|
|
|
def check_crl_issuer(self, certificate_list) -> Optional[x509.Certificate]:
|
|
"""
|
|
Checks to see if the certificate that signed a certificate list has
|
|
been found
|
|
|
|
:param certificate_list:
|
|
An ans1crypto.crl.CertificateList object
|
|
|
|
:return:
|
|
None if not found, or an asn1crypto.x509.Certificate object of the
|
|
issuer
|
|
"""
|
|
|
|
return self._crl_issuer_map.get(certificate_list.signature)
|
|
|
|
async def async_retrieve_crls(self, cert) -> List[CRLContainer]:
|
|
"""
|
|
.. versionadded:: 0.20.0
|
|
|
|
:param cert:
|
|
An asn1crypto.x509.Certificate object
|
|
|
|
:return:
|
|
A list of :class:`CRLContainer` objects
|
|
"""
|
|
if not self._fetchers:
|
|
return self._crls
|
|
|
|
fetchers = self._fetchers
|
|
try:
|
|
crls = fetchers.crl_fetcher.fetched_crls_for_cert(cert)
|
|
except KeyError:
|
|
crls = await fetchers.crl_fetcher.fetch(cert)
|
|
conts = [CRLContainer(crl_data) for crl_data in crls]
|
|
return conts + self._crls
|
|
|
|
async def async_retrieve_ocsps(
|
|
self, cert, authority: Authority
|
|
) -> List[OCSPContainer]:
|
|
"""
|
|
.. versionadded:: 0.20.0
|
|
|
|
:param cert:
|
|
An asn1crypto.x509.Certificate object
|
|
|
|
:param authority:
|
|
The issuing authority for the certificate
|
|
|
|
:return:
|
|
A list of :class:`OCSPContainer` objects
|
|
"""
|
|
|
|
if not self._fetchers:
|
|
return self._ocsps
|
|
|
|
fetchers = self._fetchers
|
|
ocsps = [
|
|
OCSPContainer(resp)
|
|
for resp in fetchers.ocsp_fetcher.fetched_responses_for_cert(cert)
|
|
]
|
|
if not ocsps:
|
|
ocsp_response_data = await fetchers.ocsp_fetcher.fetch(
|
|
cert, authority
|
|
)
|
|
ocsps = OCSPContainer.load_multi(ocsp_response_data)
|
|
|
|
# Responses can contain certificates that are useful in
|
|
# validating the response itself. We can use these since they
|
|
# will be validated using the local trust roots.
|
|
for resp in ocsps:
|
|
try:
|
|
self._extract_ocsp_certs(resp)
|
|
except ValueError:
|
|
raise OCSPFetchError(
|
|
"Failed to extract certificates from "
|
|
"fetched OCSP response"
|
|
)
|
|
|
|
return ocsps + self._ocsps
|
|
|
|
def evict_ocsps(self, hashes_to_evict: Set[bytes]):
|
|
"""
|
|
Internal API to eliminate local OCSP records from consideration.
|
|
|
|
:param hashes_to_evict:
|
|
A collection of OCSP response hashes; see :func:`.digest_for_poe`.
|
|
"""
|
|
|
|
def p(container: OCSPContainer):
|
|
digest = digest_for_poe(container.ocsp_response_data.dump())
|
|
return digest not in hashes_to_evict
|
|
|
|
self._ocsps = list(filter(p, self._ocsps))
|
|
|
|
def evict_crls(self, hashes_to_evict: Set[bytes]):
|
|
"""
|
|
Internal API to eliminate local CRLs from consideration.
|
|
|
|
:param hashes_to_evict:
|
|
A collection of CRL hashes; see :func:`.digest_for_poe`.
|
|
"""
|
|
|
|
def p(container: CRLContainer):
|
|
digest = digest_for_poe(container.crl_data.dump())
|
|
return digest not in hashes_to_evict
|
|
|
|
self._crls = list(filter(p, self._crls))
|
|
|
|
def check_asserted_unrevoked(
|
|
self, cert: x509.Certificate, at: datetime
|
|
) -> bool:
|
|
try:
|
|
return at <= self._assertions[cert.sha256].at
|
|
except KeyError:
|
|
return False
|