Source code for servicemon.plugin_support

import re
from importlib import import_module
from abc import ABC, abstractmethod

from pathlib import Path

from servicemon import builtin_plugins


class SmPluginSupport(ABC):

    _subclasses = {}
    _default_user_plugin_dir = 'plugins'

    @classmethod
    def __init_subclass__(cls, plugin_name=None, description='', **kwargs):
        super().__init_subclass__(**kwargs)
        if plugin_name is None:
            plugin_name = cls.__name__
        cls._subclasses[plugin_name] = SmPluginDesc(plugin_name, cls, description)
        # {'cls': cls, 'description': description}

    @classmethod
    def load_builtin_plugins(cls):
        for dirstr in builtin_plugins.__path__:
            path = Path(dirstr)
            for child in path.iterdir():
                if cls.__is_python_file(child):
                    module = 'servicemon.builtin_plugins.' + child.stem
                    import_module(module)

    @classmethod
    def load_plugins(cls, plugins=None):
        if plugins is None:
            plugins = cls._default_user_plugin_dir

        plugin_path = Path(plugins)
        if cls.__is_python_file(plugin_path):
            cls.__load_file(plugin_path)
        elif plugin_path.is_dir():
            for child in plugin_path.iterdir():
                if cls.__is_python_file(child):
                    cls.__load_file(child)
        else:
            # Only error on non-default file/directory.
            if plugins != cls._default_user_plugin_dir:
                raise FileNotFoundError(f"Plugin file/dir doesn't exist: {plugin_path}")

    @classmethod
    def list_plugins(cls):
        for k in cls._subclasses:
            print(f'subclass key = {k}, value = {cls._subclasses[k]}')

    @classmethod
    def get_plugin(cls, plugin_name):
        plugin = cls._subclasses.get(plugin_name, None)
        return plugin

    @classmethod
    def get_plugin_from_spec(cls, spec):
        parsed = cls.parse_spec(spec)
        plugin = cls.get_plugin(parsed['plugin_name'])
        if plugin is None:
            raise ValueError(f'No plugin found for spec: {spec}')
        full_plugin = SmPluginDesc(plugin.name, plugin.cls, plugin.description, **parsed['kwargs'])
        return full_plugin

    @classmethod
    def parse_spec(cls, spec):
        """
        The plugin specification is a string of the form:

        <plugin> :== <plugin_name> | <plugin_name> ":" <kwargs>
        <kwargs> :== <kwarg> | <kwarg> "," <kwargs>
        <kwarg> :== <key> "=" <value>
        <plugin_name> :== string with no ":"
        <key> :== string with no "=" or ","
        <value> :== string with no ","

        Whitespace will be ignored.

        """
        parsed = None
        stripped = re.sub(r"\s+", "", spec, flags=re.UNICODE)

        # Get the plug-in name.
        matches = re.match(r"^(?P<plugin_name>[^:]*)", stripped)
        if matches is not None:
            next_idx = 0
            parsed = {"plugin_name": matches.group('plugin_name'),
                      "kwargs": {}}

            # Get the keyword arguments.
            while matches is not None:
                next_idx += matches.end() + 1   # Skip the colon or comma, if any.
                matches = re.match(r"(?P<key>[^=,]+)=(?P<value>[^,]+)",
                                   stripped[next_idx:])
                if matches is not None:
                    parsed['kwargs'][matches.group('key')] = matches.group('value')

            # If we didn't finish matching the whole spec, the spec is not valid.
            if next_idx - 1 != len(stripped):
                raise ValueError(f'Invalid plug-in specification: "{spec}"')

        return parsed

    @classmethod
    def __load_file(cls, path: Path):
        python_code = path.read_text(encoding='utf-8')
        compiled_code = compile(python_code, path.name, 'exec')
        exec(compiled_code, globals())

    @classmethod
    def __is_python_file(cls, path: Path):
        is_python_file = path.is_file() and path.suffix == '.py'
        return is_python_file


class SmPluginDesc():
    def __init__(self, name, cls, description, **kwargs):
        self.name = name
        self.cls = cls
        self.description = description
        self.kwargs = kwargs


class AbstractResultWriter(SmPluginSupport):

    _subclasses = {}

[docs] @abstractmethod def begin(self, args, **kwargs): """ **args** : argparse.Namespace the result of an argparse.ArgumentParser's parse_args(). **kwargs** : dict keyword args from the plug-in specification. """ pass
[docs] @abstractmethod def one_result(self, stats): """ **stats** : obj an object with the following methods: **columns()** : list of str list of output column names **row_values()** : dict dict of output values, one key per column name """ pass
[docs] @abstractmethod def end(self): pass
class AbstractTimedQuery(SmPluginSupport): _subclasses = {}