dal.movaidb package

Submodules

dal.movaidb.database module

Copyright (C) Mov.ai - All Rights Reserved Unauthorized copying of this file, via any medium is strictly prohibited Proprietary and confidential

Developers: - Manuel Silva (manuel.silva@mov.ai) - 2020 - Tiago Paulino (tiago@mov.ai) - 2020 - Moawiya Mograbi (moawiya@mov.ai) - 2022

class dal.movaidb.database.AioRedisClient(*args, **kwargs)

Bases: object

A Singleton class implementing AioRedis API.

classmethod enable_db(db_name)

will enable the given db name

Parameters:

db_name (str) – database name

async classmethod get_client()

will return class singleton instance

Returns:

object of class AioRedisClient

Return type:

AioRedisClient

loop = <_UnixSelectorEventLoop running=False closed=False debug=False>
async shutdown()

shutdown connections

class dal.movaidb.database.MovaiDB(db: str = 'global', _api_version: str = 'latest', *, loop=None, databases=None)

Bases: object

Main MovaiDB

class API(version: str = 'latest', url: str = 'file:///__w/data-access-layer/data-access-layer/dal/validation/schema')

Bases: dict

# Represents the API template dict. Can be Imported or saved into Redis

get_api()

return the current API

keys() a set-like object providing a view on D's keys
load_schemas_from_files(url, version)
property url

Base uri

values() an object providing a view on D's values
property version

Current version

REDIS_LOCAL_HOST = 'redis-local'
REDIS_LOCAL_PORT = 6379
REDIS_MASTER_HOST = 'redis-master'
REDIS_MASTER_PORT = 6379
REDIS_SLAVE_HOST = 'redis-master'
REDIS_SLAVE_PORT = 6379
static args_to_dict(_input: dict, _replaces: dict) dict
static break_paths(path: List[str] | str) List[str]

Return list with paths broken down

static calc_scope_update(old_dict: dict, new_dict: dict, structure: dict) List[Dict[str, Any]]

Calculate scope updates dicts

check_registration(key: str)
create_pipe(write=True) Pipeline

Create a pipeline

decode_hash(_hash)

Decodes a full hash from redis

decode_list(_list)

Decodes a full list from redis

decode_value(_value)

Decodes a value from redis

decode_value_by_type(type_: Literal['string', 'list', 'hash'], value: Any)

Decode value by Redis type.

Parameters:
  • type (str) – Redis type (“string”, “list”, or “hash”)

  • value (Any) – Value to decode

Returns:

Decoded value

delete(_input: dict, pipe=None) int | None

deletes _input :returns: number of deleted entries.

delete_by_args(scope, **kwargs)

Delete keys in redis giving arguments

static dict_to_args(_input: dict) dict

Get a plain args dict from the nested one

dict_to_keys(_input: dict, validate=None)
static dict_to_paths(value, path: str = '') Generator[str, None, None]

Returns list with all dict as string paths

static execute_pipe(pipe: Pipeline)
exists(_input: dict) bool

assumes it get one or more full keys, no * allowed here

exists_by_args(scope: str, **kwargs) bool

Check if some key exists in redis giving arguments

find(_input: dict) Dict[str, Any]

Search redis for a certain structure, returns a dict with matching result

classmethod generate_search_wild_key(_input: Dict[str, Any], only_pattern: bool, symbol: str = ':', scan_key: str = '') str
get(_input: dict) Dict[str, Any]

Receives a full or partial dict and returns the values matching in the DB

Returns:

dict

get2(_input: dict) Dict[str, Any]
get_by_args(scope, **kwargs) dict

Get keys from redis giving arguments

static get_from_path(base_dict: dict, path: str) Any

Returns value from the specified path

get_hash(_input: dict, search=True) Any

Gets a full hash from Redis

async get_key_val(_conn, key: str)

Get value by type of key.

Parameters:
  • _conn – Redis connection (aioredis.Redis)

  • key (str) – Redis key

Returns:

Value of the key

Return type:

value

Raises:

ValueError – If the Redis type is unexpected

async get_key_values(keys)

Get key value

Parameters:

keys (Union[str, List[str]]) – The key or list of keys to fetch values for.

Returns:

A dictionary mapping keys to their corresponding values.

Return type:

dict

async get_keys(pattern: str) list

Get all redis keys matching pattern.

Parameters:

pattern (str) – Redis key pattern

