API module

class API.API(log: Log, config: Config, cache: Cache)

Bases: object

Class to run and handle API requests, retrieve responses from external APIs or cache

Initializes API server

Parameters:
  • log – Log object

  • config – Config object

  • cache – Cache object

class MyDict(d: dict)

Bases: object

Helper class to facilitate handling nested dict

Initialize dict

get(*args)

Get value from nested dict or None

Parameters:

args – dict keys

get_dict()

Get original dict

remove(*args)

Remove key from nested dict and return modified copy

Parameters:

args – dict keys

class UrlItem(*, url: str = '')

Bases: BaseModel

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'url': FieldInfo(annotation=str, required=False, default='')}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

url: str
class User(*, username: str, password: str)

Bases: BaseModel

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'password': FieldInfo(annotation=str, required=True), 'username': FieldInfo(annotation=str, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

password: str
username: str
crypt = <CryptContext>
db_path = '/source/EnrichmentEngine.db'
docs_path = '/enrichment-engine/docs/_build/html'
get_parsed_response(response: dict, type: str)

Parse response from services

Parameters:
  • response – response to parse

  • type – type of the response (“ip”, “domain”, “file”, “url”)

Returns:

parsed response

Return type:

dict

is_authenticated() bool
async read(request: Request, path, key, service, url)

GET/DELETE cached response Query external API if not found in cache

Parameters:
  • request – Request object

  • path – path to API endpoint

  • key – key to use when writing to cache

  • service – service to query

Returns:

response from API

Return type:

dict

async read_aggs_auth(response: Response, request: Request, ip: str | None = None, domain: str | None = None, hash: str | None = None, url: UrlItem | None = None, status: bool = Depends(is_authenticated))
async read_aggs_no_auth(request: Request, ip: str | None = None, domain: str | None = None, hash: str | None = None, url: UrlItem | None = None)
async read_all(request: Request, **kwargs)

Read responses from the services

Parameters:
  • request – Request object

  • kwargs – arguments passed by read_* methods

Returns:

response

Return type:

dict | list

async read_domain_auth(response: Response, request: Request, domain, service: str | None = None, parsed: bool = False, status: bool = Depends(is_authenticated))
async read_domain_no_auth(request: Request, domain, service: str | None = None, parsed: bool = False)

GET /domains to query information about domains

Parameters:
  • request – Request object

  • domain – domain to query

  • service – specify type of service

  • parsed – specify if returned response should be parsed or not

Returns:

response

Return type:

dict

async read_file_auth(response: Response, request: Request, hash, service: str | None = None, parsed: bool = False, status: bool = Depends(is_authenticated))
async read_file_no_auth(request: Request, hash, service: str | None = None, parsed: bool = False)

GET /files to query information about files

Parameters:
  • request – Request object

  • hash – query file by its hash

  • service – specify type of service

  • parsed – specify if returned response should be parsed or not

Returns:

response

Return type:

dict

async read_ip_auth(response: Response, request: Request, ip, service: str | None = None, parsed: bool = False, status: bool = Depends(is_authenticated))
async read_ip_no_auth(request: Request, ip, service: str | None = None, parsed: bool = False)

GET /ip_addresses to query information about IP addresses

Parameters:
  • request – Request object

  • ip – IP address to query

  • service – specify type of service

  • parsed – specify if returned response should be parsed or not

Returns:

response

Return type:

dict

async read_url_auth(response: Response, request: Request, item: UrlItem, service: str | None = None, parsed: bool = False, status: bool = Depends(is_authenticated))
async read_url_no_auth(request: Request, item: UrlItem, service: str | None = None, parsed: bool = False)

POST /urls to query information about URLs

Parameters:
  • request – Request object

  • item – URL to query as request body

  • service – specify type of service

  • parsed – specify if returned response should be parsed or not

Returns:

response

Return type:

dict

response_401(response: Response)
root_path = '/enrichment-engine'
security = <fastapi.security.http.HTTPBasic object>
async user_create(response: Response, user: User, status: bool = Depends(is_authenticated))
async user_delete(response: Response, user: str, status: bool = Depends(is_authenticated))
class API.FastAPI(*, debug: bool = False, routes: ~typing.List[~starlette.routing.BaseRoute] | None = None, title: str = 'FastAPI', summary: str | None = None, description: str = '', version: str = '0.1.0', openapi_url: str | None = '/openapi.json', openapi_tags: ~typing.List[~typing.Dict[str, ~typing.Any]] | None = None, servers: ~typing.List[~typing.Dict[str, ~typing.Any | str]] | None = None, dependencies: ~typing.Sequence[~fastapi.params.Depends] | None = None, default_response_class: ~typing.Type[~starlette.responses.Response] = <fastapi.datastructures.DefaultPlaceholder object>, redirect_slashes: bool = True, docs_url: str | None = '/docs', redoc_url: str | None = '/redoc', swagger_ui_oauth2_redirect_url: str | None = '/docs/oauth2-redirect', swagger_ui_init_oauth: ~typing.Dict[str, ~typing.Any] | None = None, middleware: ~typing.Sequence[~starlette.middleware.Middleware] | None = None, exception_handlers: ~typing.Dict[int | ~typing.Type[Exception], ~typing.Callable[[~starlette.requests.Request, ~typing.Any], ~typing.Coroutine[~typing.Any, ~typing.Any, ~starlette.responses.Response]]] | None = None, on_startup: ~typing.Sequence[~typing.Callable[[], ~typing.Any]] | None = None, on_shutdown: ~typing.Sequence[~typing.Callable[[], ~typing.Any]] | None = None, lifespan: ~typing.Callable[[~fastapi.applications.AppType], ~typing.AsyncContextManager[None]] | ~typing.Callable[[~fastapi.applications.AppType], ~typing.AsyncContextManager[~typing.Mapping[str, ~typing.Any]]] | None = None, terms_of_service: str | None = None, contact: ~typing.Dict[str, ~typing.Any | str] | None = None, license_info: ~typing.Dict[str, ~typing.Any | str] | None = None, openapi_prefix: str = '', root_path: str = '', root_path_in_servers: bool = True, responses: ~typing.Dict[int | str, ~typing.Dict[str, ~typing.Any]] | None = None, callbacks: ~typing.List[~starlette.routing.BaseRoute] | None = None, webhooks: ~fastapi.routing.APIRouter | None = None, deprecated: bool | None = None, include_in_schema: bool = True, swagger_ui_parameters: ~typing.Dict[str, ~typing.Any] | None = None, generate_unique_id_function: ~typing.Callable[[~fastapi.routing.APIRoute], str] = <fastapi.datastructures.DefaultPlaceholder object>, separate_input_output_schemas: bool = True, **extra: ~typing.Any)

Bases: FastAPI

add_api_route(path: str, endpoint: Callable[[...], Any], *, include_in_schema: bool = True, **kwargs: Any) None
add_api_route_methods(path: str, endpoint: Callable[[...], Any], methods: list[str], **kwargs: Any) None