253 lines
7.5 KiB
Python
253 lines
7.5 KiB
Python
"""
|
|
Module defining common API types for use by rules and policies.
|
|
|
|
In principle, these aren't relevant to the high-level validation API.
|
|
"""
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from dataclasses import field as dataclass_field
|
|
from typing import Callable, Iterable, Optional, Tuple, Type, TypeVar, Union
|
|
|
|
from pyhanko.pdf_utils.generic import (
|
|
ArrayObject,
|
|
Dereferenceable,
|
|
DictionaryObject,
|
|
IndirectObject,
|
|
Reference,
|
|
TrailerReference,
|
|
)
|
|
from pyhanko.pdf_utils.reader import HistoricalResolver, RawPdfPath
|
|
|
|
from ...pdf_utils import misc
|
|
from ...pdf_utils.rw_common import PdfHandler
|
|
from ...pdf_utils.xref import TrailerDictionary
|
|
from .policy_api import ModificationLevel
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
__all__ = [
|
|
'QualifiedWhitelistRule',
|
|
'WhitelistRule',
|
|
'ReferenceUpdate',
|
|
'Context',
|
|
'RelativeContext',
|
|
'AbsoluteContext',
|
|
]
|
|
|
|
|
|
def _eq_deref(a: Dereferenceable, b: Dereferenceable):
|
|
if isinstance(a, TrailerReference) and isinstance(b, TrailerReference):
|
|
return True
|
|
elif isinstance(a, (Reference, IndirectObject)) and isinstance(
|
|
b, (Reference, IndirectObject)
|
|
):
|
|
return a.idnum == b.idnum and a.generation == b.generation
|
|
else:
|
|
return False
|
|
|
|
|
|
def _hash_deref(a: Dereferenceable):
|
|
if isinstance(a, TrailerReference):
|
|
return hash((0, 0, 0))
|
|
elif isinstance(a, (Reference, IndirectObject)):
|
|
return hash((1, a.idnum, a.generation))
|
|
|
|
|
|
class Context:
|
|
@classmethod
|
|
def from_absolute(
|
|
cls, pdf_handler: PdfHandler, absolute_path: RawPdfPath
|
|
) -> 'AbsoluteContext':
|
|
return AbsoluteContext(pdf_handler=pdf_handler, path=absolute_path)
|
|
|
|
@classmethod
|
|
def relative_to(
|
|
cls,
|
|
start: Union[DictionaryObject, ArrayObject, TrailerDictionary],
|
|
path: Union[RawPdfPath, int, str],
|
|
) -> 'RelativeContext':
|
|
container_ref: Optional[Dereferenceable] = start.container_ref
|
|
assert container_ref is not None
|
|
cur_ref: Dereferenceable = container_ref
|
|
if isinstance(path, (int, str)):
|
|
path = RawPdfPath(path)
|
|
walk = path.walk_nodes(start, transparent_dereference=False)
|
|
rel_path = RawPdfPath()
|
|
for ix, (node, obj) in enumerate(walk):
|
|
# make sure we don't do a reset on the last node
|
|
# (# of nodes is the path length + 1)
|
|
if isinstance(obj, IndirectObject) and ix < len(path):
|
|
cur_ref = obj.reference
|
|
rel_path = RawPdfPath()
|
|
elif isinstance(node, (int, str)):
|
|
rel_path += node
|
|
return RelativeContext(cur_ref, rel_path)
|
|
|
|
def descend(self, path: Union[RawPdfPath, int, str]) -> 'Context':
|
|
raise NotImplementedError
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class RelativeContext(Context):
|
|
anchor: Dereferenceable
|
|
"""
|
|
Reference to the container object. In comparisons, this should be
|
|
the reference tied to the older revision.
|
|
"""
|
|
|
|
relative_path: RawPdfPath
|
|
"""
|
|
Path to the object from the container.
|
|
"""
|
|
|
|
def descend(self, path: Union[RawPdfPath, int, str]) -> 'RelativeContext':
|
|
root = self.anchor.get_object()
|
|
containers = (DictionaryObject, ArrayObject, TrailerDictionary)
|
|
if not isinstance(root, containers):
|
|
raise misc.PdfReadError(
|
|
f"Anchor {self.anchor} is not a container object"
|
|
)
|
|
return self.__class__.relative_to(root, self.relative_path + path)
|
|
|
|
def __hash__(self):
|
|
return hash(('rel', _hash_deref(self.anchor), self.relative_path))
|
|
|
|
def __eq__(self, other):
|
|
return (
|
|
isinstance(other, RelativeContext)
|
|
and other.relative_path == self.relative_path
|
|
and _eq_deref(other.anchor, self.anchor)
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class AbsoluteContext(Context):
|
|
path: RawPdfPath
|
|
"""
|
|
Absolute path from the trailer.
|
|
"""
|
|
|
|
pdf_handler: PdfHandler = dataclass_field(
|
|
repr=False, hash=False, compare=False
|
|
)
|
|
"""
|
|
The PDF handler to which this context is tied.
|
|
"""
|
|
|
|
@property
|
|
def relative_view(self) -> RelativeContext:
|
|
return RelativeContext.relative_to(
|
|
self.pdf_handler.trailer_view, self.path
|
|
)
|
|
|
|
def descend(self, path: Union[RawPdfPath, int, str]) -> 'AbsoluteContext':
|
|
return AbsoluteContext(self.path + path, self.pdf_handler)
|
|
|
|
|
|
class ApprovalType(misc.OrderedEnum):
|
|
BLANKET_APPROVE = 0
|
|
APPROVE_RELATIVE_CONTEXT = 1
|
|
APPROVE_PATH = 2
|
|
|
|
|
|
RefUpdateType = TypeVar('RefUpdateType', bound='ReferenceUpdate')
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ReferenceUpdate:
|
|
updated_ref: Reference
|
|
"""
|
|
Reference that was (potentially) updated.
|
|
"""
|
|
|
|
context_checked: Optional[Context] = None
|
|
|
|
@classmethod
|
|
def curry_ref(
|
|
cls: Type[RefUpdateType], **kwargs
|
|
) -> Callable[[Reference], RefUpdateType]:
|
|
return lambda ref: cls(updated_ref=ref, **kwargs)
|
|
|
|
@property
|
|
def approval_type(self) -> ApprovalType:
|
|
context = self.context_checked
|
|
if isinstance(context, RelativeContext):
|
|
return ApprovalType.APPROVE_RELATIVE_CONTEXT
|
|
elif isinstance(context, AbsoluteContext):
|
|
return ApprovalType.APPROVE_PATH
|
|
else:
|
|
return ApprovalType.BLANKET_APPROVE
|
|
|
|
|
|
class QualifiedWhitelistRule:
|
|
"""
|
|
Abstract base class for a whitelisting rule that outputs references together
|
|
with the modification level at which they're cleared.
|
|
|
|
This is intended for use by complicated whitelisting rules that need to
|
|
differentiate between multiple levels.
|
|
"""
|
|
|
|
def apply_qualified(
|
|
self, old: HistoricalResolver, new: HistoricalResolver
|
|
) -> Iterable[Tuple[ModificationLevel, ReferenceUpdate]]:
|
|
"""
|
|
Apply the rule to the changes between two revisions.
|
|
|
|
:param old:
|
|
The older, base revision.
|
|
:param new:
|
|
The newer revision to be vetted.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
|
|
class WhitelistRule:
|
|
"""
|
|
Abstract base class for a whitelisting rule that simply outputs
|
|
cleared references without specifying a modification level.
|
|
|
|
These rules are more flexible than rules of type
|
|
:class:`.QualifiedWhitelistRule`, since the modification level can be
|
|
specified separately (see :meth:`.WhitelistRule.as_qualified`).
|
|
"""
|
|
|
|
def apply(
|
|
self, old: HistoricalResolver, new: HistoricalResolver
|
|
) -> Iterable[ReferenceUpdate]:
|
|
"""
|
|
Apply the rule to the changes between two revisions.
|
|
|
|
:param old:
|
|
The older, base revision.
|
|
:param new:
|
|
The newer revision to be vetted.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
def as_qualified(self, level: ModificationLevel) -> QualifiedWhitelistRule:
|
|
"""
|
|
Construct a new :class:`QualifiedWhitelistRule` that whitelists the
|
|
object references from this rule at the level specified.
|
|
|
|
:param level:
|
|
The modification level at which the output of this rule should be
|
|
cleared.
|
|
:return:
|
|
A :class:`.QualifiedWhitelistRule` backed by this rule.
|
|
"""
|
|
return _WrappingQualifiedWhitelistRule(self, level)
|
|
|
|
|
|
class _WrappingQualifiedWhitelistRule(QualifiedWhitelistRule):
|
|
def __init__(self, rule: WhitelistRule, level: ModificationLevel):
|
|
self.rule = rule
|
|
self.level = level
|
|
|
|
def apply_qualified(
|
|
self, old: HistoricalResolver, new: HistoricalResolver
|
|
) -> Iterable[Tuple[ModificationLevel, ReferenceUpdate]]:
|
|
for ref in self.rule.apply(old, new):
|
|
yield self.level, ref
|