dal.utils package

Submodules

dal.utils.callback module

Copyright (C) Mov.ai - All Rights Reserved Unauthorized copying of this file, via any medium is strictly prohibited Proprietary and confidential

Developers: - Manuel Silva (manuel.silva@mov.ai) - 2020 - Tiago Paulino (tiago@mov.ai) - 2020

class dal.utils.callback.AsyncCallback(_cb_name: str, _node_name: str, _port_name: str, _update: bool = False)

Bases: object

Callback class used to execute user code

Parameters:
  • _cb_name – The name of the callback

  • _node_name – The name of the node instance

  • _port_name – The name of the input port

  • _update – Real time update of the callback code

async execute(msg: Any | None = None) None

Executes the code

Parameters:

msg – Message received in the callback

Returns:

Result from the callback function, if any

set_transitioning()
async start(code, globais)

Executes the code

Parameters:

msg – Message received in the callback

class dal.utils.callback.Callback(_cb_name: str, _node_name: str, _port_name: str, _update: bool = False)

Bases: object

Callback class used by GD_Node to execute code

Parameters:
  • _cb_name – The name of the callback

  • _node_name – The name of the node instance

  • _port_name – The name of the input port

  • _update – Real time update of the callback code

execute(msg: Any | None = None) None

Executes the code

Parameters:

msg – Message received in the callback

classmethod robot() Robot
classmethod scene() GraphicScene
set_transitioning()
start(code, globais)

Executes the code

Parameters:

msg – Message received in the callback

class dal.utils.callback.LazyInstantiation(cls, *args, **kwargs)

Bases: object

Delay instantiation of an object until first use.

class dal.utils.callback.UserFunctions(_cb_name: str, _node_name: str, _port_name: str, _libraries: list, _message: str, _user='SUPER')

Bases: object

Class that provides functions to the callback execution

globals: Dict[str, Any]
load_classes(_node_name, _port_name, _user)
load_libraries(libraries)
run(cb_name, msg)

Run another callback from a callback

user_print(*args)

Method to redirect the print function into logger

dal.utils.constants module

DAL constants.

dal.utils.constants.REST_SCOPES

Scopes supported via REST API.

dal.utils.constants.SCOPES_TO_TRACK

Scopes for which we update LastUpdate (user and time).

dal.utils.middleware module

This file contains middleware functions to be used by aiohttp servers

Here’s a summary of each of them:
  • JWTMiddleware: Used to authenticate users against the JWT token

  • save_node_type: Used to save the node type when a node is changed

  • remove_flow_exposed_port_links: Used to search and remove exposed ports links when a flow is deleted

  • redirect_not_found: Used to redirect 404 errors to the frontend

class dal.utils.middleware.JWTMiddleware(secret: str, safelist: List[str] | None = None)

Bases: object

JWT authentication middleware

add_safe(paths: str | List[str], prefix: str | None = None) None

Add paths to bypass auth list

async middleware(request, handler)

the actual middleware JWT authentication verify

async dal.utils.middleware.redirect_not_found(request, handler)

Search end remove ExposedPort links

async dal.utils.middleware.save_node_type(request, handler)

Saves the node type when a node is changed

dal.utils.redis_mocks module

class dal.utils.redis_mocks.FakeAsyncConnection(reader, writer, *, address, encoding=None, parser=None, loop=None)

Bases: AbcConnection

Basic mock connection that allows subscribing to a channel pattern (psubscribe)

property address

Connection address.

close()

Perform connection(s) close and resources cleanup.

property closed

Flag indicating if connection is closing or already closed.

property db

Current selected DB index.

property encoding

Current set connection codec.

execute(command, *args, encoding=Ellipsis)

Execute redis command.

execute_pubsub(command, *channels)

Execute Redis (p)subscribe/(p)unsubscribe commands.

property in_pubsub

Returns number of subscribed channels.

