Source code for ambra_sdk.api.async_api

"""Ambra async api."""

import logging
from asyncio import Lock, sleep
from time import monotonic
from typing import Awaitable, Callable, Optional, TypeVar

import aiohttp

from ambra_sdk.api.base_api import BaseApi
from ambra_sdk.async_addon.addon import Addon
from ambra_sdk.clear_params import clear_params
from ambra_sdk.exceptions.service import AuthorizationRequired
from ambra_sdk.exceptions.storage import AccessDenied
from ambra_sdk.request_args import AioHTTPRequestArgs
from ambra_sdk.service.entrypoints import (
    AsyncAccount,
    AsyncActivity,
    AsyncAnalytics,
    AsyncAnnotation,
    AsyncAnonymization,
    AsyncAppointment,
    AsyncAudit,
    AsyncCase,
    AsyncCustomcode,
    AsyncCustomfield,
    AsyncDestination,
    AsyncDicomdata,
    AsyncDictionary,
    AsyncFilter,
    AsyncGroup,
    AsyncHelp,
    AsyncHl7,
    AsyncKeyimage,
    AsyncLink,
    AsyncLocation,
    AsyncMeeting,
    AsyncMessage,
    AsyncNamespace,
    AsyncNode,
    AsyncNpi,
    AsyncOrder,
    AsyncPatient,
    AsyncPurge,
    AsyncQctask,
    AsyncQuery,
    AsyncRadreport,
    AsyncRadreportmacro,
    AsyncReport,
    AsyncRole,
    AsyncRoute,
    AsyncRsna,
    AsyncScanner,
    AsyncSession,
    AsyncSetting,
    AsyncSite,
    AsyncStudy,
    AsyncTag,
    AsyncTerminology,
    AsyncTraining,
    AsyncUser,
    AsyncValidate,
    AsyncWebhook,
)
from ambra_sdk.storage.storage import AsyncStorage

logger = logging.getLogger(__name__)


