250 lines
7.7 KiB
Python
250 lines
7.7 KiB
Python
"""
|
|
Module defining common helpers for use by rules and policies.
|
|
|
|
In principle, these aren't relevant to the high-level validation API.
|
|
"""
|
|
|
|
|
|
import logging
|
|
from typing import Callable, FrozenSet, Generator, Optional, Set, Tuple, TypeVar
|
|
|
|
from pyhanko.pdf_utils import generic, misc
|
|
from pyhanko.pdf_utils.generic import PdfObject, Reference
|
|
from pyhanko.pdf_utils.reader import HistoricalResolver
|
|
|
|
from .policy_api import ModificationLevel, SuspiciousModification
|
|
from .rules_api import ReferenceUpdate
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
__all__ = [
|
|
'qualify',
|
|
'qualify_transforming',
|
|
'safe_whitelist',
|
|
'compare_key_refs',
|
|
'compare_dicts',
|
|
'assert_not_stream',
|
|
]
|
|
|
|
TwoVersions = Tuple[Optional[generic.PdfObject], Optional[generic.PdfObject]]
|
|
|
|
|
|
def assert_not_stream(obj):
|
|
"""
|
|
Throw :class:`.SuspiciousModification` if the argument is a stream object.
|
|
"""
|
|
if isinstance(obj, generic.StreamObject):
|
|
raise SuspiciousModification(
|
|
f"Unexpected stream encountered at {obj.container_ref}!"
|
|
)
|
|
|
|
|
|
def safe_whitelist(
|
|
old: HistoricalResolver, old_ref, new_ref
|
|
) -> Generator[Reference, None, None]:
|
|
"""
|
|
Checks whether an indirect reference in a PDF structure
|
|
can be updated without clobbering an older object in a way
|
|
that causes ramifications at the PDF syntax level.
|
|
|
|
The following are verified:
|
|
|
|
- Does the old reference point to a non-stream object?
|
|
- If the new reference is equal to the old one, does the new reference point
|
|
to a non-stream object?
|
|
- If the new reference is not equal to the old one,
|
|
is the new reference a newly defined object?
|
|
|
|
This is a generator for syntactical convenience and integration
|
|
with internal APIs, but it will always yield at most one element.
|
|
"""
|
|
|
|
if old_ref:
|
|
assert_not_stream(old_ref.get_object())
|
|
|
|
if old_ref == new_ref:
|
|
assert_not_stream(new_ref.get_object())
|
|
yield new_ref
|
|
elif old.is_ref_available(new_ref):
|
|
yield new_ref
|
|
else:
|
|
raise SuspiciousModification(
|
|
f"Update clobbers or reuses {new_ref} in an unexpected way."
|
|
)
|
|
|
|
|
|
def compare_key_refs(
|
|
key,
|
|
old: HistoricalResolver,
|
|
old_dict: generic.DictionaryObject,
|
|
new_dict: generic.DictionaryObject,
|
|
) -> Generator[Reference, None, TwoVersions]:
|
|
"""
|
|
Ensure that updating a key in a dictionary has no undesirable side effects.
|
|
The following scenarios are allowed:
|
|
|
|
0. replacing a direct value with another direct value
|
|
1. adding a key in new_dict
|
|
2. replacing a direct value in old_dict with a reference in new_dict
|
|
3. the reverse (allowed by default)
|
|
4. replacing a reference with another reference (that doesn't override
|
|
anything else)
|
|
|
|
The restrictions of `safe_whitelist` apply to this function as well.
|
|
|
|
Note: this routine is only safe to use if the structure of the resulting
|
|
values is also checked. Otherwise, it can lead to reference leaks if
|
|
one is not careful.
|
|
"""
|
|
|
|
try:
|
|
old_value = old_dict.raw_get(key)
|
|
if isinstance(old_value, generic.IndirectObject):
|
|
old_value_ref = old_value.reference
|
|
old_value = old_value.get_object()
|
|
else:
|
|
old_value_ref = None
|
|
except KeyError:
|
|
old_value_ref = old_value = None
|
|
|
|
try:
|
|
new_value = new_dict.raw_get(key)
|
|
if isinstance(new_value, generic.IndirectObject):
|
|
new_value_ref = new_value.reference
|
|
new_value = new_value.get_object()
|
|
else:
|
|
new_value_ref = None
|
|
except KeyError:
|
|
if old_value is not None:
|
|
raise SuspiciousModification(
|
|
f"Key {key} was deleted from dictionary"
|
|
)
|
|
return None, None # nothing to do
|
|
|
|
if new_value_ref is not None:
|
|
yield from safe_whitelist(old, old_value_ref, new_value_ref)
|
|
|
|
return old_value, new_value
|
|
|
|
|
|
R = TypeVar('R', covariant=True)
|
|
QualifyIn = TypeVar('QualifyIn', contravariant=True)
|
|
RefToUpd = TypeVar('RefToUpd', bound=ReferenceUpdate)
|
|
OutRefUpd = TypeVar('OutRefUpd', bound=ReferenceUpdate, covariant=True)
|
|
|
|
|
|
def qualify(
|
|
level: ModificationLevel,
|
|
rule_result: Generator[RefToUpd, None, R],
|
|
) -> Generator[Tuple[ModificationLevel, RefToUpd], None, R]:
|
|
"""
|
|
This is a helper function for rule implementors.
|
|
It attaches a fixed modification level to an existing reference update
|
|
generator, respecting the original generator's return value (if relevant).
|
|
|
|
A prototypical use would be of the following form:
|
|
|
|
.. code-block:: python
|
|
|
|
def some_generator_function():
|
|
# do stuff
|
|
for ref in some_list:
|
|
# do stuff
|
|
yield ref
|
|
|
|
# do more stuff
|
|
return summary_value
|
|
|
|
# ...
|
|
|
|
def some_qualified_generator_function():
|
|
summary_value = yield from qualify(
|
|
ModificationLevel.FORM_FILLING,
|
|
some_generator_function()
|
|
)
|
|
|
|
Provided that ``some_generator_function`` yields
|
|
:class:`~.generic.ReferenceUpdate` objects, the yield type of the resulting
|
|
generator will be tuples of the form ``(level, ref)``.
|
|
|
|
:param level:
|
|
The modification level to set.
|
|
:param rule_result:
|
|
A generator that outputs references to be whitelisted.
|
|
:return:
|
|
A converted generator that outputs references qualified at the
|
|
modification level specified.
|
|
"""
|
|
return qualify_transforming(level, rule_result, transform=lambda x: x)
|
|
|
|
|
|
def qualify_transforming(
|
|
level: ModificationLevel,
|
|
rule_result: Generator[QualifyIn, None, R],
|
|
transform: Callable[[QualifyIn], OutRefUpd],
|
|
) -> Generator[Tuple[ModificationLevel, OutRefUpd], None, R]:
|
|
"""
|
|
This is a version of :func:`.qualify` that additionally allows
|
|
a transformation to be applied to the output of the rule.
|
|
|
|
:param level:
|
|
The modification level to set.
|
|
:param rule_result:
|
|
A generator that outputs references to be whitelisted.
|
|
:param transform:
|
|
Function to apply to the reference object before appending
|
|
the modification level and yielding it.
|
|
:return:
|
|
A converted generator that outputs references qualified at the
|
|
modification level specified.
|
|
"""
|
|
return misc.map_with_return(
|
|
rule_result, lambda ref: (level, transform(ref))
|
|
)
|
|
|
|
|
|
def compare_dicts(
|
|
old_dict: Optional[PdfObject],
|
|
new_dict: Optional[PdfObject],
|
|
ignored: FrozenSet[str] = frozenset(),
|
|
raise_exc=True,
|
|
) -> bool:
|
|
"""
|
|
Compare entries in two dictionaries, optionally ignoring certain keys.
|
|
"""
|
|
|
|
if not isinstance(old_dict, generic.DictionaryObject):
|
|
raise misc.PdfReadError(
|
|
"Encountered unexpected non-dictionary object in prior revision."
|
|
) # pragma: nocover
|
|
if not isinstance(new_dict, generic.DictionaryObject):
|
|
raise SuspiciousModification(
|
|
"Dict is overridden by non-dict in new revision"
|
|
)
|
|
|
|
assert_not_stream(old_dict)
|
|
assert_not_stream(new_dict)
|
|
new_dict_keys = set(new_dict.keys()) - ignored
|
|
old_dict_keys = set(old_dict.keys()) - ignored
|
|
if new_dict_keys != old_dict_keys:
|
|
if raise_exc:
|
|
raise SuspiciousModification(
|
|
f"Dict keys differ: {new_dict_keys} vs. " f"{old_dict_keys}."
|
|
)
|
|
else:
|
|
return False
|
|
|
|
for k in new_dict_keys:
|
|
new_val = new_dict.raw_get(k)
|
|
old_val = old_dict.raw_get(k)
|
|
if new_val != old_val:
|
|
if raise_exc:
|
|
raise SuspiciousModification(
|
|
f"Values for dict key {k} differ:"
|
|
f"{old_val} changed to {new_val}"
|
|
)
|
|
else:
|
|
return False
|
|
|
|
return True
|