""" This module contains the low-level building blocks for dealing with bookkeeping around ``/ByteRange`` digests in PDF files. """ import binascii from dataclasses import dataclass from datetime import datetime from io import BytesIO from typing import IO, Optional, Union from asn1crypto import cms from cryptography.hazmat.primitives import hashes from pyhanko.pdf_utils import generic, misc from pyhanko.pdf_utils.generic import pdf_date, pdf_name, pdf_string from pyhanko.pdf_utils.incremental_writer import IncrementalPdfFileWriter from pyhanko.pdf_utils.writer import BasePdfFileWriter from pyhanko.sign.general import SigningError, get_pyca_cryptography_hash from ..fields import SigAuthType, SigSeedSubFilter from . import constants __all__ = [ # Serialisable object used to track placeholder locations, # part of PdfCMSEmbedder / PdfSigner protocol 'PreparedByteRangeDigest', # PDF-level signature containers 'PdfByteRangeDigest', 'PdfSignedData', 'SignatureObject', 'DocumentTimestamp', 'BuildProps', ] BYTE_RANGE_ARR_PLACE_HOLDER_LENGTH = 60 class SigByteRangeObject(generic.PdfObject): """ Internal class to handle the ``/ByteRange`` arrays themselves. """ def __init__(self): self._filled = False self._range_object_offset = None self.first_region_len = 0 self.second_region_offset = 0 self.second_region_len = 0 def fill_offsets(self, stream, sig_start, sig_end, eof): if self._filled: raise ValueError('Offsets already filled') # pragma: nocover if self._range_object_offset is None: raise ValueError( 'Could not determine where to write /ByteRange value' ) # pragma: nocover old_seek = stream.tell() self.first_region_len = sig_start self.second_region_offset = sig_end self.second_region_len = eof - sig_end # our ArrayObject is rigged to have fixed width # so we can just write over it stream.seek(self._range_object_offset) self.write_to_stream(stream, None) stream.seek(old_seek) self._filled = True def write_to_stream(self, stream, handler=None, container_ref=None): if self._range_object_offset is None: self._range_object_offset = stream.tell() stream.write(b"[]") stream.write(b" " * BYTE_RANGE_ARR_PLACE_HOLDER_LENGTH) else: string_repr = b"[%d %d %d %d]" % ( 0, self.first_region_len, self.second_region_offset, self.second_region_len, ) assert len(string_repr) <= BYTE_RANGE_ARR_PLACE_HOLDER_LENGTH + 2 stream.write(string_repr) class DERPlaceholder(generic.PdfObject): """ Internal class to handle placeholders for DER content. """ def __init__(self, bytes_reserved=None): self.value = b'0' * (bytes_reserved or 16 * 1024) self._offsets = None @property def offsets(self): if self._offsets is None: raise ValueError('No offsets available') # pragma: nocover return self._offsets # always ignore encryption key, since this is a placeholder def write_to_stream(self, stream, handler=None, container_ref=None): start = stream.tell() stream.write(b'<') stream.write(self.value) stream.write(b'>') end = stream.tell() if self._offsets is None: self._offsets = start, end @dataclass(frozen=True) class PreparedByteRangeDigest: """ .. versionadded:: 0.7.0 .. versionchanged:: 0.14.0 Removed ``md_algorithm`` attribute since it was unused. Bookkeeping class that contains the digest of a document that is about to be signed (or otherwise authenticated) based on said digest. It also keeps track of the region in the output stream that is omitted in the byte range. Instances of this class can easily be serialised, which allows for interrupting the signing process partway through. """ document_digest: bytes """ Digest of the document, computed over the appropriate ``/ByteRange``. """ reserved_region_start: int """ Start of the reserved region in the output stream that is not part of the ``/ByteRange``. """ reserved_region_end: int """ End of the reserved region in the output stream that is not part of the ``/ByteRange``. """ def fill_with_cms( self, output: IO, cms_data: Union[bytes, cms.ContentInfo] ): """ Write a DER-encoded CMS object to the reserved region indicated by :attr:`reserved_region_start` and :attr:`reserved_region_end` in the output stream. :param output: Output stream to use. Must be writable and seekable. :param cms_data: CMS object to write. Can be provided as an :class:`asn1crypto.cms.ContentInfo` object, or as raw DER-encoded bytes. :return: A :class:`bytes` object containing the contents that were written, plus any additional padding. """ if isinstance(cms_data, bytes): der_bytes = cms_data else: der_bytes = cms_data.dump() return self.fill_reserved_region(output, der_bytes) def fill_reserved_region(self, output: IO, content_bytes: bytes): """ Write hex-encoded contents to the reserved region indicated by :attr:`reserved_region_start` and :attr:`reserved_region_end` in the output stream. :param output: Output stream to use. Must be writable and seekable. :param content_bytes: Content bytes. These will be padded, hexadecimally encoded and written to the appropriate location in output stream. :return: A :class:`bytes` object containing the contents that were written, plus any additional padding. """ content_hex = binascii.hexlify(content_bytes).upper() start = self.reserved_region_start end = self.reserved_region_end # might as well compute this bytes_reserved = end - start - 2 length = len(content_hex) if length > bytes_reserved: raise SigningError( f"Final ByteRange payload larger than expected: " f"allocated {bytes_reserved} bytes, but contents " f"required {length} bytes." ) # pragma: nocover # +1 to skip the '<' output.seek(start + 1) # NOTE: the PDF spec is not completely clear on this, but # signature contents are NOT supposed to be encrypted. # Perhaps this falls under the "strings in encrypted containers" # denominator in ยง 7.6.1? # Addition: the PDF 2.0 spec *does* spell out that this content # is not to be encrypted. output.write(content_hex) output.seek(0) padding = bytes(bytes_reserved // 2 - len(content_bytes)) return content_bytes + padding class PdfByteRangeDigest(generic.DictionaryObject): """ General class to model a PDF Dictionary that has a ``/ByteRange`` entry and a another data entry (named ``/Contents`` by default) that will contain a value based on a digest computed over said ``/ByteRange``. The ``/ByteRange`` will cover the entire file, except for the value of the data entry itself. .. danger:: This is internal API. :param data_key: Name of the data key, which is ``/Contents`` by default. :param bytes_reserved: Number of bytes to reserve for the contents placeholder. If ``None``, a generous default is applied, but you should try to estimate the size as accurately as possible. """ def __init__(self, data_key=pdf_name('/Contents'), *, bytes_reserved=None): super().__init__() if bytes_reserved is not None and bytes_reserved % 2 == 1: raise ValueError('bytes_reserved must be even') self.data_key = data_key contents = DERPlaceholder(bytes_reserved=bytes_reserved) self[data_key] = self.contents = contents byte_range = SigByteRangeObject() self[pdf_name('/ByteRange')] = self.byte_range = byte_range def fill( self, writer: BasePdfFileWriter, md_algorithm, in_place=False, output=None, chunk_size=misc.DEFAULT_CHUNK_SIZE, ): """ Generator coroutine that handles the document hash computation and the actual filling of the placeholder data. .. danger:: This is internal API; you should use use :class:`.PdfSigner` wherever possible. If you *really* need fine-grained control, use :class:`~pyhanko.sign.signers.cms_embedder.PdfCMSEmbedder` instead. """ if in_place: if not isinstance(writer, IncrementalPdfFileWriter): raise TypeError( "in_place is only meaningful for incremental writers." ) # pragma: nocover output = writer.prev.stream writer.write_in_place() else: output = misc.prepare_rw_output_stream(output) writer.write(output) # retcon time: write the proper values of the /ByteRange entry # in the signature object eof = output.tell() sig_start, sig_end = self.contents.offsets self.byte_range.fill_offsets(output, sig_start, sig_end, eof) # compute the digests md_spec = get_pyca_cryptography_hash(md_algorithm) md = hashes.Hash(md_spec) # attempt to get a memoryview for automatic buffering output_buffer = None if isinstance(output, BytesIO): output_buffer = output.getbuffer() else: try: output_buffer = memoryview(output) except (TypeError, IOError): pass if output_buffer is not None: # these are memoryviews, so slices should not copy stuff around # (also, the interface files for pyca/cryptography don't specify # that memoryviews are allowed, but they are) # noinspection PyTypeChecker md.update(output_buffer[:sig_start]) # noinspection PyTypeChecker md.update(output_buffer[sig_end:eof]) output_buffer.release() else: temp_buffer = bytearray(chunk_size) output.seek(0) misc.chunked_digest(temp_buffer, output, md, max_read=sig_start) output.seek(sig_end) misc.chunked_digest(temp_buffer, output, md, max_read=eof - sig_end) digest_value = md.finalize() prepared_br_digest = PreparedByteRangeDigest( document_digest=digest_value, reserved_region_start=sig_start, reserved_region_end=sig_end, ) cms_data = yield prepared_br_digest, output yield prepared_br_digest.fill_with_cms(output, cms_data) class PdfSignedData(PdfByteRangeDigest): """ Generic class to model signature dictionaries in a PDF file. See also :class:`.SignatureObject` and :class:`.DocumentTimestamp`. :param obj_type: The type of signature object. :param subfilter: See :class:`.SigSeedSubFilter`. :param timestamp: The timestamp to embed into the ``/M`` entry. :param bytes_reserved: The number of bytes to reserve for the signature. Defaults to 16 KiB. .. warning:: Since the CMS object is written to the output file as a hexadecimal string, you should request **twice** the (estimated) number of bytes in the DER-encoded version of the CMS object. """ def __init__( self, obj_type, subfilter: SigSeedSubFilter = constants.DEFAULT_SIG_SUBFILTER, timestamp: Optional[datetime] = None, bytes_reserved=None, ): super().__init__(bytes_reserved=bytes_reserved) self.update( { pdf_name('/Type'): obj_type, pdf_name('/Filter'): pdf_name('/Adobe.PPKLite'), pdf_name('/SubFilter'): subfilter.value, } ) if timestamp is not None: self[pdf_name('/M')] = pdf_date(timestamp) @dataclass(frozen=True) class BuildProps: """ Entries in a signature build properties dictionary; see Adobe PDF Signature Build Dictionary Specification. """ name: str """ The application's name. """ revision: Optional[str] = None """ The application's revision ID string. .. note:: This corresponds to the **REx** entry in the build properties dictionary. """ def as_pdf_object(self) -> generic.DictionaryObject: """ Render the build properties as a PDF object. :return: A PDF dictionary. """ props = generic.DictionaryObject( {pdf_name("/Name"): pdf_name("/" + self.name)} ) if self.revision: props['/REx'] = generic.TextStringObject(self.revision) return props class SignatureObject(PdfSignedData): """ Class modelling a (placeholder for) a regular PDF signature. :param timestamp: The (optional) timestamp to embed into the ``/M`` entry. :param subfilter: See :class:`.SigSeedSubFilter`. :param bytes_reserved: The number of bytes to reserve for the signature. Defaults to 16 KiB. .. warning:: Since the CMS object is written to the output file as a hexadecimal string, you should request **twice** the (estimated) number of bytes in the DER-encoded version of the CMS object. :param name: Signer name. You probably want to leave this blank, viewers should default to the signer's subject name. :param location: Optional signing location. :param reason: Optional signing reason. May be restricted by seed values. :params contact_info: Optional information from the signer to enable the receiver to contact the signer and verify the signature. :param app_build_props: Optional dictionary containing informations about the computer environment used for signing. See :class:`.BuildProps`. :param prop_auth_time: Optional information representing the number of seconds since signer was last authenticated. :param prop_auth_type: Optional information about the method of user's authentication See :class:`.SigAuthType`. """ def __init__( self, timestamp: Optional[datetime] = None, subfilter: SigSeedSubFilter = constants.DEFAULT_SIG_SUBFILTER, name=None, location=None, reason=None, contact_info=None, app_build_props: Optional[BuildProps] = None, prop_auth_time: Optional[int] = None, prop_auth_type: Optional[SigAuthType] = None, bytes_reserved=None, ): super().__init__( obj_type=pdf_name('/Sig'), subfilter=subfilter, timestamp=timestamp, bytes_reserved=bytes_reserved, ) if name: self[pdf_name('/Name')] = pdf_string(name) if location: self[pdf_name('/Location')] = pdf_string(location) if reason: self[pdf_name('/Reason')] = pdf_string(reason) if contact_info: self[pdf_name('/ContactInfo')] = pdf_string(contact_info) if app_build_props: self[pdf_name('/Prop_Build')] = generic.DictionaryObject( {pdf_name("/App"): app_build_props.as_pdf_object()} ) if prop_auth_time: self[pdf_name('/Prop_AuthTime')] = generic.NumberObject( prop_auth_time ) if prop_auth_type: self[pdf_name('/Prop_AuthType')] = prop_auth_type.value class DocumentTimestamp(PdfSignedData): """ Class modelling a (placeholder for) a regular PDF signature. :param bytes_reserved: The number of bytes to reserve for the signature. Defaults to 16 KiB. .. warning:: Since the CMS object is written to the output file as a hexadecimal string, you should request **twice** the (estimated) number of bytes in the DER-encoded version of the CMS object. """ def __init__(self, bytes_reserved=None): super().__init__( obj_type=pdf_name('/DocTimeStamp'), subfilter=SigSeedSubFilter.ETSI_RFC3161, bytes_reserved=bytes_reserved, ) # use of Name/Location/Reason is discouraged in document timestamps by # PAdES, so we don't set those