From 68669484bf03a87b396478d4d957614b8911883a Mon Sep 17 00:00:00 2001 From: ozamiatin Date: Sat, 27 Aug 2016 16:14:41 +0300 Subject: [PATCH] [zmq] Proxy has to skip broken multi-part message If received message is malformed all it's parts should be skipped at once, so we will not receive them on next iterations. Change-Id: I5d95a46696ea3190a58c89c3b78eaf57911baa9e Closes-Bug: #1617566 --- .../_drivers/zmq_driver/proxy/zmq_queue_proxy.py | 14 ++++++++++---- oslo_messaging/_drivers/zmq_driver/zmq_socket.py | 14 +++++++++++--- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py index 0ff776a73..52e79fe1c 100644 --- a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py @@ -21,7 +21,7 @@ from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._drivers.zmq_driver import zmq_socket from oslo_messaging._drivers.zmq_driver import zmq_updater -from oslo_messaging._i18n import _LI +from oslo_messaging._i18n import _LE, _LI zmq = zmq_async.import_zmq() LOG = logging.getLogger(__name__) @@ -91,9 +91,14 @@ class UniversalQueueProxy(object): payload.insert(0, routing_key) payload.insert(0, msg_type) return payload - except (AssertionError, ValueError, zmq.ZMQError): - LOG.error("Received message with wrong format") - return None + except (AssertionError, ValueError): + LOG.error(_LE("Received message with wrong format")) + if socket.getsockopt(zmq.RCVMORE): + # NOTE(ozamiatin): Drop the left parts of broken message + socket.recv_multipart() + except zmq.ZMQError as e: + LOG.exception(e) + return None @staticmethod def _redirect_message(socket, multipart_message): @@ -114,6 +119,7 @@ class UniversalQueueProxy(object): socket.send_multipart(multipart_message) def cleanup(self): + self.poller.close() self.fe_router_socket.close() self.be_router_socket.close() self.pub_publisher.cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py index e3c76a143..7fc113f6b 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py @@ -85,8 +85,11 @@ class ZmqSocket(object): def send(self, *args, **kwargs): self.handle.send(*args, **kwargs) - def send_string(self, *args, **kwargs): - self.handle.send_string(*args, **kwargs) + def send_string(self, value, *args, **kwargs): + # NOTE(ozamiatin): Not using send_string until + # eventlet zmq support this convenience method + # in thread-safe manner + self.handle.send(six.b(value), *args, **kwargs) def send_json(self, *args, **kwargs): self.handle.send_json(*args, **kwargs) @@ -109,7 +112,12 @@ class ZmqSocket(object): return self.handle.recv(*args, **kwargs) def recv_string(self, *args, **kwargs): - return self.handle.recv_string(*args, **kwargs) + # NOTE(ozamiatin): Not using recv_string until + # eventlet zmq support this convenience method + # in thread-safe manner + result = self.handle.recv(*args, **kwargs) + return result.decode('utf-8') if six.PY3 and \ + isinstance(result, six.binary_type) else result def recv_json(self, *args, **kwargs): return self.handle.recv_json(*args, **kwargs)