love.producer package

Submodules

love.producer.love_manager_client module

class love.producer.love_manager_client.LoveManagerClient(log)

Bases: object

Provides connectivity between the LOVE manager and the producer.

Attributes:
manager_hostname
manager_password
url

Methods

connect_to_manager()

An async generator that will try to connect to the LOVE manager as long as done_task is not set.

handle_connected()

Handle a recently established connection with the LOVE manager.

handle_connection_with_manager()

Keep connection to manager alive and handle incomming requests.

handle_producers_reply_to_server(message_data)

Given the input message data, handle any reply needed from the producers to the server.

handle_wait_retry()

Handle retrying to connect to manager.

need_reply_from_producers(message_data)

Determine if input message_data from the server requires a reply from the producers.

parse_websocket_message(websocket_message)

Parse input json message string to dictionary if message is of type aiohttp.WSMsgType.TEXT, if not return empty dictionary.

reset_tasks()

Reset both the connected_task and done_task futures.

send_message(message)

Send a given message through websockets

close

create_producers

async close()
async connect_to_manager() None

An async generator that will try to connect to the LOVE manager as long as done_task is not set.

Yields:

connection_attempt (int) – How many attempts to connect where mande before succeeding.

Notes

One way to use this async generator is in an async for loop.

See also

handle_connection_with_manager

Connects to manager and handler connection.

create_producers(components: list, **kwargs) None
async handle_connected() None

Handle a recently established connection with the LOVE manager.

async handle_connection_with_manager() None

Keep connection to manager alive and handle incomming requests. If connection is closed try to reconnect until process is stopped.

async handle_producers_reply_to_server(message_data: dict) None

Given the input message data, handle any reply needed from the producers to the server.

Parameters:

message_data (dict) – Data payload from the message received from the server.

async handle_wait_retry() None

Handle retrying to connect to manager.

property manager_hostname: str
property manager_password: str
need_reply_from_producers(message_data: dict) bool

Determine if input message_data from the server requires a reply from the producers.

Parameters:

message_data (dict) – Data from the server to process.

Returns:

Does the data need a reply?

Return type:

bool

parse_websocket_message(websocket_message: str) dict

Parse input json message string to dictionary if message is of type aiohttp.WSMsgType.TEXT, if not return empty dictionary.

Parameters:

websocket_message (str) – Input json string.

Returns:

Resulting dictionary from parsing json string.

Return type:

dict

reset_tasks() None

Reset both the connected_task and done_task futures.

These asyncio.Future are used to determine if the client is connected to the manager and until when the client should run.

See also

connect_to_manager

Async generator to connect to manager.

async send_message(message: str) None

Send a given message through websockets

Parameters:

message (str) – JSON string to send to manager.

property url: str

love.producer.love_manager_message module

class love.producer.love_manager_message.LoveManagerMessage(component_name: str)

Bases: object

Methods

add_metadata

get_message_as_json

get_message_category

get_message_category_as_json

get_message_initial_state

get_message_initial_state_all

get_message_initial_state_all_as_json

get_message_initial_state_as_json

get_message_initial_state_as_json_for_csc

get_message_initial_state_for_csc

add_metadata(**kwargs) None
get_message_as_json(data: dict) str
get_message_category(category: str, data: dict) dict
get_message_category_as_json(category: str, data: dict) str
get_message_initial_state() dict
get_message_initial_state_all() dict
get_message_initial_state_all_as_json() str
get_message_initial_state_as_json() str
get_message_initial_state_as_json_for_csc(csc) str
get_message_initial_state_for_csc(csc) dict
class love.producer.love_manager_message.NumpyEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)

Bases: JSONEncoder

Methods

default(obj)

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

encode(o)

Return a JSON string representation of a Python data structure.

iterencode(o[, _one_shot])

Encode the given object and yield each string representation as available.

default(obj: Any) JSONEncoder

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this:

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)

