cerebro Reference

Cerebro

class cerebro.cerebro.Cerebro(*args, **kwargs)[source]

Handles a list of data sources to which observers can subscribe.

Creates an RX subject that monitors a list of data sources. Observer instances can subscribe to the combined data stream, which is served as a uniformly formatted dictionary.

Parameters:
  • name – The name of this Cerebro instance. This value is added as the cerebro tag to all measurements processed. The name can be used to identify the origin of the data when multiple instances of Cerebro are loading data to the database.

  • tags – A list of tags to add to all the measurements.

  • sources – A list of Source instances to listen to.

  • config

    A file or dictionary from which to load the configuration. The format exactly follows the signature of Cerebro and the data sources it refers to. For example:

    logfile: /data/logs/cerebro/cerebro.log
    ntp_server: us.pool.ntp.org
    tags:
      observatory: ${OBSERVATORY}
    profiles:
      default:
        sources:
          - tron
        observers:
          - influxdb
    sources:
      tron:
        type: tron
        bucket: Actors
        host: localhost
        port: 6093
        actors:
          - tcc
          - apo
    observers:
      influxdb:
        type: influxdb
        url: http://localhost:9999
        token: null
        org: SDSS
        default_bucket: FPS
    

    Each source and observer must include a type key that correspond to the source_type and observer_type value of the Source and Writer subclass to be used, respectively. The profile section, if present, is a dictionary of profiles, each one defining a list of sources and observers. If the items in the sources or observers list are strings, the entries from the global sources and observers sections will be used. The profile sources and observers items can also be dictionaries with the same format as the global sources and observers. If config is defined, the keywords sources and observers are ignored.

  • profile – The name of the profile to use from the configuration file. If None and config is defined, uses all the global sources and observers.

  • ntp_server – The route to the NTP server to use. The server is queried every hour to determine the offset between the NTP pool and the local computer.

  • logfile – If set, the path where to write the file log. Otherwise logs only to stdout/stderr.

  • log_rotate – Whether to rotate the log file at midnight UTC.

on_next(data)[source]

Processes a list of measurements from a data source.

Measurements is expected to be a namedtuple or other kind of namespace with at least a .data attribute which must be a list of dictionaries. Each dictionary must contain an individual measurement along with a series of associated values, and optionally a set of tags. The default tags are added to each measurement. If a measurement does not contain a time value, it will be added with the current UTC time, taking into account the offset determined by the NTP server.

Additional parameters can be added to the namespace for any observable to interpret, but are not required.

Once updated, the measurements are propagated to all the observers.

async start()[source]

Starts the status file socket.

async status_server_cb(reader, writer)[source]

Handles new connections to the status server.

Parameters:
  • reader (StreamReader)

  • writer (StreamWriter)

stop()[source]

Stops the InfluxDB client and all the sources.

update_time_offset(server)[source]

Updates the internal offset with the NTP server.

Parameters:

server (str)

class cerebro.cerebro.SourceList(on_next, sources=[])[source]

A list of Source instances.

Provides a thin wrapper around a list class, with methods to add and start a data source or stop it.

add_source(source)[source]

Adds a Source.

Parameters:

source (Source)

append(source)[source]

Append object to the end of the list.

get(name)[source]

Retrieves a data source by name.

insert(*args)[source]

Insert object before index.

pop(index)[source]

Remove and return item at index (default last).

Raises IndexError if list is empty or index is out of range.

remove(source)[source]

Remove first occurrence of value.

Raises ValueError if the value is not present.

remove_source(source_name)[source]

Removes a source.

Observer

class cerebro.observer.InfluxDB(name, org, url='http://localhost:9999', token=None, default_bucket=None)[source]

A observer that loads data into an InfluxDB database.

Parameters:
  • url (str) – The port-qualified URL of the InfluxDB v2 database. Defaults to http://localhost:9999.

  • org (str) – The InfluxDB organisation.

  • token (Optional[str]) – The token to be used to write to the database buckets. If None, uses the value from $INFLUXDB_V2_TOKEN.

  • default_bucket (Optional[str]) – The default bucket where to write data. Can be overridden by each individual data source.

  • name (str)

dispose()[source]

Disposes of the observer and closes the connection to InfluxDB.

on_next(data)[source]

Loads data to InfluxDB.

class cerebro.observer.Observer(name)[source]

An observer class that subscribes to Cerebro updates.

Observers are subscribed to an instance of Cerebro by calling set_cerebro. The on_next method must be overridden to perform an action with the received data.

Parameters:

name (str) – The name of the observer.

abstract on_next(data)[source]

Notify the observer of a new element in the sequence.

set_cerebro(cerebro)[source]

Sets the instance of Cerebro and subscribes to it.

cerebro.observer.get_observer_subclass(type_)[source]

Returns a Observer subclass based on its data_type.

Parameters:

type_ (str)

Return type:

Type[Observer] | None

Sources

class cerebro.sources.source.DataPoints(bucket, data)[source]
Parameters:
bucket: str | None

Alias for field number 0