Can be tested as bool indicating Pub/Sub mode state.

property pubsub_channels

Read-only channels dict.

property pubsub_patterns

Read-only patterns dict.

async wait_closed()

Coroutine waiting until all resources are closed/released/cleaned up.

class dal.utils.redis_mocks.FakeAsyncPool(address, db=None, password=None, encoding=None, *, minsize, maxsize, ssl=None, parser=None, create_connection_timeout=None, connection_cls=None, loop=None)

Bases: ConnectionsPool

dal.utils.redis_mocks.fake_redis(target, recording_dir)

Module contents

Copyright (C) Mov.ai - All Rights Reserved Unauthorized copying of this file, via any medium is strictly prohibited Proprietary and confidential

Utility modules for DAL.

class dal.utils.DirectFlowUsageItem(*, flow_instance_name: str)

Bases: BaseModel

Single direct usage item for a Flow.

flow_instance_name: str
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'flow_instance_name': FieldInfo(annotation=str, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class dal.utils.DirectNodeUsageItem(*, node_instance_name: str)

Bases: BaseModel

Single direct usage item for a Node.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'node_instance_name': FieldInfo(annotation=str, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

node_instance_name: str
class dal.utils.FlowFlowUsage(*, direct: List[DirectFlowUsageItem] = None, indirect: List[IndirectFlowUsageItem] = None)

Bases: BaseModel

Usage details for a Flow in a specific parent Flow.

Can have both direct and indirect usages.

direct: List[DirectFlowUsageItem]
indirect: List[IndirectFlowUsageItem]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'direct': FieldInfo(annotation=List[dal.utils.usage_search.usage_types.DirectFlowUsageItem], required=False, default_factory=list), 'indirect': FieldInfo(annotation=List[dal.utils.usage_search.usage_types.IndirectFlowUsageItem], required=False, default_factory=list)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class dal.utils.IndirectFlowUsageItem(*, flow_template_name: str, flow_instance_name: str)

Bases: BaseModel

Single indirect usage item for a Flow - references immediate child flow.

flow_instance_name: str
flow_template_name: str
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'flow_instance_name': FieldInfo(annotation=str, required=True), 'flow_template_name': FieldInfo(annotation=str, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class dal.utils.IndirectNodeUsageItem(*, flow_template_name: str, flow_instance_name: str)

Bases: BaseModel

Single indirect usage item for a Node - references immediate child flow.

flow_instance_name: str
flow_template_name: str
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'flow_instance_name': FieldInfo(annotation=str, required=True), 'flow_template_name': FieldInfo(annotation=str, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class dal.utils.NodeFlowUsage(*, direct: List[DirectNodeUsageItem] = None, indirect: List[IndirectNodeUsageItem] = None)

Bases: BaseModel

Usage details for a Node in a specific Flow.

Can have both direct and indirect usages.

direct: List[DirectNodeUsageItem]
indirect: List[IndirectNodeUsageItem]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'direct': FieldInfo(annotation=List[dal.utils.usage_search.usage_types.DirectNodeUsageItem], required=False, default_factory=list), 'indirect': FieldInfo(annotation=List[dal.utils.usage_search.usage_types.IndirectNodeUsageItem], required=False, default_factory=list)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class dal.utils.UsageSearchResult(*, scope: str, name: str, usage: UsageData)

Bases: BaseModel

Result structure for usage search.

Format: {

“scope”: “Node” | “Flow”, “name”: “object_name”, “usage”: {

“flow”: {
“flow_name”: {

“direct”: […], “indirect”: […]

}

}

}

}

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'name': FieldInfo(annotation=str, required=True), 'scope': FieldInfo(annotation=str, required=True), 'usage': FieldInfo(annotation=UsageData, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

name: str
scope: str
usage: UsageData
dal.utils.get_usage_search_scope_map()

Get the mapping of scope types that support usage search.

This is a re-export with lazy loading to avoid circular imports.