Returns:

List of keys matching the pattern

Return type:

list

get_keys_from_pattern(_pattern) List[str]

Converts a pattern dictionary into a list of keys based on the specified scope.

This method takes a pattern dictionary, extracts the “Scope” key, and uses it to retrieve a search dictionary from the database. It then converts the search dictionary into a list of keys.

Parameters:

_pattern (PatternDict) – A dictionary containing the pattern to be converted. Must include a “Scope” key.

Returns:

A list of keys derived from the pattern. Returns an empty list if an

error occurs during the process.

Return type:

List[str]

get_list(_input: dict, search=True) Any

Gets a full list from Redis

get_search_dict(scope: str, **kwargs) dict

Get the search dictionary from the args

get_value(_input: dict, search=True) Any
hdel(_input: dict, hash_field: str, search=True)

Deletes a key within the hash name

hget(_input: dict, hash_field: str, search=True)

Return the value of a key within the hash name

hset(_input: dict)

Implementation of hset, from redys-py: Set key to value within hash

Returns:

1 if HSET created a new field, otherwise 0 e.g {‘Robot’:{‘lala’:{‘Parameters’: {‘Foo’:2, ‘Bar’:3}}}}

hset_pub(_input: dict)

Same as hset with addition publish in a respective channel

static keys_to_dict(kv: List[Tuple[str, Any]])
lpush(_input: dict, pickl: bool = True)

Push a value to the left of a Redis list

async mget(keys: List[bytes]) Dict[str, Any]

Get values using redis mget.

Parameters:

keys (List[bytes]) – List of keys, e.g. [b’Configuration:app-adminboard,Label:’]

Returns:

Dictionary of key-value pairs

Return type:

Dict[str, Any]

pop(_input: dict)

Pop a value from the left of a Redis list

push(_input: dict, pickl: bool = True)

Push a value to the right of a Redis list

rename(old_input: dict, new_input: dict) bool

Receives two dicts with same struct to replace one with the other

rpop(_input: dict)

Pop a value from the right of a Redis list

search(_input: dict) list

Search redis for a certain structure, returns a list of matching keys Meant to be used by other functions in this class

search_by_args(scope, **kwargs) Tuple[dict, int]

Search keys in redis giving arguments

search_wild(_input: dict, only_pattern=False) str | List[str]

Accepts a not full structure to search and returns a list of matching keys

set(_input: dict, pickl: bool = True, pipe=None, ex=None, px=None, nx=False, xx=False, validate=True) None

Set key values in database.

static set_dict(base: dict, path: str, value: Any) None

Appends value to dict with specified path

classmethod sort_dict(item: dict) dict

Sort nested dict. Adapted from: https://gist.github.com/gyli/f60f0374defc383aa098d44cfbd318eb

Example: Input: {‘a’: 1, ‘c’: 3, ‘b’: {‘b2’: 2, ‘b1’: 1}} Output: {‘a’: 1, ‘b’: {‘b1’: 1, ‘b2’: 2}, ‘c’: 3}

async subscribe(_input: dict, function)

Subscribes to a KeySpace event

subscribe_by_args(scope, function, **kwargs)

Subscribe to a redis pattern giving arguments

subscribe_by_args_decoded(scope, function: Subscriber, **kwargs)

Same as subscribe_by_args but decodes the notification sent back by Redis

async subscribe_channel(_input: dict, function, port_name: str, node_name: str)

Subscribes to a specific channel

async task_subscriber(key: str, callback, port_name: str | None = None, node_name: str | None = None) None

Calls a callback every time it gets a message.

static template_to_star(_input: Dict[str, Any])
unsafe_delete(_input: dict, pipe=None) int | None

deletes _input :returns: number of deleted entries.

static update_dict(d: dict, u: Any)
classmethod validate(d: Dict[str, Any], api: Dict[str, Any], base_key: str, keys: List[Tuple[str, Any, Any]])

Validation before safe in database

static validate_path(path: str, structure: dict) str | None
class dal.movaidb.database.Redis(*args, **kwargs)

Bases: object

A Singleton class implementing Redis API.

property db_global: Redis
property db_local: Redis
property db_slave: Redis
local_pubsub() PubSub
property slave_pubsub: PubSub
class dal.movaidb.database.SubscribeManager(*args, **kwargs)

Bases: object

