Fix oslo messaging connection leakage

Story: 2004993
Task: 29464

Change-Id: I8e9cca7c0a7eb82b2a029a3ead2486dd1742b65f
This commit is contained in:
Erik Olof Gunnar Andersson 2019-02-12 07:13:26 -08:00 committed by Adam Harwell
parent fae5b05980
commit ad7e627185
11 changed files with 139 additions and 50 deletions

View File

@ -14,7 +14,6 @@
from jsonschema import exceptions as js_exceptions from jsonschema import exceptions as js_exceptions
from jsonschema import validate from jsonschema import validate
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
import oslo_messaging as messaging import oslo_messaging as messaging
@ -27,6 +26,7 @@ from octavia.api.drivers import provider_base as driver_base
from octavia.api.drivers import utils as driver_utils from octavia.api.drivers import utils as driver_utils
from octavia.common import constants as consts from octavia.common import constants as consts
from octavia.common import data_models from octavia.common import data_models
from octavia.common import rpc
from octavia.common import utils from octavia.common import utils
from octavia.db import api as db_apis from octavia.db import api as db_apis
from octavia.db import repositories from octavia.db import repositories
@ -41,11 +41,10 @@ class AmphoraProviderDriver(driver_base.ProviderDriver):
def __init__(self): def __init__(self):
super(AmphoraProviderDriver, self).__init__() super(AmphoraProviderDriver, self).__init__()
topic = cfg.CONF.oslo_messaging.topic topic = cfg.CONF.oslo_messaging.topic
self.transport = messaging.get_rpc_transport(cfg.CONF)
self.target = messaging.Target( self.target = messaging.Target(
namespace=consts.RPC_NAMESPACE_CONTROLLER_AGENT, namespace=consts.RPC_NAMESPACE_CONTROLLER_AGENT,
topic=topic, version="1.0", fanout=False) topic=topic, version="1.0", fanout=False)
self.client = messaging.RPCClient(self.transport, target=self.target) self.client = rpc.get_client(self.target)
self.repositories = repositories.Repositories() self.repositories = repositories.Repositories()
# Load Balancer # Load Balancer

View File

@ -32,6 +32,8 @@ import six
from octavia.api.handlers import abstract_handler from octavia.api.handlers import abstract_handler
from octavia.common import constants from octavia.common import constants
from octavia.common import rpc
cfg.CONF.import_group('oslo_messaging', 'octavia.common.config') cfg.CONF.import_group('oslo_messaging', 'octavia.common.config')
@ -46,11 +48,10 @@ class BaseProducer(abstract_handler.BaseObjectHandler):
def __init__(self): def __init__(self):
topic = cfg.CONF.oslo_messaging.topic topic = cfg.CONF.oslo_messaging.topic
self.transport = messaging.get_rpc_transport(cfg.CONF)
self.target = messaging.Target( self.target = messaging.Target(
namespace=constants.RPC_NAMESPACE_CONTROLLER_AGENT, namespace=constants.RPC_NAMESPACE_CONTROLLER_AGENT,
topic=topic, version="1.0", fanout=False) topic=topic, version="1.0", fanout=False)
self.client = messaging.RPCClient(self.transport, target=self.target) self.client = rpc.get_client(self.target)
def create(self, model): def create(self, model):
"""Sends a create message to the controller via oslo.messaging """Sends a create message to the controller via oslo.messaging
@ -229,12 +230,12 @@ class ProducerHandler(abstract_handler.BaseHandler):
used to send messages via the Class variables load_balancer, listener, used to send messages via the Class variables load_balancer, listener,
health_monitor, member, l7policy and l7rule. health_monitor, member, l7policy and l7rule.
""" """
def __init__(self):
load_balancer = LoadBalancerProducer() self.load_balancer = LoadBalancerProducer()
listener = ListenerProducer() self.listener = ListenerProducer()
pool = PoolProducer() self.pool = PoolProducer()
health_monitor = HealthMonitorProducer() self.health_monitor = HealthMonitorProducer()
member = MemberProducer() self.member = MemberProducer()
l7policy = L7PolicyProducer() self.l7policy = L7PolicyProducer()
l7rule = L7RuleProducer() self.l7rule = L7RuleProducer()
amphora = AmphoraProducer() self.amphora = AmphoraProducer()

