diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index 0e0684309..c506c2e6f 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -106,7 +106,7 @@ def _serialize(data): def _deserialize(data): """Deserialization wrapper.""" - LOG.debug("Deserializing: %s", data) + LOG.debug("Deserializing: %r", data) return jsonutils.loads(data) @@ -180,7 +180,10 @@ class ZmqSocket(object): LOG.debug("Subscribing to %s", msg_filter) try: - self.sock.setsockopt(zmq.SUBSCRIBE, msg_filter) + arg = msg_filter + if six.PY3: + arg = arg.encode('utf-8') + self.sock.setsockopt(zmq.SUBSCRIBE, arg) except Exception: return @@ -190,7 +193,10 @@ class ZmqSocket(object): """Unsubscribe.""" if msg_filter not in self.subscriptions: return - self.sock.setsockopt(zmq.UNSUBSCRIBE, msg_filter) + arg = msg_filter + if six.PY3: + arg = arg.encode('utf-8') + self.sock.setsockopt(zmq.UNSUBSCRIBE, arg) self.subscriptions.remove(msg_filter) @property @@ -240,7 +246,10 @@ class ZmqClient(object): self.outq = ZmqSocket(addr, zmq.PUSH, bind=False) def cast(self, msg_id, topic, data, envelope): - msg_id = msg_id or 0 + msg_id = msg_id or '0' + + if six.PY3: + msg_id = msg_id.encode('utf-8') if not envelope: data = _serialize(data) @@ -574,13 +583,13 @@ class ZmqReactor(ZmqBaseReactor): proxy = self.proxies[sock] - if data[2] == 'cast': # Legacy protocol + if data[2] == b'cast': # Legacy protocol packenv = data[3] ctx, msg = _deserialize(packenv) request = rpc_common.deserialize_msg(msg) ctx = RpcContext.unmarshal(ctx) - elif data[2] == 'impl_zmq_v2': + elif data[2] == b'impl_zmq_v2': packenv = data[4:] msg = unflatten_envelope(packenv) @@ -724,9 +733,9 @@ def _call(addr, context, topic, msg, timeout=None, LOG.debug("Received message: %s", msg) LOG.debug("Unpacking response") - if msg[2] == 'cast': # Legacy version + if msg[2] == b'cast': # Legacy version raw_msg = _deserialize(msg[-1])[-1] - elif msg[2] == 'impl_zmq_v2': + elif msg[2] == b'impl_zmq_v2': rpc_envelope = unflatten_envelope(msg[4:]) raw_msg = rpc_common.deserialize_msg(rpc_envelope) else: @@ -748,7 +757,7 @@ def _call(addr, context, topic, msg, timeout=None, # One effect of this is that we're checking all # responses for Exceptions. for resp in responses: - if isinstance(resp, types.DictType) and 'exc' in resp: + if isinstance(resp, dict) and 'exc' in resp: raise rpc_common.deserialize_remote_exception( resp['exc'], allowed_remote_exmods) diff --git a/test-requirements-py3.txt b/test-requirements-py3.txt index 6d8f1d5fe..01db53d90 100644 --- a/test-requirements-py3.txt +++ b/test-requirements-py3.txt @@ -17,6 +17,9 @@ oslotest>=1.5.1,<1.6.0 # Apache-2.0 # for test_matchmaker_redis redis>=2.10.0 +# for test_impl_zmq +pyzmq>=14.3.1 # LGPL+BSD + # when we can require tox>= 1.4, this can go into tox.ini: # [testenv:cover] # deps = {[testenv]deps} coverage