[zmq] Make ThreadingPoller work with ZmqSocket
This patch makes zmq-driver-pollers work only with zmq-driver-sockets since the lower-level zmq-socket class have to be used only as an underlying implementation for the higher-level zmq-driver-socket class. Change-Id: Icb6fe89a3af7af6760b724a313ad2a7fb2a106c7 Closes-Bug: #1620543
This commit is contained in:
parent
0706361141
commit
3aa59828f9
oslo_messaging/_drivers/zmq_driver
@ -12,12 +12,15 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import threading
|
||||
|
||||
import eventlet
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_poller
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class GreenPoller(zmq_poller.ZmqPoller):
|
||||
|
||||
@ -27,6 +30,7 @@ class GreenPoller(zmq_poller.ZmqPoller):
|
||||
|
||||
def register(self, socket, recv_method=None):
|
||||
if socket not in self.thread_by_socket:
|
||||
LOG.debug("Registering socket %s", socket.handle.identity)
|
||||
self.thread_by_socket[socket] = eventlet.spawn(
|
||||
self._socket_receive, socket, recv_method
|
||||
)
|
||||
@ -34,6 +38,7 @@ class GreenPoller(zmq_poller.ZmqPoller):
|
||||
def unregister(self, socket):
|
||||
thread = self.thread_by_socket.pop(socket, None)
|
||||
if thread:
|
||||
LOG.debug("Unregistering socket %s", socket.handle.identity)
|
||||
thread.kill()
|
||||
|
||||
def _socket_receive(self, socket, recv_method=None):
|
||||
|
@ -27,35 +27,40 @@ class ThreadingPoller(zmq_poller.ZmqPoller):
|
||||
|
||||
def __init__(self):
|
||||
self.poller = zmq.Poller()
|
||||
self.recv_methods = {}
|
||||
self.sockets_and_recv_methods = {}
|
||||
|
||||
def register(self, socket, recv_method=None):
|
||||
if socket in self.recv_methods:
|
||||
socket_handle = socket.handle
|
||||
if socket_handle in self.sockets_and_recv_methods:
|
||||
return
|
||||
LOG.debug("Registering socket")
|
||||
if recv_method is not None:
|
||||
self.recv_methods[socket] = recv_method
|
||||
self.poller.register(socket, zmq.POLLIN)
|
||||
LOG.debug("Registering socket %s", socket_handle.identity)
|
||||
self.sockets_and_recv_methods[socket_handle] = (socket, recv_method)
|
||||
self.poller.register(socket_handle, zmq.POLLIN)
|
||||
|
||||
def unregister(self, socket):
|
||||
self.recv_methods.pop(socket, None)
|
||||
self.poller.unregister(socket)
|
||||
socket_handle = socket.handle
|
||||
socket_and_recv_method = \
|
||||
self.sockets_and_recv_methods.pop(socket_handle, None)
|
||||
if socket_and_recv_method:
|
||||
LOG.debug("Unregistering socket %s", socket_handle.identity)
|
||||
self.poller.unregister(socket_handle)
|
||||
|
||||
def poll(self, timeout=None):
|
||||
if timeout is not None and timeout > 0:
|
||||
timeout *= 1000 # convert seconds to milliseconds
|
||||
|
||||
sockets = {}
|
||||
socket_handles = {}
|
||||
try:
|
||||
sockets = dict(self.poller.poll(timeout=timeout))
|
||||
socket_handles = dict(self.poller.poll(timeout=timeout))
|
||||
except zmq.ZMQError as e:
|
||||
LOG.debug("Polling terminated with error: %s", e)
|
||||
|
||||
if not sockets:
|
||||
if not socket_handles:
|
||||
return None, None
|
||||
for socket in sockets:
|
||||
if socket in self.recv_methods:
|
||||
return self.recv_methods[socket](socket), socket
|
||||
for socket_handle in socket_handles:
|
||||
socket, recv_method = self.sockets_and_recv_methods[socket_handle]
|
||||
if recv_method:
|
||||
return recv_method(socket), socket
|
||||
else:
|
||||
return socket.recv_multipart(), socket
|
||||
|
||||
|
@ -49,10 +49,8 @@ class UniversalQueueProxy(object):
|
||||
conf.zmq_proxy_opts.backend_port) if port != 0 else \
|
||||
zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER, host)
|
||||
|
||||
self.poller.register(self.fe_router_socket.handle,
|
||||
self._receive_in_request)
|
||||
self.poller.register(self.be_router_socket.handle,
|
||||
self._receive_in_request)
|
||||
self.poller.register(self.fe_router_socket, self._receive_in_request)
|
||||
self.poller.register(self.be_router_socket, self._receive_in_request)
|
||||
|
||||
self.pub_publisher = zmq_publisher_proxy.PublisherProxy(
|
||||
conf, matchmaker)
|
||||
@ -73,9 +71,9 @@ class UniversalQueueProxy(object):
|
||||
zmq_names.NOTIFY_TYPE):
|
||||
self.pub_publisher.send_request(message)
|
||||
else:
|
||||
self._redirect_message(self.be_router_socket.handle
|
||||
if socket is self.fe_router_socket.handle
|
||||
else self.fe_router_socket.handle, message)
|
||||
self._redirect_message(self.be_router_socket
|
||||
if socket is self.fe_router_socket
|
||||
else self.fe_router_socket, message)
|
||||
|
||||
@staticmethod
|
||||
def _receive_in_request(socket):
|
||||
|
@ -57,7 +57,7 @@ class ZmqPoller(object):
|
||||
"""Register socket to poll
|
||||
|
||||
:param socket: Socket to subscribe for polling
|
||||
:type socket: zmq.Socket
|
||||
:type socket: ZmqSocket
|
||||
:param recv_method: Optional specific receiver procedure
|
||||
Should return received message object
|
||||
:type recv_method: callable
|
||||
@ -67,7 +67,7 @@ class ZmqPoller(object):
|
||||
"""Unregister socket from poll
|
||||
|
||||
:param socket: Socket to unsubscribe from polling
|
||||
:type socket: zmq.Socket
|
||||
:type socket: ZmqSocket
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
|
@ -82,6 +82,12 @@ class ZmqSocket(object):
|
||||
def setsockopt_string(self, *args, **kwargs):
|
||||
self.handle.setsockopt_string(*args, **kwargs)
|
||||
|
||||
def getsockopt(self, *args, **kwargs):
|
||||
return self.handle.getsockopt(*args, **kwargs)
|
||||
|
||||
def getsockopt_string(self, *args, **kwargs):
|
||||
return self.handle.getsockopt_string(*args, **kwargs)
|
||||
|
||||
def send(self, *args, **kwargs):
|
||||
self.handle.send(*args, **kwargs)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user