Source code for cerebro.cerebro

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2020-08-03
# @Filename: cerebro.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import abc
import asyncio
import datetime
import json
import os
import pathlib
import socket
import time
import warnings

from typing import Any, Dict, List, Optional

import ntplib
from rx.scheduler.eventloop.asyncioscheduler import AsyncIOScheduler
from rx.subject.subject import Subject

from sdsstools import read_yaml_file

from . import log
from .observer import Observer, get_observer_subclass
from .sources import Source, get_source_subclass


[docs] class SourceList(list): """A list of `.Source` instances. Provides a thin wrapper around a `list` class, with methods to add and start a data source or stop it. """ def __init__(self, loop: asyncio.AbstractEventLoop, on_next, sources=[]): super().__init__() self.on_next = on_next self.scheduler = AsyncIOScheduler(loop) self._name_to_source = {} for source in sources: self.add_source(source) def stop(self): for source in self: if asyncio.iscoroutinefunction(source.stop): close_task = asyncio.create_task(source.stop()) while not close_task.done(): pass else: source.stop() def __del__(self): self.stop() def __add__(self, x): raise NotImplementedError("Addition is not implemented.")
[docs] def insert(self, *args): raise NotImplementedError("Insert is not implemented.")
[docs] def remove(self, source): source.dispose() return super().remove(source)
[docs] def append(self, source): return self.add_source(source)
[docs] def pop(self, index): return self.remove_source(self[index].name)
[docs] def get(self, name): """Retrieves a data source by name.""" if name not in self._name_to_source: raise ValueError(f"Data source {name!r} not found.") return self._name_to_source[name]
[docs] def add_source(self, source: Source): """Adds a `.Source`.""" if not isinstance(source, Source): raise NotImplementedError("Only instances of Source can be passed.") source.subscribe(on_next=self.on_next, scheduler=self.scheduler) assert asyncio.iscoroutinefunction(source.start) # We add the source even if it's not running. self._name_to_source[source.name] = source super().append(source) timeout = getattr(source, "timeout", None) asyncio.create_task(self._start_source(source, timeout=timeout))
async def _start_source(self, source, timeout=None): """Starts the source.""" try: await asyncio.wait_for(source.start(), timeout=timeout) except asyncio.TimeoutError: log.error(f"Timed out trying to start source {source.name}.") source.running = False except BaseException as exception: log.error(f"Failed starting source {source.name}: {exception!s}") source.running = False else: log.debug(f"Started source {source.name}.")
[docs] def remove_source(self, source_name): """Removes a source.""" source = self._name_to_source[source_name] self.remove(source) self._name_to_source.pop(source_name)
[docs] class Cerebellum(type): """Metaclass for Cerebro.""" def __call__(cls, *args, **kwargs): args, kwargs = cls.__parse_config__(*args, **kwargs) obj = Cerebro.__new__(cls) obj.__init__(*args, **kwargs) return obj @staticmethod def __parse_config__(*args, **kwargs): """Overrides initialisation parameters from a configuration file.""" if kwargs.get("config", None) is None: return args, kwargs if kwargs.get("sources", {}) and kwargs.get("profile", None): raise ValueError("profile and sources are incompatible.") # Remove input sources and observers. sources_kw = kwargs.pop("sources", {}) kwargs.pop("observers", None) profile_name = kwargs.get("profile", None) config_file = kwargs.pop("config") if isinstance(config_file, (str, pathlib.Path)): config = read_yaml_file(config_file) elif isinstance(config_file, dict): config = config_file.copy() else: raise ValueError(f"Invalid type {type(config_file)} for config.") config.update(kwargs) profiles_data = config.pop("profiles", {}) sources = [] observers = [] if profile_name: assert profile_name in profiles_data profile = profiles_data[profile_name] global_sources = config.get("sources", {}) global_observers = config.get("observers", {}) for source in profile["sources"]: if isinstance(source, str): sources.append( Cerebellum.__get_source(source, global_sources[source]) ) elif isinstance(source, dict): for source_name in source: sources.append( Cerebellum.__get_source( source_name, source[source_name], ) ) else: raise TypeError("Profile sources must be strings or dicts.") for observer in profile["observers"]: if isinstance(observer, str): observers.append( Cerebellum.__get_observer(observer, global_observers[observer]) ) elif isinstance(observer, dict): for observer_name in observer: observers.append( Cerebellum.__get_observer( observer_name, observer[observer_name], ) ) else: raise TypeError("Profile observers must be strings or dicts.") else: for source_name, params in config.pop("sources", {}).items(): if len(sources_kw) != 0 and source_name not in sources_kw: continue sources.append(Cerebellum.__get_source(source_name, params)) for observer_name, params in config.pop("observers", {}).items(): observers.append(Cerebellum.__get_observer(observer_name, params)) config["sources"] = sources config["observers"] = observers return args, config @staticmethod def __get_source(source_name, params): type_ = params.pop("type") Subclass = get_source_subclass(type_) if Subclass is None: raise ValueError(f"Source type {type_} is not valid.") return Subclass(source_name, **params) @staticmethod def __get_observer(source_name, params): type_ = params.pop("type") Subclass = get_observer_subclass(type_) if Subclass is None: raise ValueError(f"Observer type {type_} is not valid.") return Subclass(source_name, **params)
# Combine ABCMeta and Cerebellum so that can add it as metaclass to Cerebro # without getting the "the metaclass of a derived class must be a (non-strict) # subclass of the metaclasses of all its bases" error.
[docs] class MetaCerebro(abc.ABCMeta, Cerebellum): pass
[docs] class Cerebro(Subject, metaclass=MetaCerebro): """Handles a list of data sources to which observers can subscribe. Creates an `RX <https://rxpy.readthedocs.io/en/latest/>`__ subject that monitors a list of data sources. `.Observer` instances can subscribe to the combined data stream, which is served as a uniformly formatted dictionary. Parameters ---------- name The name of this Cerebro instance. This value is added as the ``cerebro`` tag to all measurements processed. The name can be used to identify the origin of the data when multiple instances of Cerebro are loading data to the database. tags A list of tags to add to all the measurements. sources A list of `.Source` instances to listen to. config A file or dictionary from which to load the configuration. The format exactly follows the signature of `.Cerebro` and the data sources it refers to. For example: .. code-block:: yaml logfile: /data/logs/cerebro/cerebro.log ntp_server: us.pool.ntp.org tags: observatory: ${OBSERVATORY} profiles: default: sources: - tron observers: - influxdb sources: tron: type: tron bucket: Actors host: localhost port: 6093 actors: - tcc - apo observers: influxdb: type: influxdb url: http://localhost:9999 token: null org: SDSS default_bucket: FPS Each source and observer must include a ``type`` key that correspond to the ``source_type`` and ``observer_type`` value of the `.Source` and `.Writer` subclass to be used, respectively. The profile section, if present, is a dictionary of profiles, each one defining a list of sources and observers. If the items in the sources or observers list are strings, the entries from the global ``sources`` and ``observers`` sections will be used. The profile sources and observers items can also be dictionaries with the same format as the global sources and observers. If ``config`` is defined, the keywords ``sources`` and ``observers`` are ignored. profile The name of the profile to use from the configuration file. If `None` and ``config`` is defined, uses all the global sources and observers. ntp_server The route to the NTP server to use. The server is queried every hour to determine the offset between the NTP pool and the local computer. logfile If set, the path where to write the file log. Otherwise logs only to stdout/stderr. log_rotate Whether to rotate the log file at midnight UTC. """ def __init__( self, name: str = "cerebro", tags: Dict[str, Any] = {}, sources: List[Source | str] = [], observers: List[Observer] = [], config: Optional[str | dict | pathlib.Path] = None, profile: Optional[str] = None, ntp_server: str = "us.pool.ntp.org", logfile: Optional[str] = None, log_rotate: bool = True, ): Subject.__init__(self) self.name = name host = socket.getfqdn() if logfile: if os.path.isdir(logfile) and os.path.exists(logfile): logfile = os.path.join(logfile, f"{name}.log") log.start_file_logger(logfile, rotating=log_rotate) start_time = datetime.datetime.now(datetime.timezone.utc).isoformat() log.debug(f"Starting Cerebro at {start_time} on host {host}.") try: self.loop = asyncio.get_event_loop() except RuntimeError: # "There is no current event loop in thread %r" self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) self.sources = SourceList(self.loop, self.on_next, sources) for observer in observers: observer.set_cerebro(self) log.debug(f"Added observer of type {observer.observer_type}.") # Add the name of the instance and the host to the default tags. self.tags = tags.copy() self._offset = 0 self.loop.call_soon(self.update_time_offset, ntp_server) self.status_server = None
[docs] async def start(self): """Starts the status file socket.""" self.status_server = await asyncio.start_unix_server( self.status_server_cb, path="/tmp/cerebro.sock", )
[docs] def stop(self): """Stops the InfluxDB client and all the sources.""" self.sources.stop() if self.status_server is not None: self.status_server.close()
[docs] def on_next(self, data): """Processes a list of measurements from a data source. Measurements is expected to be a namedtuple or other kind of namespace with at least a ``.data`` attribute which must be a list of dictionaries. Each dictionary must contain an individual ``measurement`` along with a series of associated ``values``, and optionally a set of ``tags``. The default tags are added to each measurement. If a measurement does not contain a ``time`` value, it will be added with the current UTC time, taking into account the offset determined by the NTP server. Additional parameters can be added to the namespace for any observable to interpret, but are not required. Once updated, the measurements are propagated to all the observers. """ if data.data == [] or data.data is None: return meas_time = int((time.time() + self._offset / 1e3) * 1e9) for point in data.data: if "time" not in point: # Time is in nanoseconds since UNIX epoch. point["time"] = meas_time point["tags"].update(self.tags) # Propagate to all the observers. Subject.on_next(self, data)
[docs] def update_time_offset(self, server: str): """Updates the internal offset with the NTP server.""" try: ntp = ntplib.NTPClient() offset = ntp.request(server, version=3).delay if offset: self._offset = offset except Exception as ee: warnings.warn(f"Failed updating offset from NTP server: {ee}", UserWarning) self.loop.call_later(3600.0, self.update_time_offset, server)
[docs] async def status_server_cb( self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, ): """Handles new connections to the status server.""" while True: command = await reader.readline() command = command.decode().strip() if reader.at_eof(): return None if command == "status": status = {source.name: source.running for source in self.sources} writer.write(json.dumps(status, indent=None).encode() + b"\n") await writer.drain() elif "restart" in command: source_name = command.split()[1] try: source = self.sources.get(source_name) await source.restart() writer.write(b"true\n") except BaseException: writer.write(b"false\n") await writer.drain() elif command == "exit": break