love.producer.love_producer_base module

class love.producer.love_producer_base.LoveProducerBase(component_name: str | None = None, log: Logger | None = None, **kwargs)

Bases: object

Base class for Love Producer.

This base class implements the basic behavior required to produce messages for the LOVE manager.

When subclassing to deal with different kinds of data being, users will mostly have to override the _convert_data_to_dict private method. This method deals with converting especial data types into a dictionary, which is then transimitted as a json string by the appropriate methods.

Parameters:
  • component_name (str, optional) – Name of the component for which data will be produced.

  • log (logging.Logger, optional) – Logger facility.

log

Logger facility.

Type:

logging.Logger

done_task

An asyncio future to keep the producer running.

Type:

asyncio.Future

Attributes:
component_name
period_default_in_seconds
send_message

Send message function.

Methods

assert_component_name_is_set([message])

Assert that the component name is set.

generate_data_name(data_repr)

Generate data name.

get_initial_state_messages_as_json()

Asynchronously generetate all initial state messages.

get_message_initial_state_all_as_json()

Return the initial subscription message to all producers

get_message_initial_state_as_json()

Return the initial subscription message for this producer.

get_message_initial_state_as_json_for_csc(csc)

Return the initial subscription message for a given CSC.

handle_asynchronous_data_callback(data)

Callback function to handle asynchronous data.

has_matched_metadata(message_data)

Does the message data has the correct metadata?

is_data_stream_stored(data_stream)

Determines if data stream is part of the internal data.

register_additional_action(data_key, ...)

Register additional actions to take for specific data key.

register_monitor_data_periodically(get_data, ...)

Register a callable method so it is periodically pooled for data to be transimitted.

reply_to_message_data(message_data)

Generate a reply based on the input message_data.

retrieve_one_sample(sample_name)

Retrieve one sample from internal_asynchronous table.

retrieve_samples(*args)

Return samples from internal asynchronous table.

send_initial_data()

Send initial data.

send_reply_to_message_data(message_data)

Send reply to message data.

should_reply_to_message_data(message_data)

Determines whether a reply to message data should be sent.

store_samples(**kwargs)

Store samples in internal asynchronous table.

add_metadata

close

get_asynchronous_data_category

get_data_from_message_data

get_message_category_as_json

get_metadata

get_sample_name

register_asynchronous_data_category

add_metadata(**kwargs) None
assert_component_name_is_set(message: str = '') None

Assert that the component name is set.

Parameters:

message (str) – Additional message to append to exception message in case of assertion error.

Raises:

AssertionError: – If component name is not set.

async close()
property component_name: str | None
generate_data_name(data_repr: str) str

Generate data name.

Parameters:

data_repr (str) – Data representation, typically repr(data.keys()) for dictionaries.

Returns:

Hash code for the input data representation.

Return type:

str

get_asynchronous_data_category(name) str
get_data_from_message_data(message_data: dict) dict
async get_initial_state_messages_as_json() AsyncIterator[int]

Asynchronously generetate all initial state messages.

Yields:

initial_state (str) – Initial state messages as json.

get_message_category_as_json(category: str, data_as_dict: dict) str
get_message_initial_state_all_as_json() str

Return the initial subscription message to all producers

Returns:

Initial state as a json string.

Return type:

str

get_message_initial_state_as_json() str

Return the initial subscription message for this producer.

Returns:

Initial state as a json string.

Return type:

str

Raises:

RuntimeError – If component_name is not set.

get_message_initial_state_as_json_for_csc(csc: str) str

Return the initial subscription message for a given CSC.

Returns:

Initial state as a json string.

Return type:

str

get_metadata() dict
get_sample_name(data_stream: dict) str
async handle_asynchronous_data_callback(data: Any) None

Callback function to handle asynchronous data.

Parameters:

data – Input data.

has_matched_metadata(message_data: dict) bool

Does the message data has the correct metadata?

