Merge "[zmq] Proxy has to skip broken multi-part message"

This commit is contained in:
Jenkins 2016-08-31 16:31:45 +00:00 committed by Gerrit Code Review
commit 5cb675a2c6
2 changed files with 21 additions and 7 deletions

View File

@ -21,7 +21,7 @@ from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket from oslo_messaging._drivers.zmq_driver import zmq_socket
from oslo_messaging._drivers.zmq_driver import zmq_updater from oslo_messaging._drivers.zmq_driver import zmq_updater
from oslo_messaging._i18n import _LI from oslo_messaging._i18n import _LE, _LI
zmq = zmq_async.import_zmq() zmq = zmq_async.import_zmq()
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -91,8 +91,13 @@ class UniversalQueueProxy(object):
payload.insert(0, routing_key) payload.insert(0, routing_key)
payload.insert(0, msg_type) payload.insert(0, msg_type)
return payload return payload
except (AssertionError, ValueError, zmq.ZMQError): except (AssertionError, ValueError):
LOG.error("Received message with wrong format") LOG.error(_LE("Received message with wrong format"))
if socket.getsockopt(zmq.RCVMORE):
# NOTE(ozamiatin): Drop the left parts of broken message
socket.recv_multipart()
except zmq.ZMQError as e:
LOG.exception(e)
return None return None
@staticmethod @staticmethod
@ -114,6 +119,7 @@ class UniversalQueueProxy(object):
socket.send_multipart(multipart_message) socket.send_multipart(multipart_message)
def cleanup(self): def cleanup(self):
self.poller.close()
self.fe_router_socket.close() self.fe_router_socket.close()
self.be_router_socket.close() self.be_router_socket.close()
self.pub_publisher.cleanup() self.pub_publisher.cleanup()

View File

@ -85,8 +85,11 @@ class ZmqSocket(object):
def send(self, *args, **kwargs): def send(self, *args, **kwargs):
self.handle.send(*args, **kwargs) self.handle.send(*args, **kwargs)
def send_string(self, *args, **kwargs): def send_string(self, value, *args, **kwargs):
self.handle.send_string(*args, **kwargs) # NOTE(ozamiatin): Not using send_string until
# eventlet zmq support this convenience method
# in thread-safe manner
self.handle.send(six.b(value), *args, **kwargs)
def send_json(self, *args, **kwargs): def send_json(self, *args, **kwargs):
self.handle.send_json(*args, **kwargs) self.handle.send_json(*args, **kwargs)
@ -109,7 +112,12 @@ class ZmqSocket(object):
return self.handle.recv(*args, **kwargs) return self.handle.recv(*args, **kwargs)
def recv_string(self, *args, **kwargs): def recv_string(self, *args, **kwargs):
return self.handle.recv_string(*args, **kwargs) # NOTE(ozamiatin): Not using recv_string until
# eventlet zmq support this convenience method
# in thread-safe manner
result = self.handle.recv(*args, **kwargs)
return result.decode('utf-8') if six.PY3 and \
isinstance(result, six.binary_type) else result
def recv_json(self, *args, **kwargs): def recv_json(self, *args, **kwargs):
return self.handle.recv_json(*args, **kwargs) return self.handle.recv_json(*args, **kwargs)