data: List[Dict[str, Any]]

Alias for field number 1

class cerebro.sources.source.Source(name, bucket=None, tags={})[source]

A base class for a data source.

Data sources are standalone systems that periodically produces measurements for one or multiple fields. And example is a TCP server that outputs temperature and humidity values as plain ASCII. The data source handles the connection and parsing of the stream, and reports new measurements to Cerebro.

Internally a source is an Subject, which acts as both an observer and observable. When the source calls on_next(value) the value is received by Cerebro, which handles the write to the database.

Parameters:
  • name (str) – The name of the data source.

  • bucket (Optional[str]) – The bucket to write to. If not set it will use the default bucket.

  • tags (Dict[str, Any]) – A dictionary of tags to be associated with all measurements.

async restart()[source]

Restarts the source.

async start()[source]

Initialises the source.

This method is called by Cerebro when the data source is added. It can be empty but more frequently it contains the code to initialise the connection to a TCP server or other data stream. It can be a coroutine, in which case Cerebro will schedule it as a task.

stop()[source]

Stops the data source parsing and closes any open connection.

source_type: str | None = None

The type of data source.

Type:

str

timeout: float | None = None

Seconds to wait for initialisation.

Type:

float

class cerebro.sources.source.TCPSource(name, host, port, delay=None, retry=True, retrier_params={}, **kwargs)[source]

A source for a TCP server with robust reconnection and error handling.

Parameters:
  • name (str) – The name of the data source.

  • host (str) – The host to which to connect.

  • port (int) – The port on which the TCP server runs.

  • delay (float) – How long to wait between queries to the TCP server. If None, uses the class delay.

  • kwargs – Other parameters to pass to Source.

  • retry (bool)

  • retrier_params (dict)

async read(delay=None)[source]

Queries the TCP server, emits data points, and handles disconnections.

async start()[source]

Connects to the socket.

async stop()[source]

Disconnects from socket.

cerebro.sources.source.get_source_subclass(type_)[source]

Returns a Source subclass based on its data_type.

Parameters:

type_ (str)

Return type:

Type[Source] | None

class cerebro.sources.tron.ActorClientSource(name, actor, host, port, commands, interval=60.0, bucket=None, tags={}, casts={}, keyword_tags={}, store_broadcasts=False)[source]

A data source that connects directly to an actor and issues periodic commands.

This source should be used to complement TronSource when one wants a command to be issued periodically without flooding the feed in Tron.