[docs]class AsyncApi(BaseApi): """Ambra Async API. Example: >>> from ambra_sdk.api import AsyncApi >>> >>> api = AsyncApi.with_creds( ... url='https://ambrahealth_host/api/v3', ... username='username', ... password='password', ... client_name='Some name (ex: Hospital-ABC)', ... ) Using some special parameters (In most cases you don't need this!): >>> from ambra_sdk.api import AsyncApi >>> from ambra_sdk.api.base_api import RateLimit, RateLimits >>> >>> MY_RLS = RateLimits( ... default=RateLimit(3, 2), ... get_limit=RateLimit(4, 2), ... special={'special_url': RateLimit(5, 2)}, ... ) >>> >>> api = AsyncApi.with_creds( ... url='https://ambrahealth_host/api/v3', ... username='username', ... password='password', ... client_name='Some name (ex: Hospital-ABC)', ... special_headers_for_login={'Special-H': 'ABC'}, ... rate_limits=MY_RLS, ... autocast_arguments=False, ... ) """ backend = 'AIOHTTP' def __init__(self, *args, **kwargs): """Init api. :param args: args :param kwargs: kwargs """ super().__init__(*args, **kwargs) self._service_session: Optional[aiohttp.ClientSession] = None self._storage_session: Optional[aiohttp.ClientSession] = None # For rate limits self._rate_limits_lock = Lock() self._last_request_time = None self._last_call_period = None # Init services api self._init_service_entrypoints() # Init storage api namespace self.Storage = AsyncStorage(self) # Init addon namespace self.Addon = Addon(self) @property def service_session(self) -> aiohttp.ClientSession: """Service session. :return: service session """ headers = self._default_headers.copy() headers.update(self._service_default_headers) if self._service_session is None: self._service_session = aiohttp.ClientSession(headers=headers) return self._service_session @property def storage_session(self) -> aiohttp.ClientSession: """Storage session. :return: storage session """ headers = self._default_headers.copy() headers.update(self._storage_default_headers) if self._storage_session is None: self._storage_session = aiohttp.ClientSession(headers=headers) return self._storage_session
[docs] async def storage_get( self, url: str, required_sid: bool, **kwargs, ) -> aiohttp.ClientResponse: """Get from storage. :param url: url :param required_sid: is this method required sid :param kwargs: request arguments :return: response obj """ kwargs = await self._prepare_storage_request_args( required_sid=required_sid, **kwargs, ) logger.info( 'Storage get: %s. Params: %s', url, str(clear_params(kwargs.get('params', {}))), ) return await self.storage_session.get(url=url, **kwargs)
[docs] async def storage_delete( self, url: str, required_sid: bool, **kwargs, ) -> aiohttp.ClientResponse: """Delete from storage. :param url: url :param required_sid: is this method required sid :param kwargs: request arguments :return: response obj """ kwargs = await self._prepare_storage_request_args( required_sid=required_sid, **kwargs, ) logger.info( 'Storage delete: %s. Params: %s', url, str(clear_params(kwargs.get('params', {}))), ) return await self.storage_session.delete(url=url, **kwargs)
[docs] async def storage_post( self, url: str, required_sid: bool, **kwargs, ) -> aiohttp.ClientResponse: """Post to storage. :param url: url :param required_sid: is this method required sid :param kwargs: request arguments :return: response obj """ kwargs = await self._prepare_storage_request_args( required_sid=required_sid, **kwargs, ) logger.info( 'Storage post: %s. Params: %s', url, str(clear_params(kwargs.get('params', {}))), ) return await self.storage_session.post(url=url, **kwargs)
[docs] async def service_request( self, request_args: AioHTTPRequestArgs, required_sid: bool, ) -> aiohttp.ClientResponse: """Post data to url. :param request_args: request args :param required_sid: is this method required sid :return: response """ await self._wait_for_service_request(request_args.url) return await self._service_request_without_rate_limits( request_args=request_args, required_sid=required_sid, )
[docs] async def service_post( self, url: str, required_sid: bool, **kwargs, ) -> aiohttp.ClientResponse: """Post data to url. :param url: method url :param required_sid: is this method required sid :param kwargs: request arguments :return: response """ full_url = self.service_full_url(url) request_args = AioHTTPRequestArgs( method='POST', url=url, full_url=full_url, **kwargs, ) return await self.service_request( request_args=request_args, required_sid=required_sid, )
[docs] async def get_sid(self) -> str: """Get or create new sid. :return: sid """ if self._sid is None: return await self.get_new_sid() return self._sid
[docs] async def logout(self): """Logout.""" if self._sid: await self.Session.logout().get() self._sid = None if self._storage_session: await self._storage_session.close() if self._service_session: await self._service_session.close()
[docs] async def get_new_sid(self) -> str: """Get new sid. :raises RuntimeError: Missed credentials :return: sid """ if self._creds is None: raise RuntimeError('Missed credentials') new_sid: str = await self.Session.get_sid( self._creds.username, self._creds.password, special_headers_for_login=self._special_headers_for_login, ) self._sid = new_sid return new_sid
FN_RETURN_TYPE = TypeVar('FN_RETURN_TYPE')
[docs] async def retry_with_new_sid( self, fn: Callable[..., Awaitable[FN_RETURN_TYPE]], ) -> FN_RETURN_TYPE: """Retry with new sid. :param fn: callable method :return: fn result """ try: return await fn() except (AuthorizationRequired, AccessDenied): await self.get_new_sid() return await fn()
async def _wait_for_service_request(self, url): if self._rate_limits: async with self._rate_limits_lock: call_period = self._rate_limits.call_period(url) now = monotonic() if self._last_request_time is None: # This is a first run self._last_request_time = now self._last_call_period = call_period return wait_time = self._last_request_time \ + self._last_call_period - now if wait_time > 0: logger.info('Sleep %s due to rate limits', wait_time) await sleep(wait_time) self._last_request_time = monotonic() self._last_call_period = call_period async def _service_request_without_rate_limits( self, request_args: AioHTTPRequestArgs, required_sid: bool, ) -> aiohttp.ClientResponse: """Post data to url. :param request_args: request args :param required_sid: is this method required sid :return: response """ if required_sid is True: request_data = request_args.data or {} request_data['sid'] = await self.get_sid() request_args.data = request_data # NOQA:WPS110 if request_args.data: request_args.data = { # NOQA:WPS110 key: value for key, value in request_args.data.items() # NOQA:WPS110 if value is not None } logger.info( 'Service post: %s. Params: %s', request_args.full_url, str(clear_params(request_args.data)), ) return await self.service_session.request( method=request_args.method, url=request_args.full_url, **request_args.dict_optional_args( self._autocast_arguments, ), ) async def _prepare_storage_request_args( self, required_sid: bool, **kwargs, ): """Prepare storage request kwargs args. :param required_sid: required sid :param kwargs: kwargs :return: kwargs """ if required_sid: # Sid passed always in url params (?sid=...) request_params = kwargs.pop('params') # Get or create new sid request_params['sid'] = await self.get_sid() kwargs['params'] = request_params return kwargs def _init_service_entrypoints(self): """Init service entrypoint namespaces.""" self.Account = AsyncAccount(self) self.Activity = AsyncActivity(self) self.Analytics = AsyncAnalytics(self) self.Annotation = AsyncAnnotation(self) self.Appointment = AsyncAppointment(self) self.Audit = AsyncAudit(self) self.Case = AsyncCase(self) self.Customcode = AsyncCustomcode(self) self.Customfield = AsyncCustomfield(self) self.Destination = AsyncDestination(self) self.Dicomdata = AsyncDicomdata(self) self.Dictionary = AsyncDictionary(self) self.Filter = AsyncFilter(self) self.Group = AsyncGroup(self) self.Help = AsyncHelp(self) self.Hl7 = AsyncHl7(self) self.Keyimage = AsyncKeyimage(self) self.Link = AsyncLink(self) self.Location = AsyncLocation(self) self.Meeting = AsyncMeeting(self) self.Message = AsyncMessage(self) self.Namespace = AsyncNamespace(self) self.Node = AsyncNode(self) self.Npi = AsyncNpi(self) self.Order = AsyncOrder(self) self.Patient = AsyncPatient(self) self.Purge = AsyncPurge(self) self.Radreportmacro = AsyncRadreportmacro(self) self.Radreport = AsyncRadreport(self) self.Report = AsyncReport(self) self.Role = AsyncRole(self) self.Route = AsyncRoute(self) self.Rsna = AsyncRsna(self) self.Session = AsyncSession(self) self.Setting = AsyncSetting(self) self.Study = AsyncStudy(self) self.Tag = AsyncTag(self) self.Terminology = AsyncTerminology(self) self.Training = AsyncTraining(self) self.User = AsyncUser(self) self.Validate = AsyncValidate(self) self.Webhook = AsyncWebhook(self) self.Query = AsyncQuery(self) self.Scanner = AsyncScanner(self) self.Site = AsyncSite(self) self.AsyncAnonymization = AsyncAnonymization(self) self.AsyncQctask = AsyncQctask(self)