715 lines
24 KiB
Python
715 lines
24 KiB
Python
import hashlib
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from dataclasses import field as data_field
|
|
from typing import Iterable, Optional
|
|
|
|
from asn1crypto import crl as asn1_crl
|
|
from asn1crypto import ocsp as asn1_ocsp
|
|
from asn1crypto import x509
|
|
from asn1crypto.x509 import Certificate
|
|
from pyhanko_certvalidator import CertificateValidator, ValidationContext
|
|
from pyhanko_certvalidator.path import ValidationPath
|
|
|
|
from pyhanko.pdf_utils import generic, misc
|
|
from pyhanko.pdf_utils.generic import pdf_name
|
|
from pyhanko.pdf_utils.incremental_writer import IncrementalPdfFileWriter
|
|
from pyhanko.pdf_utils.misc import get_and_apply
|
|
from pyhanko.pdf_utils.rw_common import PdfHandler
|
|
from pyhanko.pdf_utils.writer import BasePdfFileWriter
|
|
|
|
from ..general import extract_certificate_info
|
|
from .errors import NoDSSFoundError, ValidationInfoReadingError
|
|
from .pdf_embedded import EmbeddedPdfSignature
|
|
|
|
__all__ = [
|
|
'VRI',
|
|
'DocumentSecurityStore',
|
|
'async_add_validation_info',
|
|
'collect_validation_info',
|
|
'enumerate_ocsp_certs',
|
|
]
|
|
|
|
from ...pdf_utils.crypt import SerialisedCredential
|
|
from ...pdf_utils.reader import PdfFileReader
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class VRI:
|
|
"""
|
|
VRI dictionary as defined in PAdES / ISO 32000-2.
|
|
These dictionaries collect data that may be relevant for the validation of
|
|
a specific signature.
|
|
|
|
.. note::
|
|
The data are stored as PDF indirect objects, not asn1crypto values.
|
|
In particular, values are tied to a specific PDF handler.
|
|
"""
|
|
|
|
certs: set = data_field(default_factory=set)
|
|
"""
|
|
Relevant certificates.
|
|
"""
|
|
|
|
ocsps: set = data_field(default_factory=set)
|
|
"""
|
|
Relevant OCSP responses.
|
|
"""
|
|
|
|
crls: set = data_field(default_factory=set)
|
|
"""
|
|
Relevant CRLs.
|
|
"""
|
|
|
|
def as_pdf_object(self) -> generic.DictionaryObject:
|
|
"""
|
|
:return:
|
|
A PDF dictionary representing this VRI entry.
|
|
"""
|
|
vri = generic.DictionaryObject({pdf_name('/Type'): pdf_name('/VRI')})
|
|
if self.ocsps:
|
|
vri[pdf_name('/OCSP')] = generic.ArrayObject(self.ocsps)
|
|
if self.crls:
|
|
vri[pdf_name('/CRL')] = generic.ArrayObject(self.crls)
|
|
vri[pdf_name('/Cert')] = generic.ArrayObject(self.certs)
|
|
return vri
|
|
|
|
|
|
def enumerate_ocsp_certs(ocsp_response):
|
|
"""
|
|
Essentially nabbed from _extract_ocsp_certs in ValidationContext
|
|
"""
|
|
|
|
status = ocsp_response['response_status'].native
|
|
if status == 'successful':
|
|
response_bytes = ocsp_response['response_bytes']
|
|
if response_bytes['response_type'].native == 'basic_ocsp_response':
|
|
response = response_bytes['response'].parsed
|
|
yield from response['certs']
|
|
|
|
|
|
class DocumentSecurityStore:
|
|
"""
|
|
Representation of a DSS in Python.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
writer: Optional[BasePdfFileWriter],
|
|
certs=None,
|
|
ocsps=None,
|
|
crls=None,
|
|
vri_entries=None,
|
|
backing_pdf_object=None,
|
|
):
|
|
self.vri_entries = vri_entries if vri_entries is not None else {}
|
|
self.certs = certs if certs is not None else {}
|
|
self.ocsps = ocsps if ocsps is not None else []
|
|
self.crls = crls if crls is not None else []
|
|
|
|
self.writer = writer
|
|
self.backing_pdf_object = (
|
|
backing_pdf_object
|
|
if backing_pdf_object is not None
|
|
else generic.DictionaryObject()
|
|
)
|
|
|
|
ocsps_seen = {}
|
|
for ocsp_ref in self.ocsps:
|
|
ocsp_bytes = ocsp_ref.get_object().data
|
|
ocsps_seen[ocsp_bytes] = ocsp_ref
|
|
self._ocsps_seen = ocsps_seen
|
|
|
|
crls_seen = {}
|
|
for crl_ref in self.crls:
|
|
crl_bytes = crl_ref.get_object().data
|
|
crls_seen[crl_bytes] = crl_ref
|
|
self._crls_seen = crls_seen
|
|
self._modified = False
|
|
|
|
@property
|
|
def modified(self):
|
|
return self._modified
|
|
|
|
def _mark_modified(self):
|
|
if not self._modified:
|
|
self._modified = True
|
|
if self.backing_pdf_object is not None:
|
|
self.writer.update_container(self.backing_pdf_object)
|
|
|
|
def _cms_objects_to_streams(self, objs, seen, dest):
|
|
for obj in objs:
|
|
obj_bytes = obj.dump()
|
|
try:
|
|
yield seen[obj_bytes]
|
|
except KeyError:
|
|
ref = self.writer.add_object(
|
|
generic.StreamObject(stream_data=obj_bytes)
|
|
)
|
|
self._mark_modified()
|
|
seen[obj_bytes] = ref
|
|
dest.append(ref)
|
|
yield ref
|
|
|
|
def _embed_certs_from_ocsp(self, ocsps):
|
|
def extra_certs():
|
|
for resp in ocsps:
|
|
yield from enumerate_ocsp_certs(resp)
|
|
|
|
return [self._embed_cert(cert_) for cert_ in extra_certs()]
|
|
|
|
def _embed_cert(self, cert):
|
|
if self.writer is None:
|
|
raise TypeError('This DSS does not support updates.')
|
|
|
|
try:
|
|
return self.certs[cert.issuer_serial]
|
|
except KeyError:
|
|
pass
|
|
|
|
ref = self.writer.add_object(
|
|
generic.StreamObject(stream_data=cert.dump())
|
|
)
|
|
self._mark_modified()
|
|
self.certs[cert.issuer_serial] = ref
|
|
return ref
|
|
|
|
@staticmethod
|
|
def sig_content_identifier(contents) -> generic.NameObject:
|
|
"""
|
|
Hash the contents of a signature object to get the corresponding VRI
|
|
identifier.
|
|
|
|
This is internal API.
|
|
|
|
:param contents:
|
|
Signature contents.
|
|
:return:
|
|
A name object to put into the DSS.
|
|
"""
|
|
ident = hashlib.sha1(contents).digest().hex().upper()
|
|
return pdf_name('/' + ident)
|
|
|
|
def register_vri(self, identifier, *, certs=(), ocsps=(), crls=()):
|
|
"""
|
|
Register validation information for a set of signing certificates
|
|
associated with a particular signature.
|
|
|
|
:param identifier:
|
|
Identifier of the signature object (see `sig_content_identifier`).
|
|
If ``None``, only embed the data into the DSS without associating
|
|
it with any VRI.
|
|
:param certs:
|
|
Certificates to add.
|
|
:param ocsps:
|
|
OCSP responses to add.
|
|
:param crls:
|
|
CRLs to add.
|
|
"""
|
|
|
|
if self.writer is None:
|
|
raise TypeError('This DSS does not support updates.')
|
|
|
|
ocsps = list(ocsps)
|
|
crls = list(crls)
|
|
|
|
ocsp_refs = set()
|
|
crl_refs = set()
|
|
cert_refs = {self._embed_cert(cert) for cert in certs}
|
|
if ocsps:
|
|
ocsp_refs = set(
|
|
self._cms_objects_to_streams(
|
|
ocsps, self._ocsps_seen, self.ocsps
|
|
)
|
|
)
|
|
if crls:
|
|
crl_refs = set(
|
|
self._cms_objects_to_streams(crls, self._crls_seen, self.crls)
|
|
)
|
|
# TODO while somewhat less common, CRL signing can also be delegated
|
|
# we should take that into account
|
|
cert_refs.update(set(self._embed_certs_from_ocsp(ocsps)))
|
|
|
|
# TODO do a better job of determining whether the VRI dictionary even
|
|
# needs updating.
|
|
if identifier is not None:
|
|
vri = VRI(certs=cert_refs, ocsps=ocsp_refs, crls=crl_refs)
|
|
self.vri_entries[identifier] = self.writer.add_object(
|
|
vri.as_pdf_object()
|
|
)
|
|
self._mark_modified()
|
|
|
|
def as_pdf_object(self):
|
|
"""
|
|
Convert the :class:`.DocumentSecurityStore` object to a python
|
|
dictionary. This method also handles DSS updates.
|
|
|
|
:return:
|
|
A PDF object representing this DSS.
|
|
"""
|
|
pdf_dict = self.backing_pdf_object
|
|
pdf_dict['/Certs'] = generic.ArrayObject(list(self.certs.values()))
|
|
if self.vri_entries:
|
|
pdf_dict['/VRI'] = generic.DictionaryObject(self.vri_entries)
|
|
|
|
if self.ocsps:
|
|
pdf_dict[pdf_name('/OCSPs')] = generic.ArrayObject(self.ocsps)
|
|
|
|
if self.crls:
|
|
pdf_dict[pdf_name('/CRLs')] = generic.ArrayObject(self.crls)
|
|
|
|
return pdf_dict
|
|
|
|
def load_certs(self) -> Iterable[x509.Certificate]:
|
|
"""
|
|
Return a generator that parses and yields all certificates in the DSS.
|
|
|
|
:return:
|
|
A generator yielding :class:`.Certificate` objects.
|
|
"""
|
|
for cert_ref in self.certs.values():
|
|
cert_stream: generic.StreamObject = cert_ref.get_object()
|
|
cert = Certificate.load(cert_stream.data)
|
|
yield cert
|
|
|
|
def as_validation_context(
|
|
self, validation_context_kwargs, include_revinfo=True
|
|
) -> ValidationContext:
|
|
"""
|
|
Construct a validation context from the data in this DSS.
|
|
|
|
:param validation_context_kwargs:
|
|
Extra kwargs to pass to the ``__init__`` function.
|
|
:param include_revinfo:
|
|
If ``False``, revocation info is skipped.
|
|
:return:
|
|
A validation context preloaded with information from this DSS.
|
|
"""
|
|
|
|
validation_context_kwargs = dict(validation_context_kwargs)
|
|
extra_certs = validation_context_kwargs.pop('other_certs', [])
|
|
certs = list(self.load_certs()) + extra_certs
|
|
|
|
if include_revinfo:
|
|
ocsps = list(validation_context_kwargs.pop('ocsps', ()))
|
|
for ocsp_ref in self.ocsps:
|
|
ocsp_stream: generic.StreamObject = ocsp_ref.get_object()
|
|
resp = asn1_ocsp.OCSPResponse.load(ocsp_stream.data)
|
|
ocsps.append(resp)
|
|
validation_context_kwargs['ocsps'] = ocsps
|
|
|
|
crls = list(validation_context_kwargs.pop('crls', ()))
|
|
for crl_ref in self.crls:
|
|
crl_stream: generic.StreamObject = crl_ref.get_object()
|
|
crl = asn1_crl.CertificateList.load(crl_stream.data)
|
|
crls.append(crl)
|
|
validation_context_kwargs['crls'] = crls
|
|
|
|
return ValidationContext(other_certs=certs, **validation_context_kwargs)
|
|
|
|
@classmethod
|
|
def read_dss(cls, handler: PdfHandler) -> 'DocumentSecurityStore':
|
|
"""
|
|
Read a DSS record from a file and add the data to a validation context.
|
|
|
|
:param handler:
|
|
PDF handler from which to read the DSS.
|
|
:return:
|
|
A DocumentSecurityStore object describing the current state of the
|
|
DSS.
|
|
"""
|
|
try:
|
|
dss_dict = handler.root['/DSS']
|
|
except KeyError as e:
|
|
raise NoDSSFoundError() from e
|
|
|
|
cert_refs = {}
|
|
cert_ref_list = get_and_apply(dss_dict, '/Certs', list, default=[])
|
|
for cert_ref in cert_ref_list:
|
|
cert_stream: generic.StreamObject = cert_ref.get_object()
|
|
cert: Certificate = Certificate.load(cert_stream.data)
|
|
cert_refs[cert.issuer_serial] = cert_ref
|
|
|
|
ocsp_refs = get_and_apply(dss_dict, '/OCSPs', list, default=[])
|
|
ocsps = []
|
|
for ocsp_ref in ocsp_refs:
|
|
ocsp_stream: generic.StreamObject = ocsp_ref.get_object()
|
|
resp = asn1_ocsp.OCSPResponse.load(ocsp_stream.data)
|
|
ocsps.append(resp)
|
|
|
|
crl_refs = get_and_apply(dss_dict, '/CRLs', list, default=[])
|
|
crls = []
|
|
for crl_ref in crl_refs:
|
|
crl_stream: generic.StreamObject = crl_ref.get_object()
|
|
crl = asn1_crl.CertificateList.load(crl_stream.data)
|
|
crls.append(crl)
|
|
|
|
# shallow-copy the VRI dictionary
|
|
try:
|
|
vri_entries = dict(dss_dict['/VRI'])
|
|
except KeyError:
|
|
vri_entries = None
|
|
|
|
# if the handler is a writer, the DSS will support updates
|
|
if isinstance(handler, BasePdfFileWriter):
|
|
writer = handler
|
|
else:
|
|
writer = None
|
|
|
|
# the DSS returned will be backed by the original DSS object, so CRLs
|
|
# are automagically preserved if they happened to be included in
|
|
# the original file
|
|
dss = cls(
|
|
writer=writer,
|
|
certs=cert_refs,
|
|
ocsps=ocsp_refs,
|
|
vri_entries=vri_entries,
|
|
crls=crl_refs,
|
|
backing_pdf_object=dss_dict,
|
|
)
|
|
return dss
|
|
|
|
@classmethod
|
|
def supply_dss_in_writer(
|
|
cls,
|
|
pdf_out: BasePdfFileWriter,
|
|
sig_contents,
|
|
*,
|
|
certs=None,
|
|
ocsps=None,
|
|
crls=None,
|
|
paths=None,
|
|
validation_context=None,
|
|
embed_roots: bool = True,
|
|
) -> 'DocumentSecurityStore':
|
|
"""
|
|
Add or update a DSS, and optionally associate the new information with a
|
|
VRI entry tied to a signature object.
|
|
|
|
You can either specify the CMS objects to include directly, or
|
|
pass them in as output from `pyhanko_certvalidator`.
|
|
|
|
:param pdf_out:
|
|
PDF writer to write to.
|
|
:param sig_contents:
|
|
Contents of the new signature (used to compute the VRI hash), as
|
|
a hexadecimal string, including any padding.
|
|
If ``None``, the information will not be added to any VRI
|
|
dictionary.
|
|
:param certs:
|
|
Certificates to include in the VRI entry.
|
|
:param ocsps:
|
|
OCSP responses to include in the VRI entry.
|
|
:param crls:
|
|
CRLs to include in the VRI entry.
|
|
:param paths:
|
|
Validation paths that have been established, and need to be added
|
|
to the DSS.
|
|
:param validation_context:
|
|
Validation context from which to draw OCSP responses and CRLs.
|
|
:param embed_roots:
|
|
.. versionadded:: 0.9.0
|
|
|
|
Option that controls whether the root certificate of each validation
|
|
path should be embedded into the DSS. The default is ``True``.
|
|
|
|
.. note::
|
|
Trust roots are configured by the validator, so embedding them
|
|
typically does nothing in a typical validation process.
|
|
Therefore they can be safely omitted in most cases.
|
|
Nonetheless, embedding the roots can be useful for documentation
|
|
purposes.
|
|
|
|
.. warning::
|
|
This only applies to paths, not the ``certs`` parameter.
|
|
|
|
:return:
|
|
a :class:`DocumentSecurityStore` object containing both the new
|
|
and existing contents of the DSS (if any).
|
|
"""
|
|
try:
|
|
dss = cls.read_dss(pdf_out)
|
|
created = False
|
|
except ValidationInfoReadingError:
|
|
created = True
|
|
dss = cls(writer=pdf_out)
|
|
|
|
if sig_contents is not None:
|
|
identifier = DocumentSecurityStore.sig_content_identifier(
|
|
sig_contents
|
|
)
|
|
else:
|
|
identifier = None
|
|
|
|
def _certs() -> Iterable[x509.Certificate]:
|
|
yield from certs or ()
|
|
path: ValidationPath
|
|
for path in paths or ():
|
|
path_parts = iter(path)
|
|
if not embed_roots:
|
|
# skip the first cert (i.e. the root)
|
|
next(path_parts)
|
|
yield from path_parts
|
|
|
|
def _ocsps():
|
|
yield from ocsps or ()
|
|
if validation_context is not None:
|
|
yield from validation_context.ocsps
|
|
|
|
def _crls():
|
|
yield from crls or ()
|
|
if validation_context is not None:
|
|
yield from validation_context.crls
|
|
|
|
dss.register_vri(
|
|
identifier, certs=_certs(), ocsps=_ocsps(), crls=_crls()
|
|
)
|
|
dss_dict = dss.as_pdf_object()
|
|
# if we're updating the DSS, this is all we need to do.
|
|
# if we're adding a fresh DSS, we need to register it.
|
|
|
|
if created:
|
|
dss_ref = pdf_out.add_object(dss_dict)
|
|
pdf_out.root[pdf_name('/DSS')] = dss_ref
|
|
pdf_out.update_root()
|
|
return dss
|
|
|
|
@classmethod
|
|
def add_dss(
|
|
cls,
|
|
output_stream,
|
|
sig_contents,
|
|
*,
|
|
certs=None,
|
|
ocsps=None,
|
|
crls=None,
|
|
paths=None,
|
|
validation_context=None,
|
|
force_write: bool = False,
|
|
embed_roots: bool = True,
|
|
file_credential: Optional[SerialisedCredential] = None,
|
|
):
|
|
"""
|
|
Wrapper around :meth:`supply_dss_in_writer`.
|
|
|
|
The result is applied to the output stream as an incremental update.
|
|
|
|
:param output_stream:
|
|
Output stream to write to.
|
|
:param sig_contents:
|
|
Contents of the new signature (used to compute the VRI hash), as
|
|
a hexadecimal string, including any padding.
|
|
If ``None``, the information will not be added to any VRI
|
|
dictionary.
|
|
:param certs:
|
|
Certificates to include in the VRI entry.
|
|
:param ocsps:
|
|
OCSP responses to include in the VRI entry.
|
|
:param crls:
|
|
CRLs to include in the VRI entry.
|
|
:param paths:
|
|
Validation paths that have been established, and need to be added
|
|
to the DSS.
|
|
:param force_write:
|
|
Force a write even if the DSS doesn't have any new content.
|
|
:param validation_context:
|
|
Validation context from which to draw OCSP responses and CRLs.
|
|
:param embed_roots:
|
|
.. versionadded:: 0.9.0
|
|
|
|
Option that controls whether the root certificate of each validation
|
|
path should be embedded into the DSS. The default is ``True``.
|
|
|
|
.. note::
|
|
Trust roots are configured by the validator, so embedding them
|
|
typically does nothing in a typical validation process.
|
|
Therefore they can be safely omitted in most cases.
|
|
Nonetheless, embedding the roots can be useful for documentation
|
|
purposes.
|
|
|
|
.. warning::
|
|
This only applies to paths, not the ``certs`` parameter.
|
|
:param file_credential:
|
|
.. versionadded:: 0.13.0
|
|
|
|
Serialised file credential, to update encrypted files.
|
|
"""
|
|
pdf_out = IncrementalPdfFileWriter(output_stream)
|
|
if pdf_out.security_handler is not None and file_credential is not None:
|
|
pdf_out.security_handler.authenticate(file_credential)
|
|
dss = cls.supply_dss_in_writer(
|
|
pdf_out,
|
|
sig_contents,
|
|
certs=certs,
|
|
ocsps=ocsps,
|
|
crls=crls,
|
|
paths=paths,
|
|
validation_context=validation_context,
|
|
embed_roots=embed_roots,
|
|
)
|
|
if force_write or dss.modified:
|
|
pdf_out.write_in_place()
|
|
|
|
|
|
async def collect_validation_info(
|
|
embedded_sig: EmbeddedPdfSignature,
|
|
validation_context: ValidationContext,
|
|
skip_timestamp=False,
|
|
):
|
|
"""
|
|
Query revocation info for a PDF signature using a validation context,
|
|
and store the results in a validation context.
|
|
|
|
This works by validating the signer's certificate against the provided
|
|
validation context, which causes revocation info to be cached for
|
|
later retrieval.
|
|
|
|
.. warning::
|
|
This function does *not* actually validate the signature, but merely
|
|
checks the signer certificate's chain of trust.
|
|
|
|
:param embedded_sig:
|
|
Embedded PDF signature to operate on.
|
|
:param validation_context:
|
|
Validation context to use.
|
|
:param skip_timestamp:
|
|
If the signature has a time stamp token attached to it, also collect
|
|
revocation information for the timestamp.
|
|
:return:
|
|
A list of validation paths.
|
|
"""
|
|
|
|
revinfo_fetch_policy = (
|
|
validation_context.revinfo_policy.revocation_checking_policy
|
|
)
|
|
if not revinfo_fetch_policy.essential:
|
|
logger.warning(
|
|
"Revocation mode is set to soft-fail/tolerant mode; collected "
|
|
"revocation information may be incomplete."
|
|
)
|
|
|
|
paths = []
|
|
|
|
async def _validate_signed_data(signed_data):
|
|
cert_info = extract_certificate_info(signed_data)
|
|
cert = cert_info.signer_cert
|
|
other_certs = cert_info.other_certs
|
|
|
|
validator = CertificateValidator(
|
|
cert,
|
|
intermediate_certs=other_certs,
|
|
validation_context=validation_context,
|
|
)
|
|
path = await validator.async_validate_usage(key_usage=set())
|
|
paths.append(path)
|
|
|
|
await _validate_signed_data(embedded_sig.signed_data)
|
|
if not skip_timestamp and embedded_sig.attached_timestamp_data is not None:
|
|
await _validate_signed_data(embedded_sig.attached_timestamp_data)
|
|
|
|
return paths
|
|
|
|
|
|
async def async_add_validation_info(
|
|
embedded_sig: EmbeddedPdfSignature,
|
|
validation_context: ValidationContext,
|
|
skip_timestamp=False,
|
|
add_vri_entry=True,
|
|
in_place=False,
|
|
output=None,
|
|
force_write=False,
|
|
chunk_size=misc.DEFAULT_CHUNK_SIZE,
|
|
embed_roots: bool = True,
|
|
):
|
|
"""
|
|
.. versionadded: 0.9.0
|
|
|
|
Add validation info (CRLs, OCSP responses, extra certificates) for a
|
|
signature to the DSS of a document in an incremental update.
|
|
This is a wrapper around :func:`collect_validation_info`.
|
|
|
|
:param embedded_sig:
|
|
The signature for which the revocation information needs to be
|
|
collected.
|
|
:param validation_context:
|
|
The validation context to use.
|
|
:param skip_timestamp:
|
|
If ``True``, do not attempt to validate the timestamp attached to
|
|
the signature, if one is present.
|
|
:param add_vri_entry:
|
|
Add a ``/VRI`` entry for this signature to the document security store.
|
|
Default is ``True``.
|
|
:param output:
|
|
Write the output to the specified output stream.
|
|
If ``None``, write to a new :class:`.BytesIO` object.
|
|
Default is ``None``.
|
|
:param in_place:
|
|
Sign the original input stream in-place.
|
|
This parameter overrides ``output``.
|
|
:param chunk_size:
|
|
Chunk size parameter to use when copying output to a new stream
|
|
(irrelevant if ``in_place`` is ``True``).
|
|
:param force_write:
|
|
Force a new revision to be written, even if not necessary (i.e.
|
|
when all data in the validation context is already present in the DSS).
|
|
:param embed_roots:
|
|
Option that controls whether the root certificate of each validation
|
|
path should be embedded into the DSS. The default is ``True``.
|
|
|
|
.. note::
|
|
Trust roots are configured by the validator, so embedding them
|
|
typically does nothing in a typical validation process.
|
|
Therefore they can be safely omitted in most cases.
|
|
Nonetheless, embedding the roots can be useful for documentation
|
|
purposes.
|
|
:return:
|
|
The (file-like) output object to which the result was written.
|
|
"""
|
|
|
|
reader: PdfFileReader = embedded_sig.reader
|
|
# Take care of this first, so we get any errors re: stream properties
|
|
# out of the way before doing the (potentially) expensive validation
|
|
# operations
|
|
if in_place:
|
|
working_output = output = reader.stream
|
|
# Note: random-access I/O is checked at this point.
|
|
# At the time of writing, this means that all this assertion does is
|
|
# raise an error if the output is not writable.
|
|
# We put it in for forwards compatibility & consistency with future
|
|
# changes to I/O internals.
|
|
misc.assert_writable_and_random_access(output)
|
|
else:
|
|
working_output = misc.prepare_rw_output_stream(output)
|
|
|
|
paths = await collect_validation_info(
|
|
embedded_sig, validation_context, skip_timestamp=skip_timestamp
|
|
)
|
|
|
|
if add_vri_entry:
|
|
sig_contents = embedded_sig.pkcs7_content.hex().encode('ascii')
|
|
else:
|
|
sig_contents = None
|
|
|
|
pdf_out = IncrementalPdfFileWriter.from_reader(reader)
|
|
pdf_out.IO_CHUNK_SIZE = chunk_size
|
|
resulting_dss = DocumentSecurityStore.supply_dss_in_writer(
|
|
pdf_out,
|
|
sig_contents,
|
|
validation_context=validation_context,
|
|
paths=paths,
|
|
embed_roots=embed_roots,
|
|
)
|
|
if force_write or resulting_dss.modified:
|
|
if in_place:
|
|
pdf_out.write_in_place()
|
|
else:
|
|
pdf_out.write(working_output)
|
|
elif not in_place:
|
|
# not in place, but we don't have anything to add -> copy the input
|
|
# buffer
|
|
reader.stream.seek(0)
|
|
misc.chunked_write(bytearray(chunk_size), reader.stream, working_output)
|
|
return misc.finalise_output(output, working_output)
|