Parameters:
  • name (str) – The name of the data source.

  • actor (str) – The name of the actor.

  • host (str) – The host on which the actors is running.

  • port (int) – The port on which the actor is running.

  • commands (list[str]) – A list of commands to issue to the actor on a timer.

  • interval (float) – The interval, in seconds, between commands.

  • bucket (Optional[str]) – The bucket to write to. If not set it will use the default bucket.

  • tags (Dict[str, Any]) – A dictionary of tags to be associated with all measurements.

  • casts (dict[str, str]) – A dictionary of actor.keyword.key_name to cast. E.g., {boss.exposure_time.exptime: "float"}.

  • keyword_tags (dict[str, dict]) – A dictionary with a keyword value to be added to the data-points as a tag. This is useful for keywords in which the values are not independent and one wants to add an additional tag from the keyword itself. For example in fliswarm.status contains several values that all relate to a camera which is the first value in the keyword. To add the camera name as a tag we can pass {"fliswarm.status": {"index": 0, "name": "camera}}.

  • store_broadcasts (bool) – Whether to store broadcast messages that may not be in response to a command.

connected()[source]

Checks whether the client is connected.

async schedule_command(command, interval=None)[source]

Schedules a command to be executed on an interval.

Parameters:
async start(get_keys=True)[source]

Starts the connection to Tron.

Parameters:

get_keys (bool) – If True, gets all the keys in the models.

async stop()[source]

Closes the connection.

source_type: str | None = 'actor_client'

The type of data source.

Type:

str

timeout: float | None = 60

Seconds to wait for initialisation.

Type:

float

class cerebro.sources.tron.TronSource(name, bucket=None, tags={}, actors=[], host='localhost', port=6093, keywords=None, commands={}, casts={}, keyword_tags={})[source]

A data source that monitors a Tron connection.

Connects to Tron as a TCP client and parses actor keywords. Data values are sent to cerebro with the actor name as measurement and the keyword name as field_key. If the key contains multiple values, the name of each value is added to the field_key as keyword_keyname. If the value does not have a name, the zero-indexed index of its key is used.

Internally it uses CLU to establish the connection to Tron and parse the keywords. It requires actorkeys to be importable.

Parameters:
  • name (str) – The name of the data source.

  • bucket (Optional[str]) – The bucket to write to. If not set it will use the default bucket.

  • tags (Dict[str, Any]) – A dictionary of tags to be associated with all measurements.

  • actors (List[str]) – A list of actor names to monitor.

  • host (str) – The host on which to connect to Tron.

  • port (int) – The port on which Tron is running.

  • keywords (Optional[List[str]]) – A list of keywords to monitor for a given actor. If None, all keywords are monitored and recorded.

  • commands (dict[str, float]) – A dictionary of command string to be sent to the actor on an interval. The value of each key-value pair is the interval, in seconds.

  • casts (dict[str, str]) – A dictionary of actor.keyword.key_name to cast. E.g., {boss.exposure_time.exptime: "float"}.

  • keyword_tags (dict[str, dict]) – A dictionary with a keyword value to be added to the data-points as a tag. This is useful for keywords in which the values are not independent and one wants to add an additional tag from the keyword itself. For example in fliswarm.status contains several values that all relate to a camera which is the first value in the keyword. To add the camera name as a tag we can pass {"fliswarm.status": {"index": 0, "name": "camera}}.

async process_keyword(_, keyword)[source]

Processes a keyword received from Tron.

async schedule_command(command, interval)[source]

Schedules a command to be executed on an interval.

Parameters:
async start()[source]

Starts the connection to Tron.

async stop()[source]

Closes the connection to Tron.

source_type: str | None = 'tron'

The type of data source.

Type:

str

timeout: float | None = 60

Seconds to wait for initialisation.

Type:

float

cerebro.sources.tron.process_keyword(keyword, actor, tags={}, keyword_tags={}, casts={})[source]

Creates a series of data points out of a keyword.

Parameters:
  • keyword (Keyword)

  • actor (str)

  • tags (dict)

  • keyword_tags (dict)

  • casts (dict)

class cerebro.sources.AMQP.AMQPSource(name, bucket=None, tags={}, host='localhost', port=5672, user='guest', password='guest', keywords=None, groupers=[], internal=False, commands={})[source]

A source for AMQP actors.

Parameters:
  • name (str) – The name of the data source.

  • bucket (Optional[str]) – The bucket to write to. If not set it will use the default bucket.

  • tags (dict[str, Any]) – A dictionary of tags to be associated with all measurements.

  • host (str) – The host on which RabbitMQ is running.

  • port (int) – The port on which RabbitMQ is running.

  • user (str) – The username to use to use to connect to RabbitMQ.

  • password (str) – The password to use to use to connect to RabbitMQ.

  • keywords (list[str] | None) – A list of keyword values to output. The format must be actor.keyword.subkeyword.subsubkeyword. The final value extracted must be a scalar. If None, all keywords will be stored.

  • groupers (list[str]) – A list of subkeywords that, if found, will be added as tags to the measurement. These are useful when the same keyword can be output for different devices or controllers.

  • internal (bool) – Mark commands sent to the actor as internal.

  • commands (dict[str, float]) – A mapping of commands to be issued to the interval, in seconds. For example {"archon status": 5}.

async process_keyword(reply)[source]

Processes a keyword received from an actor.

Parameters:

reply (AMQPReply)

async schedule_command(command, interval)[source]

Schedules a command to be executed on an interval.

Parameters:
async start()[source]

Starts the connection to RabbitMQ.

async stop()[source]

Closes the connection to Tron.

source_type: str | None = 'amqp'

The type of data source.

Type:

str

timeout: float | None = 60

Seconds to wait for initialisation.

Type:

float

class cerebro.sources.lvm.GoveeSource(*args, address, device=None, **kwargs)[source]

Retrieves temperature and RH from a TCP server connected to a Govee BT device.

Parameters:
  • address (str)

  • device (Optional[str])

source_type: str | None = 'govee'

The type of data source.

Type:

str

class cerebro.sources.lvm.LVMIEBSource(name, controller, config, **kwargs)[source]

A source for the LVM IEB boxes that parses the Archon configuration file.

Parameters:
source_type: str | None = 'lvm_ieb'

The type of data source.

Type:

str

class cerebro.sources.lvm.Sens4Source(*args, device_id, ccd='NA', delay=1, **kwargs)[source]

Reads pressure and temperature from multiple Sens4 transducers.

The Sens4 provides this information over a serial interface but we assume there is a bidirectional byte stream between the serial device and a TCP socket.

Parameters:
source_type: str | None = 'sens4'

The type of data source.

Type:

str

class cerebro.sources.lvm.ThermistorsSource(name, host, port=1025, mapping={}, bucket=None, tags={}, interval=None)[source]

Reads the spectrograph thermistors.

Reads the ADAM 6251-B module and outputs the status of each thermistor.

Parameters:
  • name (str) – The name of the data source.

  • host (str) – The ADAM module IP.

  • port (int) – The UDP port that serves the ASCII service.

  • mapping (dict[str, str]) – A mapping of channelN to a channel name that will be stored as as tag.

  • bucket (Optional[str]) – The bucket to write to. If not set it will use the default bucket.

  • tags (dict[str, Any]) – A dictionary of tags to be associated with all measurements.

  • interval (float) – How often to read the thermistors.

async start()[source]

Starts the runner.

async stop()[source]

Stops the runner.

source_type: str | None = 'lvm_thermistors'

The type of data source.

Type:

str