classmethod is_registered(key)
classmethod register_sub(key)
classmethod unregister_sub(key)
class dal.movaidb.database.Subscriber(*args, **kwargs)

Bases: Protocol

dal.movaidb.database.longest_common_prefix(strings: List[str]) str

Finds the longest common prefix string amongst an array of strings.

Parameters:

strings (list of str) – A list of strings to evaluate.

Returns:

The longest common prefix shared among all strings in the list.

If the list is empty, returns an empty string. If no common prefix exists, returns an empty string.

Return type:

str

Example

>>> longest_common_prefix(["flower", "flow", "flight"])
'fl'
>>> longest_common_prefix(["dog", "racecar", "car"])
''
>>> longest_common_prefix([])
''

Module contents

Copyright (C) Mov.ai - All Rights Reserved Unauthorized copying of this file, via any medium is strictly prohibited Proprietary and confidential

Developers: - Moawiya Mograbi (moawiya@mov.ai) - 2022

class dal.movaidb.MovaiDB(db: str = 'global', _api_version: str = 'latest', *, loop=None, databases=None)

Bases: object

Main MovaiDB

class API(version: str = 'latest', url: str = 'file:///__w/data-access-layer/data-access-layer/dal/validation/schema')

Bases: dict

# Represents the API template dict. Can be Imported or saved into Redis

get_api()

return the current API

keys() a set-like object providing a view on D's keys
load_schemas_from_files(url, version)
property url

Base uri

values() an object providing a view on D's values
property version

Current version

REDIS_LOCAL_HOST = 'redis-local'
REDIS_LOCAL_PORT = 6379
REDIS_MASTER_HOST = 'redis-master'
REDIS_MASTER_PORT = 6379
REDIS_SLAVE_HOST = 'redis-master'
REDIS_SLAVE_PORT = 6379
static args_to_dict(_input: dict, _replaces: dict) dict
static break_paths(path: List[str] | str) List[str]

Return list with paths broken down

static calc_scope_update(old_dict: dict, new_dict: dict, structure: dict) List[Dict[str, Any]]

Calculate scope updates dicts

check_registration(key: str)
create_pipe(write=True) Pipeline

Create a pipeline

db_read: Redis
db_write: Redis
decode_hash(_hash)

Decodes a full hash from redis

decode_list(_list)

Decodes a full list from redis

decode_value(_value)

Decodes a value from redis

decode_value_by_type(type_: Literal['string', 'list', 'hash'], value: Any)

Decode value by Redis type.

Parameters:
  • type (str) – Redis type (“string”, “list”, or “hash”)

  • value (Any) – Value to decode

Returns:

Decoded value

delete(_input: dict, pipe=None) int | None

deletes _input :returns: number of deleted entries.

delete_by_args(scope, **kwargs)

Delete keys in redis giving arguments

static dict_to_args(_input: dict) dict

Get a plain args dict from the nested one

dict_to_keys(_input: dict, validate=None)
static dict_to_paths(value, path: str = '') Generator[str, None, None]

Returns list with all dict as string paths

static execute_pipe(pipe: Pipeline)
exists(_input: dict) bool

assumes it get one or more full keys, no * allowed here

exists_by_args(scope: str, **kwargs) bool

Check if some key exists in redis giving arguments

find(_input: dict) Dict[str, Any]

Search redis for a certain structure, returns a dict with matching result

classmethod generate_search_wild_key(_input: Dict[str, Any], only_pattern: bool, symbol: str = ':', scan_key: str = '') str
get(_input: dict) Dict[str, Any]

Receives a full or partial dict and returns the values matching in the DB

Returns:

dict

get2(_input: dict) Dict[str, Any]
get_by_args(scope, **kwargs) dict

Get keys from redis giving arguments

static get_from_path(base_dict: dict, path: str) Any

Returns value from the specified path

get_hash(_input: dict, search=True) Any

Gets a full hash from Redis

async get_key_val(_conn, key: str)

Get value by type of key.

Parameters:
  • _conn – Redis connection (aioredis.Redis)

  • key (str) – Redis key

Returns:

Value of the key

Return type:

value

Raises:

ValueError – If the Redis type is unexpected

async get_key_values(keys)

Get key value

Parameters:

keys (Union[str, List[str]]) – The key or list of keys to fetch values for.

Returns:

A dictionary mapping keys to their corresponding values.

Return type:

dict

async get_keys(pattern: str) list

