Source code for cerebro.sources.AMQP

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2021-05-04
# @Filename: AMQP.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
from collections.abc import MutableMapping
from contextlib import suppress

from typing import TYPE_CHECKING, Any, Callable, Coroutine, Optional

from clu import AMQPClient, AMQPReply

from .source import DataPoints, Source


if TYPE_CHECKING:
    from aio_pika import IncomingMessage

__all__ = ["AMQPSource"]


def flatten_dict(
    d: MutableMapping,
    parent_key: str = "",
    sep: str = ".",
    groupers: list[str] = [],
) -> tuple[MutableMapping, dict]:
    """From https://bit.ly/3KN9v3G."""

    groupings = {}

    items = []
    for k, v in d.items():
        if isinstance(v, (tuple, list)):
            continue

        new_key = parent_key + sep + k if parent_key else k
        if isinstance(v, MutableMapping):
            flattened, new_groupings = flatten_dict(
                v,
                new_key,
                groupers=groupers,
                sep=sep,
            )
            items.extend(flattened.items())
            groupings.update(new_groupings)
        else:
            items.append((new_key, v))
            if k in groupers:
                groupings[k] = v

    return dict(items), groupings


class ReplyClient(AMQPClient):
    """A client that monitors all the replies."""

    def __init__(
        self,
        *args,
        callback: Callable[[AMQPReply], Coroutine] | None = None,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)

        self.callback = callback

    async def handle_reply(self, message: IncomingMessage):
        """Prints the formatted reply."""

        reply = await super().handle_reply(message)

        if self.callback:
            asyncio.create_task(self.callback(reply))

        return reply


[docs] class AMQPSource(Source): """A source for AMQP actors. Parameters ---------- name The name of the data source. bucket The bucket to write to. If not set it will use the default bucket. tags A dictionary of tags to be associated with all measurements. host The host on which RabbitMQ is running. port The port on which RabbitMQ is running. user The username to use to use to connect to RabbitMQ. password The password to use to use to connect to RabbitMQ. keywords A list of keyword values to output. The format must be ``actor.keyword.subkeyword.subsubkeyword``. The final value extracted must be a scalar. If `None`, all keywords will be stored. groupers A list of subkeywords that, if found, will be added as tags to the measurement. These are useful when the same keyword can be output for different devices or controllers. internal Mark commands sent to the actor as internal. commands A mapping of commands to be issued to the interval, in seconds. For example ``{"archon status": 5}``. """ source_type = "amqp" timeout = 60 def __init__( self, name: str, bucket: Optional[str] = None, tags: dict[str, Any] = {}, host: str = "localhost", port: int = 5672, user: str = "guest", password: str = "guest", keywords: list[str] | None = None, groupers: list[str] = [], internal: bool = False, commands: dict[str, float] = {}, ): super().__init__(name, bucket=bucket, tags=tags) self.client = ReplyClient( f"cerebro_client_{name}", user=user, password=password, host=host, port=port, callback=self.process_keyword, ) self.keywords = keywords self.groupers = groupers self.internal = internal self.commands = commands self._command_tasks: list[asyncio.Task] = []
[docs] async def start(self): """Starts the connection to RabbitMQ.""" await self.client.start() for model in self.client.models.values(): model.register_callback(self.process_keyword) for command in self.commands: task = self.schedule_command(command, self.commands[command]) self._command_tasks.append(asyncio.create_task(task)) self.running = True
[docs] async def stop(self): """Closes the connection to Tron.""" for task in self._command_tasks: task.cancel() with suppress(asyncio.CancelledError): await task self._command_tasks = [] await self.client.stop() self.running = False
[docs] async def schedule_command(self, command: str, interval: float): """Schedules a command to be executed on an interval.""" actor = command.split(" ")[0] cmd_str = " ".join(command.split(" ")[1:]) while True: await self.client.send_command(actor, cmd_str, internal=self.internal) await asyncio.sleep(interval)
[docs] async def process_keyword(self, reply: AMQPReply): """Processes a keyword received from an actor.""" actor = reply.sender body = reply.body fields, grouppings = flatten_dict(body, groupers=self.groupers) if self.keywords: fields = {k: fields[k] for k in fields if k in self.keywords} tags = self.tags.copy() tags.update(grouppings) points = [ { "measurement": actor, "tags": tags, "fields": fields, } ] data_points = DataPoints(data=points, bucket=self.bucket) self.on_next(data_points)