diff --git a/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py index 862e6b9d0..056d2665e 100644 --- a/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py +++ b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py @@ -12,12 +12,15 @@ # License for the specific language governing permissions and limitations # under the License. +import logging import threading import eventlet from oslo_messaging._drivers.zmq_driver import zmq_poller +LOG = logging.getLogger(__name__) + class GreenPoller(zmq_poller.ZmqPoller): @@ -27,6 +30,7 @@ class GreenPoller(zmq_poller.ZmqPoller): def register(self, socket, recv_method=None): if socket not in self.thread_by_socket: + LOG.debug("Registering socket %s", socket.handle.identity) self.thread_by_socket[socket] = eventlet.spawn( self._socket_receive, socket, recv_method ) @@ -34,6 +38,7 @@ class GreenPoller(zmq_poller.ZmqPoller): def unregister(self, socket): thread = self.thread_by_socket.pop(socket, None) if thread: + LOG.debug("Unregistering socket %s", socket.handle.identity) thread.kill() def _socket_receive(self, socket, recv_method=None): diff --git a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py index 9589fd162..0b9b3320c 100644 --- a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py +++ b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py @@ -27,35 +27,40 @@ class ThreadingPoller(zmq_poller.ZmqPoller): def __init__(self): self.poller = zmq.Poller() - self.recv_methods = {} + self.sockets_and_recv_methods = {} def register(self, socket, recv_method=None): - if socket in self.recv_methods: + socket_handle = socket.handle + if socket_handle in self.sockets_and_recv_methods: return - LOG.debug("Registering socket") - if recv_method is not None: - self.recv_methods[socket] = recv_method - self.poller.register(socket, zmq.POLLIN) + LOG.debug("Registering socket %s", socket_handle.identity) + self.sockets_and_recv_methods[socket_handle] = (socket, recv_method) + self.poller.register(socket_handle, zmq.POLLIN) def unregister(self, socket): - self.recv_methods.pop(socket, None) - self.poller.unregister(socket) + socket_handle = socket.handle + socket_and_recv_method = \ + self.sockets_and_recv_methods.pop(socket_handle, None) + if socket_and_recv_method: + LOG.debug("Unregistering socket %s", socket_handle.identity) + self.poller.unregister(socket_handle) def poll(self, timeout=None): if timeout is not None and timeout > 0: timeout *= 1000 # convert seconds to milliseconds - sockets = {} + socket_handles = {} try: - sockets = dict(self.poller.poll(timeout=timeout)) + socket_handles = dict(self.poller.poll(timeout=timeout)) except zmq.ZMQError as e: LOG.debug("Polling terminated with error: %s", e) - if not sockets: + if not socket_handles: return None, None - for socket in sockets: - if socket in self.recv_methods: - return self.recv_methods[socket](socket), socket + for socket_handle in socket_handles: + socket, recv_method = self.sockets_and_recv_methods[socket_handle] + if recv_method: + return recv_method(socket), socket else: return socket.recv_multipart(), socket diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py index 52e79fe1c..1b2ebd433 100644 --- a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py @@ -49,10 +49,8 @@ class UniversalQueueProxy(object): conf.zmq_proxy_opts.backend_port) if port != 0 else \ zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER, host) - self.poller.register(self.fe_router_socket.handle, - self._receive_in_request) - self.poller.register(self.be_router_socket.handle, - self._receive_in_request) + self.poller.register(self.fe_router_socket, self._receive_in_request) + self.poller.register(self.be_router_socket, self._receive_in_request) self.pub_publisher = zmq_publisher_proxy.PublisherProxy( conf, matchmaker) @@ -73,9 +71,9 @@ class UniversalQueueProxy(object): zmq_names.NOTIFY_TYPE): self.pub_publisher.send_request(message) else: - self._redirect_message(self.be_router_socket.handle - if socket is self.fe_router_socket.handle - else self.fe_router_socket.handle, message) + self._redirect_message(self.be_router_socket + if socket is self.fe_router_socket + else self.fe_router_socket, message) @staticmethod def _receive_in_request(socket): diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_poller.py b/oslo_messaging/_drivers/zmq_driver/zmq_poller.py index 678741d9a..c2abd4af3 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_poller.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_poller.py @@ -57,7 +57,7 @@ class ZmqPoller(object): """Register socket to poll :param socket: Socket to subscribe for polling - :type socket: zmq.Socket + :type socket: ZmqSocket :param recv_method: Optional specific receiver procedure Should return received message object :type recv_method: callable @@ -67,7 +67,7 @@ class ZmqPoller(object): """Unregister socket from poll :param socket: Socket to unsubscribe from polling - :type socket: zmq.Socket + :type socket: ZmqSocket """ @abc.abstractmethod diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py index 7fc113f6b..1b9fc0867 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py @@ -82,6 +82,12 @@ class ZmqSocket(object): def setsockopt_string(self, *args, **kwargs): self.handle.setsockopt_string(*args, **kwargs) + def getsockopt(self, *args, **kwargs): + return self.handle.getsockopt(*args, **kwargs) + + def getsockopt_string(self, *args, **kwargs): + return self.handle.getsockopt_string(*args, **kwargs) + def send(self, *args, **kwargs): self.handle.send(*args, **kwargs)