Port ZMQ driver to Python 3
* Fix bytes/unicode issues in the ZMQ driver * Add pyzmq test dependency on Python 3 * Enable (indirectly) ZMQ driver tests on Python 3 Change-Id: I812f1ec7ad2dcd8e16af70d6f8f9bc3cf7a2225c Depends-on: I0efae1c91c5d830156b867d7d21b5c0065094665
This commit is contained in:
parent
1305d3e3b8
commit
45ca27a8a7
@ -106,7 +106,7 @@ def _serialize(data):
|
|||||||
|
|
||||||
def _deserialize(data):
|
def _deserialize(data):
|
||||||
"""Deserialization wrapper."""
|
"""Deserialization wrapper."""
|
||||||
LOG.debug("Deserializing: %s", data)
|
LOG.debug("Deserializing: %r", data)
|
||||||
return jsonutils.loads(data)
|
return jsonutils.loads(data)
|
||||||
|
|
||||||
|
|
||||||
@ -180,7 +180,10 @@ class ZmqSocket(object):
|
|||||||
LOG.debug("Subscribing to %s", msg_filter)
|
LOG.debug("Subscribing to %s", msg_filter)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.sock.setsockopt(zmq.SUBSCRIBE, msg_filter)
|
arg = msg_filter
|
||||||
|
if six.PY3:
|
||||||
|
arg = arg.encode('utf-8')
|
||||||
|
self.sock.setsockopt(zmq.SUBSCRIBE, arg)
|
||||||
except Exception:
|
except Exception:
|
||||||
return
|
return
|
||||||
|
|
||||||
@ -190,7 +193,10 @@ class ZmqSocket(object):
|
|||||||
"""Unsubscribe."""
|
"""Unsubscribe."""
|
||||||
if msg_filter not in self.subscriptions:
|
if msg_filter not in self.subscriptions:
|
||||||
return
|
return
|
||||||
self.sock.setsockopt(zmq.UNSUBSCRIBE, msg_filter)
|
arg = msg_filter
|
||||||
|
if six.PY3:
|
||||||
|
arg = arg.encode('utf-8')
|
||||||
|
self.sock.setsockopt(zmq.UNSUBSCRIBE, arg)
|
||||||
self.subscriptions.remove(msg_filter)
|
self.subscriptions.remove(msg_filter)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@ -240,7 +246,10 @@ class ZmqClient(object):
|
|||||||
self.outq = ZmqSocket(addr, zmq.PUSH, bind=False)
|
self.outq = ZmqSocket(addr, zmq.PUSH, bind=False)
|
||||||
|
|
||||||
def cast(self, msg_id, topic, data, envelope):
|
def cast(self, msg_id, topic, data, envelope):
|
||||||
msg_id = msg_id or 0
|
msg_id = msg_id or '0'
|
||||||
|
|
||||||
|
if six.PY3:
|
||||||
|
msg_id = msg_id.encode('utf-8')
|
||||||
|
|
||||||
if not envelope:
|
if not envelope:
|
||||||
data = _serialize(data)
|
data = _serialize(data)
|
||||||
@ -574,13 +583,13 @@ class ZmqReactor(ZmqBaseReactor):
|
|||||||
|
|
||||||
proxy = self.proxies[sock]
|
proxy = self.proxies[sock]
|
||||||
|
|
||||||
if data[2] == 'cast': # Legacy protocol
|
if data[2] == b'cast': # Legacy protocol
|
||||||
packenv = data[3]
|
packenv = data[3]
|
||||||
|
|
||||||
ctx, msg = _deserialize(packenv)
|
ctx, msg = _deserialize(packenv)
|
||||||
request = rpc_common.deserialize_msg(msg)
|
request = rpc_common.deserialize_msg(msg)
|
||||||
ctx = RpcContext.unmarshal(ctx)
|
ctx = RpcContext.unmarshal(ctx)
|
||||||
elif data[2] == 'impl_zmq_v2':
|
elif data[2] == b'impl_zmq_v2':
|
||||||
packenv = data[4:]
|
packenv = data[4:]
|
||||||
|
|
||||||
msg = unflatten_envelope(packenv)
|
msg = unflatten_envelope(packenv)
|
||||||
@ -724,9 +733,9 @@ def _call(addr, context, topic, msg, timeout=None,
|
|||||||
LOG.debug("Received message: %s", msg)
|
LOG.debug("Received message: %s", msg)
|
||||||
LOG.debug("Unpacking response")
|
LOG.debug("Unpacking response")
|
||||||
|
|
||||||
if msg[2] == 'cast': # Legacy version
|
if msg[2] == b'cast': # Legacy version
|
||||||
raw_msg = _deserialize(msg[-1])[-1]
|
raw_msg = _deserialize(msg[-1])[-1]
|
||||||
elif msg[2] == 'impl_zmq_v2':
|
elif msg[2] == b'impl_zmq_v2':
|
||||||
rpc_envelope = unflatten_envelope(msg[4:])
|
rpc_envelope = unflatten_envelope(msg[4:])
|
||||||
raw_msg = rpc_common.deserialize_msg(rpc_envelope)
|
raw_msg = rpc_common.deserialize_msg(rpc_envelope)
|
||||||
else:
|
else:
|
||||||
@ -748,7 +757,7 @@ def _call(addr, context, topic, msg, timeout=None,
|
|||||||
# One effect of this is that we're checking all
|
# One effect of this is that we're checking all
|
||||||
# responses for Exceptions.
|
# responses for Exceptions.
|
||||||
for resp in responses:
|
for resp in responses:
|
||||||
if isinstance(resp, types.DictType) and 'exc' in resp:
|
if isinstance(resp, dict) and 'exc' in resp:
|
||||||
raise rpc_common.deserialize_remote_exception(
|
raise rpc_common.deserialize_remote_exception(
|
||||||
resp['exc'], allowed_remote_exmods)
|
resp['exc'], allowed_remote_exmods)
|
||||||
|
|
||||||
|
@ -17,6 +17,9 @@ oslotest>=1.5.1,<1.6.0 # Apache-2.0
|
|||||||
# for test_matchmaker_redis
|
# for test_matchmaker_redis
|
||||||
redis>=2.10.0
|
redis>=2.10.0
|
||||||
|
|
||||||
|
# for test_impl_zmq
|
||||||
|
pyzmq>=14.3.1 # LGPL+BSD
|
||||||
|
|
||||||
# when we can require tox>= 1.4, this can go into tox.ini:
|
# when we can require tox>= 1.4, this can go into tox.ini:
|
||||||
# [testenv:cover]
|
# [testenv:cover]
|
||||||
# deps = {[testenv]deps} coverage
|
# deps = {[testenv]deps} coverage
|
||||||
|
Loading…
x
Reference in New Issue
Block a user