View File

@ -25,7 +25,7 @@ from octavia.api.v2.controllers import base
from octavia.api.v2.types import amphora as amp_types from octavia.api.v2.types import amphora as amp_types
from octavia.common import constants from octavia.common import constants
from octavia.common import exceptions from octavia.common import exceptions
from octavia.common import rpc
CONF = cfg.CONF CONF = cfg.CONF
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -96,11 +96,10 @@ class FailoverController(base.BaseController):
def __init__(self, amp_id): def __init__(self, amp_id):
super(FailoverController, self).__init__() super(FailoverController, self).__init__()
topic = cfg.CONF.oslo_messaging.topic topic = cfg.CONF.oslo_messaging.topic
self.transport = messaging.get_rpc_transport(cfg.CONF)
self.target = messaging.Target( self.target = messaging.Target(
namespace=constants.RPC_NAMESPACE_CONTROLLER_AGENT, namespace=constants.RPC_NAMESPACE_CONTROLLER_AGENT,
topic=topic, version="1.0", fanout=False) topic=topic, version="1.0", fanout=False)
self.client = messaging.RPCClient(self.transport, target=self.target) self.client = rpc.get_client(self.target)
self.amp_id = amp_id self.amp_id = amp_id
@wsme_pecan.wsexpose(None, wtypes.text, status_code=202) @wsme_pecan.wsexpose(None, wtypes.text, status_code=202)

66
octavia/common/rpc.py Normal file
View File

@ -0,0 +1,66 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_messaging.rpc import dispatcher
LOG = logging.getLogger(__name__)
TRANSPORT = None
def init():
global TRANSPORT
TRANSPORT = create_transport(get_transport_url())
def cleanup():
global TRANSPORT
if TRANSPORT is not None:
TRANSPORT.cleanup()
TRANSPORT = None
def get_transport_url(url_str=None):
return messaging.TransportURL.parse(cfg.CONF, url_str)
def get_client(target, version_cap=None, serializer=None,
call_monitor_timeout=None):
if TRANSPORT is None:
init()
return messaging.RPCClient(TRANSPORT,
target,
version_cap=version_cap,
serializer=serializer,
call_monitor_timeout=call_monitor_timeout)
def get_server(target, endpoints, executor='threading',
access_policy=dispatcher.DefaultRPCAccessPolicy,
serializer=None):
if TRANSPORT is None:
init()
return messaging.get_rpc_server(TRANSPORT,
target,
endpoints,
executor=executor,
serializer=serializer,
access_policy=access_policy)
def create_transport(url):
return messaging.get_rpc_transport(cfg.CONF, url=url)

View File

@ -16,6 +16,7 @@ from oslo_config import cfg
from oslo_log import log from oslo_log import log
from octavia.common import config from octavia.common import config
from octavia.common import rpc
def prepare_service(argv=None): def prepare_service(argv=None):
@ -24,3 +25,4 @@ def prepare_service(argv=None):
config.init(argv[1:]) config.init(argv[1:])
log.set_defaults() log.set_defaults()
config.setup_logging(cfg.CONF) config.setup_logging(cfg.CONF)
rpc.init()

View File

@ -17,6 +17,7 @@ from oslo_log import log as logging
import oslo_messaging as messaging import oslo_messaging as messaging
from oslo_messaging.rpc import dispatcher from oslo_messaging.rpc import dispatcher
from octavia.common import rpc
from octavia.controller.queue import endpoint from octavia.controller.queue import endpoint
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -35,13 +36,14 @@ class ConsumerService(cotyledon.Service):
def run(self): def run(self):
LOG.info('Starting consumer...') LOG.info('Starting consumer...')
transport = messaging.get_rpc_transport(self.conf)
target = messaging.Target(topic=self.topic, server=self.server, target = messaging.Target(topic=self.topic, server=self.server,
fanout=False) fanout=False)
self.endpoints = [endpoint.Endpoint()] self.endpoints = [endpoint.Endpoint()]
self.message_listener = messaging.get_rpc_server( self.message_listener = rpc.get_server(
transport, target, self.endpoints, target, self.endpoints,
executor='threading', access_policy=self.access_policy) executor='threading',
access_policy=self.access_policy
)
self.message_listener.start() self.message_listener.start()
def terminate(self, graceful=False): def terminate(self, graceful=False):

