exn-connector-python/exn/connector.py
Fotis Paraskevopoulos 2144911594 Aligned python and java, dynamic consumers for python
Change-Id: I2b63a46431adc4adcb0a4fcbf12012f0077c3414
2024-01-13 19:28:08 +02:00

85 lines
2.7 KiB
Python

import logging
import os
from proton.reactor import Container
from exn.core import state_publisher, schedule_publisher
from exn.core.context import Context
from .core.manager import Manager
from .settings import base
from .handler import connector_handler
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
_logger = logging.getLogger(__name__)
class EXN:
context = None
container = None
def __init__(self, component=None,
handler:connector_handler.ConnectorHandler = None,
publishers=None,
consumers=None,
**kwargs):
# Load .env file
# Validate and set connector
if not component:
_logger.error("Component cannot be empty or None")
raise ValueError("Component cannot be empty or None")
self.component = component
self.url = kwargs.get('url',os.getenv('NEBULOUS_BROKER_URL'))
self.port = kwargs.get('port', os.getenv('NEBULOUS_BROKER_PORT'))
self.username = kwargs.get('username',os.getenv('NEBULOUS_BROKER_USERNAME'))
self.password = kwargs.get('password', os.getenv('NEBULOUS_BROKER_PASSWORD'))
self.handler = handler
# Validate attributes
if not self.url:
_logger.error("URL cannot be empty or None")
raise ValueError("URL cannot be empty or None")
if not self.port:
_logger.error("PORT cannot be empty or None")
raise ValueError("PORT cannot be empty or None")
if not self.username:
_logger.error("USERNAME cannot be empty or None")
raise ValueError("USERNAME cannot be empty or None")
if not self.password:
_logger.error("PASSWORD cannot be empty or None")
raise ValueError("PASSWORD cannot be empty or None")
self.context = Context(base=f"{base.NEBULOUS_BASE_NAME}.{self.component}")
if not publishers:
publishers = []
if not consumers:
consumers = []
compiled_publishers = publishers
if kwargs.get("enable_state",False):
compiled_publishers.append(state_publisher.Publisher())
if kwargs.get("enable_health",False):
compiled_publishers.append(schedule_publisher.Publisher(
base.NEBULOUS_DEFAULT_HEALTH_CHECK_TIMEOUT,
'health',
'health',
topic=True))
for c in consumers:
self.context.register_consumers(c)
for p in compiled_publishers:
self.context.register_publisher(p)
def start(self):
self.context.start(Manager(f"{self.url}:{self.port}"),self.handler)
def stop(self):
self.context.stop()