107 lines
3.4 KiB
Python

from typing import Optional, Union
from asn1crypto import cms, tsp
from pyhanko_certvalidator.fetchers.aiohttp_fetchers.util import LazySession
from pyhanko.sign.timestamps import TimeStamper, TimestampRequestError
from pyhanko.sign.timestamps.common_utils import set_tsp_headers
try:
import aiohttp
except ImportError as _e: # pragma: nocover
raise ImportError(
"Install pyHanko with the [async_http] optional dependency group"
) from _e
class AIOHttpTimeStamper(TimeStamper):
def __init__(
self,
url,
session: Union[aiohttp.ClientSession, LazySession],
https=False,
timeout=5,
headers=None,
auth: Optional[aiohttp.BasicAuth] = None,
):
"""
Initialise the timestamp client.
:param url:
URL where the server listens for timestamp requests.
:param https:
Enforce HTTPS.
:param timeout:
Timeout (in seconds)
:param auth:
`aiohttp.BasicAuth` object with authentication credentials.
:param headers:
Other headers to include.
"""
if https and not url.startswith('https:'): # pragma: nocover
raise ValueError('Timestamp URL is not HTTPS.')
self.url = url
self.timeout = timeout
self.auth = auth
self.headers = headers
self._session = session
super().__init__()
async def async_request_headers(self) -> dict:
"""
Format the HTTP request headers.
Subclasses can override this to perform their own header generation
logic.
:return:
Header dictionary.
"""
return set_tsp_headers(self.headers or {})
async def get_session(self) -> aiohttp.ClientSession:
session = self._session
if isinstance(session, LazySession):
return await session.get_session()
else:
return session
async def async_timestamp(
self, message_digest, md_algorithm
) -> cms.ContentInfo:
return await super().async_timestamp(message_digest, md_algorithm)
async def async_request_tsa_response(
self, req: tsp.TimeStampReq
) -> tsp.TimeStampResp:
session = await self.get_session()
cl_timeout = aiohttp.ClientTimeout(total=self.timeout)
headers = await self.async_request_headers()
try:
async with session.post(
url=self.url,
headers=headers,
data=req.dump(),
auth=self.auth,
raise_for_status=True,
timeout=cl_timeout,
) as response:
response_data = await response.read()
ct = response.headers.get('Content-Type')
if ct != 'application/timestamp-reply':
msg = (
f'Bad content type. Expected '
f'application/timestamp-reply,but got {ct}.'
)
raise aiohttp.ContentTypeError(
response.request_info,
response.history,
message=msg,
headers=response.headers,
)
except aiohttp.ClientError as e:
raise TimestampRequestError(
"Error while contacting timestamp service",
) from e
return tsp.TimeStampResp.load(response_data)