""" Audit.
Do not edit this file by hand.
This is generated by parsing api.html service doc.
"""
from ambra_sdk.exceptions.service import FilterNotFound
from ambra_sdk.exceptions.service import InvalidBucket
from ambra_sdk.exceptions.service import InvalidCondition
from ambra_sdk.exceptions.service import InvalidField
from ambra_sdk.exceptions.service import MissingFields
from ambra_sdk.exceptions.service import NotFound
from ambra_sdk.exceptions.service import NotPermitted
from ambra_sdk.service.query import QueryO
from ambra_sdk.service.query import AsyncQueryO
from ambra_sdk.service.query import QueryOP
from ambra_sdk.service.query import AsyncQueryOP
from ambra_sdk.service.query import QueryOPF
from ambra_sdk.service.query import AsyncQueryOPF
class Audit:
"""Audit."""
def __init__(self, api):
self._api = api
[docs] def object(
self,
uuid,
customfield_detail=None,
download=None,
reverse=None,
):
"""Object.
:param uuid: The uuid of the object to audit
:param customfield_detail: Flag to include the customfield name in the detail (optional)
:param download: Flag to create a zipped CSV file. A report_id will be returned and the file can be accessed via /report/status and /report/zip (optional)
:param reverse: Flag to reverse the default sort order (optional)
"""
request_data = {
'customfield_detail': customfield_detail,
'download': download,
'reverse': reverse,
'uuid': uuid,
}
errors_mapping = {}
errors_mapping[('FILTER_NOT_FOUND', None)] = FilterNotFound('The filter can not be found. The error_subtype will hold the filter UUID')
errors_mapping[('INVALID_CONDITION', None)] = InvalidCondition('The condition is not support. The error_subtype will hold the filter expression this applies to')
errors_mapping[('INVALID_FIELD', None)] = InvalidField('The field is not valid for this object. The error_subtype will hold the filter expression this applies to')
errors_mapping[('MISSING_FIELDS', None)] = MissingFields('A required field is missing or does not have data in it. The error_subtype holds a array of all the missing fields')
errors_mapping[('NOT_FOUND', None)] = NotFound('The object was not found')
errors_mapping[('NOT_PERMITTED', None)] = NotPermitted('You are not permitted to audit this object')
query_data = {
'api': self._api,
'url': '/audit/object',
'request_data': request_data,
'errors_mapping': errors_mapping,
'required_sid': True,
}
query_data['paginated_field'] = 'events'
return QueryOPF(**query_data)
[docs] def user(
self,
account_id,
user_id,
download=None,
reverse=None,
study_fields=None,
):
"""User.
:param account_id: The id of the account
:param user_id: The id of the user to audit
:param download: Flag to create a zipped CSV file. A report_id will be returned and the file can be accessed via /report/status and /report/zip (optional)
:param reverse: Flag to reverse the default sort order (optional)
:param study_fields: JSON list of study fields to include in the response (optional)
"""
request_data = {
'account_id': account_id,
'download': download,
'reverse': reverse,
'study_fields': study_fields,
'user_id': user_id,
}
errors_mapping = {}
errors_mapping[('FILTER_NOT_FOUND', None)] = FilterNotFound('The filter can not be found. The error_subtype will hold the filter UUID')
errors_mapping[('INVALID_CONDITION', None)] = InvalidCondition('The condition is not support. The error_subtype will hold the filter expression this applies to')
errors_mapping[('INVALID_FIELD', None)] = InvalidField('The field is not valid for this object. The error_subtype will hold the filter expression this applies to')
errors_mapping[('MISSING_FIELDS', None)] = MissingFields('A required field is missing or does not have data in it. The error_subtype holds a array of all the missing fields')
errors_mapping[('NOT_FOUND', None)] = NotFound('The user was not found')
errors_mapping[('NOT_PERMITTED', None)] = NotPermitted('You are not permitted to access this user record')
query_data = {
'api': self._api,
'url': '/audit/user',
'request_data': request_data,
'errors_mapping': errors_mapping,
'required_sid': True,
}
query_data['paginated_field'] = 'events'
return QueryOPF(**query_data)
[docs] def account(
self,
account_id,
download=None,
reverse=None,
):
"""Account.
:param account_id: The id of the account
:param download: Flag to create a zipped CSV file. A report_id will be returned and the file can be accessed via /report/status and /report/zip (optional)
:param reverse: Flag to reverse the default sort order (optional)
"""
request_data = {
'account_id': account_id,
'download': download,
'reverse': reverse,
}
errors_mapping = {}
errors_mapping[('FILTER_NOT_FOUND', None)] = FilterNotFound('The filter can not be found. The error_subtype will hold the filter UUID')
errors_mapping[('INVALID_CONDITION', None)] = InvalidCondition('The condition is not support. The error_subtype will hold the filter expression this applies to')
errors_mapping[('INVALID_FIELD', None)] = InvalidField('The field is not valid for this object. The error_subtype will hold the filter expression this applies to')
errors_mapping[('MISSING_FIELDS', None)] = MissingFields('A required field is missing or does not have data in it. The error_subtype holds a array of all the missing fields')
errors_mapping[('NOT_FOUND', None)] = NotFound('The account was not found')
errors_mapping[('NOT_PERMITTED', None)] = NotPermitted('You are not permitted to access this information')
query_data = {
'api': self._api,
'url': '/audit/account',
'request_data': request_data,
'errors_mapping': errors_mapping,
'required_sid': True,
}
query_data['paginated_field'] = 'events'
return QueryOPF(**query_data)
[docs] def deleted(
self,
account_id,
type,
):
"""Deleted.
:param account_id: The id of the account
:param type: The type of the object (Study|User etc.)
"""
request_data = {
'account_id': account_id,
'type': type,
}
errors_mapping = {}
errors_mapping[('MISSING_FIELDS', None)] = MissingFields('A required field is missing or does not have data in it. The error_subtype holds a array of all the missing fields')
errors_mapping[('NOT_PERMITTED', None)] = NotPermitted('You are not permitted to access this record')
query_data = {
'api': self._api,
'url': '/audit/deleted',
'request_data': request_data,
'errors_mapping': errors_mapping,
'required_sid': True,
}
query_data['paginated_field'] = 'objects'
return QueryOP(**query_data)
[docs] def log(
self,
bucket,
rest_params=None,
):
"""Log.
:param bucket: Name of the bucket to log to
:param rest_params: Parameters are logged to a message in the bucket
"""
request_data = {
'bucket': bucket,
}
if rest_params is not None:
rest_params_dict = {'{prefix}{k}'.format(prefix='', k=k): v for k,v in rest_params.items()}
request_data.update(rest_params_dict)
errors_mapping = {}
errors_mapping[('INVALID_BUCKET', None)] = InvalidBucket('The bucket name can only contain A-z characters and must be between 4 and 16 characters long')
errors_mapping[('MISSING_FIELDS', None)] = MissingFields('A required field is missing or does not have data in it. The error_subtype holds a array of all the missing fields')
query_data = {
'api': self._api,
'url': '/audit/log',
'request_data': request_data,
'errors_mapping': errors_mapping,
'required_sid': True,
}
return QueryO(**query_data)
[docs] def failedlogins(
self,
account_id,
from_time=None,
):
"""Failedlogins.
:param account_id: The id of the account
:param from_time: Only return events after the epoch time (optional)
"""
request_data = {
'account_id': account_id,
'from_time': from_time,
}
errors_mapping = {}
query_data = {
'api': self._api,
'url': '/audit/failedlogins',
'request_data': request_data,
'errors_mapping': errors_mapping,
'required_sid': True,
}
return QueryO(**query_data)
class AsyncAudit:
"""AsyncAudit."""
def __init__(self, api):
self._api = api
[docs] def object(
self,
uuid,
customfield_detail=None,
download=None,
reverse=None,
):
"""Object.
:param uuid: The uuid of the object to audit
:param customfield_detail: Flag to include the customfield name in the detail (optional)
:param download: Flag to create a zipped CSV file. A report_id will be returned and the file can be accessed via /report/status and /report/zip (optional)
:param reverse: Flag to reverse the default sort order (optional)
"""
request_data = {
'customfield_detail': customfield_detail,
'download': download,
'reverse': reverse,
'uuid': uuid,
}
errors_mapping = {}
errors_mapping[('FILTER_NOT_FOUND', None)] = FilterNotFound('The filter can not be found. The error_subtype will hold the filter UUID')
errors_mapping[('INVALID_CONDITION', None)] = InvalidCondition('The condition is not support. The error_subtype will hold the filter expression this applies to')
errors_mapping[('INVALID_FIELD', None)] = InvalidField('The field is not valid for this object. The error_subtype will hold the filter expression this applies to')
errors_mapping[('MISSING_FIELDS', None)] = MissingFields('A required field is missing or does not have data in it. The error_subtype holds a array of all the missing fields')
errors_mapping[('NOT_FOUND', None)] = NotFound('The object was not found')
errors_mapping[('NOT_PERMITTED', None)] = NotPermitted('You are not permitted to audit this object')
query_data = {
'api': self._api,
'url': '/audit/object',
'request_data': request_data,
'errors_mapping': errors_mapping,
'required_sid': True,
}
query_data['paginated_field'] = 'events'
return AsyncQueryOPF(**query_data)
[docs] def user(
self,
account_id,
user_id,
download=None,
reverse=None,
study_fields=None,
):
"""User.
:param account_id: The id of the account
:param user_id: The id of the user to audit
:param download: Flag to create a zipped CSV file. A report_id will be returned and the file can be accessed via /report/status and /report/zip (optional)
:param reverse: Flag to reverse the default sort order (optional)
:param study_fields: JSON list of study fields to include in the response (optional)
"""
request_data = {
'account_id': account_id,
'download': download,
'reverse': reverse,
'study_fields': study_fields,
'user_id': user_id,
}
errors_mapping = {}
errors_mapping[('FILTER_NOT_FOUND', None)] = FilterNotFound('The filter can not be found. The error_subtype will hold the filter UUID')
errors_mapping[('INVALID_CONDITION', None)] = InvalidCondition('The condition is not support. The error_subtype will hold the filter expression this applies to')
errors_mapping[('INVALID_FIELD', None)] = InvalidField('The field is not valid for this object. The error_subtype will hold the filter expression this applies to')
errors_mapping[('MISSING_FIELDS', None)] = MissingFields('A required field is missing or does not have data in it. The error_subtype holds a array of all the missing fields')
errors_mapping[('NOT_FOUND', None)] = NotFound('The user was not found')
errors_mapping[('NOT_PERMITTED', None)] = NotPermitted('You are not permitted to access this user record')
query_data = {
'api': self._api,
'url': '/audit/user',
'request_data': request_data,
'errors_mapping': errors_mapping,
'required_sid': True,
}
query_data['paginated_field'] = 'events'
return AsyncQueryOPF(**query_data)
[docs] def account(
self,
account_id,
download=None,
reverse=None,
):
"""Account.
:param account_id: The id of the account
:param download: Flag to create a zipped CSV file. A report_id will be returned and the file can be accessed via /report/status and /report/zip (optional)
:param reverse: Flag to reverse the default sort order (optional)
"""
request_data = {
'account_id': account_id,
'download': download,
'reverse': reverse,
}
errors_mapping = {}
errors_mapping[('FILTER_NOT_FOUND', None)] = FilterNotFound('The filter can not be found. The error_subtype will hold the filter UUID')
errors_mapping[('INVALID_CONDITION', None)] = InvalidCondition('The condition is not support. The error_subtype will hold the filter expression this applies to')
errors_mapping[('INVALID_FIELD', None)] = InvalidField('The field is not valid for this object. The error_subtype will hold the filter expression this applies to')
errors_mapping[('MISSING_FIELDS', None)] = MissingFields('A required field is missing or does not have data in it. The error_subtype holds a array of all the missing fields')
errors_mapping[('NOT_FOUND', None)] = NotFound('The account was not found')
errors_mapping[('NOT_PERMITTED', None)] = NotPermitted('You are not permitted to access this information')
query_data = {
'api': self._api,
'url': '/audit/account',
'request_data': request_data,
'errors_mapping': errors_mapping,
'required_sid': True,
}
query_data['paginated_field'] = 'events'
return AsyncQueryOPF(**query_data)
[docs] def deleted(
self,
account_id,
type,
):
"""Deleted.
:param account_id: The id of the account
:param type: The type of the object (Study|User etc.)
"""
request_data = {
'account_id': account_id,
'type': type,
}
errors_mapping = {}
errors_mapping[('MISSING_FIELDS', None)] = MissingFields('A required field is missing or does not have data in it. The error_subtype holds a array of all the missing fields')
errors_mapping[('NOT_PERMITTED', None)] = NotPermitted('You are not permitted to access this record')
query_data = {
'api': self._api,
'url': '/audit/deleted',
'request_data': request_data,
'errors_mapping': errors_mapping,
'required_sid': True,
}
query_data['paginated_field'] = 'objects'
return AsyncQueryOP(**query_data)
[docs] def log(
self,
bucket,
rest_params=None,
):
"""Log.
:param bucket: Name of the bucket to log to
:param rest_params: Parameters are logged to a message in the bucket
"""
request_data = {
'bucket': bucket,
}
if rest_params is not None:
rest_params_dict = {'{prefix}{k}'.format(prefix='', k=k): v for k,v in rest_params.items()}
request_data.update(rest_params_dict)
errors_mapping = {}
errors_mapping[('INVALID_BUCKET', None)] = InvalidBucket('The bucket name can only contain A-z characters and must be between 4 and 16 characters long')
errors_mapping[('MISSING_FIELDS', None)] = MissingFields('A required field is missing or does not have data in it. The error_subtype holds a array of all the missing fields')
query_data = {
'api': self._api,
'url': '/audit/log',
'request_data': request_data,
'errors_mapping': errors_mapping,
'required_sid': True,
}
return AsyncQueryO(**query_data)
[docs] def failedlogins(
self,
account_id,
from_time=None,
):
"""Failedlogins.
:param account_id: The id of the account
:param from_time: Only return events after the epoch time (optional)
"""
request_data = {
'account_id': account_id,
'from_time': from_time,
}
errors_mapping = {}
query_data = {
'api': self._api,
'url': '/audit/failedlogins',
'request_data': request_data,
'errors_mapping': errors_mapping,
'required_sid': True,
}
return AsyncQueryO(**query_data)