[zmq] Proxy has to skip broken multi-part message
If received message is malformed all it's parts should be skipped at once, so we will not receive them on next iterations. Change-Id: I5d95a46696ea3190a58c89c3b78eaf57911baa9e Closes-Bug: #1617566
This commit is contained in:
parent
41564c40bd
commit
68669484bf
@ -21,7 +21,7 @@ from oslo_messaging._drivers.zmq_driver import zmq_async
|
|||||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_socket
|
from oslo_messaging._drivers.zmq_driver import zmq_socket
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_updater
|
from oslo_messaging._drivers.zmq_driver import zmq_updater
|
||||||
from oslo_messaging._i18n import _LI
|
from oslo_messaging._i18n import _LE, _LI
|
||||||
|
|
||||||
zmq = zmq_async.import_zmq()
|
zmq = zmq_async.import_zmq()
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
@ -91,8 +91,13 @@ class UniversalQueueProxy(object):
|
|||||||
payload.insert(0, routing_key)
|
payload.insert(0, routing_key)
|
||||||
payload.insert(0, msg_type)
|
payload.insert(0, msg_type)
|
||||||
return payload
|
return payload
|
||||||
except (AssertionError, ValueError, zmq.ZMQError):
|
except (AssertionError, ValueError):
|
||||||
LOG.error("Received message with wrong format")
|
LOG.error(_LE("Received message with wrong format"))
|
||||||
|
if socket.getsockopt(zmq.RCVMORE):
|
||||||
|
# NOTE(ozamiatin): Drop the left parts of broken message
|
||||||
|
socket.recv_multipart()
|
||||||
|
except zmq.ZMQError as e:
|
||||||
|
LOG.exception(e)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@ -114,6 +119,7 @@ class UniversalQueueProxy(object):
|
|||||||
socket.send_multipart(multipart_message)
|
socket.send_multipart(multipart_message)
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
|
self.poller.close()
|
||||||
self.fe_router_socket.close()
|
self.fe_router_socket.close()
|
||||||
self.be_router_socket.close()
|
self.be_router_socket.close()
|
||||||
self.pub_publisher.cleanup()
|
self.pub_publisher.cleanup()
|
||||||
|
@ -85,8 +85,11 @@ class ZmqSocket(object):
|
|||||||
def send(self, *args, **kwargs):
|
def send(self, *args, **kwargs):
|
||||||
self.handle.send(*args, **kwargs)
|
self.handle.send(*args, **kwargs)
|
||||||
|
|
||||||
def send_string(self, *args, **kwargs):
|
def send_string(self, value, *args, **kwargs):
|
||||||
self.handle.send_string(*args, **kwargs)
|
# NOTE(ozamiatin): Not using send_string until
|
||||||
|
# eventlet zmq support this convenience method
|
||||||
|
# in thread-safe manner
|
||||||
|
self.handle.send(six.b(value), *args, **kwargs)
|
||||||
|
|
||||||
def send_json(self, *args, **kwargs):
|
def send_json(self, *args, **kwargs):
|
||||||
self.handle.send_json(*args, **kwargs)
|
self.handle.send_json(*args, **kwargs)
|
||||||
@ -109,7 +112,12 @@ class ZmqSocket(object):
|
|||||||
return self.handle.recv(*args, **kwargs)
|
return self.handle.recv(*args, **kwargs)
|
||||||
|
|
||||||
def recv_string(self, *args, **kwargs):
|
def recv_string(self, *args, **kwargs):
|
||||||
return self.handle.recv_string(*args, **kwargs)
|
# NOTE(ozamiatin): Not using recv_string until
|
||||||
|
# eventlet zmq support this convenience method
|
||||||
|
# in thread-safe manner
|
||||||
|
result = self.handle.recv(*args, **kwargs)
|
||||||
|
return result.decode('utf-8') if six.PY3 and \
|
||||||
|
isinstance(result, six.binary_type) else result
|
||||||
|
|
||||||
def recv_json(self, *args, **kwargs):
|
def recv_json(self, *args, **kwargs):
|
||||||
return self.handle.recv_json(*args, **kwargs)
|
return self.handle.recv_json(*args, **kwargs)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user