View File

@ -11,7 +11,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import mock import mock
from octavia.api.drivers.amphora_driver import driver from octavia.api.drivers.amphora_driver import driver
@ -23,7 +22,7 @@ from octavia.tests.unit.api.drivers import sample_data_models
from octavia.tests.unit import base from octavia.tests.unit import base
class TestAmphoraDriver(base.TestCase): class TestAmphoraDriver(base.TestRpc):
def setUp(self): def setUp(self):
super(TestAmphoraDriver, self).setUp() super(TestAmphoraDriver, self).setUp()
self.amp_driver = driver.AmphoraProviderDriver() self.amp_driver = driver.AmphoraProviderDriver()

View File

@ -41,26 +41,23 @@ from octavia.common import data_models
from octavia.tests.unit import base from octavia.tests.unit import base
class TestProducer(base.TestCase): class TestProducer(base.TestRpc):
def setUp(self): def setUp(self):
super(TestProducer, self).setUp() super(TestProducer, self).setUp()
self.mck_model = mock.Mock() self.mck_model = mock.Mock()
self.mck_model.id = '10' self.mck_model.id = '10'
conf = self.useFixture(oslo_fixture.Config(cfg.CONF)) conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
conf.config(group="oslo_messaging", topic='OCTAVIA_PROV') conf.config(group="oslo_messaging", topic='OCTAVIA_PROV')
self.mck_client = mock.create_autospec(messaging.RPCClient)
mck_target = mock.patch( mck_target = mock.patch(
'octavia.api.handlers.queue.producer.messaging.Target') 'octavia.api.handlers.queue.producer.messaging.Target')
mck_transport = mock.patch(
'octavia.api.handlers.queue.producer.messaging.get_transport')
self.mck_client = mock.create_autospec(messaging.RPCClient) self.mck_client = mock.create_autospec(messaging.RPCClient)
mck_client = mock.patch( mck_client = mock.patch(
'octavia.api.handlers.queue.producer.messaging.RPCClient', 'octavia.api.handlers.queue.producer.messaging.RPCClient',
return_value=self.mck_client) return_value=self.mck_client)
mck_target.start() mck_target.start()
mck_transport.start()
mck_client.start() mck_client.start()
self.addCleanup(mck_target.stop) self.addCleanup(mck_target.stop)
self.addCleanup(mck_transport.stop)
self.addCleanup(mck_client.stop) self.addCleanup(mck_client.stop)
def test_create_loadbalancer(self): def test_create_loadbalancer(self):

View File

@ -11,11 +11,16 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import fixtures
import mock import mock
from oslo_config import cfg
import oslo_messaging as messaging
from oslo_messaging import conffixture as messaging_conffixture
import testtools import testtools
from octavia.common import clients from octavia.common import clients
from octavia.common import rpc
# needed for tests to function when run independently: # needed for tests to function when run independently:
from octavia.common import config # noqa: F401 from octavia.common import config # noqa: F401
@ -30,3 +35,29 @@ class TestCase(testtools.TestCase):
def clean_caches(self): def clean_caches(self):
clients.NovaAuth.nova_client = None clients.NovaAuth.nova_client = None
clients.NeutronAuth.neutron_client = None clients.NeutronAuth.neutron_client = None
class TestRpc(testtools.TestCase):
def __init__(self, *args, **kwargs):
super(TestRpc, self).__init__(*args, **kwargs)
self._buses = {}
def _fake_create_transport(self, url):
if url not in self._buses:
self._buses[url] = messaging.get_rpc_transport(
cfg.CONF,
url=url)
return self._buses[url]
def setUp(self):
super(TestRpc, self).setUp()
self.addCleanup(rpc.cleanup)
self.messaging_conf = messaging_conffixture.ConfFixture(cfg.CONF)
self.messaging_conf.transport_url = 'fake:/'
self.useFixture(self.messaging_conf)
self.useFixture(fixtures.MonkeyPatch(
'octavia.common.rpc.create_transport',
self._fake_create_transport))
with mock.patch('octavia.common.rpc.get_transport_url') as mock_gtu:
mock_gtu.return_value = None
rpc.init()

