ZMQ: Removed unused code and tests
The code of ZMQ driver is under the active development now, so after modifications some code can became unused. This patch removes this code. Change-Id: I4cfa75560eabf82618f31584b4645fd2630ac9cb
This commit is contained in:
parent
de629d8104
commit
5920e7bef6
oslo_messaging
_drivers/zmq_driver
tests/drivers/zmq
@ -13,9 +13,7 @@
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
from abc import abstractmethod
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
import six
|
||||
|
||||
@ -42,7 +40,6 @@ class Request(object):
|
||||
LOG.error(errmsg)
|
||||
raise KeyError(errmsg)
|
||||
|
||||
self.msg_id = uuid.uuid4().hex
|
||||
self.target = target
|
||||
self.context = context
|
||||
self.message = message
|
||||
@ -55,17 +52,8 @@ class Request(object):
|
||||
def msg_type(self):
|
||||
"""ZMQ message type"""
|
||||
|
||||
@property
|
||||
def is_replied(self):
|
||||
return self.reply is not None
|
||||
|
||||
@property
|
||||
def is_timed_out(self):
|
||||
return False
|
||||
|
||||
def send_request(self):
|
||||
self.socket.send_string(self.msg_type, zmq.SNDMORE)
|
||||
self.socket.send_json(self.target.__dict__, zmq.SNDMORE)
|
||||
self.socket.send_json(self.context, zmq.SNDMORE)
|
||||
self.socket.send_json(self.message)
|
||||
|
||||
@ -73,6 +61,6 @@ class Request(object):
|
||||
self.send_request()
|
||||
return self.receive_reply()
|
||||
|
||||
@abstractmethod
|
||||
@abc.abstractmethod
|
||||
def receive_reply(self):
|
||||
"Receive reply from server side"
|
||||
|
@ -74,8 +74,6 @@ class ZmqServer(base.Listener):
|
||||
assert empty == b'', 'Bad format: empty delimiter expected'
|
||||
msg_type = socket.recv_string()
|
||||
assert msg_type is not None, 'Bad format: msg type expected'
|
||||
target_dict = socket.recv_json()
|
||||
assert target_dict is not None, 'Bad format: target expected'
|
||||
context = socket.recv_json()
|
||||
message = socket.recv_json()
|
||||
LOG.debug("Received CALL message %s" % str(message))
|
||||
|
@ -12,17 +12,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
|
||||
import six
|
||||
|
||||
from oslo_messaging._drivers import common as rpc_common
|
||||
from oslo_messaging._i18n import _LE, _LW
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
MESSAGE_CALL_TYPE_POSITION = 1
|
||||
MESSAGE_CALL_TARGET_POSITION = 2
|
||||
MESSAGE_CALL_TOPIC_POSITION = 3
|
||||
@ -37,64 +26,3 @@ FANOUT_TYPE = 'fanout'
|
||||
NOTIFY_TYPE = 'notify'
|
||||
|
||||
MESSAGE_TYPES = (CALL_TYPE, CAST_TYPE, FANOUT_TYPE, NOTIFY_TYPE)
|
||||
|
||||
|
||||
def get_msg_type(message):
|
||||
type = message[MESSAGE_CALL_TYPE_POSITION]
|
||||
if type not in MESSAGE_TYPES:
|
||||
errmsg = _LE("Unknown message type: %s") % str(type)
|
||||
LOG.error(errmsg)
|
||||
raise rpc_common.RPCException(errmsg)
|
||||
return type
|
||||
|
||||
|
||||
def _get_topic_from_msg(message, position):
|
||||
pathsep = set((os.path.sep or '', os.path.altsep or '', '/', '\\'))
|
||||
badchars = re.compile(r'[%s]' % re.escape(''.join(pathsep)))
|
||||
|
||||
if len(message) < position + 1:
|
||||
errmsg = _LE("Message did not contain a topic")
|
||||
LOG.error("%s: %s" % (errmsg, message))
|
||||
raise rpc_common.RPCException("%s: %s" % (errmsg, message))
|
||||
|
||||
topic = message[position]
|
||||
|
||||
if six.PY3:
|
||||
topic = topic.decode('utf-8')
|
||||
|
||||
# The topic is received over the network, don't trust this input.
|
||||
if badchars.search(topic) is not None:
|
||||
errmsg = _LW("Topic contained dangerous characters")
|
||||
LOG.warn("%s: %s" % (errmsg, topic))
|
||||
raise rpc_common.RPCException("%s: %s" % (errmsg, topic))
|
||||
|
||||
topic_items = topic.split('.', 1)
|
||||
|
||||
if len(topic_items) != 2:
|
||||
errmsg = _LE("Topic was not formatted correctly")
|
||||
LOG.error("%s: %s" % (errmsg, topic))
|
||||
raise rpc_common.RPCException("%s: %s" % (errmsg, topic))
|
||||
|
||||
return topic_items[0], topic_items[1]
|
||||
|
||||
|
||||
def get_topic_from_call_message(message):
|
||||
"""Extract topic and server from message.
|
||||
|
||||
:param message: A message
|
||||
:type message: list
|
||||
|
||||
:returns: (topic: str, server: str)
|
||||
"""
|
||||
return _get_topic_from_msg(message, MESSAGE_CALL_TOPIC_POSITION)
|
||||
|
||||
|
||||
def get_target_from_call_message(message):
|
||||
"""Extract target from message.
|
||||
|
||||
:param message: A message
|
||||
:type message: list
|
||||
|
||||
:returns: target: Target
|
||||
"""
|
||||
return message[MESSAGE_CALL_TARGET_POSITION]
|
||||
|
@ -13,14 +13,6 @@
|
||||
# under the License.
|
||||
|
||||
|
||||
def get_tcp_bind_address(port):
|
||||
return "tcp://*:%s" % port
|
||||
|
||||
|
||||
def get_tcp_address_call(conf, host):
|
||||
return "tcp://%s:%s" % (host, conf.rpc_zmq_port)
|
||||
|
||||
|
||||
def combine_address(host, port):
|
||||
return "%s:%s" % (host, port)
|
||||
|
||||
|
@ -1,97 +0,0 @@
|
||||
# Copyright 2014 Canonical, Ltd.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_utils import importutils
|
||||
import testtools
|
||||
|
||||
from oslo_messaging.tests import utils as test_utils
|
||||
|
||||
redis = importutils.try_import('redis')
|
||||
matchmaker_redis = (
|
||||
importutils.try_import('oslo_messaging._drivers.matchmaker_redis'))
|
||||
|
||||
|
||||
def redis_available():
|
||||
'''Helper to see if local redis server is running'''
|
||||
if not redis:
|
||||
return False
|
||||
try:
|
||||
c = redis.StrictRedis(socket_timeout=1)
|
||||
c.ping()
|
||||
return True
|
||||
except redis.exceptions.ConnectionError:
|
||||
return False
|
||||
|
||||
|
||||
@testtools.skipIf(not matchmaker_redis, "matchmaker/eventlet unavailable")
|
||||
@testtools.skipIf(not redis_available(), "redis unavailable")
|
||||
class RedisMatchMakerTest(test_utils.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(RedisMatchMakerTest, self).setUp()
|
||||
self.ring_data = {
|
||||
"conductor": ["controller1", "node1", "node2", "node3"],
|
||||
"scheduler": ["controller1", "node1", "node2", "node3"],
|
||||
"network": ["controller1", "node1", "node2", "node3"],
|
||||
"cert": ["controller1"],
|
||||
"console": ["controller1"],
|
||||
"l3_agent.node1": ["node1"],
|
||||
"consoleauth": ["controller1"]}
|
||||
self.matcher = matchmaker_redis.MatchMakerRedis()
|
||||
self.populate()
|
||||
|
||||
def tearDown(self):
|
||||
super(RedisMatchMakerTest, self).tearDown()
|
||||
c = redis.StrictRedis()
|
||||
c.flushdb()
|
||||
|
||||
def populate(self):
|
||||
for k, hosts in self.ring_data.items():
|
||||
for h in hosts:
|
||||
self.matcher.register(k, h)
|
||||
|
||||
def test_direct(self):
|
||||
self.assertEqual(
|
||||
self.matcher.queues('cert.controller1'),
|
||||
[('cert.controller1', 'controller1')])
|
||||
|
||||
def test_register(self):
|
||||
self.matcher.register('cert', 'keymaster')
|
||||
self.assertEqual(
|
||||
sorted(self.matcher.redis.smembers('cert')),
|
||||
[b'cert.controller1', b'cert.keymaster'])
|
||||
self.matcher.register('l3_agent.node1', 'node1')
|
||||
self.assertEqual(
|
||||
sorted(self.matcher.redis.smembers('l3_agent.node1')),
|
||||
[b'l3_agent.node1.node1'])
|
||||
|
||||
def test_unregister(self):
|
||||
self.matcher.unregister('conductor', 'controller1')
|
||||
self.assertEqual(
|
||||
sorted(self.matcher.redis.smembers('conductor')),
|
||||
[b'conductor.node1', b'conductor.node2', b'conductor.node3'])
|
||||
|
||||
def test_ack_alive(self):
|
||||
self.matcher.ack_alive('ack_alive', 'controller1')
|
||||
self.assertEqual(
|
||||
sorted(self.matcher.redis.smembers('ack_alive')),
|
||||
[b'ack_alive.controller1'])
|
||||
|
||||
def test_is_alive(self):
|
||||
self.assertEqual(
|
||||
self.matcher.is_alive('conductor', 'conductor.controller1'),
|
||||
True)
|
||||
self.assertEqual(
|
||||
self.matcher.is_alive('conductor', 'conductor.controller2'),
|
||||
False)
|
@ -1,67 +0,0 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import re
|
||||
|
||||
from oslo_messaging._drivers.common import RPCException
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_serializer
|
||||
from oslo_messaging.tests import utils as test_utils
|
||||
|
||||
|
||||
class TestZmqSerializer(test_utils.BaseTestCase):
|
||||
|
||||
def test_message_without_topic_raises_RPCException(self):
|
||||
# The topic is the 4th element of the message.
|
||||
msg_without_topic = ['only', 'three', 'parts']
|
||||
|
||||
expected = "Message did not contain a topic: %s" % msg_without_topic
|
||||
with self.assertRaisesRegexp(RPCException, re.escape(expected)):
|
||||
zmq_serializer.get_topic_from_call_message(msg_without_topic)
|
||||
|
||||
def test_invalid_topic_format_raises_RPCException(self):
|
||||
invalid_topic = "no dots to split on, so not index-able".encode('utf8')
|
||||
bad_message = ['', '', '', invalid_topic]
|
||||
|
||||
expected_msg = "Topic was not formatted correctly: %s"
|
||||
expected_msg = expected_msg % invalid_topic.decode('utf8')
|
||||
with self.assertRaisesRegexp(RPCException, expected_msg):
|
||||
zmq_serializer.get_topic_from_call_message(bad_message)
|
||||
|
||||
def test_py3_decodes_bytes_correctly(self):
|
||||
message = ['', '', '', b'topic.ipaddress']
|
||||
|
||||
actual, _ = zmq_serializer.get_topic_from_call_message(message)
|
||||
|
||||
self.assertEqual('topic', actual)
|
||||
|
||||
def test_bad_characters_in_topic_raise_RPCException(self):
|
||||
# handle unexpected os path separators:
|
||||
unexpected_evil = '<'
|
||||
os.path.sep = unexpected_evil
|
||||
|
||||
unexpected_alt_evil = '>'
|
||||
os.path.altsep = unexpected_alt_evil
|
||||
|
||||
evil_chars = [unexpected_evil, unexpected_alt_evil, '\\', '/']
|
||||
|
||||
for evil_char in evil_chars:
|
||||
evil_topic = '%s%s%s' % ('trust.me', evil_char, 'please')
|
||||
evil_topic = evil_topic.encode('utf8')
|
||||
evil_message = ['', '', '', evil_topic]
|
||||
|
||||
expected_msg = "Topic contained dangerous characters: %s"
|
||||
expected_msg = expected_msg % evil_topic.decode('utf8')
|
||||
expected_msg = re.escape(expected_msg)
|
||||
|
||||
with self.assertRaisesRegexp(RPCException, expected_msg):
|
||||
zmq_serializer.get_topic_from_call_message(evil_message)
|
Loading…
x
Reference in New Issue
Block a user