256 lines
8.4 KiB
Python
256 lines
8.4 KiB
Python
import logging
|
|
from pathlib import Path
|
|
|
|
from asn1crypto import crl, ocsp, pem, x509
|
|
from pyhanko.pdf_utils.incremental_writer import IncrementalPdfFileWriter
|
|
from pyhanko.sign import signers, timestamps
|
|
from pyhanko.sign.fields import SigSeedSubFilter
|
|
from pyhanko_certvalidator import ValidationContext
|
|
|
|
try:
|
|
from pyhanko.sign.pkcs11 import PKCS11Signer, open_pkcs11_session
|
|
except ImportError:
|
|
open_pkcs11_session = PKCS11Signer = None
|
|
|
|
from xhtml2pdf.files import getFile
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class PDFSignature:
|
|
@staticmethod
|
|
def get_passphrase(config):
|
|
if "passphrase" in config:
|
|
passphrase = config["passphrase"]
|
|
if isinstance(passphrase, str):
|
|
passphrase = passphrase.encode()
|
|
return passphrase
|
|
return None
|
|
|
|
@staticmethod
|
|
def get_chains(config, _key):
|
|
chains = []
|
|
if "ca_chain" in config:
|
|
chain = config["ca_chain"]
|
|
if not isinstance(chain, list):
|
|
chain = [chain]
|
|
for c in chain:
|
|
if isinstance(c, (Path, str)):
|
|
pisafile = getFile(c)
|
|
_, _, digicert_ca_bytes = pem.unarmor(pisafile.getData())
|
|
chains.append(x509.Certificate.load(digicert_ca_bytes))
|
|
else:
|
|
chains.append(c)
|
|
if not chains:
|
|
return None
|
|
return chains
|
|
|
|
@staticmethod
|
|
def test_simple_signer(config):
|
|
passphrase = PDFSignature.get_passphrase(config)
|
|
if "key" in config and "cert" in config and passphrase:
|
|
chain = PDFSignature.get_chains(config, "ca_chain")
|
|
return signers.SimpleSigner.load(
|
|
config["key"],
|
|
config["cert"],
|
|
ca_chain_files=chain,
|
|
key_passphrase=passphrase,
|
|
)
|
|
return None
|
|
|
|
@staticmethod
|
|
def test_pkcs12_signer(config):
|
|
passphrase = PDFSignature.get_passphrase(config)
|
|
if "pfx_file" in config and passphrase:
|
|
return signers.SimpleSigner.load_pkcs12(
|
|
pfx_file=config["pfx_file"], passphrase=passphrase
|
|
)
|
|
return None
|
|
|
|
@staticmethod
|
|
def test_pkcs11_signer(config):
|
|
session = PDFSignature.get_session(config)
|
|
keys = {
|
|
"pkcs11_session": session,
|
|
"cert_label": None,
|
|
"signing_cert": None,
|
|
"ca_chain": None,
|
|
"key_label": None,
|
|
"prefer_pss": False,
|
|
"embed_roots": True,
|
|
"other_certs_to_pull": (),
|
|
"bulk_fetch": True,
|
|
"key_id": None,
|
|
"cert_id": None,
|
|
"use_raw_mechanism": False,
|
|
}
|
|
|
|
for key in keys:
|
|
if key in config:
|
|
if key == "ca_chain":
|
|
chain = PDFSignature.get_chains(config, "ca_chain")
|
|
keys[key] = chain
|
|
else:
|
|
keys[key] = config[key]
|
|
|
|
return PKCS11Signer(**keys)
|
|
|
|
@staticmethod
|
|
def get_timestamps(config):
|
|
if "tsa" in config:
|
|
return timestamps.HTTPTimeStamper(url=config["tsa"])
|
|
return None
|
|
|
|
@staticmethod
|
|
def get_signers(config):
|
|
if "engine" not in config:
|
|
return None
|
|
|
|
signer = None
|
|
engine = config["engine"]
|
|
if engine == "pkcs12":
|
|
signer = PDFSignature.test_pkcs12_signer(config)
|
|
elif engine == "pkcs11":
|
|
if PKCS11Signer is None:
|
|
msg = (
|
|
"pyhanko.sign.pkcs11 requires pyHanko to be installed with the"
|
|
" [pkcs11] option. You can install missing dependencies by running"
|
|
" \"pip install 'pyHanko[pkcs11]'\"."
|
|
)
|
|
raise ImportError(msg)
|
|
|
|
signer = PDFSignature.test_pkcs11_signer(config)
|
|
elif engine == "simple":
|
|
signer = PDFSignature.test_simple_signer(config)
|
|
return signer
|
|
|
|
@staticmethod
|
|
def sign(inputfile, output, config):
|
|
if config["type"] == "lta":
|
|
return PDFSignature.lta_sign(inputfile, output, config)
|
|
return PDFSignature.simple_sign(inputfile, output, config)
|
|
|
|
@staticmethod
|
|
def parse_crls(crls):
|
|
list_crls = []
|
|
for x in crls:
|
|
if isinstance(x, (Path, str)):
|
|
pisafile = getFile(x)
|
|
cert_list = crl.CertificateList.load(pisafile.getData())
|
|
list_crls.append(cert_list)
|
|
else:
|
|
list_crls.append(x)
|
|
return list_crls
|
|
|
|
@staticmethod
|
|
def parse_oscp(oscps):
|
|
list_oscp = []
|
|
for x in oscps:
|
|
pisafile = getFile(x)
|
|
data = ocsp.OCSPResponse.load(pisafile.getData())
|
|
list_oscp.append(data)
|
|
return list_oscp
|
|
|
|
@staticmethod
|
|
def get_validation_context(config):
|
|
context = {"allow_fetching": True}
|
|
if "validation_context" in config:
|
|
if "crls" in config["validation_context"]:
|
|
config["validation_context"]["crls"] = PDFSignature.parse_crls(
|
|
config["validation_context"]["crls"]
|
|
)
|
|
if "ocsps" in config["validation_context"]:
|
|
config["validation_context"]["ocsps"] = PDFSignature.parse_oscp(
|
|
config["validation_context"]["ocsps"]
|
|
)
|
|
|
|
if "trust_roots" in config["validation_context"]:
|
|
config["validation_context"]["trust_roots"] = PDFSignature.get_chains(
|
|
config, "trust_roots"
|
|
)
|
|
|
|
if "extra_trust_roots" in config["validation_context"]:
|
|
config["validation_context"]["extra_trust_roots"] = (
|
|
PDFSignature.get_chains(config, "extra_trust_roots")
|
|
)
|
|
|
|
if "other_certs" in config["validation_context"]:
|
|
config["validation_context"]["other_certs"] = PDFSignature.get_chains(
|
|
config, "other_certs"
|
|
)
|
|
|
|
context.update(config["validation_context"])
|
|
return ValidationContext(**context)
|
|
|
|
@staticmethod
|
|
def get_signature_meta(config):
|
|
meta = {
|
|
"field_name": "Signature1",
|
|
"md_algorithm": "sha256",
|
|
"location": None,
|
|
"reason": None,
|
|
"name": None,
|
|
"certify": False,
|
|
"embed_validation_info": True,
|
|
"use_pades_lta": True,
|
|
"subfilter": SigSeedSubFilter.PADES,
|
|
"timestamp_field_name": None,
|
|
"validation_context": PDFSignature.get_validation_context(config),
|
|
}
|
|
if "meta" in config:
|
|
meta.update(config["meta"])
|
|
return meta
|
|
|
|
@staticmethod
|
|
def simple_sign(inputfile, output, config):
|
|
signer = PDFSignature.get_signers(config)
|
|
if signer:
|
|
w = IncrementalPdfFileWriter(inputfile)
|
|
timestamper = PDFSignature.get_timestamps(config)
|
|
signers.sign_pdf(
|
|
w,
|
|
signers.PdfSignatureMetadata(field_name="Signature1"),
|
|
signer=signer,
|
|
output=output,
|
|
timestamper=timestamper,
|
|
)
|
|
return True
|
|
return None
|
|
|
|
@staticmethod
|
|
def lta_sign(inputfile, output, config):
|
|
signer = PDFSignature.get_signers(config)
|
|
timestamper = PDFSignature.get_timestamps(config)
|
|
w = IncrementalPdfFileWriter(inputfile)
|
|
meta = PDFSignature.get_signature_meta(config)
|
|
|
|
signature_meta = signers.PdfSignatureMetadata(**meta)
|
|
if signer and timestamper:
|
|
signers.sign_pdf(
|
|
w,
|
|
signature_meta=signature_meta,
|
|
signer=signer,
|
|
timestamper=timestamper,
|
|
output=output,
|
|
)
|
|
return True
|
|
return None
|
|
|
|
@staticmethod
|
|
def get_session(config):
|
|
lib_location = config.get("lib_location", None)
|
|
slot_no = config.get("slot_no", None)
|
|
token_label = config.get("token_label", None)
|
|
user_pin = config.get("user_pin", None)
|
|
|
|
if user_pin is not None and lib_location is not None:
|
|
if slot_no is not None or token_label is not None:
|
|
return open_pkcs11_session(
|
|
lib_location,
|
|
slot_no=slot_no,
|
|
token_label=token_label,
|
|
user_pin=user_pin,
|
|
)
|
|
return None
|
|
return None
|