991 lines
35 KiB
Python

import logging
import os
from collections import namedtuple
from datetime import datetime
from typing import List, Optional, Union
from asn1crypto import cms, x509
from pyhanko_certvalidator import ValidationContext
from pyhanko_certvalidator.path import ValidationPath
from pyhanko.pdf_utils import generic, misc
from pyhanko.pdf_utils.generic import pdf_name
from pyhanko.pdf_utils.reader import PdfFileReader, process_data_at_eof
from pyhanko.sign.diff_analysis import (
DEFAULT_DIFF_POLICY,
DiffPolicy,
DiffResult,
ModificationLevel,
SuspiciousModification,
)
from pyhanko.sign.fields import (
FieldMDPSpec,
MDPPerm,
SeedLockDocument,
SigSeedSubFilter,
SigSeedValFlags,
SigSeedValueSpec,
)
from pyhanko.sign.general import (
SignedDataCerts,
UnacceptableSignerError,
byte_range_digest,
extract_signer_info,
)
from .errors import (
SignatureValidationError,
SigSeedValueValidationError,
ValidationInfoReadingError,
)
from .generic_cms import (
cms_basic_validation,
collect_signer_attr_status,
collect_timing_info,
compute_signature_tst_digest,
extract_certs_for_validation,
extract_self_reported_ts,
extract_tst_data,
validate_tst_signed_data,
)
from .settings import KeyUsageConstraints
from .status import (
DocumentTimestampStatus,
PdfSignatureStatus,
SignatureCoverageLevel,
)
from .utils import CMSAlgorithmUsagePolicy
__all__ = [
'EmbeddedPdfSignature',
'DocMDPInfo',
'read_certification_data',
'async_validate_pdf_signature',
'async_validate_pdf_timestamp',
'report_seed_value_validation',
'extract_contents',
]
logger = logging.getLogger(__name__)
def _extract_reference_dict(
signature_obj, method
) -> Optional[generic.DictionaryObject]:
try:
sig_refs = signature_obj['/Reference']
except KeyError:
return None
for ref in sig_refs:
ref = ref.get_object()
if ref['/TransformMethod'] == method:
return ref
return None
def _extract_docmdp_for_sig(signature_obj) -> Optional[MDPPerm]:
ref = _extract_reference_dict(signature_obj, '/DocMDP')
if ref is None:
return None
try:
raw_perms = ref['/TransformParams'].raw_get('/P')
return MDPPerm(raw_perms)
except (ValueError, KeyError) as e: # pragma: nocover
raise SignatureValidationError(
"Failed to read document permissions"
) from e
def extract_contents(sig_object: generic.DictionaryObject) -> bytes:
"""
Internal function to extract the (DER-encoded) signature bytes from a PDF
signature dictionary.
:param sig_object:
A signature dictionary.
:return:
The extracted contents as a byte string.
"""
try:
cms_content = sig_object.raw_get(
'/Contents', decrypt=generic.EncryptedObjAccess.RAW
)
except KeyError:
raise misc.PdfReadError('Could not read /Contents entry in signature')
if not isinstance(
cms_content, (generic.TextStringObject, generic.ByteStringObject)
):
raise misc.PdfReadError('/Contents must be string-like')
return cms_content.original_bytes
# TODO clarify in docs that "external timestamp" is always None when dealing
# with a /DocTimeStamp, since there the timestamp token is simply the entire
# signature object
class EmbeddedPdfSignature:
"""
Class modelling a signature embedded in a PDF document.
"""
sig_field: generic.DictionaryObject
"""
The field dictionary of the form field containing the signature.
"""
sig_object: generic.DictionaryObject
"""
The signature dictionary.
"""
signed_data: cms.SignedData
"""
CMS signed data in the signature.
"""
def __init__(
self,
reader: PdfFileReader,
sig_field: generic.DictionaryObject,
fq_name: str,
):
self.reader = reader
if isinstance(sig_field, generic.IndirectObject):
sig_field = sig_field.get_object()
self.sig_field = sig_field
sig_object_ref = sig_field.raw_get('/V')
self.sig_object = sig_object = sig_object_ref.get_object()
assert isinstance(sig_object, generic.DictionaryObject)
try:
self.byte_range = sig_object.raw_get('/ByteRange')
except KeyError:
raise misc.PdfReadError(
'Could not read /ByteRange entry in signature'
)
self.pkcs7_content = cms_content = extract_contents(sig_object)
message = cms.ContentInfo.load(cms_content)
signed_data = message['content']
self.signed_data: cms.SignedData = signed_data
self.signer_info = extract_signer_info(signed_data)
self._sd_cert_info: Optional[SignedDataCerts] = None
# The PDF standard does not define a way to specify the digest algorithm
# used other than this one.
# However, RFC 5652 § 11.2 states that the message_digest attribute
# (which in our case is the PDF's ByteRange digest) is to be computed
# using the signer's digest algorithm. This can only refer
# to the corresponding SignerInfo entry.
digest_algo = self.signer_info['digest_algorithm']
self.md_algorithm = digest_algo['algorithm'].native.lower()
eci = signed_data['encap_content_info']
content_type = eci['content_type'].native
if content_type == 'data':
# Case of a normal signature
self.external_md_algorithm = self.md_algorithm
elif content_type == 'tst_info':
# for timestamps, the hash algorithm in the messageImprint
# need not be the same as the one to digest the encapsulated data!
# RFC 8933 recommends to unify them, but it's not a given.
mi = eci['content'].parsed['message_imprint']
self.external_md_algorithm = mi['hash_algorithm'][
'algorithm'
].native
# grab the revision to which the signature applies
# NOTE: We're using get_last_change here as opposed to
# get_introducing_revision. The distinction won't be relevant in most
# legitimate use cases, but get_last_change is more likely to be correct
# in cases where the signature obj was created by overriding an existing
# object (which is weird, but technically possible, I guess).
# Important note: the coverage checker will validate whether the
# xref table for that revision is actually covered by the signature,
# and raise the alarm if that's not the case.
# Therefore shenanigans with updating signature objects will be detected
# even before the diff checker runs.
self.signed_revision = self.reader.xrefs.get_last_change(
sig_object_ref.reference
)
self.coverage = None
self.external_digest: Optional[bytes] = None
self.total_len: Optional[int] = None
self._docmdp: Optional[MDPPerm] = None
self._fieldmdp: Optional[FieldMDPSpec] = None
self._docmdp_queried = self._fieldmdp_queried = False
self.tst_signature_digest: Optional[bytes] = None
self.diff_result = None
self._integrity_checked = False
self.fq_name = fq_name
def _init_cert_info(self) -> SignedDataCerts:
if self._sd_cert_info is None:
self._sd_cert_info = extract_certs_for_validation(self.signed_data)
return self._sd_cert_info
@property
def embedded_attr_certs(self) -> List[cms.AttributeCertificateV2]:
"""
Embedded attribute certificates.
"""
return list(self._init_cert_info().attribute_certs)
@property
def other_embedded_certs(self) -> List[x509.Certificate]:
"""
Embedded X.509 certificates, excluding than that of the signer.
"""
return list(self._init_cert_info().other_certs)
@property
def signer_cert(self) -> x509.Certificate:
"""
Certificate of the signer.
"""
return self._init_cert_info().signer_cert
@property
def sig_object_type(self) -> generic.NameObject:
"""
Returns the type of the embedded signature object.
For ordinary signatures, this will be ``/Sig``.
In the case of a document timestamp, ``/DocTimeStamp`` is returned.
:return:
A PDF name object describing the type of signature.
"""
return self.sig_object.get('/Type', pdf_name('/Sig'))
@property
def field_name(self):
"""
:return:
Name of the signature field.
"""
return self.fq_name
@property
def self_reported_timestamp(self) -> Optional[datetime]:
"""
:return:
The signing time as reported by the signer, if embedded in the
signature's signed attributes or provided as part of the signature
object in the PDF document.
"""
ts = extract_self_reported_ts(self.signer_info)
if ts is not None:
return ts
try:
st_as_pdf_date = self.sig_object['/M']
return generic.parse_pdf_date(
st_as_pdf_date, strict=self.reader.strict
)
except KeyError: # pragma: nocover
return None
@property
def attached_timestamp_data(self) -> Optional[cms.SignedData]:
"""
:return:
The signed data component of the timestamp token embedded in this
signature, if present.
"""
return extract_tst_data(self.signer_info)
def compute_integrity_info(self, diff_policy=None, skip_diff=False):
"""
Compute the various integrity indicators of this signature.
:param diff_policy:
Policy to evaluate potential incremental updates that were appended
to the signed revision of the document.
Defaults to
:const:`~pyhanko.sign.diff_analysis.DEFAULT_DIFF_POLICY`.
:param skip_diff:
If ``True``, skip the difference analysis step entirely.
"""
self._enforce_hybrid_xref_policy()
self.compute_digest()
self.compute_tst_digest()
# TODO in scenarios where we have to verify multiple signatures, we're
# doing a lot of double work here. This could be improved.
self.coverage = self.evaluate_signature_coverage()
diff_policy = diff_policy or DEFAULT_DIFF_POLICY
if not skip_diff:
self.diff_result = self.evaluate_modifications(diff_policy)
self._integrity_checked = True
def summarise_integrity_info(self) -> dict:
"""
Compile the integrity information for this signature into a dictionary
that can later be passed to :class:`.PdfSignatureStatus` as kwargs.
This method is only available after calling
:meth:`.EmbeddedPdfSignature.compute_integrity_info`.
"""
if not self._integrity_checked:
raise SignatureValidationError(
"Call compute_integrity_info() before invoking"
"summarise_integrity_info()"
) # pragma: nocover
docmdp = self.docmdp_level
diff_result = self.diff_result
coverage = self.coverage
docmdp_ok = None
# attempt to set docmdp_ok based on the diff analysis results
if diff_result is not None:
mod_level = (
diff_result.modification_level
if isinstance(diff_result, DiffResult)
else ModificationLevel.OTHER
)
docmdp_ok = not (
mod_level == ModificationLevel.OTHER
or (docmdp is not None and mod_level.value > docmdp.value)
)
elif coverage != SignatureCoverageLevel.ENTIRE_REVISION:
# if the diff analysis didn't run, we can still do something
# meaningful if coverage is not ENTIRE_REVISION:
# - if the signature covers the entire file, we're good.
# - if the coverage level is anything else, not so much
docmdp_ok = coverage == SignatureCoverageLevel.ENTIRE_FILE
status_kwargs = {
'coverage': coverage,
'docmdp_ok': docmdp_ok,
'diff_result': diff_result,
}
return status_kwargs
@property
def seed_value_spec(self) -> Optional[SigSeedValueSpec]:
try:
sig_sv_dict = self.sig_field['/SV']
except KeyError:
return None
return SigSeedValueSpec.from_pdf_object(sig_sv_dict)
@property
def docmdp_level(self) -> Optional[MDPPerm]:
"""
:return:
The document modification policy required by this signature or
its Lock dictionary.
.. warning::
This does not take into account the DocMDP requirements of
earlier signatures (if present).
The specification forbids signing with a more lenient DocMDP
than the one currently in force, so this should not happen
in a compliant document.
That being said, any potential violations will still invalidate
the earlier signature with the stricter DocMDP policy.
"""
if self._docmdp_queried:
return self._docmdp
docmdp = _extract_docmdp_for_sig(signature_obj=self.sig_object)
if docmdp is None:
try:
lock_dict = self.sig_field['/Lock']
docmdp = MDPPerm(lock_dict['/P'])
except KeyError:
pass
self._docmdp = docmdp
self._docmdp_queried = True
return docmdp
@property
def fieldmdp(self) -> Optional[FieldMDPSpec]:
"""
:return:
Read the field locking policy of this signature, if applicable.
See also :class:`~.pyhanko.sign.fields.FieldMDPSpec`.
"""
# TODO as above, fall back to /Lock
if self._fieldmdp_queried:
return self._fieldmdp
ref_dict = _extract_reference_dict(self.sig_object, '/FieldMDP')
self._fieldmdp_queried = True
if ref_dict is None:
return None
try:
sp = FieldMDPSpec.from_pdf_object(ref_dict['/TransformParams'])
except (ValueError, KeyError) as e: # pragma: nocover
raise SignatureValidationError(
"Failed to read /FieldMDP settings"
) from e
self._fieldmdp = sp
return sp
def compute_digest(self) -> bytes:
"""
Compute the ``/ByteRange`` digest of this signature.
The result will be cached.
:return:
The digest value.
"""
if self.external_digest is not None:
return self.external_digest
self.total_len, digest = byte_range_digest(
self.reader.stream,
byte_range=self.byte_range,
md_algorithm=self.external_md_algorithm,
)
self.external_digest = digest
return digest
def compute_tst_digest(self) -> Optional[bytes]:
"""
Compute the digest of the signature needed to validate its timestamp
token (if present).
.. warning::
This computation is only relevant for timestamp tokens embedded
inside a regular signature.
If the signature in question is a document timestamp (where the
entire signature object is a timestamp token), this method
does not apply.
:return:
The digest value, or ``None`` if there is no timestamp token.
"""
if self.tst_signature_digest is not None:
return self.tst_signature_digest
self.tst_signature_digest = digest = compute_signature_tst_digest(
self.signer_info
)
return digest
def evaluate_signature_coverage(self) -> SignatureCoverageLevel:
"""
Internal method used to evaluate the coverage level of a signature.
:return:
The coverage level of the signature.
"""
xref_cache = self.reader.xrefs
# for the coverage check, we're more strict with regards to the byte
# range
stream = self.reader.stream
# nonstandard byte range -> insta-fail
if len(self.byte_range) != 4 or self.byte_range[0] != 0:
return SignatureCoverageLevel.UNCLEAR
_, len1, start2, len2 = self.byte_range
# first check: check if the signature covers the entire file.
# (from a cryptographic point of view)
# In this case, there are no changes at all, so we're good.
# compute file size
stream.seek(0, os.SEEK_END)
# the * 2 is because of the ASCII hex encoding, and the + 2
# is the wrapping <>
embedded_sig_content = len(self.pkcs7_content) * 2 + 2
signed_zone_len = len1 + len2 + embedded_sig_content
file_covered = stream.tell() == signed_zone_len
if file_covered:
return SignatureCoverageLevel.ENTIRE_FILE
# Now we're in the mixed case: the byte range is a standard one
# starting at the beginning of the document, but it doesn't go all
# the way to the end of the file. This can be for legitimate reasons,
# not all of which we can evaluate right now.
# First, check if the signature is a contiguous one.
# In other words, we need to check if the interruption in the byte
# range is "fully explained" by the signature content.
contiguous = start2 == len1 + embedded_sig_content
if not contiguous:
return SignatureCoverageLevel.UNCLEAR
# next, we verify that the revision this signature belongs to
# is completely covered. This requires a few separate checks.
# (1) Verify that the xref container (table or stream) is covered
# (2) Verify the presence of the EOF and startxref markers at the
# end of the signed region, and compare them with the values
# in the xref cache to make sure we are reading the right revision.
# Check (2) first, since it's the quickest
stream.seek(signed_zone_len)
signed_rev = self.signed_revision
try:
startxref = process_data_at_eof(stream)
expected = xref_cache.get_startxref_for_revision(signed_rev)
if startxref != expected:
return SignatureCoverageLevel.CONTIGUOUS_BLOCK_FROM_START
except misc.PdfReadError:
return SignatureCoverageLevel.CONTIGUOUS_BLOCK_FROM_START
# ... then check (1) for all revisions up to and including
# signed_revision
for revision in range(signed_rev + 1):
xref_meta = xref_cache.get_xref_container_info(revision)
if xref_meta.end_location > signed_zone_len:
return SignatureCoverageLevel.CONTIGUOUS_BLOCK_FROM_START
return SignatureCoverageLevel.ENTIRE_REVISION
def _enforce_hybrid_xref_policy(self):
reader = self.reader
if reader.strict and reader.xrefs.hybrid_xrefs_present:
raise SignatureValidationError(
"Settings do not permit validation of signatures in "
"hybrid-reference files."
)
def evaluate_modifications(
self, diff_policy: DiffPolicy
) -> Union[DiffResult, SuspiciousModification]:
"""
Internal method used to evaluate the modification level of a signature.
"""
if self.coverage < SignatureCoverageLevel.ENTIRE_REVISION:
return SuspiciousModification(
'Nonstandard signature coverage level'
)
elif self.coverage == SignatureCoverageLevel.ENTIRE_FILE:
return DiffResult(ModificationLevel.NONE, set())
return diff_policy.review_file(
self.reader,
self.signed_revision,
field_mdp_spec=self.fieldmdp,
doc_mdp=self.docmdp_level,
)
DocMDPInfo = namedtuple('DocMDPInfo', ['permission', 'author_sig'])
"""
Encodes certification information for a signed document, consisting of a
reference to the author signature, together with the associated DocMDP policy.
"""
def read_certification_data(reader: PdfFileReader) -> Optional[DocMDPInfo]:
"""
Read the certification information for a PDF document, if present.
:param reader:
Reader representing the input document.
:return:
A :class:`.DocMDPInfo` object containing the relevant data, or ``None``.
"""
try:
certification_sig = reader.root['/Perms']['/DocMDP']
except KeyError:
return None
perm = _extract_docmdp_for_sig(certification_sig)
return DocMDPInfo(perm, certification_sig)
def _validate_sv_constraints(
emb_sig: EmbeddedPdfSignature, validation_path, timestamp_found
):
sv_spec = emb_sig.seed_value_spec
if sv_spec is None:
return
signing_cert = emb_sig.signer_cert
if sv_spec.cert is not None:
try:
sv_spec.cert.satisfied_by(signing_cert, validation_path)
except UnacceptableSignerError as e:
raise SigSeedValueValidationError(e) from e
if not timestamp_found and sv_spec.timestamp_required:
raise SigSeedValueValidationError(
"The seed value dictionary requires a trusted timestamp, but "
"none was found, or the timestamp did not validate."
)
sig_obj = emb_sig.sig_object
if sv_spec.seed_signature_type is not None:
sv_certify = sv_spec.seed_signature_type.certification_signature()
try:
perms: generic.DictionaryObject = emb_sig.reader.root['/Perms']
cert_sig_ref = perms.get_value_as_reference('/DocMDP')
was_certified = cert_sig_ref == sig_obj.container_ref
except (KeyError, generic.IndirectObjectExpected, AttributeError):
was_certified = False
if sv_certify != was_certified:
def _type(certify):
return 'a certification' if certify else 'an approval'
raise SigSeedValueValidationError(
"The seed value dictionary's /MDP entry specifies that "
f"this field should contain {_type(sv_certify)} "
f"signature, but {_type(was_certified)} "
"appears to have been used."
)
if sv_certify:
sv_mdp_perm = sv_spec.seed_signature_type.mdp_perm
doc_mdp = emb_sig.docmdp_level
if sv_mdp_perm != doc_mdp:
raise SigSeedValueValidationError(
"The seed value dictionary specified that this "
"certification signature should use the MDP policy "
f"'{sv_mdp_perm}', but '{doc_mdp}' was "
"used in the signature."
)
flags = sv_spec.flags
if not flags:
return
selected_sf_str = sig_obj['/SubFilter']
selected_sf = SigSeedSubFilter(selected_sf_str)
if (flags & SigSeedValFlags.SUBFILTER) and sv_spec.subfilters is not None:
# empty array = no supported subfilters
if not sv_spec.subfilters:
raise NotImplementedError(
"The signature encodings mandated by the seed value "
"dictionary are not supported."
)
# standard mandates that we take the first available subfilter
mandated_sf: SigSeedSubFilter = sv_spec.subfilters[0]
if selected_sf is not None and mandated_sf != selected_sf:
raise SigSeedValueValidationError(
"The seed value dictionary mandates subfilter '%s', "
"but '%s' was used in the signature."
% (mandated_sf.value, selected_sf.value)
)
if (
flags & SigSeedValFlags.APPEARANCE_FILTER
) and sv_spec.appearance is not None:
logger.warning(
"The signature's seed value dictionary specifies the "
"/AppearanceFilter entry as mandatory, but this constraint "
"is impossible to validate."
)
if (
flags & SigSeedValFlags.LEGAL_ATTESTATION
) and sv_spec.legal_attestations is not None:
raise NotImplementedError(
"pyHanko does not support legal attestations, but the seed value "
"dictionary mandates that they be restricted to a specific subset."
)
if (
flags & SigSeedValFlags.LOCK_DOCUMENT
) and sv_spec.lock_document is not None:
doc_mdp = emb_sig.docmdp_level
if (
sv_spec.lock_document == SeedLockDocument.LOCK
and doc_mdp != MDPPerm.NO_CHANGES
):
raise SigSeedValueValidationError(
"Document must be locked, but some changes are still allowed."
)
if (
sv_spec.lock_document == SeedLockDocument.DO_NOT_LOCK
and doc_mdp == MDPPerm.NO_CHANGES
):
raise SigSeedValueValidationError(
"Document must not be locked, but the DocMDP level is set to "
"NO_CHANGES."
)
# value 'auto' is OK.
signer_info = emb_sig.signer_info
if (
flags & SigSeedValFlags.ADD_REV_INFO
) and sv_spec.add_rev_info is not None:
from pyhanko.sign.validation.ltv import retrieve_adobe_revocation_info
try:
retrieve_adobe_revocation_info(signer_info)
revinfo_found = True
except ValidationInfoReadingError:
revinfo_found = False
if sv_spec.add_rev_info != revinfo_found:
raise SigSeedValueValidationError(
"The seed value dict mandates that revocation info %sbe "
"added, but it was %sfound in the signature."
% (
"" if sv_spec.add_rev_info else "not ",
"" if revinfo_found else "not ",
)
)
if (
sv_spec.add_rev_info
and selected_sf != SigSeedSubFilter.ADOBE_PKCS7_DETACHED
):
raise SigSeedValueValidationError(
"The seed value dict mandates that Adobe-style revocation "
"info be added; this requires subfilter '%s'"
% (SigSeedSubFilter.ADOBE_PKCS7_DETACHED.value)
)
if (
flags & SigSeedValFlags.DIGEST_METHOD
) and sv_spec.digest_methods is not None:
selected_md = emb_sig.md_algorithm.lower()
if selected_md not in sv_spec.digest_methods:
raise SigSeedValueValidationError(
"The selected message digest %s is not allowed by the "
"seed value dictionary." % selected_md
)
if flags & SigSeedValFlags.REASONS:
# standard says that omission of the /Reasons key amounts to
# a prohibition in this case
reasons = sv_spec.reasons or []
must_omit = not reasons or reasons == ["."]
reason_given = sig_obj.get('/Reason')
if must_omit and reason_given is not None:
raise SigSeedValueValidationError(
"The seed value dictionary prohibits giving a reason "
"for signing."
)
if not must_omit and reason_given not in reasons:
raise SigSeedValueValidationError(
"The reason for signing \"%s\" is not accepted by the "
"seed value dictionary." % (reason_given,)
)
def report_seed_value_validation(
embedded_sig: EmbeddedPdfSignature,
validation_path: ValidationPath,
timestamp_found: bool,
):
"""
Internal API function to enforce seed value constraints (if present)
and report on the result(s).
:param embedded_sig:
The embedded signature.
:param validation_path:
The validation path for the signer's certificate.
:param timestamp_found:
Flag indicating whether a valid timestamp was found or not.
:return:
A ``status_kwargs`` dict.
"""
sv_err: Optional[SigSeedValueValidationError]
try:
_validate_sv_constraints(
embedded_sig, validation_path, timestamp_found=timestamp_found
)
sv_err = None
except SigSeedValueValidationError as e:
logger.warning("Error in seed value validation.", exc_info=e)
sv_err = e
return {
'has_seed_values': embedded_sig.seed_value_spec is not None,
'seed_value_constraint_error': sv_err,
}
def _validate_subfilter(subfilter_str, permitted_subfilters, err_msg):
try:
from pyhanko.sign.fields import SigSeedSubFilter
subfilter_ok = SigSeedSubFilter(subfilter_str) in permitted_subfilters
except ValueError:
subfilter_ok = False
if not subfilter_ok:
raise SignatureValidationError(err_msg % subfilter_str)
async def async_validate_pdf_signature(
embedded_sig: EmbeddedPdfSignature,
signer_validation_context: Optional[ValidationContext] = None,
ts_validation_context: Optional[ValidationContext] = None,
ac_validation_context: Optional[ValidationContext] = None,
diff_policy: Optional[DiffPolicy] = None,
key_usage_settings: Optional[KeyUsageConstraints] = None,
skip_diff: bool = False,
algorithm_policy: Optional[CMSAlgorithmUsagePolicy] = None,
) -> PdfSignatureStatus:
"""
.. versionadded:: 0.9.0
.. versionchanged: 0.11.0
Added ``ac_validation_context`` param.
Validate a PDF signature.
:param embedded_sig:
Embedded signature to evaluate.
:param signer_validation_context:
Validation context to use to validate the signature's chain of trust.
:param ts_validation_context:
Validation context to use to validate the timestamp's chain of trust
(defaults to ``signer_validation_context``).
:param ac_validation_context:
Validation context to use to validate attribute certificates.
If not supplied, no AC validation will be performed.
.. note::
:rfc:`5755` requires attribute authority trust roots to be specified
explicitly; hence why there's no default.
:param diff_policy:
Policy to evaluate potential incremental updates that were appended
to the signed revision of the document.
Defaults to
:const:`~pyhanko.sign.diff_analysis.DEFAULT_DIFF_POLICY`.
:param key_usage_settings:
A :class:`.KeyUsageConstraints` object specifying which key usages
must or must not be present in the signer's certificate.
:param skip_diff:
If ``True``, skip the difference analysis step entirely.
:param algorithm_policy:
The algorithm usage policy for the signature validation.
.. warning::
This is distinct from the algorithm usage policy used for
certificate validation, but the latter will be used as a fallback
if this parameter is not specified.
It is nonetheless recommended to align both policies unless
there is a clear reason to do otherwise.
:return:
The status of the PDF signature in question.
"""
sig_object = embedded_sig.sig_object
if embedded_sig.sig_object_type != '/Sig':
raise SignatureValidationError("Signature object type must be /Sig")
# check whether the subfilter type is one we support
subfilter_str = sig_object.get('/SubFilter', None)
_validate_subfilter(
subfilter_str,
(SigSeedSubFilter.ADOBE_PKCS7_DETACHED, SigSeedSubFilter.PADES),
"%s is not a recognized SubFilter type in signatures.",
)
if ts_validation_context is None:
ts_validation_context = signer_validation_context
embedded_sig.compute_integrity_info(
diff_policy=diff_policy, skip_diff=skip_diff
)
status_kwargs = embedded_sig.summarise_integrity_info()
ts_status_kwargs = await collect_timing_info(
embedded_sig.signer_info,
ts_validation_context,
raw_digest=embedded_sig.compute_digest(),
)
status_kwargs.update(ts_status_kwargs)
if 'signer_reported_dt' not in status_kwargs:
# maybe the PDF signature dictionary declares /M
signer_reported_dt = embedded_sig.self_reported_timestamp
if signer_reported_dt is not None:
status_kwargs['signer_reported_dt'] = signer_reported_dt
key_usage_settings = PdfSignatureStatus.default_usage_constraints(
key_usage_settings
)
status_kwargs = await cms_basic_validation(
embedded_sig.signed_data,
raw_digest=embedded_sig.external_digest,
validation_context=signer_validation_context,
status_kwargs=status_kwargs,
key_usage_settings=key_usage_settings,
algorithm_policy=algorithm_policy,
)
tst_validity = status_kwargs.get('timestamp_validity', None)
timestamp_found = (
tst_validity is not None and tst_validity.valid and tst_validity.trusted
)
sv_update = report_seed_value_validation(
embedded_sig, status_kwargs['validation_path'], timestamp_found
)
status_kwargs.update(sv_update)
if ac_validation_context is not None:
ac_validation_context.certificate_registry.register_multiple(
embedded_sig.other_embedded_certs
)
status_kwargs.update(
await collect_signer_attr_status(
sd_attr_certificates=embedded_sig.embedded_attr_certs,
signer_cert=embedded_sig.signer_cert,
validation_context=ac_validation_context,
sd_signed_attrs=embedded_sig.signer_info['signed_attrs'],
)
)
return PdfSignatureStatus(**status_kwargs)
async def async_validate_pdf_timestamp(
embedded_sig: EmbeddedPdfSignature,
validation_context: Optional[ValidationContext] = None,
diff_policy: Optional[DiffPolicy] = None,
skip_diff: bool = False,
) -> DocumentTimestampStatus:
"""
.. versionadded:: 0.9.0
Validate a PDF document timestamp.
:param embedded_sig:
Embedded signature to evaluate.
:param validation_context:
Validation context to use to validate the timestamp's chain of trust.
:param diff_policy:
Policy to evaluate potential incremental updates that were appended
to the signed revision of the document.
Defaults to
:const:`~pyhanko.sign.diff_analysis.DEFAULT_DIFF_POLICY`.
:param skip_diff:
If ``True``, skip the difference analysis step entirely.
:return:
The status of the PDF timestamp in question.
"""
if embedded_sig.sig_object_type != '/DocTimeStamp':
raise SignatureValidationError(
"Signature object type must be /DocTimeStamp"
)
# check whether the subfilter type is one we support
subfilter_str = embedded_sig.sig_object.get('/SubFilter', None)
_validate_subfilter(
subfilter_str,
(SigSeedSubFilter.ETSI_RFC3161,),
"%s is not a recognized SubFilter type for timestamps.",
)
embedded_sig.compute_integrity_info(
diff_policy=diff_policy, skip_diff=skip_diff
)
status_kwargs = await validate_tst_signed_data(
embedded_sig.signed_data,
validation_context,
embedded_sig.compute_digest(),
)
status_kwargs['coverage'] = embedded_sig.coverage
status_kwargs['diff_result'] = embedded_sig.diff_result
return DocumentTimestampStatus(**status_kwargs)