Parameters:

message_data (dict) – Input dictionary with information used by the producer to determine some action to take.

Returns:

Does message_data and metadata has matched information?

Return type:

bool

is_data_stream_stored(data_stream: dict) bool

Determines if data stream is part of the internal data.

Parameters:

data_stream (str) – Name of the data stream.

Returns:

Is data stream stored in the asynchonous data samples?

Return type:

bool

property period_default_in_seconds: float
register_additional_action(data_key: str, additional_action: Coroutine) None

Register additional actions to take for specific data key.

Parameters:
  • data_key (str) – Data key.

  • additional_action (coroutine) – Function to call when specified data is received.

register_asynchronous_data_category(name: str, category: str) None
register_monitor_data_periodically(get_data: Callable[[], Any], category: str) None

Register a callable method so it is periodically pooled for data to be transimitted.

Parameters:
  • get_data (func or coroutine) – Function or coroutine to be called/awaited periodically for data.

  • category (str) – The data category. Usual options are “telemetry” (default) and “event”.

Notes

The output of the get_data function can be in any format. It will be converted to a dictionary by calling self._convert_data_to_dict in the the data monitor loop (e.g. self._monitor_periodic_data). If producing especial data types (not dict), make sure to subclass self._convert_data_to_dict appropriately.

See also

_convert_data_to_dict

Convert data type to dictionary.

_monitor_periodic_data

Method running in the background pooling for data.

async reply_to_message_data(message_data: dict) None

Generate a reply based on the input message_data.

Parameters:

message_data (dict) – Input dictionary with information used by the producer to determine some action to take.

retrieve_one_sample(sample_name: str) dict

Retrieve one sample from internal_asynchronous table.

Parameters:

sample_name (str) – Name of the sample in internal data structure.

Returns:

Sample.

Return type:

dict

retrieve_samples(*args: List[str]) List[dict]

Return samples from internal asynchronous table.

Parameters:

*args (list of str) – List of names of samples to retrieve.

Returns:

List of dictionary with the requested samples.

Return type:

list of dict

async send_initial_data()

Send initial data.

property send_message: Callable[[str], None]

Send message function.

This must be set before the producer can send messages with it. If not set and called it will generate a RuntimeError.

async send_reply_to_message_data(message_data: dict) None

Send reply to message data.

should_reply_to_message_data(message_data: dict) bool

Determines whether a reply to message data should be sent.

Parameters:

message_data (dict) – Input dictionary with information used by the producer to determine some action to take.

Returns:

Should reply to message data?

Return type:

bool

store_samples(**kwargs: dict) None

Store samples in internal asynchronous table.

Parameters:

**kwargs (dict) – names, values to store.

love.producer.love_producer_csc module

class love.producer.love_producer_csc.LoveProducerCSC(domain: Domain, csc: str, log: Logger | None = None, **kwargs)

Bases: LoveProducerBase

Specialized LOVE producer to deal with generic CSC behavior.

Attributes:
component_name
period_default_in_seconds
reply_names
send_message

Send message function.

Methods

assert_component_name_is_set([message])

Assert that the component name is set.

create_revcode_mapping(topic_attribute_name)

For a given topic name, create a revcode mapping.

create_template_manager_message(topic_name)

Create template manager message for given topic.

generate_data_name(data_repr)

Generate data name.

generate_valid_topic_attribute_names(...)

For each entry in periodic_data check that is it part of the producer list of topics and return a valid list.

get_initial_state_messages_as_json()

Asynchronously generetate all initial state messages.

get_message_initial_state_all_as_json()

Return the initial subscription message to all producers

get_message_initial_state_as_json()

Return the initial subscription message for this producer.

get_message_initial_state_as_json_for_csc(csc)

Return the initial subscription message for a given CSC.

get_sample_name(data_stream)

Override base class default behavior.

get_topic_attribute_name(rev_code)

