diff --git a/.gitignore b/.gitignore index e0e9b8b..e6901a8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,193 @@ __pycache__/ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ .nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + + +# General +.DS_Store +.AppleDouble +.LSOverride + +# Icon must end with two \r +Icon + + +# Thumbnails +._* + +# Files that might appear in the root of a volume +.DocumentRevisions-V100 +.fseventsd +.Spotlight-V100 +.TemporaryItems +.Trashes +.VolumeIcon.icns +.com.apple.timemachine.donotpresent + +# Directories potentially created on remote AFP share +.AppleDB +.AppleDesktop +Network Trash Folder +Temporary Items +.apdisk + +#Local Ignore +/example/exn diff --git a/Readme.md b/Readme.md new file mode 100644 index 0000000..c1e7428 --- /dev/null +++ b/Readme.md @@ -0,0 +1,111 @@ +Of course, let's integrate everything into a cohesive guide. + +--- + +## **Using the `exn` Module: A Comprehensive Guide** + +The `exn` module serves as a connector, enabling communication between various components in a messaging infrastructure. This guide will take you through the basics to advanced usage of the module. + +### **Overview** + +- **Core Components:** + - **Bootstrap**: A foundational class setting up the readiness state. + - **CoreHandler**: Manages connection start, message reception, and timed tasks. + - **EXN**: Main connector class initializing connections and configurations. + +### **Basic Usage** + +1. **Initialize** the `EXN` class: + + ```python + connector = connector.EXN('ui', bootstrap=Bootstrap()) + ``` + +2. **Run** the connector to start: + + ```python + connector.start() + ``` + +### **Advanced Usage** + +#### **1. Enabling Health and State Monitoring** + +The `EXN` class offers two flags: `enable_health` and `enable_state`. + +- **`enable_health`:** Enables a scheduled publisher that sends a health-check ping at regular intervals. +- **`enable_state`:** Activates the `StatePublisher` to manage and signal the lifecycle states of the component. + +**Gradual Implementation:** + +**a. Basic Setup (No Flags):** + + ```python + connector = connector.EXN('ui', bootstrap=Bootstrap()) + ``` + +**b. Health Monitoring:** + + ```python + connector = connector.EXN('ui', bootstrap=Bootstrap(), enable_health=True) + ``` + +**c. Lifecycle State Monitoring:** + + ```python + connector = connector.EXN('ui', bootstrap=Bootstrap(), enable_health=True, enable_state=True) + ``` + +#### **2. The `ready` Function** + +The `ready` function in your Bootstrap class is called when the component is initialized. Use it to perform specific operations at startup: + +Example: + +```python +def ready(self, context): + if context.has_publisher('state'): + context.publishers['state'].starting() +``` + +#### **3. The Importance of the 'key' Argument** + +Each publisher is identified using a unique 'key', which is crucial when handling multiple publishers: + +Example: + +```python +context.publishers['state'].starting() +``` + +Here, 'state' is the 'key' for a specific publisher, directing it to send a 'starting' signal. + +### **Working with Context** + +The `Context` class in the `core` section aids in managing publishers and consumers within the application. Here's how: + +- Register a publisher: + + ```python + context.register_publisher(publisher) + ``` + +- Check if a publisher exists: + + ```python + context.has_publisher('key') + ``` + +- Build an address from a link: + + ```python + context.build_address_from_link(link) + ``` + +### **Conclusion** + +The `exn` module provides a robust platform for component communication, health and lifecycle management, and more. Whether you're looking for simple connectivity or advanced message routing with multiple publishers, this module has got you covered. + +--- + +This documentation offers an inclusive overview and guide for the `exn` module. Tailor it as per your specific project requirements or as the module evolves. \ No newline at end of file diff --git a/example/requirements.txt b/example/requirements.txt new file mode 100644 index 0000000..3e338bf --- /dev/null +++ b/example/requirements.txt @@ -0,0 +1 @@ +python-dotenv \ No newline at end of file diff --git a/example/test_exn_publisher.py b/example/test_exn_publisher.py new file mode 100644 index 0000000..37d900f --- /dev/null +++ b/example/test_exn_publisher.py @@ -0,0 +1,51 @@ +import logging +import time + +from exn import connector, core + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logging.getLogger('exn.connector').setLevel(logging.DEBUG) + + +class MyHandler(connector.ConnectorHandler): + + def ready(self, context): + if context.has_publisher('state'): + context.publishers['state'].starting() + context.publishers['state'].started() + context.publishers['state'].custom('forecasting') + context.publishers['state'].stopping() + context.publishers['state'].stopped() + + context.publishers['config'].send({ + 'hello': 'world' + }) + context.publishers['preferences'].send() + + +class MyPublisher(core.publisher.Publisher): + def __init__(self): + super().__init__('preferences', 'preferences.changed', True) + + def send(self, body={}): + body.update({ + "preferences": { + "dark_mode": True + } + }) + super(MyPublisher, self).send(body) + + +connector = connector.EXN('ui', handler=MyHandler() + , publishers=[ + core.publisher.Publisher('config', 'config', True), + MyPublisher(), + ], + enable_health=True, enable_state=False + ,url='localhost' + ,port=5672 + ,username="admin" + ,password="adming" + ) + +connector.start() diff --git a/example/test_exn_receiver.py b/example/test_exn_receiver.py new file mode 100644 index 0000000..6f703a1 --- /dev/null +++ b/example/test_exn_receiver.py @@ -0,0 +1,27 @@ +import logging + +from exn import connector, core + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logging.getLogger('exn.connector').setLevel(logging.DEBUG) + + +class Bootstrap(connector.ConnectorHandler): + + def on_message(self, key, address, body, context, **kwargs): + logging.info(f"Received {key} => {address}") + if key == 'ui_health': + logging.info(f"I am healthy => {body}") + + if key == 'ui_all': + logging.info(f"These are my preferences => {body}") + + +connector = connector.EXN('ui', handler=Bootstrap(), + consumers=[ + core.consumer.Consumer('ui_health', 'health', topic=True), + core.consumer.Consumer('ui_all', 'eu.nebulouscloud.ui.preferences.>', topic=True, + fqdn=True) + ]) + +connector.start() diff --git a/exn/__init__.py b/exn/__init__.py new file mode 100644 index 0000000..74c0c17 --- /dev/null +++ b/exn/__init__.py @@ -0,0 +1 @@ +from . import connector \ No newline at end of file diff --git a/exn/connector.py b/exn/connector.py new file mode 100644 index 0000000..8a62016 --- /dev/null +++ b/exn/connector.py @@ -0,0 +1,155 @@ +import logging +import os + +from dotenv import load_dotenv +from proton.handlers import MessagingHandler +from proton.reactor import Container + +from .core import context as core_context, state_publisher, schedule_publisher +from .settings import base + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +_logger = logging.getLogger(__name__) + +class ConnectorHandler: + def __init__(self): + self.initialized = False + + + def set_ready(self,ready, ctx:core_context.Context): + self.initialized = ready + self.ready(ctx) + + def ready(self, ctx:core_context.Context): + pass + + def on_message(self, key, address, body, context, **kwargs): + pass + + +class CoreHandler(MessagingHandler): + + def __init__(self, + context, + handler: ConnectorHandler, + publishers = [], + consumers = [], + ): + super(CoreHandler, self).__init__() + self.context=context + self.publishers=publishers + self.consumers=consumers + self.handler = handler + self.conn = None + + def on_start(self, event) -> None: + + self.conn = event.container.connect(self.context.connection) + for publisher in self.publishers: + _logger.info(f"{publisher.address} registering sender") + address = self.context.build_address_from_link(publisher) + publisher.set(event.container.create_sender(self.conn,address)) + self.context.register_publisher(publisher) + _logger.debug(f"{self.context.base} Registering timer { hasattr(publisher, 'delay')}") + if hasattr(publisher, "delay"): + _logger.debug(f"{self.context.base} Registering timer") + event.reactor.schedule(publisher.delay, self) + + for consumer in self.consumers: + address = self.context.build_address_from_link(consumer) + _logger.info(f"{self.context.base} Registering consumer {address}") + consumer.set(event.container.create_receiver(self.conn, address)) + self.context.register_consumers(consumer) + + def on_sendable(self, event): + if not self.handler.initialized: + self.handler.set_ready(True, self.context) + + def on_timer_task(self, event): + _logger.debug(f"{self.context.base} On timer") + for publisher in self._delay_publishers(): + publisher.send() + event.reactor.schedule(publisher.delay, self) + + def on_message(self, event): + try: + for consumer in self.consumers: + if consumer.should_handle(event): + _logger.debug(f"Received message: {event.message.address}") + self.handler.on_message(consumer.key, event.message.address, event.message.body, self.context, event=event) + except Exception as e: + _logger.error(f"Received message: {e}") + + + def close(self): + if self.conn: + self.conn.close() + else: + _logger.warning(f"{self.context.base} No open connection") + + def _delay_publishers(self): + return [p for p in self.publishers if hasattr(p,'delay')] + + +class EXN: + def __init__(self, component=None, + handler:ConnectorHandler = None, + publishers=[], + consumers=[], + **kwargs): + + # Load .env file + load_dotenv() + + # Validate and set connector + if not component: + _logger.error("Component cannot be empty or None") + raise ValueError("Component cannot be empty or None") + self.component = component + self.handler = handler + + self.url = kwargs.get('url',os.getenv('NEBULOUS_BROKER_URL')) + self.port = kwargs.get('port', os.getenv('NEBULOUS_BROKER_PORT')) + self.username = kwargs.get('username',os.getenv('NEBULOUS_BROKER_USERNAME')) + self.password = kwargs.get('password', os.getenv('NEBULOUS_BROKER_PASSWORD')) + + # Validate attributes + if not self.url: + _logger.error("URL cannot be empty or None") + raise ValueError("URL cannot be empty or None") + if not self.port: + _logger.error("PORT cannot be empty or None") + raise ValueError("PORT cannot be empty or None") + if not self.username: + _logger.error("USERNAME cannot be empty or None") + raise ValueError("USERNAME cannot be empty or None") + if not self.password: + _logger.error("PASSWORD cannot be empty or None") + raise ValueError("PASSWORD cannot be empty or None") + + ctx = core_context.Context( + connection=f"{self.url}:{self.port}", + base=f"{base.NEBULOUS_BASE_NAME}.{self.component}", + ) + + if kwargs.get("enable_state",False): + publishers.append(state_publisher.Publisher()) + + if kwargs.get("enable_health",False): + publishers.append(schedule_publisher.Publisher( + base.NEBULOUS_DEFAULT_HEALTH_CHECK_TIMEOUT, + 'health', + 'health', + True)) + + core_handler = CoreHandler( + ctx, + handler, + publishers, + consumers + ) + + self.container = Container(core_handler) + + def start(self): + self.container.run() diff --git a/exn/core/__init__.py b/exn/core/__init__.py new file mode 100644 index 0000000..bdc524b --- /dev/null +++ b/exn/core/__init__.py @@ -0,0 +1,8 @@ + + +from . import context +from . import publisher +from . import consumer +from . import state_publisher +from . import schedule_publisher + diff --git a/exn/core/consumer.py b/exn/core/consumer.py new file mode 100644 index 0000000..e5c7424 --- /dev/null +++ b/exn/core/consumer.py @@ -0,0 +1,17 @@ +import datetime + +from proton import Message, Event +from . import link +import logging + +_logger = logging.getLogger(__name__) + + +class Consumer(link.Link): + + def on_message(self, body, **kwargs): + _logger.debug(f"{self.address} Got {body} ") + + def should_handle(self, event: Event): + if event.link == self._link: + return True diff --git a/exn/core/context.py b/exn/core/context.py new file mode 100644 index 0000000..1b99056 --- /dev/null +++ b/exn/core/context.py @@ -0,0 +1,63 @@ +from . import link + + +class Context: + + def __init__(self, connection, base): + + self.connection = connection + self.base = base + self.publishers = {} + self.consumers = {} + + def get_publisher(self, key): + if key in self.publishers: + return self.publishers[key] + return None + + def has_publisher(self, key): + return key in self.publishers + + def has_consumer(self, key): + return key in self.consumers + + def register_publisher(self, publisher): + self.publishers[publisher.key] = publisher + + def register_consumers(self, consumer): + self.consumers[consumer.key] = consumer + + def build_address_from_link(self, link: link.Link): + + if link.fqdn: + address = link.address + if link.topic and not link.address.startswith("topic://"): + address = f"topic://{address}" + return address + + address = f"{self.base}.{link.address}" + if link.topic: + address = f"topic://{address}" + + return address + + def match_address(self, l: link.Link, event): + + if not event \ + or not event.message \ + or not event.message.address: + return False + + address = self.build_address_from_link(l) + return address == event.message.address + + def build_address(self, *actions, topic=False): + + if len(actions) <= 0: + return self.base + + address = f"{self.base}.{'.'.join(actions)}" + if topic: + address = f"topic://{address}" + + return address diff --git a/exn/core/link.py b/exn/core/link.py new file mode 100644 index 0000000..65d6395 --- /dev/null +++ b/exn/core/link.py @@ -0,0 +1,18 @@ + +from proton import Link as pLink +class Link: + + fqdn=False + def __init__(self, key, address, topic=False, fqdn=False): + + self.key = key + self.address = address + self._link = None + self.topic= topic + self.fqdn= fqdn + + + def set(self, link:pLink): + # The proton container creates a sender + # so we just use composition instead of extension + self._link = link diff --git a/exn/core/publisher.py b/exn/core/publisher.py new file mode 100644 index 0000000..2768c5d --- /dev/null +++ b/exn/core/publisher.py @@ -0,0 +1,33 @@ +import datetime +import logging + +from proton import Message + +from . import link + +_logger = logging.getLogger(__name__) + + +class Publisher(link.Link): + + def send(self, body=None): + if not body: + body = {} + + _logger.debug(f"{self.address} Sending {body} ") + msg = self._prepare_message(body) + self._link.send(msg) + + def _prepare_message(self, body=None): + + if not body: + body = {} + + send = {"when": datetime.datetime.utcnow().isoformat()} + send.update(body) + msg = Message( + address=self._link.target.address, + body=send + ) + msg.content_type = "application/json" + return msg diff --git a/exn/core/schedule_publisher.py b/exn/core/schedule_publisher.py new file mode 100644 index 0000000..9cf8b64 --- /dev/null +++ b/exn/core/schedule_publisher.py @@ -0,0 +1,14 @@ +import logging + +from . import publisher + +_logger = logging.getLogger(__name__) + + +class Publisher(publisher.Publisher): + send_next = False + delay = 15 + + def __init__(self, delay, key, address, topic=False): + super(Publisher, self).__init__(key, address, topic) + self.delay = delay diff --git a/exn/core/state_publisher.py b/exn/core/state_publisher.py new file mode 100644 index 0000000..8ea6af7 --- /dev/null +++ b/exn/core/state_publisher.py @@ -0,0 +1,45 @@ +import datetime +import json +from enum import Enum + +from proton import Message + +from . import publisher + +import logging + +_logger = logging.getLogger(__name__) + +class States(Enum): + + STARTING = "starting" + STARTED = "started" + READY = "ready" + STOPPING = "stopping" + STOPPED = "stopped" + +class Publisher(publisher.Publisher): + + def __init__(self): + super().__init__("state","state", True) + + def _send_message(self, message_type): + self.send({"state": message_type,"message": None}) + + def starting(self): + self._send_message(States.STARTING) + + def started(self): + self._send_message(States.STARTED) + + def ready(self): + self._send_message(States.READY) + + def stopping(self): + self._send_message(States.STOPPING) + + def stopped(self): + self._send_message(States.STOPPED) + + def custom(self, state): + self._send_message(state) diff --git a/exn/settings/__init__.py b/exn/settings/__init__.py new file mode 100644 index 0000000..cf50d1c --- /dev/null +++ b/exn/settings/__init__.py @@ -0,0 +1 @@ +from . import base \ No newline at end of file diff --git a/exn/settings/base.py b/exn/settings/base.py new file mode 100644 index 0000000..baf5475 --- /dev/null +++ b/exn/settings/base.py @@ -0,0 +1,2 @@ +NEBULOUS_BASE_NAME="eu.nebulouscloud" +NEBULOUS_DEFAULT_HEALTH_CHECK_TIMEOUT=15 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..bf59ab4 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +python-qpid-proton \ No newline at end of file