Get all redis keys matching pattern.

Parameters:

pattern (str) – Redis key pattern

Returns:

List of keys matching the pattern

Return type:

list

get_keys_from_pattern(_pattern) List[str]

Converts a pattern dictionary into a list of keys based on the specified scope.

This method takes a pattern dictionary, extracts the “Scope” key, and uses it to retrieve a search dictionary from the database. It then converts the search dictionary into a list of keys.

Parameters:

_pattern (PatternDict) – A dictionary containing the pattern to be converted. Must include a “Scope” key.

Returns:

A list of keys derived from the pattern. Returns an empty list if an

error occurs during the process.

Return type:

List[str]

get_list(_input: dict, search=True) Any

Gets a full list from Redis

get_search_dict(scope: str, **kwargs) dict

Get the search dictionary from the args

get_value(_input: dict, search=True) Any
hdel(_input: dict, hash_field: str, search=True)

Deletes a key within the hash name

hget(_input: dict, hash_field: str, search=True)

Return the value of a key within the hash name

hset(_input: dict)

Implementation of hset, from redys-py: Set key to value within hash

Returns:

1 if HSET created a new field, otherwise 0 e.g {‘Robot’:{‘lala’:{‘Parameters’: {‘Foo’:2, ‘Bar’:3}}}}

hset_pub(_input: dict)

Same as hset with addition publish in a respective channel

static keys_to_dict(kv: List[Tuple[str, Any]])
lpush(_input: dict, pickl: bool = True)

Push a value to the left of a Redis list

async mget(keys: List[bytes]) Dict[str, Any]

Get values using redis mget.

Parameters:

keys (List[bytes]) – List of keys, e.g. [b’Configuration:app-adminboard,Label:’]

Returns:

Dictionary of key-value pairs

Return type:

Dict[str, Any]

pop(_input: dict)

Pop a value from the left of a Redis list

pubsub: PubSub
push(_input: dict, pickl: bool = True)

Push a value to the right of a Redis list

rename(old_input: dict, new_input: dict) bool

Receives two dicts with same struct to replace one with the other

rpop(_input: dict)

Pop a value from the right of a Redis list

search(_input: dict) list

Search redis for a certain structure, returns a list of matching keys Meant to be used by other functions in this class

search_by_args(scope, **kwargs) Tuple[dict, int]

Search keys in redis giving arguments

search_wild(_input: dict, only_pattern=False) str | List[str]

Accepts a not full structure to search and returns a list of matching keys

set(_input: dict, pickl: bool = True, pipe=None, ex=None, px=None, nx=False, xx=False, validate=True) None

Set key values in database.

static set_dict(base: dict, path: str, value: Any) None

Appends value to dict with specified path

classmethod sort_dict(item: dict) dict

Sort nested dict. Adapted from: https://gist.github.com/gyli/f60f0374defc383aa098d44cfbd318eb

Example: Input: {‘a’: 1, ‘c’: 3, ‘b’: {‘b2’: 2, ‘b1’: 1}} Output: {‘a’: 1, ‘b’: {‘b1’: 1, ‘b2’: 2}, ‘c’: 3}

async subscribe(_input: dict, function)

Subscribes to a KeySpace event

subscribe_by_args(scope, function, **kwargs)

Subscribe to a redis pattern giving arguments

subscribe_by_args_decoded(scope, function: Subscriber, **kwargs)

Same as subscribe_by_args but decodes the notification sent back by Redis

async subscribe_channel(_input: dict, function, port_name: str, node_name: str)

Subscribes to a specific channel

async task_subscriber(key: str, callback, port_name: str | None = None, node_name: str | None = None) None

Calls a callback every time it gets a message.

static template_to_star(_input: Dict[str, Any])
unsafe_delete(_input: dict, pipe=None) int | None

deletes _input :returns: number of deleted entries.

static update_dict(d: dict, u: Any)
classmethod validate(d: Dict[str, Any], api: Dict[str, Any], base_key: str, keys: List[Tuple[str, Any, Any]])

Validation before safe in database

static validate_path(path: str, structure: dict) str | None
class dal.movaidb.Redis(*args, **kwargs)

Bases: object

A Singleton class implementing Redis API.

property db_global: Redis
property db_local: Redis
property db_slave: Redis
local_pubsub() PubSub
property slave_pubsub: PubSub
dal.movaidb.RedisClient

alias of AioRedisClient