231 lines
8.0 KiB
Python
231 lines
8.0 KiB
Python
import abc
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from asn1crypto import algos, cms, keys, x509
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography.hazmat.primitives.asymmetric import padding
|
|
from cryptography.hazmat.primitives.asymmetric.dsa import DSAPublicKey
|
|
from cryptography.hazmat.primitives.asymmetric.ec import (
|
|
ECDSA,
|
|
EllipticCurvePublicKey,
|
|
)
|
|
from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PublicKey
|
|
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
|
|
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
|
|
from pyhanko_certvalidator.policy_decl import (
|
|
AlgorithmUsageConstraint,
|
|
AlgorithmUsagePolicy,
|
|
DisallowWeakAlgorithmsPolicy,
|
|
)
|
|
|
|
from ..general import (
|
|
MultivaluedAttributeError,
|
|
NonexistentAttributeError,
|
|
find_unique_cms_attribute,
|
|
get_pyca_cryptography_hash_for_signing,
|
|
process_pss_params,
|
|
)
|
|
from .errors import DisallowedAlgorithmError, SignatureValidationError
|
|
|
|
|
|
def _ensure_digest_match(
|
|
signature_algo: algos.SignedDigestAlgorithm,
|
|
message_digest_algo: algos.DigestAlgorithm,
|
|
) -> AlgorithmUsageConstraint:
|
|
try:
|
|
sig_hash_algo_obj = algos.DigestAlgorithm(
|
|
{'algorithm': signature_algo.hash_algo}
|
|
)
|
|
except ValueError:
|
|
sig_hash_algo_obj = None
|
|
|
|
if (
|
|
sig_hash_algo_obj is not None
|
|
and sig_hash_algo_obj.dump() != message_digest_algo.dump()
|
|
):
|
|
return AlgorithmUsageConstraint(
|
|
allowed=False,
|
|
failure_reason=(
|
|
f"Digest algorithm {message_digest_algo['algorithm'].native} "
|
|
f"does not match value implied by signature algorithm "
|
|
f"{signature_algo['algorithm'].native}"
|
|
),
|
|
)
|
|
return AlgorithmUsageConstraint(allowed=True)
|
|
|
|
|
|
class CMSAlgorithmUsagePolicy(AlgorithmUsagePolicy, abc.ABC):
|
|
"""
|
|
Algorithm usage policy for CMS signatures.
|
|
"""
|
|
|
|
def digest_combination_allowed(
|
|
self,
|
|
signature_algo: algos.SignedDigestAlgorithm,
|
|
message_digest_algo: algos.DigestAlgorithm,
|
|
moment: Optional[datetime],
|
|
) -> AlgorithmUsageConstraint:
|
|
"""
|
|
Verify whether a digest algorithm is compatible with the digest
|
|
algorithm implied by the provided signature algorithm, if any.
|
|
|
|
By default, this enforces the convention (requirement in RFC 8933) that
|
|
the message digest must be computed using the same digest algorithm
|
|
as the one used by the signature, if applicable.
|
|
|
|
Checking whether the individual algorithms are allowed is not the
|
|
responsibility of this method.
|
|
|
|
:param signature_algo:
|
|
A signature mechanism to use
|
|
:param message_digest_algo:
|
|
The digest algorithm used for the message digest
|
|
:param moment:
|
|
The point in time for which the assessment needs to be made.
|
|
:return:
|
|
A usage constraint.
|
|
"""
|
|
return _ensure_digest_match(signature_algo, message_digest_algo)
|
|
|
|
@staticmethod
|
|
def lift_policy(policy: AlgorithmUsagePolicy) -> 'CMSAlgorithmUsagePolicy':
|
|
"""
|
|
Lift a 'base' :class:`.AlgorithmUsagePolicy` to a CMS usage algorithm
|
|
policy with default settings. If the policy passed in is already
|
|
a :class:`.CMSAlgorithmUsagePolicy`, return it as-is.
|
|
|
|
:param policy:
|
|
The underlying original policy
|
|
:return:
|
|
The lifted policy
|
|
"""
|
|
if isinstance(policy, CMSAlgorithmUsagePolicy):
|
|
return policy
|
|
else:
|
|
return _DefaultPolicyMixin(policy)
|
|
|
|
|
|
class _DefaultPolicyMixin(CMSAlgorithmUsagePolicy):
|
|
def __init__(self, underlying_policy: AlgorithmUsagePolicy):
|
|
self._policy = underlying_policy
|
|
|
|
def digest_algorithm_allowed(
|
|
self, algo: algos.DigestAlgorithm, moment: Optional[datetime]
|
|
) -> AlgorithmUsageConstraint:
|
|
return self._policy.digest_algorithm_allowed(algo, moment)
|
|
|
|
def signature_algorithm_allowed(
|
|
self,
|
|
algo: algos.SignedDigestAlgorithm,
|
|
moment: Optional[datetime],
|
|
public_key: Optional[keys.PublicKeyInfo],
|
|
) -> AlgorithmUsageConstraint:
|
|
return self._policy.signature_algorithm_allowed(
|
|
algo, moment, public_key
|
|
)
|
|
|
|
|
|
DEFAULT_WEAK_HASH_ALGORITHMS = frozenset({'sha1', 'md5', 'md2'})
|
|
|
|
|
|
DEFAULT_ALGORITHM_USAGE_POLICY = CMSAlgorithmUsagePolicy.lift_policy(
|
|
DisallowWeakAlgorithmsPolicy(DEFAULT_WEAK_HASH_ALGORITHMS)
|
|
)
|
|
|
|
|
|
def validate_raw(
|
|
signature: bytes,
|
|
signed_data: bytes,
|
|
cert: x509.Certificate,
|
|
signature_algorithm: algos.SignedDigestAlgorithm,
|
|
md_algorithm: str,
|
|
prehashed=False,
|
|
algorithm_policy: Optional[
|
|
CMSAlgorithmUsagePolicy
|
|
] = DEFAULT_ALGORITHM_USAGE_POLICY,
|
|
time_indic: Optional[datetime] = None,
|
|
):
|
|
"""
|
|
Validate a raw signature. Internal API.
|
|
"""
|
|
if algorithm_policy is not None:
|
|
sig_algo_allowed = algorithm_policy.signature_algorithm_allowed(
|
|
signature_algorithm, moment=time_indic, public_key=cert.public_key
|
|
)
|
|
if not sig_algo_allowed:
|
|
msg = (
|
|
f"Signature algorithm "
|
|
f"{signature_algorithm['algorithm'].native} is not allowed "
|
|
f"by the current usage policy."
|
|
)
|
|
if sig_algo_allowed.failure_reason is not None:
|
|
msg += f" Reason: {sig_algo_allowed.failure_reason}"
|
|
raise DisallowedAlgorithmError(
|
|
msg, permanent=sig_algo_allowed.not_allowed_after is None
|
|
)
|
|
|
|
digest_compatible = algorithm_policy.digest_combination_allowed(
|
|
signature_algo=signature_algorithm,
|
|
message_digest_algo=algos.DigestAlgorithm(
|
|
{'algorithm': md_algorithm}
|
|
),
|
|
moment=None,
|
|
)
|
|
if not digest_compatible:
|
|
raise DisallowedAlgorithmError(
|
|
failure_message=digest_compatible.failure_reason,
|
|
permanent=digest_compatible.not_allowed_after is None,
|
|
)
|
|
|
|
try:
|
|
verify_md_algo = signature_algorithm.hash_algo
|
|
except ValueError:
|
|
verify_md_algo = md_algorithm
|
|
|
|
verify_md = get_pyca_cryptography_hash_for_signing(
|
|
verify_md_algo, prehashed=prehashed
|
|
)
|
|
|
|
pub_key = serialization.load_der_public_key(cert.public_key.dump())
|
|
|
|
sig_algo = signature_algorithm.signature_algo
|
|
if sig_algo == 'rsassa_pkcs1v15':
|
|
assert isinstance(pub_key, RSAPublicKey)
|
|
pub_key.verify(signature, signed_data, padding.PKCS1v15(), verify_md)
|
|
elif sig_algo == 'rsassa_pss':
|
|
assert isinstance(pub_key, RSAPublicKey)
|
|
pss_padding, hash_algo = process_pss_params(
|
|
signature_algorithm['parameters'], md_algorithm, prehashed=prehashed
|
|
)
|
|
pub_key.verify(signature, signed_data, pss_padding, hash_algo)
|
|
elif sig_algo == 'dsa':
|
|
assert isinstance(pub_key, DSAPublicKey)
|
|
pub_key.verify(signature, signed_data, verify_md)
|
|
elif sig_algo == 'ecdsa':
|
|
assert isinstance(pub_key, EllipticCurvePublicKey)
|
|
pub_key.verify(signature, signed_data, ECDSA(verify_md))
|
|
elif sig_algo in 'ed25519':
|
|
assert isinstance(pub_key, Ed25519PublicKey)
|
|
pub_key.verify(signature, signed_data)
|
|
elif sig_algo in 'ed448':
|
|
assert isinstance(pub_key, Ed448PublicKey)
|
|
pub_key.verify(signature, signed_data)
|
|
else: # pragma: nocover
|
|
raise NotImplementedError(
|
|
f"Signature mechanism {sig_algo} is not supported."
|
|
)
|
|
|
|
|
|
def extract_message_digest(signer_info: cms.SignerInfo):
|
|
try:
|
|
embedded_digest = find_unique_cms_attribute(
|
|
signer_info['signed_attrs'], 'message_digest'
|
|
)
|
|
return embedded_digest.native
|
|
except (NonexistentAttributeError, MultivaluedAttributeError):
|
|
raise SignatureValidationError(
|
|
'Message digest not found in signature, or multiple message '
|
|
'digest attributes present.'
|
|
)
|