Returns the associated topic attribute name (e.g.

handle_asynchronous_data_callback(data)

Callback function to handle asynchronous data.

has_matched_metadata(message_data)

Does the message data has the correct metadata?

is_data_stream_stored(data_stream)

Determines if data stream is part of the internal data.

register_additional_action(data_key, ...)

Register additional actions to take for specific data key.

register_monitor_data_periodically(get_data, ...)

Register a callable method so it is periodically pooled for data to be transimitted.

reply_to_message_data(message_data)

Generate a reply based on the input message_data.

retrieve_one_sample(sample_name)

Retrieve one sample from internal_asynchronous table.

retrieve_samples(*args)

Return samples from internal asynchronous table.

send_initial_data()

Send initial data.

send_reply_to_message_data(message_data)

Send reply to message data.

set_topic_name_revcode_mapping()

Create a mapping between topic name and revcode for all topics so the producer can assign the name of the topics from the samples.

set_topic_template_manager_message_format()

Generate manager message format for each registered topic.

should_reply_to_message_data(message_data)

Determines whether a reply to message data should be sent.

store_samples(**kwargs)

Store samples in internal asynchronous table.

add_metadata

close

get_asynchronous_data

get_asynchronous_data_category

get_data_from_message_data

get_event_attribute_names_and_category

get_message_category_as_json

get_metadata

get_periodic_data

get_telemetry_attribute_names_and_category

register_asynchronous_data_category

set_asynchronous_monitor_for

set_monitor_asynchronous_data

set_monitor_heartbeat

set_monitor_periodic_data

start

store_last_sample

async close()
create_revcode_mapping(topic_attribute_name: str) None

For a given topic name, create a revcode mapping.

Parameters:

topic_attribute_name (str) – Name of the topic attribute, e.g. evt_heartbeat.

create_template_manager_message(topic_name)

Create template manager message for given topic.

generate_valid_topic_attribute_names(periodic_data: list) list

For each entry in periodic_data check that is it part of the producer list of topics and return a valid list.

The names in the input list must follow the same rule as the topics in the remote. For instance, telemetry topics must be preceeded by tel_ and events by evt_.

Parameters:

periodic_data (list of str) – List of topic names (precedeed by type).

Returns:

periodic_data_list – Validated list of periodic names.

Return type:

list

get_asynchronous_data(**kwargs) dict
get_event_attribute_names_and_category() list
get_periodic_data(**kwargs) dict
get_sample_name(data_stream: dict) str

Override base class default behavior.

get_telemetry_attribute_names_and_category() list
get_topic_attribute_name(rev_code: str) str

Returns the associated topic attribute name (e.g. evt_heartbeat) for a given rev_code.

Parameters:

rev_code (str) – Revision code of the topic.

Returns:

Topic attribute name, e.g. evt_heartbeat.

Return type:

str

property reply_names: str
async set_asynchronous_monitor_for(asynchronous_data_name: str, asynchronous_data_category: str) None
async set_monitor_asynchronous_data() None
async set_monitor_heartbeat()
async set_monitor_periodic_data() None
set_topic_name_revcode_mapping() None

Create a mapping between topic name and revcode for all topics so the producer can assign the name of the topics from the samples.

set_topic_template_manager_message_format() None

Generate manager message format for each registered topic.

should_reply_to_message_data(message_data: dict) bool

Determines whether a reply to message data should be sent.

Override base class default behavior.

Parameters:

message_data (dict) – Input dictionary with information used by the producer to determine some action to take.

Returns:

Should reply to message data?

Return type:

bool

async start() None
async store_last_sample(sample_name: str) None

love.producer.love_producer_factory module

class love.producer.love_producer_factory.LoveProducerFactory

Bases: object

Methods

get_love_producer_from_name

get_love_producer_from_type

available_love_producer_type = {'base': <class 'love.producer.love_producer_base.LoveProducerBase'>, 'csc': <class 'love.producer.love_producer_csc.LoveProducerCSC'>, 'scriptqueue': <class 'love.producer.love_producer_script_queue.LoveProducerScriptQueue'>, 'watcher': <class 'love.producer.love_producer_watcher.LoveProducerWatcher'>}
classmethod get_love_producer_from_name(component_name: str, **kwargs) LoveProducerBase
classmethod get_love_producer_from_type(love_producer_type: str, **kwargs) LoveProducerBase
named_love_producer_type = {'ScriptQueue': 'scriptqueue', 'Watcher': 'watcher'}

love.producer.love_producer_script_queue module

class love.producer.love_producer_script_queue.LoveProducerScriptQueue(domain: Domain, log: Logger | None = None, **kwargs)

Bases: LoveProducerCSC

Specialized LOVE producer to deal with the ScriptQueue CSC.

Attributes:
component_name
period_default_in_seconds
reply_names
scriptqueue_state_message_data

Script queue state message.

send_message

Send message function.

Methods

add_new_script(salindex)

Add new script to the internal script database.

assert_component_name_is_set([message])

Assert that the component name is set.

create_revcode_mapping(topic_attribute_name)

For a given topic name, create a revcode mapping.

create_template_manager_message(topic_name)

Create template manager message for given topic.

generate_data_name(data_repr)

Generate data name.

generate_valid_topic_attribute_names(...)

For each entry in periodic_data check that is it part of the producer list of topics and return a valid list.

get_empty_script(salindex)

Return an empty script data structure.

get_empty_script_heartbeat(script_index)

Heartbeat message data structure.

get_initial_state_messages_as_json()

Asynchronously generetate all initial state messages.

get_message_initial_state_all_as_json()

Return the initial subscription message to all producers

get_message_initial_state_as_json()

Return the initial subscription message for this producer.

get_message_initial_state_as_json_for_csc(csc)

Return the initial subscription message for a given CSC.

get_sample_name(data_stream)

Override base class default behavior.

get_scriptqueue_state_message_as_json()

Parses the current state into a LOVE friendly format.

get_topic_attribute_name(rev_code)

Returns the associated topic attribute name (e.g.

handle_asynchronous_data_callback(data)

Callback function to handle asynchronous data.

handle_event_script_checkpoints(event)

Callback for the logevent_checkpoints.

handle_event_script_description(event)

Callback for the logevent_description.

handle_event_script_log_level(event)

Listens to the logLevel event.

handle_event_script_metadata(event)

Callback for the logevent_metadata.

handle_event_script_state(event)

Callback for the Script_logevent_state event.

handle_event_scriptqueue_available_scripts(data)

Additional action for availableScripts events.

handle_event_scriptqueue_config_schema(event)

Additional action for configSchema event.

handle_event_scriptqueue_queue(event)

Saves the queue state using the event data and queries the state of each script that does not exist or has not been set up

handle_event_scriptqueue_script(event)

Additional action for script queue script event.

has_matched_metadata(message_data)

Does the message data has the correct metadata?

is_data_stream_stored(data_stream)

Determines if data stream is part of the internal data.

monitor_script_heartbeat_callback(data)

Callback to monitor script heartbeat.

monitor_script_log_message_callback(data)

Callback to monitor script log messages.

register_additional_action(data_key, ...)

Register additional actions to take for specific data key.

register_monitor_data_periodically(get_data, ...)

Register a callable method so it is periodically pooled for data to be transimitted.

reply_to_message_data(message_data)

Generate a reply based on the input message_data.

retrieve_one_sample(sample_name)

Retrieve one sample from internal_asynchronous table.

retrieve_samples(*args)

Return samples from internal asynchronous table.

send_initial_data()

Send initial data.

send_reply_to_message_data(message_data)

Send reply to message data.

send_script_heartbeat(salindex)

Send heartbeat message from script.

send_script_log_message(message_data)

Send log messages from the current script.

send_scriptqueue_state()

Send script queue state.

set_topic_name_revcode_mapping()

Create a mapping between topic name and revcode for all topics so the producer can assign the name of the topics from the samples.

set_topic_template_manager_message_format()

Generate manager message format for each registered topic.

should_reply_to_message_data(message_data)

Determines whether a reply to message data should be sent.

store_samples(**kwargs)

Store samples in internal asynchronous table.

update_scripts_schema()

Update scripts schema.

add_metadata

close

get_asynchronous_data

get_asynchronous_data_category

get_data_from_message_data

get_event_attribute_names_and_category

get_message_category_as_json

get_metadata

get_periodic_data

get_telemetry_attribute_names_and_category

register_asynchronous_data_category

set_asynchronous_monitor_for

set_monitor_asynchronous_data

set_monitor_heartbeat

set_monitor_periodic_data

set_script_heartbeat_producer

start

store_last_sample

add_new_script(salindex: int) None

Add new script to the internal script database.

Parameters:

salindex (int) – The SAL index of the stript

async close()
cmd_timeout = 5.0
get_empty_script(salindex: int) dict

Return an empty script data structure.

Parameters:

salindex (int) – The SAL index of the script.

Returns:

The empty, default, script.

Return type:

dict

get_empty_script_heartbeat(script_index: int) dict

Heartbeat message data structure.

Returns:

Heartbeat message.

Return type:

dict

async get_initial_state_messages_as_json() AsyncIterator[int]

Asynchronously generetate all initial state messages.

Yields:

initial_state (str) – Initial state messages as json.

get_scriptqueue_state_message_as_json() str

Parses the current state into a LOVE friendly format.

Returns:

ScriptQueue state message.

Return type:

str

async handle_event_script_checkpoints(event: Any) None

Callback for the logevent_checkpoints.

Used to extract Script’s checkpoint pause and stop information.

Parameters:

event (Script_logevent_checkpoints) – Event data.

async handle_event_script_description(event: Any) None

Callback for the logevent_description.

Used to extract Script’s description, classname and remotes.

Parameters:

event (Script_logevent_description) – Event data.

async handle_event_script_log_level(event: Any) None

Listens to the logLevel event.

Parameters:

event (Script_logevent_logLevel) – Event data.

async handle_event_script_metadata(event: Any) None

Callback for the logevent_metadata.

Used to extract the expected duration of the script.

Parameters:

event (Script_logevent_metadata) – Event data.

async handle_event_script_state(event: Any) None

Callback for the Script_logevent_state event.

Used to update the state of the script.

Parameters:

event (Script_logevent_state) – Event data.

async handle_event_scriptqueue_available_scripts(data: Any) None

Additional action for availableScripts events.

Updates the list of available_scripts in the state according to the availableScripts event info.

Parameters:

data (ScriptQueue_logevent_availableScripts) – The SAL event data.

async handle_event_scriptqueue_config_schema(event: Any) None

Additional action for configSchema event.

Parameters:

event (ScriptQueue_logevent_configSchema) – ScriptQueue_logevent_configSchema event data.

async handle_event_scriptqueue_queue(event: Any) None

Saves the queue state using the event data and queries the state of each script that does not exist or has not been set up

Parameters:

event (ScriptQueue_logevent_queue) – The SAL event data.

async handle_event_scriptqueue_script(event: Any) None

Additional action for script queue script event.

Parameters:

event (ScriptQueue_logevent_script) – ScriptQueue_logevent_script event data.

max_script_log_messages = 20
monitor_script_heartbeat_callback(data: Any) None

Callback to monitor script heartbeat.

Parameters:

data (Script_logevent_heartbeat) – Script heartbeat topic sample.

async monitor_script_log_message_callback(data: Any) None

Callback to monitor script log messages.

Parameters:

data (Script_logevent_logMessage) –

property reply_names: str
property scriptqueue_state_message_data: dict

Script queue state message.

The data structure matches the required by the manager/frontend view.

async send_reply_to_message_data(message_data: dict) None

Send reply to message data.

async send_script_heartbeat(salindex: int) None

Send heartbeat message from script.

Parameters:

salindex (int) – Index of the script.

async send_script_log_message(message_data: dict) None

Send log messages from the current script.

Parameters:

message_data (dict) – Payload of the message with the request to send script log message.

async send_scriptqueue_state()

Send script queue state.

async set_script_heartbeat_producer() None
should_reply_to_message_data(message_data: dict) bool

Determines whether a reply to message data should be sent.

Override base class default behavior.

Parameters:

message_data (dict) – Input dictionary with information used by the producer to determine some action to take.

Returns:

Should reply to message data?

Return type:

bool

async start() None
async update_scripts_schema() None

Update scripts schema.

love.producer.love_producer_set module

class love.producer.love_producer_set.LoveProducerSet(components, log_level=20)

Bases: object

Container class to configure and host a list of LOVE producers.

Methods

amain()

Parse command line arguments, create and run a LoveManagerClient.

make_argument_parser()

Make command line arguments.

run_producer

signal_handler

async classmethod amain()

Parse command line arguments, create and run a LoveManagerClient.

classmethod make_argument_parser()

Make command line arguments.

async run_producer()
signal_handler()
love.producer.love_producer_set.run_love_producer()

Run love producer.

love.producer.love_producer_watcher module

class love.producer.love_producer_watcher.LoveProducerWatcher(domain: Domain, log: Logger | None = None, **kwargs)

Bases: LoveProducerCSC

Specialized LOVE producer to deal with the Watcher CSC.

Attributes:
alarms_state_message_data

Get the alarms state as a dictionary.

component_name
period_default_in_seconds
reply_names
send_message

Send message function.

Methods

add_new_alarm(alarm)

Add a new alarm to the alarms state.

assert_component_name_is_set([message])

Assert that the component name is set.

create_revcode_mapping(topic_attribute_name)

For a given topic name, create a revcode mapping.

create_template_manager_message(topic_name)

Create template manager message for given topic.

generate_data_name(data_repr)

Generate data name.

generate_valid_topic_attribute_names(...)

For each entry in periodic_data check that is it part of the producer list of topics and return a valid list.

get_alarms_state_as_json()

Get the alarms state as a JSON string.

get_initial_state_messages_as_json()

Asynchronously generetate all initial state messages.

get_message_initial_state_all_as_json()

Return the initial subscription message to all producers

get_message_initial_state_as_json()

Return the initial subscription message for this producer.

get_message_initial_state_as_json_for_csc(csc)

Return the initial subscription message for a given CSC.

get_sample_name(data_stream)

Override base class default behavior.

get_topic_attribute_name(rev_code)

Returns the associated topic attribute name (e.g.

handle_asynchronous_data_callback(data)

Callback function to handle asynchronous data.

handle_event_watcher_alarm(event)

Handle the Watcher_logevent_alarm event.

has_matched_metadata(message_data)

Does the message data has the correct metadata?

is_data_stream_stored(data_stream)

Determines if data stream is part of the internal data.

register_additional_action(data_key, ...)

Register additional actions to take for specific data key.

register_monitor_data_periodically(get_data, ...)

Register a callable method so it is periodically pooled for data to be transimitted.

reply_to_message_data(message_data)

Generate a reply based on the input message_data.

retrieve_one_sample(sample_name)

Retrieve one sample from internal_asynchronous table.

retrieve_samples(*args)

Return samples from internal asynchronous table.

send_initial_data()

Send initial data.

send_reply_to_message_data(message_data)

Send reply to message data.

send_watcher_alarms()

Send the watcher alarms to the LOVE manager.

set_topic_name_revcode_mapping()

Create a mapping between topic name and revcode for all topics so the producer can assign the name of the topics from the samples.

set_topic_template_manager_message_format()

Generate manager message format for each registered topic.

should_reply_to_message_data(message_data)

Determines whether a reply to message data should be sent.

store_samples(**kwargs)

Store samples in internal asynchronous table.

add_metadata

close

get_asynchronous_data

get_asynchronous_data_category

get_data_from_message_data

get_event_attribute_names_and_category

get_message_category_as_json

get_metadata

get_periodic_data

get_telemetry_attribute_names_and_category

register_asynchronous_data_category

set_asynchronous_monitor_for

set_monitor_asynchronous_data

set_monitor_heartbeat

set_monitor_periodic_data

start

store_last_sample

add_new_alarm(alarm: dict) None

Add a new alarm to the alarms state.

property alarms_state_message_data: dict

Get the alarms state as a dictionary.

get_alarms_state_as_json() str

Get the alarms state as a JSON string.

async handle_event_watcher_alarm(event: Any) None

Handle the Watcher_logevent_alarm event.

Parameters:

event (Watcher_logevent_script) – Watcher_logevent_script event data.

Notes

This method is registered as an additional action for the Watcher_logevent_alarm event. It stores alarms in the alarms_state attribute and sends them to the LOVE manager.

async send_watcher_alarms() None

Send the watcher alarms to the LOVE manager.

love.producer.producer_utils module

exception love.producer.producer_utils.ConnectedTaskDoneError

Bases: Exception

Exception raised by the LoveManagerclient.connect_to_manager method internally to handle condition where the connection to the manager was closed unexpectedly.

exception love.producer.producer_utils.MissingMessageParameterError

Bases: Exception

Exception class to be raised on missing message parameter

exception love.producer.producer_utils.MissingMessageStreamError

Bases: Exception

Exception class to be raised on missing message stream

class love.producer.producer_utils.NumpyEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)

Bases: JSONEncoder

Methods

default(obj)

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

encode(o)

Return a JSON string representation of a Python data structure.

iterencode(o[, _one_shot])

Encode the given object and yield each string representation as available.

default(obj)

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this:

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
class love.producer.producer_utils.Settings

Bases: object

Methods

config_path

trace_timestamps

ws_host

ws_pass

static config_path()
static trace_timestamps()
static ws_host()
static ws_pass()
love.producer.producer_utils.check_event_stream(message, category, csc, salindex, stream_name)

Tries to return the first stream found in a LOVE message. Throws errors if it does not exist.

love.producer.producer_utils.check_stream_from_last_message(message, category, csc, salindex, stream)

Takes a message and returns a parameter for a given (category, csc, salindex,stream). If not found then it will throw an error.

love.producer.producer_utils.get_all_csc_names_in_message(message)

Returns a list of all cscs names contained in a message

love.producer.producer_utils.get_available_components()

Return a list of all CSCs available in the idl directory.

Returns:

CSCs with idl files in the idl directory.

Return type:

set of str

love.producer.producer_utils.get_data_type(value)
love.producer.producer_utils.get_event_stream(message, category, csc, salindex, stream_name)

Tries to return the first stream found in a LOVE message. Throws errors if it does not exist.

love.producer.producer_utils.get_parameter_from_last_message(message, category, csc, salindex, stream, parameter)

Takes a message and returns a parameter for a given (category, csc, salindex, stream). If not found then it will throw an error.

love.producer.producer_utils.get_stream_from_last_message(message, category, csc, salindex, stream)

Takes a message and returns a parameter for a given (category, csc, salindex,stream). If not found then it will throw an error.

love.producer.producer_utils.make_stream_message(category, csc, salindex, stream, content)

Returns a message for the LOVE-manager group (category-csc-salindex-stream) with a given content.

love.producer.producer_utils.onemsg_generator(category, csc, salindex, streams_dict)

Generates one msg for the LOVE-manager from a single (csc,salindex) source.

Module contents