cerebro Reference¶
Cerebro¶
- class cerebro.cerebro.Cerebro(*args, **kwargs)[source]¶
Handles a list of data sources to which observers can subscribe.
Creates an RX subject that monitors a list of data sources.
Observer
instances can subscribe to the combined data stream, which is served as a uniformly formatted dictionary.- Parameters:
name – The name of this Cerebro instance. This value is added as the
cerebro
tag to all measurements processed. The name can be used to identify the origin of the data when multiple instances of Cerebro are loading data to the database.tags – A list of tags to add to all the measurements.
sources – A list of
Source
instances to listen to.config –
A file or dictionary from which to load the configuration. The format exactly follows the signature of
Cerebro
and the data sources it refers to. For example:logfile: /data/logs/cerebro/cerebro.log ntp_server: us.pool.ntp.org tags: observatory: ${OBSERVATORY} profiles: default: sources: - tron observers: - influxdb sources: tron: type: tron bucket: Actors host: localhost port: 6093 actors: - tcc - apo observers: influxdb: type: influxdb url: http://localhost:9999 token: null org: SDSS default_bucket: FPS
Each source and observer must include a
type
key that correspond to thesource_type
andobserver_type
value of theSource
andWriter
subclass to be used, respectively. The profile section, if present, is a dictionary of profiles, each one defining a list of sources and observers. If the items in the sources or observers list are strings, the entries from the globalsources
andobservers
sections will be used. The profile sources and observers items can also be dictionaries with the same format as the global sources and observers. Ifconfig
is defined, the keywordssources
andobservers
are ignored.profile – The name of the profile to use from the configuration file. If
None
andconfig
is defined, uses all the global sources and observers.ntp_server – The route to the NTP server to use. The server is queried every hour to determine the offset between the NTP pool and the local computer.
logfile – If set, the path where to write the file log. Otherwise logs only to stdout/stderr.
log_rotate – Whether to rotate the log file at midnight UTC.
- on_next(data)[source]¶
Processes a list of measurements from a data source.
Measurements is expected to be a namedtuple or other kind of namespace with at least a
.data
attribute which must be a list of dictionaries. Each dictionary must contain an individualmeasurement
along with a series of associatedvalues
, and optionally a set oftags
. The default tags are added to each measurement. If a measurement does not contain atime
value, it will be added with the current UTC time, taking into account the offset determined by the NTP server.Additional parameters can be added to the namespace for any observable to interpret, but are not required.
Once updated, the measurements are propagated to all the observers.
- class cerebro.cerebro.SourceList(on_next, sources=[])[source]¶
A list of
Source
instances.Provides a thin wrapper around a
list
class, with methods to add and start a data source or stop it.- pop(index)[source]¶
Remove and return item at index (default last).
Raises IndexError if list is empty or index is out of range.
Observer¶
- class cerebro.observer.InfluxDB(name, org, url='http://localhost:9999', token=None, default_bucket=None)[source]¶
A observer that loads data into an InfluxDB database.
- Parameters:
url (str) – The port-qualified URL of the InfluxDB v2 database. Defaults to
http://localhost:9999
.org (str) – The InfluxDB organisation.
token (Optional[str]) – The token to be used to write to the database buckets. If
None
, uses the value from$INFLUXDB_V2_TOKEN
.default_bucket (Optional[str]) – The default bucket where to write data. Can be overridden by each individual
data source
.name (str)
- class cerebro.observer.Observer(name)[source]¶
An observer class that subscribes to
Cerebro
updates.Observers are subscribed to an instance of
Cerebro
by callingset_cerebro
. Theon_next
method must be overridden to perform an action with the received data.- Parameters:
name (str) – The name of the observer.
Sources¶
- class cerebro.sources.source.Source(name, bucket=None, tags={})[source]¶
A base class for a data source.
Data sources are standalone systems that periodically produces measurements for one or multiple fields. And example is a TCP server that outputs temperature and humidity values as plain ASCII. The data source handles the connection and parsing of the stream, and reports new measurements to
Cerebro
.Internally a source is an
Subject
, which acts as both an observer and observable. When the source callson_next(value)
the value is received byCerebro
, which handles the write to the database.- Parameters:
- async start()[source]¶
Initialises the source.
This method is called by
Cerebro
when the data source is added. It can be empty but more frequently it contains the code to initialise the connection to a TCP server or other data stream. It can be a coroutine, in which caseCerebro
will schedule it as a task.
- class cerebro.sources.source.TCPSource(name, host, port, delay=None, retry=True, retrier_params={}, **kwargs)[source]¶
A source for a TCP server with robust reconnection and error handling.
- Parameters:
name (str) – The name of the data source.
host (str) – The host to which to connect.
port (int) – The port on which the TCP server runs.
delay (float) – How long to wait between queries to the TCP server. If
None
, uses the class delay.kwargs – Other parameters to pass to
Source
.retry (bool)
retrier_params (dict)
- cerebro.sources.source.get_source_subclass(type_)[source]¶
Returns a
Source
subclass based on itsdata_type
.
- class cerebro.sources.tron.ActorClientSource(name, actor, host, port, commands, interval=60.0, bucket=None, tags={}, casts={}, keyword_tags={}, store_broadcasts=False)[source]¶
A data source that connects directly to an actor and issues periodic commands.
This source should be used to complement
TronSource
when one wants a command to be issued periodically without flooding the feed in Tron.- Parameters:
name (str) – The name of the data source.
actor (str) – The name of the actor.
host (str) – The host on which the actors is running.
port (int) – The port on which the actor is running.
commands (list[str]) – A list of commands to issue to the actor on a timer.
interval (float) – The interval, in seconds, between commands.
bucket (Optional[str]) – The bucket to write to. If not set it will use the default bucket.
tags (Dict[str, Any]) – A dictionary of tags to be associated with all measurements.
casts (dict[str, str]) – A dictionary of
actor.keyword.key_name
to cast. E.g.,{boss.exposure_time.exptime: "float"}
.keyword_tags (dict[str, dict]) – A dictionary with a keyword value to be added to the data-points as a tag. This is useful for keywords in which the values are not independent and one wants to add an additional tag from the keyword itself. For example in
fliswarm.status
contains several values that all relate to a camera which is the first value in the keyword. To add the camera name as a tag we can pass{"fliswarm.status": {"index": 0, "name": "camera}}
.store_broadcasts (bool) – Whether to store broadcast messages that may not be in response to a command.
- async schedule_command(command, interval=None)[source]¶
Schedules a command to be executed on an interval.
- class cerebro.sources.tron.TronSource(name, bucket=None, tags={}, actors=[], host='localhost', port=6093, keywords=None, commands={}, casts={}, keyword_tags={})[source]¶
A data source that monitors a Tron connection.
Connects to Tron as a TCP client and parses actor keywords. Data values are sent to cerebro with the actor name as
measurement
and the keyword name asfield_key
. If the key contains multiple values, the name of each value is added to thefield_key
askeyword_keyname
. If the value does not have a name, the zero-indexed index of its key is used.Internally it uses CLU to establish the connection to Tron and parse the keywords. It requires
actorkeys
to be importable.- Parameters:
name (str) – The name of the data source.
bucket (Optional[str]) – The bucket to write to. If not set it will use the default bucket.
tags (Dict[str, Any]) – A dictionary of tags to be associated with all measurements.
actors (List[str]) – A list of actor names to monitor.
host (str) – The host on which to connect to Tron.
port (int) – The port on which Tron is running.
keywords (Optional[List[str]]) – A list of keywords to monitor for a given actor. If
None
, all keywords are monitored and recorded.commands (dict[str, float]) – A dictionary of command string to be sent to the actor on an interval. The value of each key-value pair is the interval, in seconds.
casts (dict[str, str]) – A dictionary of
actor.keyword.key_name
to cast. E.g.,{boss.exposure_time.exptime: "float"}
.keyword_tags (dict[str, dict]) – A dictionary with a keyword value to be added to the data-points as a tag. This is useful for keywords in which the values are not independent and one wants to add an additional tag from the keyword itself. For example in
fliswarm.status
contains several values that all relate to a camera which is the first value in the keyword. To add the camera name as a tag we can pass{"fliswarm.status": {"index": 0, "name": "camera}}
.
- cerebro.sources.tron.process_keyword(keyword, actor, tags={}, keyword_tags={}, casts={})[source]¶
Creates a series of data points out of a keyword.
- class cerebro.sources.AMQP.AMQPSource(name, bucket=None, tags={}, host='localhost', port=5672, user='guest', password='guest', keywords=None, groupers=[], internal=False, commands={})[source]¶
A source for AMQP actors.
- Parameters:
name (str) – The name of the data source.
bucket (Optional[str]) – The bucket to write to. If not set it will use the default bucket.
tags (dict[str, Any]) – A dictionary of tags to be associated with all measurements.
host (str) – The host on which RabbitMQ is running.
port (int) – The port on which RabbitMQ is running.
user (str) – The username to use to use to connect to RabbitMQ.
password (str) – The password to use to use to connect to RabbitMQ.
keywords (list[str] | None) – A list of keyword values to output. The format must be
actor.keyword.subkeyword.subsubkeyword
. The final value extracted must be a scalar. IfNone
, all keywords will be stored.groupers (list[str]) – A list of subkeywords that, if found, will be added as tags to the measurement. These are useful when the same keyword can be output for different devices or controllers.
internal (bool) – Mark commands sent to the actor as internal.
commands (dict[str, float]) – A mapping of commands to be issued to the interval, in seconds. For example
{"archon status": 5}
.
- async process_keyword(reply)[source]¶
Processes a keyword received from an actor.
- Parameters:
reply (AMQPReply)
- class cerebro.sources.lvm.GoveeSource(*args, address, device=None, **kwargs)[source]¶
Retrieves temperature and RH from a TCP server connected to a Govee BT device.
- class cerebro.sources.lvm.LVMIEBSource(name, controller, config, **kwargs)[source]¶
A source for the LVM IEB boxes that parses the Archon configuration file.
- class cerebro.sources.lvm.Sens4Source(*args, device_id, ccd='NA', delay=1, **kwargs)[source]¶
Reads pressure and temperature from multiple Sens4 transducers.
The Sens4 provides this information over a serial interface but we assume there is a bidirectional byte stream between the serial device and a TCP socket.
- class cerebro.sources.lvm.ThermistorsSource(name, host, port=1025, mapping={}, bucket=None, tags={}, interval=None)[source]¶
Reads the spectrograph thermistors.
Reads the ADAM 6251-B module and outputs the status of each thermistor.
- Parameters:
name (str) – The name of the data source.
host (str) – The ADAM module IP.
port (int) – The UDP port that serves the ASCII service.
mapping (dict[str, str]) – A mapping of
channelN
to a channel name that will be stored as as tag.bucket (Optional[str]) – The bucket to write to. If not set it will use the default bucket.
tags (dict[str, Any]) – A dictionary of tags to be associated with all measurements.
interval (float) – How often to read the thermistors.