Source code for cerebro.observer

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2020-08-11
# @Filename: observer.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import abc
import asyncio
import os

from typing import Optional, Type

from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import ASYNCHRONOUS
from influxdb_client.rest import ApiException
from rx.core.observer.observer import Observer as RXObserver
from rx.scheduler.eventloop.asyncioscheduler import AsyncIOScheduler

from . import log


[docs] class Observer(RXObserver, metaclass=abc.ABCMeta): """An observer class that subscribes to `.Cerebro` updates. Observers are subscribed to an instance of `.Cerebro` by calling `.set_cerebro`. The `.on_next` method must be overridden to perform an action with the received data. Parameters ---------- name The name of the observer. """ observer_type = None def __init__(self, name: str): if self.observer_type is None: raise ValueError( f"observer_type is not defined for class {self.__class__.__name__}." ) super().__init__(on_next=self.on_next) self.cerebro = None self.name = name self.loop = asyncio.get_running_loop() self.scheduler = AsyncIOScheduler(self.loop)
[docs] def set_cerebro(self, cerebro): """Sets the instance of `.Cerebro` and subscribes to it.""" self.cerebro = cerebro self.cerebro.subscribe(self, scheduler=self.scheduler)
[docs] @abc.abstractmethod def on_next(self, data): pass
[docs] class InfluxDB(Observer): """A observer that loads data into an InfluxDB database. Parameters ---------- url The port-qualified URL of the InfluxDB v2 database. Defaults to ``http://localhost:9999``. org The InfluxDB organisation. token The token to be used to write to the database buckets. If `None`, uses the value from ``$INFLUXDB_V2_TOKEN``. default_bucket The default bucket where to write data. Can be overridden by each individual `data source <.Source>`. """ observer_type = "influxdb" def __init__( self, name: str, org: str, url: str = "http://localhost:9999", token: Optional[str] = None, default_bucket: Optional[str] = None, ): super().__init__(name) self.default_bucket = default_bucket if token is None: if "INFLUXDB_V2_TOKEN" in os.environ: token = os.environ["INFLUXDB_V2_TOKEN"] else: raise ValueError("Token not provided or found in INFLUXDB_V2_TOKEN") # Establish connection to InfluxDB self.client = InfluxDBClient(url=url, token=token, org=org) self.write_client = self.client.write_api(write_options=ASYNCHRONOUS)
[docs] def dispose(self): """Disposes of the observer and closes the connection to InfluxDB.""" super().dispose() if hasattr(self, "client"): self.client.__del__()
[docs] def on_next(self, data): """Loads data to InfluxDB.""" bucket = data.bucket or self.default_bucket if not bucket: raise ValueError("bucket is not defined.") try: result = self.write_client.write(bucket=bucket, record=data.data) result.get() except ApiException as ee: log.error(f"Failed writing to bucket {bucket}: {ee}")
[docs] def get_observer_subclass(type_: str) -> Type[Observer] | None: """Returns a `.Observer` subclass based on its ``data_type``.""" for subclass in Observer.__subclasses__(): if subclass.observer_type == type_: return subclass return None