185 lines
6.0 KiB
Python
185 lines
6.0 KiB
Python
"""
|
|
Module to handle the timestamping functionality in pyHanko.
|
|
|
|
Many PDF signature profiles require trusted timestamp tokens.
|
|
The tools in this module allow pyHanko to obtain such tokens from
|
|
:rfc:`3161`-compliant time stamping
|
|
authorities.
|
|
"""
|
|
import asyncio
|
|
|
|
from asn1crypto import algos, cms, tsp
|
|
from pyhanko_certvalidator import CertificateValidator
|
|
from pyhanko_certvalidator.registry import SimpleCertificateStore
|
|
|
|
from .common_utils import (
|
|
dummy_digest,
|
|
extract_ts_certs,
|
|
get_nonce,
|
|
handle_tsp_response,
|
|
)
|
|
|
|
__all__ = ['TimeStamper']
|
|
|
|
|
|
class TimeStamper:
|
|
"""
|
|
.. versionchanged:: 0.9.0
|
|
Made API more asyncio-friendly _(breaking change)_
|
|
|
|
Class to make :rfc:`3161` timestamp requests.
|
|
"""
|
|
|
|
def __init__(self, include_nonce=True):
|
|
self._dummy_response_cache = {}
|
|
self._certs = {}
|
|
self.cert_registry = SimpleCertificateStore()
|
|
self.include_nonce = include_nonce
|
|
|
|
def request_cms(self, message_digest, md_algorithm):
|
|
"""
|
|
Format the body of an :rfc:`3161` request as a CMS object.
|
|
Subclasses with more specific needs may want to override this.
|
|
|
|
:param message_digest:
|
|
Message digest to which the timestamp will apply.
|
|
:param md_algorithm:
|
|
Message digest algorithm to use.
|
|
|
|
.. note::
|
|
As per :rfc:`8933`, ``md_algorithm`` should also be the
|
|
algorithm used to compute ``message_digest``.
|
|
:return:
|
|
An :class:`.asn1crypto.tsp.TimeStampReq` object.
|
|
"""
|
|
req = {
|
|
'version': 1,
|
|
'message_imprint': tsp.MessageImprint(
|
|
{
|
|
'hash_algorithm': algos.DigestAlgorithm(
|
|
{'algorithm': md_algorithm}
|
|
),
|
|
'hashed_message': message_digest,
|
|
}
|
|
),
|
|
# we want the server to send along its certs
|
|
'cert_req': True,
|
|
}
|
|
if self.include_nonce:
|
|
nonce = get_nonce()
|
|
req['nonce'] = cms.Integer(nonce)
|
|
else:
|
|
nonce = None
|
|
return nonce, tsp.TimeStampReq(req)
|
|
|
|
async def validation_paths(self, validation_context):
|
|
"""
|
|
Produce validation paths for the certificates gathered by this
|
|
:class:`.TimeStamper`.
|
|
|
|
This is internal API.
|
|
|
|
:param validation_context:
|
|
The validation context to apply.
|
|
:return:
|
|
An asynchronous generator of validation paths.
|
|
"""
|
|
await self._ensure_dummy()
|
|
|
|
def _validation_job(cert):
|
|
validator = CertificateValidator(
|
|
cert,
|
|
intermediate_certs=self.cert_registry,
|
|
validation_context=validation_context,
|
|
)
|
|
return validator.async_validate_usage(set(), {"time_stamping"})
|
|
|
|
jobs = map(_validation_job, self._certs.values())
|
|
|
|
for job in asyncio.as_completed(jobs):
|
|
yield await job
|
|
|
|
def _register_dummy(self, md_algorithm, dummy):
|
|
self._dummy_response_cache[md_algorithm] = dummy
|
|
for cert in extract_ts_certs(dummy, self.cert_registry):
|
|
self._certs[cert.issuer_serial] = cert
|
|
|
|
async def _ensure_dummy(self):
|
|
# if no dummy responses are available, fetch some
|
|
if not self._dummy_response_cache:
|
|
from pyhanko.sign import DEFAULT_MD
|
|
|
|
await self.async_dummy_response(DEFAULT_MD)
|
|
|
|
async def async_dummy_response(self, md_algorithm) -> cms.ContentInfo:
|
|
"""
|
|
Return a dummy response for use in CMS object size estimation.
|
|
|
|
For every new ``md_algorithm`` passed in, this method will call
|
|
the :meth:`timestamp` method exactly once, with a dummy digest.
|
|
The resulting object will be cached and reused for future invocations
|
|
of :meth:`dummy_response` with the same ``md_algorithm`` value.
|
|
|
|
:param md_algorithm:
|
|
Message digest algorithm to use.
|
|
:return:
|
|
A timestamp token, encoded as an
|
|
:class:`.asn1crypto.cms.ContentInfo` object.
|
|
"""
|
|
|
|
# different hashes have different sizes, so the dummy responses
|
|
# might differ in size
|
|
try:
|
|
return self._dummy_response_cache[md_algorithm]
|
|
except KeyError:
|
|
dummy = await self.async_timestamp(
|
|
dummy_digest(md_algorithm), md_algorithm
|
|
)
|
|
self._register_dummy(md_algorithm, dummy)
|
|
return dummy
|
|
|
|
async def async_request_tsa_response(
|
|
self, req: tsp.TimeStampReq
|
|
) -> tsp.TimeStampResp:
|
|
"""
|
|
Submit the specified timestamp request to the server.
|
|
|
|
:param req:
|
|
Request body to submit.
|
|
:return:
|
|
A timestamp response from the server.
|
|
:raises IOError:
|
|
Raised in case of an I/O issue in the communication with the
|
|
timestamping server.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
async def async_timestamp(
|
|
self, message_digest, md_algorithm
|
|
) -> cms.ContentInfo:
|
|
"""
|
|
Request a timestamp for the given message digest.
|
|
|
|
:param message_digest:
|
|
Message digest to which the timestamp will apply.
|
|
:param md_algorithm:
|
|
Message digest algorithm to use.
|
|
|
|
.. note::
|
|
As per :rfc:`8933`, ``md_algorithm`` should also be the
|
|
algorithm used to compute ``message_digest``.
|
|
:return:
|
|
A timestamp token, encoded as an
|
|
:class:`.asn1crypto.cms.ContentInfo` object.
|
|
:raises IOError:
|
|
Raised in case of an I/O issue in the communication with the
|
|
timestamping server.
|
|
:raises TimestampRequestError:
|
|
Raised if the timestamp server did not return a success response,
|
|
or if the server's response is invalid.
|
|
"""
|
|
|
|
nonce, req = self.request_cms(message_digest, md_algorithm)
|
|
res = await self.async_request_tsa_response(req)
|
|
return handle_tsp_response(res, nonce)
|