View File

@ -16,18 +16,13 @@ import mock
from oslo_config import cfg from oslo_config import cfg
from oslo_config import fixture as oslo_fixture from oslo_config import fixture as oslo_fixture
import oslo_messaging as messaging import oslo_messaging as messaging
from oslo_messaging.rpc import dispatcher
from octavia.controller.queue import consumer from octavia.controller.queue import consumer
from octavia.controller.queue import endpoint from octavia.controller.queue import endpoint
from octavia.tests.unit import base from octavia.tests.unit import base
@mock.patch.object(messaging, 'get_rpc_transport') class TestConsumer(base.TestRpc):
@mock.patch.object(messaging, 'Target')
@mock.patch.object(endpoint, 'Endpoint')
@mock.patch.object(messaging, 'get_rpc_server')
class TestConsumer(base.TestCase):
def setUp(self): def setUp(self):
super(TestConsumer, self).setUp() super(TestConsumer, self).setUp()
@ -36,10 +31,10 @@ class TestConsumer(base.TestCase):
conf.config(host='test-hostname') conf.config(host='test-hostname')
self.conf = conf.conf self.conf = conf.conf
def test_consumer_run(self, mock_rpc_server, mock_endpoint, mock_target, @mock.patch.object(messaging, 'Target')
mock_get_transport): @mock.patch.object(endpoint, 'Endpoint')
mock_get_transport_rv = mock.Mock() @mock.patch.object(messaging, 'get_rpc_server')
mock_get_transport.return_value = mock_get_transport_rv def test_consumer_run(self, mock_rpc_server, mock_endpoint, mock_target):
mock_rpc_server_rv = mock.Mock() mock_rpc_server_rv = mock.Mock()
mock_rpc_server.return_value = mock_rpc_server_rv mock_rpc_server.return_value = mock_rpc_server_rv
mock_endpoint_rv = mock.Mock() mock_endpoint_rv = mock.Mock()
@ -49,20 +44,13 @@ class TestConsumer(base.TestCase):
consumer.ConsumerService(1, self.conf).run() consumer.ConsumerService(1, self.conf).run()
mock_get_transport.assert_called_once_with(cfg.CONF)
mock_target.assert_called_once_with(topic='foo_topic', mock_target.assert_called_once_with(topic='foo_topic',
server='test-hostname', server='test-hostname',
fanout=False) fanout=False)
mock_endpoint.assert_called_once_with() mock_endpoint.assert_called_once_with()
access_policy = dispatcher.DefaultRPCAccessPolicy
mock_rpc_server.assert_called_once_with(mock_get_transport_rv,
mock_target_rv,
[mock_endpoint_rv],
executor='threading',
access_policy=access_policy)
def test_consumer_terminate(self, mock_rpc_server, mock_endpoint, @mock.patch.object(messaging, 'get_rpc_server')
mock_target, mock_get_transport): def test_consumer_terminate(self, mock_rpc_server):
mock_rpc_server_rv = mock.Mock() mock_rpc_server_rv = mock.Mock()
mock_rpc_server.return_value = mock_rpc_server_rv mock_rpc_server.return_value = mock_rpc_server_rv
@ -72,8 +60,8 @@ class TestConsumer(base.TestCase):
mock_rpc_server_rv.stop.assert_called_once_with() mock_rpc_server_rv.stop.assert_called_once_with()
self.assertFalse(mock_rpc_server_rv.wait.called) self.assertFalse(mock_rpc_server_rv.wait.called)
def test_consumer_graceful_terminate(self, mock_rpc_server, mock_endpoint, @mock.patch.object(messaging, 'get_rpc_server')
mock_target, mock_get_transport): def test_consumer_graceful_terminate(self, mock_rpc_server):
mock_rpc_server_rv = mock.Mock() mock_rpc_server_rv = mock.Mock()
mock_rpc_server.return_value = mock_rpc_server_rv mock_rpc_server.return_value = mock_rpc_server_rv

View File

@ -0,0 +1,5 @@
---
fixes:
- |
Fixed a bug that caused an excessive number of RabbitMQ connections to be
opened.