1889 lines
67 KiB
Python
1889 lines
67 KiB
Python
"""
|
|
This module defines utility classes to format CMS objects for use in PDF
|
|
signatures.
|
|
"""
|
|
import asyncio
|
|
import logging
|
|
import warnings
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from typing import IO, Callable, Iterable, List, Optional, Union
|
|
|
|
import tzlocal
|
|
from asn1crypto import algos, cms, core, keys
|
|
from asn1crypto import pdf as asn1_pdf
|
|
from asn1crypto import x509
|
|
from asn1crypto.algos import SignedDigestAlgorithm
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
from cryptography.hazmat.primitives.asymmetric.dsa import DSAPrivateKey
|
|
from cryptography.hazmat.primitives.asymmetric.ec import (
|
|
ECDSA,
|
|
EllipticCurvePrivateKey,
|
|
)
|
|
from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PrivateKey
|
|
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
|
|
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
|
|
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
|
|
from cryptography.hazmat.primitives.serialization import pkcs12
|
|
from pyhanko_certvalidator._asyncio_compat import to_thread
|
|
from pyhanko_certvalidator.registry import (
|
|
CertificateStore,
|
|
SimpleCertificateStore,
|
|
)
|
|
|
|
from pyhanko.pdf_utils import misc
|
|
from pyhanko.sign import attributes
|
|
from pyhanko.sign.ades.api import CAdESSignedAttrSpec
|
|
from pyhanko.sign.attributes import (
|
|
CMSAttributeProvider,
|
|
SignedAttributeProviderSpec,
|
|
UnsignedAttributeProviderSpec,
|
|
)
|
|
from pyhanko.sign.general import (
|
|
SigningError,
|
|
get_cms_hash_algo_for_mechanism,
|
|
get_pyca_cryptography_hash,
|
|
optimal_pss_params,
|
|
process_pss_params,
|
|
simple_cms_attribute,
|
|
)
|
|
from pyhanko.sign.timestamps import TimeStamper
|
|
|
|
from ...keys import (
|
|
_translate_pyca_cryptography_cert_to_asn1,
|
|
_translate_pyca_cryptography_key_to_asn1,
|
|
load_cert_from_pemder,
|
|
load_certs_from_pemder,
|
|
load_private_key_from_pemder,
|
|
)
|
|
from . import constants
|
|
|
|
__all__ = [
|
|
'Signer',
|
|
'SimpleSigner',
|
|
'ExternalSigner',
|
|
'PdfCMSSignedAttributes',
|
|
'format_attributes',
|
|
'format_signed_attributes',
|
|
'asyncify_signer',
|
|
'select_suitable_signing_md',
|
|
'signer_from_p12_config',
|
|
'signer_from_pemder_config',
|
|
]
|
|
|
|
from ...config.errors import ConfigurationError
|
|
from ...config.local_keys import PemDerSignatureConfig, PKCS12SignatureConfig
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class CMSSignedAttributes:
|
|
"""
|
|
.. versionadded:: 0.14.0
|
|
|
|
Serialisable container class describing input for various signed attributes
|
|
in a signature CMS object.
|
|
"""
|
|
|
|
signing_time: Optional[datetime] = None
|
|
"""
|
|
Timestamp for the ``signingTime`` attribute. Will be ignored in a PAdES
|
|
context.
|
|
"""
|
|
|
|
cades_signed_attrs: Optional[CAdESSignedAttrSpec] = None
|
|
"""
|
|
Optional settings for CAdES-style signed attributes.
|
|
"""
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class PdfCMSSignedAttributes(CMSSignedAttributes):
|
|
"""
|
|
.. versionadded:: 0.7.0
|
|
|
|
.. versionchanged:: 0.14.0
|
|
Split off some fields into :class:`.CMSSignedAttributes`.
|
|
|
|
|
|
Serialisable container class describing input for various signed attributes
|
|
in a CMS object for a PDF signature.
|
|
"""
|
|
|
|
adobe_revinfo_attr: Optional[asn1_pdf.RevocationInfoArchival] = None
|
|
"""
|
|
Adobe-style signed revocation info attribute.
|
|
"""
|
|
|
|
|
|
def _ensure_content_type(encap_content_info):
|
|
encap_content_info = encap_content_info or {'content_type': 'data'}
|
|
if isinstance(encap_content_info, core.Sequence):
|
|
# could be cms.ContentInfo or cms.EncapsulatedContentInfo depending
|
|
# on circumstances, so let's just stick to Sequence
|
|
content_type = encap_content_info['content_type'].native
|
|
else:
|
|
content_type = encap_content_info.get('content_type', 'data')
|
|
|
|
return encap_content_info, content_type
|
|
|
|
|
|
def _cms_version(
|
|
content_type: Union[str, core.ObjectIdentifier], has_attribute_certs
|
|
):
|
|
if has_attribute_certs:
|
|
return 'v4'
|
|
|
|
if isinstance(content_type, core.ObjectIdentifier):
|
|
content_type = content_type.native
|
|
|
|
# We don't support signing with subjectKeyIdentifier, so there's no
|
|
# need to take that into account here. We also don't do version 1 attribute
|
|
# certificates.
|
|
# (these matter for distinguishing v1<>v3)
|
|
return 'v1' if content_type == 'data' else 'v3'
|
|
|
|
|
|
def _prepare_encap_content(
|
|
input_data: Union[IO, bytes, cms.ContentInfo, cms.EncapsulatedContentInfo],
|
|
digest_algorithm: str,
|
|
detached=True,
|
|
chunk_size=misc.DEFAULT_CHUNK_SIZE,
|
|
max_read=None,
|
|
):
|
|
h = hashes.Hash(get_pyca_cryptography_hash(digest_algorithm))
|
|
encap_content_info = None
|
|
if isinstance(input_data, (cms.ContentInfo, cms.EncapsulatedContentInfo)):
|
|
h.update(bytes(input_data['content']))
|
|
if detached:
|
|
encap_content_info = {'content_type': input_data['content_type']}
|
|
else:
|
|
encap_content_info = input_data
|
|
elif isinstance(input_data, bytes):
|
|
h.update(input_data)
|
|
if not detached:
|
|
# use dicts instead of Asn1Value objects, to leave asn1crypto
|
|
# to decide whether to use cms.ContentInfo or
|
|
# cms.EncapsulatedContentInfo (for backwards compat with PCKS#7)
|
|
encap_content_info = {'content_type': 'data', 'content': input_data}
|
|
elif not detached:
|
|
# input stream is a buffer, and we're in 'enveloping' mode
|
|
# read the entire thing into memory, since we need to embed
|
|
# it anyway
|
|
input_bytes = input_data.read(max_read)
|
|
h.update(input_bytes)
|
|
# see above
|
|
encap_content_info = {'content_type': 'data', 'content': input_bytes}
|
|
else:
|
|
temp_buf = bytearray(chunk_size)
|
|
misc.chunked_digest(temp_buf, input_data, h, max_read=max_read)
|
|
digest_bytes = h.finalize()
|
|
return encap_content_info, digest_bytes
|
|
|
|
|
|
async def format_attributes(
|
|
attr_provs: List[CMSAttributeProvider],
|
|
other_attrs: Iterable[cms.CMSAttributes] = (),
|
|
dry_run: bool = False,
|
|
) -> cms.CMSAttributes:
|
|
"""
|
|
Format CMS attributes obtained from attribute providers.
|
|
|
|
:param attr_provs:
|
|
List of attribute providers.
|
|
:param other_attrs:
|
|
Other (predetermined) attributes to include.
|
|
:param dry_run:
|
|
Whether to invoke the attribute providers in dry-run mode or not.
|
|
:return:
|
|
A :class:`cms.CMSAttributes` value.
|
|
"""
|
|
|
|
attrs = list(other_attrs)
|
|
jobs = [prov.get_attribute(dry_run=dry_run) for prov in attr_provs]
|
|
for attr_coro in asyncio.as_completed(jobs):
|
|
attr = await attr_coro
|
|
if attr is not None:
|
|
attrs.append(attr)
|
|
|
|
return cms.CMSAttributes(attrs)
|
|
|
|
|
|
async def format_signed_attributes(
|
|
data_digest: bytes,
|
|
attr_provs: List[CMSAttributeProvider],
|
|
content_type='data',
|
|
dry_run=False,
|
|
) -> cms.CMSAttributes:
|
|
"""
|
|
Format signed attributes for a CMS ``SignerInfo`` value.
|
|
|
|
:param data_digest:
|
|
The byte string to put in the ``messageDigest`` attribute.
|
|
:param attr_provs:
|
|
List of attribute providers to source attributes from.
|
|
:param content_type:
|
|
The content type of the data being signed (default is ``data``).
|
|
:param dry_run:
|
|
Whether to invoke the attribute providers in dry-run mode or not.
|
|
:return:
|
|
A :class:`cms.CMSAttributes` value representing the signed attributes.
|
|
"""
|
|
attrs = [
|
|
simple_cms_attribute('content_type', content_type),
|
|
simple_cms_attribute('message_digest', data_digest),
|
|
]
|
|
return await format_attributes(
|
|
attr_provs, dry_run=dry_run, other_attrs=attrs
|
|
)
|
|
|
|
|
|
class Signer:
|
|
"""
|
|
Abstract signer object that is agnostic as to where the cryptographic
|
|
operations actually happen.
|
|
|
|
As of now, pyHanko provides two implementations:
|
|
|
|
* :class:`.SimpleSigner` implements the easy case where all the key material
|
|
can be loaded into memory.
|
|
* :class:`~pyhanko.sign.pkcs11.PKCS11Signer` implements a signer that is
|
|
capable of interfacing with a PKCS#11 device
|
|
(see also :class:`~pyhanko.sign.beid.BEIDSigner`).
|
|
|
|
:param prefer_pss:
|
|
When signing using an RSA key, prefer PSS padding to legacy PKCS#1 v1.5
|
|
padding. Default is ``False``. This option has no effect on non-RSA
|
|
signatures.
|
|
:param embed_roots:
|
|
.. versionadded:: 0.9.0
|
|
|
|
Option that controls whether or not additional self-signed certificates
|
|
should be embedded into the CMS payload. The default is ``True``.
|
|
|
|
.. note::
|
|
The signer's certificate is always embedded, even if it is
|
|
self-signed.
|
|
|
|
.. note::
|
|
Trust roots are configured by the validator, so embedding them
|
|
doesn't affect the semantics of a typical validation process.
|
|
Therefore, they can be safely omitted in most cases.
|
|
Nonetheless, embedding the roots can be useful for documentation
|
|
purposes. In addition, some validators are poorly implemented,
|
|
and will refuse to build paths if the roots are not present
|
|
in the file.
|
|
|
|
.. warning::
|
|
To be precise, if this flag is ``False``, a certificate will be
|
|
dropped if (a) it is not the signer's, (b) it is self-issued and
|
|
(c) its subject and authority key identifiers match (or either is
|
|
missing). In other words, we never validate the actual
|
|
self-signature. This heuristic is sufficiently accurate
|
|
for most applications.
|
|
:param signature_mechanism:
|
|
The (cryptographic) signature mechanism to use for all signing
|
|
operations. If unset, the default behaviour is to try to impute
|
|
a reasonable one given the preferred digest algorithm and public key.
|
|
:param signing_cert:
|
|
See :attr:`signing_cert`.
|
|
:param attribute_certs:
|
|
See :attr:`attribute_certs`.
|
|
:param cert_registry:
|
|
Initial value for :attr:`cert_registry`. If unset, an empty certificate
|
|
store will be initialised.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
prefer_pss: bool = False,
|
|
embed_roots: bool = True,
|
|
signature_mechanism: Optional[SignedDigestAlgorithm] = None,
|
|
signing_cert: Optional[x509.Certificate] = None,
|
|
cert_registry: Optional[CertificateStore] = None,
|
|
attribute_certs: Iterable[cms.AttributeCertificateV2] = (),
|
|
):
|
|
self.prefer_pss = prefer_pss
|
|
self.embed_roots = embed_roots
|
|
self.signed_attr_prov_spec: Optional[SignedAttributeProviderSpec] = None
|
|
self.unsigned_attr_prov_spec: Optional[
|
|
UnsignedAttributeProviderSpec
|
|
] = None
|
|
self._signature_mechanism = signature_mechanism
|
|
self._signing_cert = signing_cert
|
|
self._cert_registry = cert_registry or SimpleCertificateStore()
|
|
self._attribute_certs = attribute_certs
|
|
|
|
@property
|
|
def signature_mechanism(self) -> Optional[SignedDigestAlgorithm]:
|
|
"""
|
|
.. versionchanged:: 0.18.0
|
|
Turned into a property instead of a class attribute.
|
|
|
|
The (cryptographic) signature mechanism to use for all signing
|
|
operations.
|
|
"""
|
|
return self._signature_mechanism
|
|
|
|
@property
|
|
def signing_cert(self) -> Optional[x509.Certificate]:
|
|
"""
|
|
.. versionchanged:: 0.14.0
|
|
Made optional (see note)
|
|
|
|
.. versionchanged:: 0.18.0
|
|
Turned into a property instead of a class attribute.
|
|
|
|
The certificate that will be used to create the signature.
|
|
|
|
.. note::
|
|
This is an optional field only to a limited extent. Subclasses may
|
|
require it to be present, and not setting it at the beginning of
|
|
the signing process implies that certain high-level convenience
|
|
features will no longer work or be limited in function (e.g.,
|
|
automatic hash selection, appearance generation, revocation
|
|
information collection, ...).
|
|
|
|
However, making :attr:`signing_cert` optional enables certain
|
|
signing workflows where the certificate of the signer is not known
|
|
until the signature has actually been produced. This is most
|
|
relevant in certain types of remote signing scenarios.
|
|
"""
|
|
return self._signing_cert
|
|
|
|
@property
|
|
def cert_registry(self) -> CertificateStore:
|
|
"""
|
|
.. versionchanged:: 0.18.0
|
|
Turned into a property instead of a class attribute.
|
|
|
|
Collection of certificates associated with this signer.
|
|
Note that this is simply a bookkeeping tool; in particular it
|
|
doesn't care about trust.
|
|
"""
|
|
return self._cert_registry
|
|
|
|
@property
|
|
def attribute_certs(self) -> Iterable[cms.AttributeCertificateV2]:
|
|
"""
|
|
.. versionchanged:: 0.18.0
|
|
Turned into a property instead of a class attribute.
|
|
|
|
Attribute certificates to include with the signature.
|
|
|
|
.. note::
|
|
Only ``v2`` attribute certificates are supported.
|
|
"""
|
|
return self._attribute_certs
|
|
|
|
def get_signature_mechanism_for_digest(
|
|
self, digest_algorithm: Optional[str]
|
|
) -> SignedDigestAlgorithm:
|
|
"""
|
|
Get the signature mechanism for this signer to use.
|
|
If :attr:`signature_mechanism` is set, it will be used.
|
|
Otherwise, this method will attempt to put together a default
|
|
based on mechanism used in the signer's certificate.
|
|
|
|
:param digest_algorithm:
|
|
Digest algorithm to use as part of the signature mechanism.
|
|
Only used if a signature mechanism object has to be put together
|
|
on-the-fly.
|
|
:return:
|
|
A :class:`.SignedDigestAlgorithm` object.
|
|
"""
|
|
|
|
if self.signature_mechanism is not None:
|
|
return self.signature_mechanism
|
|
if self.signing_cert is None:
|
|
raise SigningError(
|
|
"Could not set up a default signature mechanism."
|
|
)
|
|
# Grab the certificate's algorithm (but forget about the digest)
|
|
# and use that to set up the default.
|
|
# We'll specify the digest somewhere else.
|
|
algo = self.signing_cert.public_key.algorithm
|
|
params = None
|
|
if algo == 'ec':
|
|
# with ECDSA, RFC 5753 requires us to encode the digest
|
|
# algorithm together with the signing algorithm.
|
|
# The correspondence with the digestAlgorithm field in CMS is
|
|
# verified separately.
|
|
if digest_algorithm is None:
|
|
raise ValueError("Digest algorithm required for ECDSA")
|
|
mech = digest_algorithm + '_ecdsa'
|
|
elif algo == 'dsa':
|
|
if digest_algorithm is None:
|
|
raise ValueError("Digest algorithm required for DSA")
|
|
# Note: DSA isn't specified with sha384 and sha512, but we
|
|
# don't check that here; let's allow the error to percolate
|
|
# further down for now.
|
|
# TODO (but maybe it's worth revisiting the issue of checking
|
|
# signature <> hash algorithm combinations in greater generality)
|
|
mech = digest_algorithm + '_dsa'
|
|
elif algo == 'rsa':
|
|
if self.prefer_pss:
|
|
mech = 'rsassa_pss'
|
|
if digest_algorithm is None:
|
|
raise ValueError("Digest algorithm required for RSASSA-PSS")
|
|
params = optimal_pss_params(self.signing_cert, digest_algorithm)
|
|
elif digest_algorithm is not None:
|
|
mech = digest_algorithm + '_rsa'
|
|
else:
|
|
mech = 'rsassa_pkcs1v15'
|
|
elif algo in ('ed25519', 'ed448'):
|
|
mech = algo
|
|
else: # pragma: nocover
|
|
raise SigningError(f"Signature mechanism {algo} is unsupported.")
|
|
|
|
sda_kwargs = {'algorithm': mech}
|
|
if params is not None:
|
|
sda_kwargs['parameters'] = params
|
|
return SignedDigestAlgorithm(sda_kwargs)
|
|
|
|
@property
|
|
def subject_name(self) -> Optional[str]:
|
|
"""
|
|
:return:
|
|
The subject's common name as a string, extracted from
|
|
:attr:`signing_cert`, or ``None`` if no signer's certificate is
|
|
available
|
|
"""
|
|
if self.signing_cert is None:
|
|
return None
|
|
|
|
name: x509.Name = self.signing_cert.subject
|
|
try:
|
|
result = name.native['common_name']
|
|
except KeyError:
|
|
result = name.native['organization_name']
|
|
try:
|
|
email = name.native['email_address']
|
|
result = '%s <%s>' % (result, email)
|
|
except KeyError:
|
|
pass
|
|
return result
|
|
|
|
@staticmethod
|
|
def format_revinfo(
|
|
ocsp_responses: Optional[list] = None, crls: Optional[list] = None
|
|
):
|
|
"""
|
|
Format Adobe-style revocation information for inclusion into a CMS
|
|
object.
|
|
|
|
:param ocsp_responses:
|
|
A list of OCSP responses to include.
|
|
:param crls:
|
|
A list of CRLs to include.
|
|
"""
|
|
|
|
revinfo_dict = {}
|
|
if ocsp_responses:
|
|
revinfo_dict['ocsp'] = ocsp_responses
|
|
|
|
if crls:
|
|
revinfo_dict['crl'] = crls
|
|
|
|
if revinfo_dict:
|
|
return asn1_pdf.RevocationInfoArchival(revinfo_dict)
|
|
|
|
def _signed_attr_provider_spec(
|
|
self,
|
|
attr_settings: PdfCMSSignedAttributes,
|
|
timestamper=None,
|
|
use_cades=False,
|
|
is_pdf_sig=True,
|
|
):
|
|
"""
|
|
Internal method to select a default attribute provider spec if none
|
|
is available already.
|
|
"""
|
|
|
|
if self.signed_attr_prov_spec is not None:
|
|
return self.signed_attr_prov_spec
|
|
elif use_cades:
|
|
return CAdESSignedAttributeProviderSpec(
|
|
attr_settings=attr_settings,
|
|
signing_cert=self.signing_cert,
|
|
is_pades=is_pdf_sig,
|
|
timestamper=timestamper,
|
|
)
|
|
elif is_pdf_sig:
|
|
return GenericPdfSignedAttributeProviderSpec(
|
|
attr_settings=attr_settings,
|
|
signing_cert=self.signing_cert,
|
|
signature_mechanism=self.get_signature_mechanism_for_digest,
|
|
timestamper=timestamper,
|
|
)
|
|
else:
|
|
return GenericCMSSignedAttributeProviderSpec(
|
|
attr_settings=attr_settings,
|
|
signing_cert=self.signing_cert,
|
|
signature_mechanism=self.get_signature_mechanism_for_digest,
|
|
timestamper=timestamper,
|
|
)
|
|
|
|
def _signed_attr_providers(
|
|
self,
|
|
data_digest: bytes,
|
|
digest_algorithm: str,
|
|
attr_settings: PdfCMSSignedAttributes,
|
|
timestamper=None,
|
|
use_cades=False,
|
|
is_pdf_sig=True,
|
|
):
|
|
"""
|
|
Prepare "standard" signed attribute providers. Internal API.
|
|
"""
|
|
|
|
spec = self._signed_attr_provider_spec(
|
|
attr_settings=attr_settings,
|
|
timestamper=timestamper,
|
|
use_cades=use_cades,
|
|
is_pdf_sig=is_pdf_sig,
|
|
)
|
|
|
|
return spec.signed_attr_providers(
|
|
data_digest=data_digest,
|
|
digest_algorithm=digest_algorithm,
|
|
)
|
|
|
|
def _unsigned_attr_provider_spec(
|
|
self, timestamper: Optional[TimeStamper] = None
|
|
):
|
|
if self.unsigned_attr_prov_spec is not None:
|
|
return self.unsigned_attr_prov_spec
|
|
else:
|
|
return DefaultUnsignedAttributes(timestamper=timestamper)
|
|
|
|
def _unsigned_attr_providers(
|
|
self,
|
|
digest_algorithm: str,
|
|
signature: bytes,
|
|
signed_attrs: cms.CMSAttributes,
|
|
timestamper: Optional[TimeStamper] = None,
|
|
):
|
|
"""
|
|
Prepare "standard" unsigned attribute providers. Internal API.
|
|
"""
|
|
spec = self._unsigned_attr_provider_spec(timestamper)
|
|
return spec.unsigned_attr_providers(
|
|
digest_algorithm=digest_algorithm,
|
|
signature=signature,
|
|
signed_attrs=signed_attrs,
|
|
)
|
|
|
|
def signer_info(self, digest_algorithm: str, signed_attrs, signature):
|
|
"""
|
|
Format the ``SignerInfo`` entry for a CMS signature.
|
|
|
|
:param digest_algorithm:
|
|
Digest algorithm to use.
|
|
:param signed_attrs:
|
|
Signed attributes (see :meth:`signed_attrs`).
|
|
:param signature:
|
|
The raw signature to embed (see :meth:`sign_raw`).
|
|
:return:
|
|
An :class:`.asn1crypto.cms.SignerInfo` object.
|
|
"""
|
|
digest_algorithm_obj = algos.DigestAlgorithm(
|
|
{'algorithm': digest_algorithm}
|
|
)
|
|
|
|
signing_cert = self.signing_cert
|
|
if signing_cert is None:
|
|
raise SigningError(
|
|
"The signer\'s certificate must be available for "
|
|
"SignerInfo assembly to proceed."
|
|
)
|
|
# build the signer info object that goes into the PKCS7 signature
|
|
# (see RFC 2315 § 9.2)
|
|
sig_info = cms.SignerInfo(
|
|
{
|
|
'version': 'v1',
|
|
'sid': cms.SignerIdentifier(
|
|
{
|
|
'issuer_and_serial_number': cms.IssuerAndSerialNumber(
|
|
{
|
|
'issuer': signing_cert.issuer,
|
|
'serial_number': signing_cert.serial_number,
|
|
}
|
|
)
|
|
}
|
|
),
|
|
'digest_algorithm': digest_algorithm_obj,
|
|
'signature_algorithm': self.get_signature_mechanism_for_digest(
|
|
digest_algorithm
|
|
),
|
|
'signed_attrs': signed_attrs,
|
|
'signature': signature,
|
|
}
|
|
)
|
|
return sig_info
|
|
|
|
def _package_signature(
|
|
self,
|
|
*,
|
|
digest_algorithm: str,
|
|
cms_version,
|
|
signed_attrs: cms.CMSAttributes,
|
|
signature: bytes,
|
|
unsigned_attrs: cms.CMSAttributes,
|
|
encap_content_info,
|
|
) -> cms.ContentInfo:
|
|
encap_content_info = encap_content_info or {'content_type': 'data'}
|
|
digest_algorithm_obj = algos.DigestAlgorithm(
|
|
{'algorithm': digest_algorithm}
|
|
)
|
|
sig_info = self.signer_info(digest_algorithm, signed_attrs, signature)
|
|
|
|
if unsigned_attrs is not None:
|
|
sig_info['unsigned_attrs'] = unsigned_attrs
|
|
|
|
cert_registry = self.cert_registry or ()
|
|
# Note: we do not add the TS certs at this point
|
|
if self.embed_roots:
|
|
certs = {
|
|
cms.CertificateChoices(name='certificate', value=cert)
|
|
for cert in cert_registry
|
|
}
|
|
else:
|
|
# asn1crypto's heuristic is good enough for now, we won't check the
|
|
# actual signatures. CAs that make use of self-issued certificates
|
|
# for things like key rollover probably also use SKI/AKI to
|
|
# distinguish between different certs, which will be picked up by
|
|
# asn1crypto either way.
|
|
certs = {
|
|
cms.CertificateChoices(name='certificate', value=cert)
|
|
for cert in cert_registry
|
|
if cert.self_signed == 'no'
|
|
}
|
|
certs.add(
|
|
cms.CertificateChoices(name='certificate', value=self.signing_cert)
|
|
)
|
|
|
|
# add attribute certs
|
|
for ac in self.attribute_certs:
|
|
certs.add(cms.CertificateChoices(name='v2_attr_cert', value=ac))
|
|
|
|
# this is the SignedData object for our message (see RFC 2315 § 9.1)
|
|
signed_data = {
|
|
'version': cms_version,
|
|
'digest_algorithms': cms.DigestAlgorithms((digest_algorithm_obj,)),
|
|
'encap_content_info': encap_content_info,
|
|
'certificates': certs,
|
|
'signer_infos': [sig_info],
|
|
}
|
|
|
|
return cms.ContentInfo(
|
|
{
|
|
'content_type': cms.ContentType('signed_data'),
|
|
'content': cms.SignedData(signed_data),
|
|
}
|
|
)
|
|
|
|
def _check_digest_algorithm(self, digest_algorithm: str):
|
|
implied_hash_algo = None
|
|
try:
|
|
# Just using self.signature_mechanism is not good enough,
|
|
# we need to cover cases like ed448 and ed25519 (where
|
|
# the hash algorithm choice is fixed) also when the signature
|
|
# mechanism is not explicitly passed in.
|
|
mech = self.get_signature_mechanism_for_digest(None)
|
|
except ValueError:
|
|
# This could happen if there's no explicit mechanism defined,
|
|
# and we're signing with ECDSA, for example. In that case
|
|
# we simply use the digest algorithm passed in.
|
|
mech = None
|
|
try:
|
|
if mech is not None:
|
|
implied_hash_algo = get_cms_hash_algo_for_mechanism(mech)
|
|
except ValueError:
|
|
# this is OK, just use the specified message digest
|
|
pass
|
|
if (
|
|
implied_hash_algo is not None
|
|
and implied_hash_algo != digest_algorithm
|
|
):
|
|
raise SigningError(
|
|
f"Selected signature mechanism specifies message digest "
|
|
f"{implied_hash_algo}, but {digest_algorithm} "
|
|
f"was requested."
|
|
)
|
|
|
|
async def async_sign_raw(
|
|
self, data: bytes, digest_algorithm: str, dry_run=False
|
|
) -> bytes:
|
|
"""
|
|
Compute the raw cryptographic signature of the data provided, hashed
|
|
using the digest algorithm provided.
|
|
|
|
:param data:
|
|
Data to sign.
|
|
:param digest_algorithm:
|
|
Digest algorithm to use.
|
|
|
|
.. warning::
|
|
If :attr:`signature_mechanism` also specifies a digest, they
|
|
should match.
|
|
:param dry_run:
|
|
Do not actually create a signature, but merely output placeholder
|
|
bytes that would suffice to contain an actual signature.
|
|
:return:
|
|
Signature bytes.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
async def unsigned_attrs(
|
|
self,
|
|
digest_algorithm: str,
|
|
signature: bytes,
|
|
signed_attrs: cms.CMSAttributes,
|
|
timestamper=None,
|
|
dry_run=False,
|
|
) -> Optional[cms.CMSAttributes]:
|
|
"""
|
|
.. versionchanged:: 0.9.0
|
|
Made asynchronous _(breaking change)_
|
|
|
|
.. versionchanged:: 0.14.0
|
|
Added ``signed_attrs`` parameter _(breaking change)_
|
|
|
|
Compute the unsigned attributes to embed into the CMS object.
|
|
This function is called after signing the hash of the signed attributes
|
|
(see :meth:`signed_attrs`).
|
|
|
|
By default, this method only handles timestamp requests, but other
|
|
functionality may be added by subclasses
|
|
|
|
If this method returns ``None``, no unsigned attributes will be
|
|
embedded.
|
|
|
|
:param digest_algorithm:
|
|
Digest algorithm used to hash the signed attributes.
|
|
:param signed_attrs:
|
|
Signed attributes of the signature.
|
|
:param signature:
|
|
Signature of the signed attribute hash.
|
|
:param timestamper:
|
|
Timestamp supplier to use.
|
|
:param dry_run:
|
|
Flag indicating "dry run" mode. If ``True``, only the approximate
|
|
size of the output matters, so cryptographic
|
|
operations can be replaced by placeholders.
|
|
:return:
|
|
The unsigned attributes to add, or ``None``.
|
|
"""
|
|
provs = self._unsigned_attr_providers(
|
|
signature=signature,
|
|
signed_attrs=signed_attrs,
|
|
digest_algorithm=digest_algorithm,
|
|
timestamper=timestamper,
|
|
)
|
|
attrs = await format_attributes(list(provs), dry_run=dry_run)
|
|
return attrs or None
|
|
|
|
async def signed_attrs(
|
|
self,
|
|
data_digest: bytes,
|
|
digest_algorithm: str,
|
|
attr_settings: Optional[PdfCMSSignedAttributes] = None,
|
|
content_type='data',
|
|
use_pades=False,
|
|
timestamper=None,
|
|
dry_run=False,
|
|
is_pdf_sig=True,
|
|
):
|
|
"""
|
|
.. versionchanged:: 0.4.0
|
|
Added positional ``digest_algorithm`` parameter _(breaking change)_.
|
|
.. versionchanged:: 0.5.0
|
|
Added ``dry_run``, ``timestamper`` and ``cades_meta`` parameters.
|
|
.. versionchanged:: 0.9.0
|
|
Made asynchronous, grouped some parameters under ``attr_settings``
|
|
_(breaking change)_
|
|
|
|
Format the signed attributes for a CMS signature.
|
|
|
|
:param data_digest:
|
|
Raw digest of the data to be signed.
|
|
:param digest_algorithm:
|
|
.. versionadded:: 0.4.0
|
|
|
|
Name of the digest algorithm used to compute the digest.
|
|
:param use_pades:
|
|
Respect PAdES requirements.
|
|
:param dry_run:
|
|
.. versionadded:: 0.5.0
|
|
|
|
Flag indicating "dry run" mode. If ``True``, only the approximate
|
|
size of the output matters, so cryptographic
|
|
operations can be replaced by placeholders.
|
|
:param attr_settings:
|
|
:class:`.PdfCMSSignedAttributes` object describing the attributes
|
|
to be added.
|
|
:param timestamper:
|
|
.. versionadded:: 0.5.0
|
|
|
|
Timestamper to use when creating timestamp tokens.
|
|
:param content_type:
|
|
CMS content type of the encapsulated data. Default is `data`.
|
|
|
|
.. danger::
|
|
This parameter is internal API, and non-default values must not
|
|
be used to produce PDF signatures.
|
|
:param is_pdf_sig:
|
|
Whether the signature being generated is for use in a PDF document.
|
|
|
|
.. danger::
|
|
This parameter is internal API.
|
|
:return:
|
|
An :class:`.asn1crypto.cms.CMSAttributes` object.
|
|
"""
|
|
|
|
attr_settings = attr_settings or PdfCMSSignedAttributes()
|
|
|
|
provs = self._signed_attr_providers(
|
|
data_digest=data_digest,
|
|
digest_algorithm=digest_algorithm,
|
|
use_cades=use_pades,
|
|
attr_settings=attr_settings,
|
|
timestamper=timestamper,
|
|
is_pdf_sig=is_pdf_sig,
|
|
)
|
|
|
|
return await format_signed_attributes(
|
|
data_digest,
|
|
attr_provs=list(provs),
|
|
content_type=content_type,
|
|
dry_run=dry_run,
|
|
)
|
|
|
|
async def async_sign(
|
|
self,
|
|
data_digest: bytes,
|
|
digest_algorithm: str,
|
|
dry_run=False,
|
|
use_pades=False,
|
|
timestamper=None,
|
|
signed_attr_settings: Optional[PdfCMSSignedAttributes] = None,
|
|
is_pdf_sig=True,
|
|
encap_content_info=None,
|
|
) -> cms.ContentInfo:
|
|
"""
|
|
.. versionadded:: 0.9.0
|
|
|
|
Produce a detached CMS signature from a raw data digest.
|
|
|
|
:param data_digest:
|
|
Digest of the actual content being signed.
|
|
:param digest_algorithm:
|
|
Digest algorithm to use. This should be the same digest method
|
|
as the one used to hash the (external) content.
|
|
:param dry_run:
|
|
If ``True``, the actual signing step will be replaced with
|
|
a placeholder.
|
|
|
|
In a PDF signing context, this is necessary to estimate the size
|
|
of the signature container before computing the actual digest of
|
|
the document.
|
|
:param signed_attr_settings:
|
|
:class:`.PdfCMSSignedAttributes` object describing the attributes
|
|
to be added.
|
|
:param use_pades:
|
|
Respect PAdES requirements.
|
|
:param timestamper:
|
|
:class:`~.timestamps.TimeStamper` used to obtain a trusted timestamp
|
|
token that can be embedded into the signature container.
|
|
|
|
.. note::
|
|
If ``dry_run`` is true, the timestamper's
|
|
:meth:`~.timestamps.TimeStamper.dummy_response` method will be
|
|
called to obtain a placeholder token.
|
|
Note that with a standard :class:`~.timestamps.HTTPTimeStamper`,
|
|
this might still hit the timestamping server (in order to
|
|
produce a realistic size estimate), but the dummy response will
|
|
be cached.
|
|
:param is_pdf_sig:
|
|
Whether the signature being generated is for use in a PDF document.
|
|
|
|
.. danger::
|
|
This parameter is internal API.
|
|
:param encap_content_info:
|
|
Data to encapsulate in the CMS object.
|
|
|
|
.. danger::
|
|
This parameter is internal API, and must not be used to produce
|
|
PDF signatures.
|
|
:return:
|
|
An :class:`~.asn1crypto.cms.ContentInfo` object.
|
|
"""
|
|
|
|
encap_content_info, content_type = _ensure_content_type(
|
|
encap_content_info
|
|
)
|
|
signed_attrs = await self.signed_attrs(
|
|
data_digest,
|
|
digest_algorithm,
|
|
attr_settings=signed_attr_settings,
|
|
use_pades=use_pades,
|
|
timestamper=timestamper,
|
|
dry_run=dry_run,
|
|
content_type=content_type,
|
|
is_pdf_sig=is_pdf_sig,
|
|
)
|
|
return await self.async_sign_prescribed_attributes(
|
|
digest_algorithm,
|
|
signed_attrs,
|
|
cms_version=_cms_version(content_type, bool(self.attribute_certs)),
|
|
dry_run=dry_run,
|
|
timestamper=timestamper,
|
|
encap_content_info=encap_content_info,
|
|
)
|
|
|
|
async def async_sign_prescribed_attributes(
|
|
self,
|
|
digest_algorithm: str,
|
|
signed_attrs: cms.CMSAttributes,
|
|
cms_version='v1',
|
|
dry_run=False,
|
|
timestamper=None,
|
|
encap_content_info=None,
|
|
) -> cms.ContentInfo:
|
|
"""
|
|
.. versionadded:: 0.9.0
|
|
|
|
Start the CMS signing process with the prescribed set of signed
|
|
attributes.
|
|
|
|
:param digest_algorithm:
|
|
Digest algorithm to use. This should be the same digest method
|
|
as the one used to hash the (external) content.
|
|
:param signed_attrs:
|
|
CMS attributes to sign.
|
|
:param dry_run:
|
|
If ``True``, the actual signing step will be replaced with
|
|
a placeholder.
|
|
|
|
In a PDF signing context, this is necessary to estimate the size
|
|
of the signature container before computing the actual digest of
|
|
the document.
|
|
:param timestamper:
|
|
:class:`~.timestamps.TimeStamper` used to obtain a trusted timestamp
|
|
token that can be embedded into the signature container.
|
|
|
|
.. note::
|
|
If ``dry_run`` is true, the timestamper's
|
|
:meth:`~.timestamps.TimeStamper.dummy_response` method will be
|
|
called to obtain a placeholder token.
|
|
Note that with a standard :class:`~.timestamps.HTTPTimeStamper`,
|
|
this might still hit the timestamping server (in order to
|
|
produce a realistic size estimate), but the dummy response will
|
|
be cached.
|
|
:param cms_version:
|
|
CMS version to use.
|
|
:param encap_content_info:
|
|
Data to encapsulate in the CMS object.
|
|
|
|
.. danger::
|
|
This parameter is internal API, and must not be used to produce
|
|
PDF signatures.
|
|
:return:
|
|
An :class:`~.asn1crypto.cms.ContentInfo` object.
|
|
"""
|
|
|
|
digest_algorithm = digest_algorithm.lower()
|
|
self._check_digest_algorithm(digest_algorithm)
|
|
|
|
signature = await self.async_sign_raw(
|
|
signed_attrs.dump(), digest_algorithm.lower(), dry_run
|
|
)
|
|
unsigned_attrs = await self.unsigned_attrs(
|
|
digest_algorithm,
|
|
signature,
|
|
signed_attrs=signed_attrs,
|
|
timestamper=timestamper,
|
|
dry_run=dry_run,
|
|
)
|
|
return self._package_signature(
|
|
digest_algorithm=digest_algorithm,
|
|
cms_version=cms_version,
|
|
signed_attrs=signed_attrs,
|
|
signature=signature,
|
|
unsigned_attrs=unsigned_attrs,
|
|
encap_content_info=encap_content_info,
|
|
)
|
|
|
|
async def async_sign_general_data(
|
|
self,
|
|
input_data: Union[
|
|
IO, bytes, cms.ContentInfo, cms.EncapsulatedContentInfo
|
|
],
|
|
digest_algorithm: str,
|
|
detached=True,
|
|
use_cades=False,
|
|
timestamper=None,
|
|
chunk_size=misc.DEFAULT_CHUNK_SIZE,
|
|
signed_attr_settings: Optional[PdfCMSSignedAttributes] = None,
|
|
max_read=None,
|
|
) -> cms.ContentInfo:
|
|
"""
|
|
.. versionadded:: 0.9.0
|
|
|
|
Produce a CMS signature for an arbitrary data stream
|
|
(not necessarily PDF data).
|
|
|
|
:param input_data:
|
|
The input data to sign. This can be either a :class:`bytes` object
|
|
a file-type object, a :class:`cms.ContentInfo` object or
|
|
a :class:`cms.EncapsulatedContentInfo` object.
|
|
|
|
.. warning::
|
|
``asn1crypto`` mandates :class:`cms.ContentInfo` for CMS v1
|
|
signatures. In practical terms, this means that you need to
|
|
use :class:`cms.ContentInfo` if the content type is ``data``,
|
|
and :class:`cms.EncapsulatedContentInfo` otherwise.
|
|
|
|
.. warning::
|
|
We currently only support CMS v1, v3 and v4 signatures.
|
|
This is only a concern if you need certificates or CRLs
|
|
of type 'other', in which case you can change the version
|
|
yourself (this will not invalidate any signatures).
|
|
You'll also need to do this if you need support for version 1
|
|
attribute certificates, or if you want to sign with
|
|
``subjectKeyIdentifier`` in the ``sid`` field.
|
|
:param digest_algorithm:
|
|
The name of the digest algorithm to use.
|
|
:param detached:
|
|
If ``True``, create a CMS detached signature (i.e. an object where
|
|
the encapsulated content is not embedded in the signature object
|
|
itself). This is the default. If ``False``, the content to be
|
|
signed will be embedded as encapsulated content.
|
|
:param signed_attr_settings:
|
|
:class:`.PdfCMSSignedAttributes` object describing the attributes
|
|
to be added.
|
|
:param use_cades:
|
|
Construct a CAdES-style CMS object.
|
|
:param timestamper:
|
|
:class:`.PdfTimeStamper` to use to create a signature timestamp
|
|
|
|
.. note::
|
|
If you want to create a *content* timestamp (as opposed to
|
|
a *signature* timestamp), see :class:`.CAdESSignedAttrSpec`.
|
|
:param chunk_size:
|
|
Chunk size to use when consuming input data.
|
|
:param max_read:
|
|
Maximal number of bytes to read from the input stream.
|
|
:return:
|
|
A CMS ContentInfo object of type signedData.
|
|
"""
|
|
|
|
encap_content_info, digest_bytes = _prepare_encap_content(
|
|
input_data=input_data,
|
|
digest_algorithm=digest_algorithm,
|
|
detached=detached,
|
|
chunk_size=chunk_size,
|
|
max_read=max_read,
|
|
)
|
|
return await self.async_sign(
|
|
data_digest=digest_bytes,
|
|
digest_algorithm=digest_algorithm,
|
|
use_pades=use_cades,
|
|
is_pdf_sig=False,
|
|
timestamper=timestamper,
|
|
encap_content_info=encap_content_info,
|
|
signed_attr_settings=signed_attr_settings,
|
|
)
|
|
|
|
def sign(
|
|
self,
|
|
data_digest: bytes,
|
|
digest_algorithm: str,
|
|
timestamp: Optional[datetime] = None,
|
|
dry_run=False,
|
|
revocation_info=None,
|
|
use_pades=False,
|
|
timestamper=None,
|
|
cades_signed_attr_meta: Optional[CAdESSignedAttrSpec] = None,
|
|
encap_content_info=None,
|
|
) -> cms.ContentInfo:
|
|
"""
|
|
.. deprecated:: 0.9.0
|
|
Use :meth:`async_sign` instead.
|
|
The implementation of this method will invoke :meth:`async_sign`
|
|
using ``asyncio.run()``.
|
|
|
|
Produce a detached CMS signature from a raw data digest.
|
|
|
|
:param data_digest:
|
|
Digest of the actual content being signed.
|
|
:param digest_algorithm:
|
|
Digest algorithm to use. This should be the same digest method
|
|
as the one used to hash the (external) content.
|
|
:param timestamp:
|
|
Signing time to embed into the signed attributes
|
|
(will be ignored if ``use_pades`` is ``True``).
|
|
|
|
.. note::
|
|
This timestamp value is to be interpreted as an unfounded
|
|
assertion by the signer, which may or may not be good enough
|
|
for your purposes.
|
|
:param dry_run:
|
|
If ``True``, the actual signing step will be replaced with
|
|
a placeholder.
|
|
|
|
In a PDF signing context, this is necessary to estimate the size
|
|
of the signature container before computing the actual digest of
|
|
the document.
|
|
:param revocation_info:
|
|
Revocation information to embed; this should be the output
|
|
of a call to :meth:`.Signer.format_revinfo`
|
|
(ignored when ``use_pades`` is ``True``).
|
|
:param use_pades:
|
|
Respect PAdES requirements.
|
|
:param timestamper:
|
|
:class:`~.timestamps.TimeStamper` used to obtain a trusted timestamp
|
|
token that can be embedded into the signature container.
|
|
|
|
.. note::
|
|
If ``dry_run`` is true, the timestamper's
|
|
:meth:`~.timestamps.TimeStamper.dummy_response` method will be
|
|
called to obtain a placeholder token.
|
|
Note that with a standard :class:`~.timestamps.HTTPTimeStamper`,
|
|
this might still hit the timestamping server (in order to
|
|
produce a realistic size estimate), but the dummy response will
|
|
be cached.
|
|
:param cades_signed_attr_meta:
|
|
.. versionadded:: 0.5.0
|
|
|
|
Specification for CAdES-specific signed attributes.
|
|
:param encap_content_info:
|
|
Data to encapsulate in the CMS object.
|
|
|
|
.. danger::
|
|
This parameter is internal API, and must not be used to produce
|
|
PDF signatures.
|
|
:return:
|
|
An :class:`~.asn1crypto.cms.ContentInfo` object.
|
|
"""
|
|
warnings.warn(
|
|
"'Signer.sign' is deprecated, use 'Signer.async_sign' instead",
|
|
DeprecationWarning,
|
|
)
|
|
signed_attr_settings = PdfCMSSignedAttributes(
|
|
signing_time=timestamp,
|
|
adobe_revinfo_attr=revocation_info,
|
|
cades_signed_attrs=cades_signed_attr_meta,
|
|
)
|
|
sign_coro = self.async_sign(
|
|
data_digest=data_digest,
|
|
digest_algorithm=digest_algorithm,
|
|
dry_run=dry_run,
|
|
use_pades=use_pades,
|
|
timestamper=timestamper,
|
|
signed_attr_settings=signed_attr_settings,
|
|
encap_content_info=encap_content_info,
|
|
)
|
|
return asyncio.run(sign_coro)
|
|
|
|
def sign_prescribed_attributes(
|
|
self,
|
|
digest_algorithm: str,
|
|
signed_attrs: cms.CMSAttributes,
|
|
cms_version='v1',
|
|
dry_run=False,
|
|
timestamper=None,
|
|
encap_content_info=None,
|
|
) -> cms.ContentInfo:
|
|
"""
|
|
.. versionadded: 0.7.0
|
|
|
|
.. deprecated:: 0.9.0
|
|
Use :meth:`async_sign_prescribed_attributes` instead.
|
|
The implementation of this method will invoke
|
|
:meth:`async_sign_prescribed_attributes` using
|
|
``asyncio.run()``.
|
|
|
|
Start the CMS signing process with the prescribed set of signed
|
|
attributes.
|
|
|
|
:param digest_algorithm:
|
|
Digest algorithm to use. This should be the same digest method
|
|
as the one used to hash the (external) content.
|
|
:param signed_attrs:
|
|
CMS attributes to sign.
|
|
:param dry_run:
|
|
If ``True``, the actual signing step will be replaced with
|
|
a placeholder.
|
|
|
|
In a PDF signing context, this is necessary to estimate the size
|
|
of the signature container before computing the actual digest of
|
|
the document.
|
|
:param timestamper:
|
|
:class:`~.timestamps.TimeStamper` used to obtain a trusted timestamp
|
|
token that can be embedded into the signature container.
|
|
|
|
.. note::
|
|
If ``dry_run`` is true, the timestamper's
|
|
:meth:`~.timestamps.TimeStamper.dummy_response` method will be
|
|
called to obtain a placeholder token.
|
|
Note that with a standard :class:`~.timestamps.HTTPTimeStamper`,
|
|
this might still hit the timestamping server (in order to
|
|
produce a realistic size estimate), but the dummy response will
|
|
be cached.
|
|
:param cms_version:
|
|
CMS version to use.
|
|
:param encap_content_info:
|
|
Data to encapsulate in the CMS object.
|
|
|
|
.. danger::
|
|
This parameter is internal API, and must not be used to produce
|
|
PDF signatures.
|
|
:return:
|
|
An :class:`~.asn1crypto.cms.ContentInfo` object.
|
|
"""
|
|
warnings.warn(
|
|
"'Signer.sign_prescribed_attributes' is deprecated, use "
|
|
"'Signer.async_sign_prescribed_attributes' instead",
|
|
DeprecationWarning,
|
|
)
|
|
sign_coro = self.async_sign_prescribed_attributes(
|
|
digest_algorithm=digest_algorithm,
|
|
signed_attrs=signed_attrs,
|
|
cms_version=cms_version,
|
|
dry_run=dry_run,
|
|
timestamper=timestamper,
|
|
encap_content_info=encap_content_info,
|
|
)
|
|
return asyncio.run(sign_coro)
|
|
|
|
def sign_general_data(
|
|
self,
|
|
input_data: Union[
|
|
IO, bytes, cms.ContentInfo, cms.EncapsulatedContentInfo
|
|
],
|
|
digest_algorithm: str,
|
|
detached=True,
|
|
timestamp: Optional[datetime] = None,
|
|
use_cades=False,
|
|
timestamper=None,
|
|
cades_signed_attr_meta: Optional[CAdESSignedAttrSpec] = None,
|
|
chunk_size=misc.DEFAULT_CHUNK_SIZE,
|
|
max_read=None,
|
|
) -> cms.ContentInfo:
|
|
"""
|
|
.. versionadded:: 0.7.0
|
|
|
|
.. deprecated:: 0.9.0
|
|
Use :meth:`async_sign_general_data` instead.
|
|
The implementation of this method will invoke
|
|
:meth:`async_sign_general_data` using ``asyncio.run()``.
|
|
|
|
Produce a CMS signature for an arbitrary data stream
|
|
(not necessarily PDF data).
|
|
|
|
|
|
:param input_data:
|
|
The input data to sign. This can be either a :class:`bytes` object
|
|
a file-type object, a :class:`cms.ContentInfo` object or
|
|
a :class:`cms.EncapsulatedContentInfo` object.
|
|
|
|
.. warning::
|
|
``asn1crypto`` mandates :class:`cms.ContentInfo` for CMS v1
|
|
signatures. In practical terms, this means that you need to
|
|
use :class:`cms.ContentInfo` if the content type is ``data``,
|
|
and :class:`cms.EncapsulatedContentInfo` otherwise.
|
|
|
|
.. warning::
|
|
We currently only support CMS v1, v3 and v4 signatures.
|
|
This is only a concern if you need certificates or CRLs
|
|
of type 'other', in which case you can change the version
|
|
yourself (this will not invalidate any signatures).
|
|
You'll also need to do this if you need support for version 1
|
|
attribute certificates, or if you want to sign with
|
|
``subjectKeyIdentifier`` in the ``sid`` field.
|
|
:param digest_algorithm:
|
|
The name of the digest algorithm to use.
|
|
:param detached:
|
|
If ``True``, create a CMS detached signature (i.e. an object where
|
|
the encapsulated content is not embedded in the signature object
|
|
itself). This is the default. If ``False``, the content to be
|
|
signed will be embedded as encapsulated content.
|
|
|
|
:param timestamp:
|
|
Signing time to embed into the signed attributes
|
|
(will be ignored if ``use_cades`` is ``True``).
|
|
|
|
.. note::
|
|
This timestamp value is to be interpreted as an unfounded
|
|
assertion by the signer, which may or may not be good enough
|
|
for your purposes.
|
|
:param use_cades:
|
|
Construct a CAdES-style CMS object.
|
|
:param timestamper:
|
|
:class:`.PdfTimeStamper` to use to create a signature timestamp
|
|
|
|
.. note::
|
|
If you want to create a *content* timestamp (as opposed to
|
|
a *signature* timestamp), see :class:`.CAdESSignedAttrSpec`.
|
|
:param cades_signed_attr_meta:
|
|
Specification for CAdES-specific signed attributes.
|
|
:param chunk_size:
|
|
Chunk size to use when consuming input data.
|
|
:param max_read:
|
|
Maximal number of bytes to read from the input stream.
|
|
:return:
|
|
A CMS ContentInfo object of type signedData.
|
|
"""
|
|
warnings.warn(
|
|
"'Signer.sign_general_data' is deprecated, use "
|
|
"'Signer.async_sign_general_data' instead",
|
|
DeprecationWarning,
|
|
)
|
|
signed_attr_settings = PdfCMSSignedAttributes(
|
|
signing_time=timestamp, cades_signed_attrs=cades_signed_attr_meta
|
|
)
|
|
sign_coro = self.async_sign_general_data(
|
|
input_data=input_data,
|
|
digest_algorithm=digest_algorithm,
|
|
detached=detached,
|
|
use_cades=use_cades,
|
|
timestamper=timestamper,
|
|
chunk_size=chunk_size,
|
|
signed_attr_settings=signed_attr_settings,
|
|
max_read=max_read,
|
|
)
|
|
return asyncio.run(sign_coro)
|
|
|
|
|
|
def asyncify_signer(signer_cls):
|
|
"""
|
|
Decorator to turn a legacy :class:`Signer` subclass into one that works
|
|
with the new async API.
|
|
"""
|
|
|
|
async def async_sign_raw(
|
|
self, data: bytes, digest_algorithm: str, dry_run=False
|
|
) -> bytes:
|
|
coro = to_thread(
|
|
lambda: signer_cls.sign_raw(
|
|
self,
|
|
data=data,
|
|
digest_algorithm=digest_algorithm,
|
|
dry_run=dry_run,
|
|
)
|
|
)
|
|
return await coro
|
|
|
|
signer_cls.async_sign_raw = async_sign_raw
|
|
return signer_cls
|
|
|
|
|
|
class SimpleSigner(Signer):
|
|
"""
|
|
Simple signer implementation where the key material is available in local
|
|
memory.
|
|
"""
|
|
|
|
signing_key: keys.PrivateKeyInfo
|
|
"""
|
|
Private key associated with the certificate in :attr:`signing_cert`.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
signing_cert: x509.Certificate,
|
|
signing_key: keys.PrivateKeyInfo,
|
|
cert_registry: CertificateStore,
|
|
signature_mechanism: Optional[SignedDigestAlgorithm] = None,
|
|
prefer_pss: bool = False,
|
|
embed_roots: bool = True,
|
|
attribute_certs: Optional[Iterable[cms.AttributeCertificateV2]] = None,
|
|
):
|
|
self.signing_key = signing_key
|
|
super().__init__(
|
|
prefer_pss=prefer_pss,
|
|
embed_roots=embed_roots,
|
|
cert_registry=cert_registry,
|
|
signature_mechanism=signature_mechanism,
|
|
signing_cert=signing_cert,
|
|
)
|
|
if attribute_certs is not None:
|
|
self._attribute_certs = list(attribute_certs)
|
|
|
|
async def async_sign_raw(
|
|
self, data: bytes, digest_algorithm: str, dry_run=False
|
|
) -> bytes:
|
|
return self.sign_raw(data, digest_algorithm)
|
|
|
|
def sign_raw(self, data: bytes, digest_algorithm: str) -> bytes:
|
|
"""
|
|
Synchronous raw signature implementation.
|
|
|
|
:param data:
|
|
Data to be signed.
|
|
:param digest_algorithm:
|
|
Digest algorithm to use.
|
|
:return:
|
|
Raw signature encoded according to the conventions of the
|
|
signing algorithm used.
|
|
"""
|
|
signature_mechanism = self.get_signature_mechanism_for_digest(
|
|
digest_algorithm
|
|
)
|
|
mechanism = signature_mechanism.signature_algo
|
|
priv_key = serialization.load_der_private_key(
|
|
self.signing_key.dump(), password=None
|
|
)
|
|
|
|
if mechanism == 'rsassa_pkcs1v15':
|
|
padding = PKCS1v15()
|
|
hash_algo = get_pyca_cryptography_hash(digest_algorithm)
|
|
assert isinstance(priv_key, RSAPrivateKey)
|
|
return priv_key.sign(data, padding, hash_algo)
|
|
elif mechanism == 'rsassa_pss':
|
|
params = signature_mechanism['parameters']
|
|
padding, hash_algo = process_pss_params(params, digest_algorithm)
|
|
assert isinstance(priv_key, RSAPrivateKey)
|
|
return priv_key.sign(data, padding, hash_algo)
|
|
elif mechanism == 'ecdsa':
|
|
hash_algo = get_pyca_cryptography_hash(digest_algorithm)
|
|
assert isinstance(priv_key, EllipticCurvePrivateKey)
|
|
return priv_key.sign(data, signature_algorithm=ECDSA(hash_algo))
|
|
elif mechanism == 'dsa':
|
|
hash_algo = get_pyca_cryptography_hash(digest_algorithm)
|
|
assert isinstance(priv_key, DSAPrivateKey)
|
|
return priv_key.sign(data, hash_algo)
|
|
elif mechanism == 'ed25519':
|
|
assert isinstance(priv_key, Ed25519PrivateKey)
|
|
return priv_key.sign(data)
|
|
elif mechanism == 'ed448':
|
|
assert isinstance(priv_key, Ed448PrivateKey)
|
|
return priv_key.sign(data)
|
|
else: # pragma: nocover
|
|
raise SigningError(
|
|
f"The signature mechanism {mechanism} "
|
|
"is unsupported by this signer."
|
|
)
|
|
|
|
@classmethod
|
|
def _load_ca_chain(cls, ca_chain_files=None):
|
|
try:
|
|
return set(load_certs_from_pemder(ca_chain_files))
|
|
except (IOError, ValueError) as e: # pragma: nocover
|
|
logger.error('Could not load CA chain', exc_info=e)
|
|
return None
|
|
|
|
@classmethod
|
|
def load_pkcs12(
|
|
cls,
|
|
pfx_file,
|
|
ca_chain_files=None,
|
|
other_certs=None,
|
|
passphrase=None,
|
|
signature_mechanism=None,
|
|
prefer_pss=False,
|
|
):
|
|
"""
|
|
Load certificates and key material from a PCKS#12 archive
|
|
(usually ``.pfx`` or ``.p12`` files).
|
|
|
|
:param pfx_file:
|
|
Path to the PKCS#12 archive.
|
|
:param ca_chain_files:
|
|
Path to (PEM/DER) files containing other relevant certificates
|
|
not included in the PKCS#12 file.
|
|
:param other_certs:
|
|
Other relevant certificates, specified as a list of
|
|
:class:`.asn1crypto.x509.Certificate` objects.
|
|
:param passphrase:
|
|
Passphrase to decrypt the PKCS#12 archive, if required.
|
|
:param signature_mechanism:
|
|
Override the signature mechanism to use.
|
|
:param prefer_pss:
|
|
Prefer PSS signature mechanism over RSA PKCS#1 v1.5 if
|
|
there's a choice.
|
|
:return:
|
|
A :class:`.SimpleSigner` object initialised with key material loaded
|
|
from the PKCS#12 file provided.
|
|
"""
|
|
# TODO support MAC integrity checking?
|
|
|
|
try:
|
|
with open(pfx_file, 'rb') as f:
|
|
pfx_bytes = f.read()
|
|
except IOError as e: # pragma: nocover
|
|
logger.error(f'Could not open PKCS#12 file {pfx_file}.', exc_info=e)
|
|
return None
|
|
|
|
ca_chain = (
|
|
cls._load_ca_chain(ca_chain_files) if ca_chain_files else set()
|
|
)
|
|
if ca_chain is None: # pragma: nocover
|
|
return None
|
|
try:
|
|
(
|
|
private_key,
|
|
cert,
|
|
other_certs_pkcs12,
|
|
) = pkcs12.load_key_and_certificates(pfx_bytes, passphrase)
|
|
except (IOError, ValueError, TypeError) as e:
|
|
logger.error(
|
|
'Could not load key material from PKCS#12 file', exc_info=e
|
|
)
|
|
return None
|
|
kinfo = _translate_pyca_cryptography_key_to_asn1(private_key)
|
|
cert = _translate_pyca_cryptography_cert_to_asn1(cert)
|
|
other_certs_pkcs12 = set(
|
|
map(_translate_pyca_cryptography_cert_to_asn1, other_certs_pkcs12)
|
|
)
|
|
|
|
cs = SimpleCertificateStore()
|
|
certs_to_register = ca_chain | other_certs_pkcs12
|
|
if other_certs is not None:
|
|
certs_to_register |= set(other_certs)
|
|
cs.register_multiple(certs_to_register)
|
|
return SimpleSigner(
|
|
signing_key=kinfo,
|
|
signing_cert=cert,
|
|
cert_registry=cs,
|
|
signature_mechanism=signature_mechanism,
|
|
prefer_pss=prefer_pss,
|
|
)
|
|
|
|
@classmethod
|
|
def load(
|
|
cls,
|
|
key_file,
|
|
cert_file,
|
|
ca_chain_files=None,
|
|
key_passphrase=None,
|
|
other_certs=None,
|
|
signature_mechanism=None,
|
|
prefer_pss=False,
|
|
):
|
|
"""
|
|
Load certificates and key material from PEM/DER files.
|
|
|
|
:param key_file:
|
|
File containing the signer's private key.
|
|
:param cert_file:
|
|
File containing the signer's certificate.
|
|
:param ca_chain_files:
|
|
File containing other relevant certificates.
|
|
:param key_passphrase:
|
|
Passphrase to decrypt the private key (if required).
|
|
:param other_certs:
|
|
Other relevant certificates, specified as a list of
|
|
:class:`.asn1crypto.x509.Certificate` objects.
|
|
:param signature_mechanism:
|
|
Override the signature mechanism to use.
|
|
:param prefer_pss:
|
|
Prefer PSS signature mechanism over RSA PKCS#1 v1.5 if
|
|
there's a choice.
|
|
:return:
|
|
A :class:`.SimpleSigner` object initialised with key material loaded
|
|
from the files provided.
|
|
"""
|
|
try:
|
|
# load cryptographic data (both PEM and DER are supported)
|
|
signing_key = load_private_key_from_pemder(
|
|
key_file, passphrase=key_passphrase
|
|
)
|
|
signing_cert = load_cert_from_pemder(cert_file)
|
|
except (IOError, ValueError, TypeError) as e:
|
|
logger.error('Could not load cryptographic material', exc_info=e)
|
|
return None
|
|
|
|
ca_chain = cls._load_ca_chain(ca_chain_files) if ca_chain_files else []
|
|
if ca_chain is None: # pragma: nocover
|
|
return None
|
|
|
|
other_certs = (
|
|
ca_chain if other_certs is None else ca_chain + other_certs
|
|
)
|
|
|
|
cert_reg = SimpleCertificateStore()
|
|
cert_reg.register_multiple(other_certs)
|
|
return SimpleSigner(
|
|
signing_cert=signing_cert,
|
|
signing_key=signing_key,
|
|
cert_registry=cert_reg,
|
|
signature_mechanism=signature_mechanism,
|
|
prefer_pss=prefer_pss,
|
|
)
|
|
|
|
|
|
def signer_from_p12_config(
|
|
config: PKCS12SignatureConfig,
|
|
provided_pfx_passphrase: Optional[bytes] = None,
|
|
):
|
|
passphrase = config.pfx_passphrase or provided_pfx_passphrase
|
|
result = SimpleSigner.load_pkcs12(
|
|
pfx_file=config.pfx_file,
|
|
passphrase=passphrase,
|
|
other_certs=config.other_certs,
|
|
prefer_pss=config.prefer_pss,
|
|
)
|
|
if result is None:
|
|
raise ConfigurationError("Error while loading key material")
|
|
return result
|
|
|
|
|
|
def signer_from_pemder_config(
|
|
config: PemDerSignatureConfig,
|
|
provided_key_passphrase: Optional[bytes] = None,
|
|
):
|
|
key_passphrase = config.key_passphrase or provided_key_passphrase
|
|
result = SimpleSigner.load(
|
|
key_file=config.key_file,
|
|
cert_file=config.cert_file,
|
|
other_certs=config.other_certs,
|
|
prefer_pss=config.prefer_pss,
|
|
key_passphrase=key_passphrase,
|
|
)
|
|
if result is None:
|
|
raise ConfigurationError("Error while loading key material")
|
|
return result
|
|
|
|
|
|
class ExternalSigner(Signer):
|
|
"""
|
|
Class to help formatting CMS objects for use with remote signing.
|
|
It embeds a fixed signature value into the CMS, set at initialisation.
|
|
|
|
Intended for use with :ref:`interrupted-signing`.
|
|
|
|
:param signing_cert:
|
|
The signer's certificate.
|
|
:param cert_registry:
|
|
The certificate registry to use in CMS generation.
|
|
:param signature_value:
|
|
The value of the signature as a byte string, a placeholder length,
|
|
or ``None``.
|
|
:param signature_mechanism:
|
|
The signature mechanism used by the external signing service.
|
|
:param prefer_pss:
|
|
Switch to prefer PSS when producing RSA signatures, as opposed to
|
|
RSA with PKCS#1 v1.5 padding.
|
|
:param embed_roots:
|
|
Whether to embed relevant root certificates into the CMS payload.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
signing_cert: Optional[x509.Certificate],
|
|
cert_registry: Optional[CertificateStore],
|
|
signature_value: Union[bytes, int, None] = None,
|
|
signature_mechanism: Optional[SignedDigestAlgorithm] = None,
|
|
prefer_pss: bool = False,
|
|
embed_roots: bool = True,
|
|
):
|
|
if isinstance(signature_value, bytes):
|
|
self._signature_value = signature_value
|
|
else:
|
|
self._signature_value = bytes(signature_value or 256)
|
|
super().__init__(
|
|
prefer_pss=prefer_pss,
|
|
embed_roots=embed_roots,
|
|
signing_cert=signing_cert,
|
|
cert_registry=cert_registry,
|
|
signature_mechanism=signature_mechanism,
|
|
)
|
|
|
|
async def async_sign_raw(
|
|
self, data: bytes, digest_algorithm: str, dry_run=False
|
|
) -> bytes:
|
|
"""
|
|
Return a fixed signature value.
|
|
"""
|
|
return self._signature_value
|
|
|
|
|
|
class GenericCMSSignedAttributeProviderSpec(SignedAttributeProviderSpec):
|
|
"""
|
|
Signed attribute provider spec for generic CMS signatures.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
attr_settings: CMSSignedAttributes,
|
|
signing_cert: Optional[x509.Certificate],
|
|
signature_mechanism: Union[
|
|
Callable[[str], algos.SignedDigestAlgorithm], None
|
|
],
|
|
timestamper: Optional[TimeStamper],
|
|
):
|
|
self.signing_cert = signing_cert
|
|
self.attr_settings = attr_settings
|
|
self.signature_mechanism = signature_mechanism
|
|
self.timestamper = timestamper
|
|
|
|
def signed_attr_providers(
|
|
self, data_digest: bytes, digest_algorithm: str
|
|
) -> Iterable[CMSAttributeProvider]:
|
|
attr_settings = self.attr_settings
|
|
if self.signing_cert is not None:
|
|
yield attributes.SigningCertificateV2Provider(
|
|
signing_cert=self.signing_cert
|
|
)
|
|
signing_time = attr_settings.signing_time
|
|
if signing_time is not None:
|
|
yield attributes.SigningTimeProvider(timestamp=signing_time)
|
|
if attr_settings.cades_signed_attrs is not None:
|
|
yield from attr_settings.cades_signed_attrs.prepare_providers(
|
|
message_digest=data_digest,
|
|
md_algorithm=digest_algorithm,
|
|
timestamper=self.timestamper,
|
|
)
|
|
|
|
if self.signature_mechanism is not None:
|
|
mech = self.signature_mechanism(digest_algorithm)
|
|
# TODO not sure if PAdES/CAdES allow this, need to check.
|
|
# It *should*, but perhaps the version of CMS it is based on is too
|
|
# old, or it might not allow undefined signed attributes.
|
|
# In the meantime, we only add this attribute to non-PAdES sigs
|
|
yield attributes.CMSAlgorithmProtectionProvider(
|
|
digest_algo=digest_algorithm, signature_algo=mech
|
|
)
|
|
|
|
|
|
class GenericPdfSignedAttributeProviderSpec(
|
|
GenericCMSSignedAttributeProviderSpec
|
|
):
|
|
"""
|
|
Signed attribute provider spec for generic PDF signatures.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
attr_settings: PdfCMSSignedAttributes,
|
|
signing_cert: Optional[x509.Certificate],
|
|
signature_mechanism: Union[
|
|
Callable[[str], algos.SignedDigestAlgorithm], None
|
|
],
|
|
timestamper: Optional[TimeStamper],
|
|
):
|
|
super().__init__(
|
|
attr_settings=attr_settings,
|
|
signing_cert=signing_cert,
|
|
signature_mechanism=signature_mechanism,
|
|
timestamper=timestamper,
|
|
)
|
|
|
|
def signed_attr_providers(
|
|
self, data_digest: bytes, digest_algorithm: str
|
|
) -> Iterable[CMSAttributeProvider]:
|
|
yield from super().signed_attr_providers(
|
|
data_digest=data_digest, digest_algorithm=digest_algorithm
|
|
)
|
|
attr_settings = self.attr_settings
|
|
assert isinstance(attr_settings, PdfCMSSignedAttributes)
|
|
if attr_settings.adobe_revinfo_attr is not None:
|
|
yield attributes.AdobeRevinfoProvider(
|
|
value=attr_settings.adobe_revinfo_attr
|
|
)
|
|
|
|
|
|
class CAdESSignedAttributeProviderSpec(SignedAttributeProviderSpec):
|
|
"""
|
|
Signed attribute provider spec for CAdES and PAdES signatures.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
attr_settings: CMSSignedAttributes,
|
|
signing_cert: x509.Certificate,
|
|
is_pades: bool,
|
|
timestamper: Optional[TimeStamper],
|
|
):
|
|
self.signing_cert = signing_cert
|
|
self.attr_settings = attr_settings
|
|
self.is_pades = is_pades
|
|
self.timestamper = timestamper
|
|
|
|
def signed_attr_providers(
|
|
self, data_digest: bytes, digest_algorithm: str
|
|
) -> Iterable[CMSAttributeProvider]:
|
|
yield attributes.SigningCertificateV2Provider(
|
|
signing_cert=self.signing_cert
|
|
)
|
|
attr_settings = self.attr_settings
|
|
if not self.is_pades:
|
|
# NOTE: PAdES actually forbids this, but CAdES requires it!
|
|
signing_time = attr_settings.signing_time
|
|
if signing_time is None:
|
|
# Ensure CAdES mandate is followed
|
|
signing_time = datetime.now(tz=tzlocal.get_localzone())
|
|
if signing_time is not None:
|
|
yield attributes.SigningTimeProvider(timestamp=signing_time)
|
|
cades_meta = attr_settings.cades_signed_attrs
|
|
if cades_meta is not None:
|
|
yield from cades_meta.prepare_providers(
|
|
message_digest=data_digest,
|
|
md_algorithm=digest_algorithm,
|
|
timestamper=self.timestamper,
|
|
)
|
|
|
|
|
|
class DefaultUnsignedAttributes(UnsignedAttributeProviderSpec):
|
|
"""
|
|
Default unsigned attribute provider spec.
|
|
"""
|
|
|
|
def __init__(self, timestamper: Optional[TimeStamper]):
|
|
self.timestamper = timestamper
|
|
|
|
def unsigned_attr_providers(
|
|
self,
|
|
signature: bytes,
|
|
signed_attrs: cms.CMSAttributes,
|
|
digest_algorithm: str,
|
|
) -> Iterable[CMSAttributeProvider]:
|
|
timestamper = self.timestamper
|
|
if timestamper is not None:
|
|
# the timestamp server needs to cross-sign our signature
|
|
yield attributes.TSTProvider(
|
|
digest_algorithm=digest_algorithm,
|
|
data_to_ts=signature,
|
|
timestamper=timestamper,
|
|
)
|
|
|
|
|
|
RSA_THRESHOLDS = [(2048, 'sha256'), (3072, 'sha384')]
|
|
ECC_THRESHOLDS = [(256, 'sha256'), (384, 'sha384')]
|
|
|
|
|
|
def select_suitable_signing_md(key: keys.PublicKeyInfo) -> str:
|
|
"""
|
|
Choose a reasonable default signing message digest given the properties of
|
|
(the public part of) a key.
|
|
|
|
The fallback value is :const:`constants.DEFAULT_MD`.
|
|
|
|
:param key:
|
|
A :class:`keys.PublicKeyInfo` object.
|
|
:return:
|
|
The name of a message digest algorithm.
|
|
"""
|
|
|
|
def _with_thresholds(key_size, thresholds):
|
|
for sz, md in thresholds:
|
|
if key_size <= sz:
|
|
return md
|
|
return 'sha512'
|
|
|
|
key_algo = key.algorithm
|
|
if key_algo == 'rsa':
|
|
return _with_thresholds(key.bit_size, RSA_THRESHOLDS)
|
|
elif key_algo == 'ec':
|
|
return _with_thresholds(key.bit_size, ECC_THRESHOLDS)
|
|
elif key_algo == 'ed25519':
|
|
return 'sha512'
|
|
elif key_algo == 'ed448':
|
|
return 'shake256'
|
|
return constants.DEFAULT_MD
|