Make reference lbaas implementation as a pluggable driver

implements blueprint multi-vendor-support-for-lbaas-step1

This patch implements the following changes:
 * merge lbaas_plugin.py and plugin.py into 'plugin.py'
   After that the default 'reference' implementation is available again.
 * move all code related to reference implementation from plugin.py to
   drivers/haproxy/plugin_driver.py
 * Inherit HaproxyOnHostPluginDriver from abstract driver and implement
   its interface.
 * modify tests accordingly

Change-Id: Ib4bfe286826acdedeadbeeff4713448c073378d2
This commit is contained in:
Eugene Nikanorov 2013-05-05 06:34:44 +04:00
parent 72936dc5ce
commit 7176de17bd
15 changed files with 509 additions and 519 deletions
bin
quantum
common
db/loadbalancer
plugins/services/agent_loadbalancer
tests/unit

@ -20,7 +20,7 @@ import os
import sys
sys.path.insert(0, os.getcwd())
from quantum.plugins.services.agent_loadbalancer.agent import main
from quantum.plugins.services.agent_loadbalancer.drivers.haproxy.agent import main
main()

@ -25,11 +25,9 @@ UPDATE = 'update'
AGENT = 'q-agent-notifier'
PLUGIN = 'q-plugin'
DHCP = 'q-dhcp-notifer'
LOADBALANCER_PLUGIN = 'q-loadbalancer-plugin'
L3_AGENT = 'l3_agent'
DHCP_AGENT = 'dhcp_agent'
LOADBALANCER_AGENT = 'loadbalancer_agent'
def get_topic_name(prefix, table, operation):

@ -426,7 +426,6 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase,
context.session.delete(vip)
if vip.port: # this is a Quantum port
self._core_plugin.delete_port(context, vip.port.id)
context.session.flush()
def get_vip(self, context, id, fields=None):
vip = self._get_resource(context, Vip, id)

@ -21,11 +21,12 @@ from oslo.config import cfg
from quantum.agent.common import config
from quantum.agent.linux import interface
from quantum.common import topics
from quantum.openstack.common.rpc import service as rpc_service
from quantum.openstack.common import service
from quantum.plugins.services.agent_loadbalancer.agent import manager
from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import (
agent_manager as manager,
plugin_driver
)
OPTS = [
cfg.IntOpt(
@ -61,7 +62,7 @@ def main():
mgr = manager.LbaasAgentManager(cfg.CONF)
svc = LbaasAgentService(
host=cfg.CONF.host,
topic=topics.LOADBALANCER_AGENT,
topic=plugin_driver.TOPIC_LOADBALANCER_AGENT,
manager=mgr
)
service.launch(svc).wait()

@ -21,12 +21,14 @@ import weakref
from oslo.config import cfg
from quantum.agent.common import config
from quantum.common import topics
from quantum import context
from quantum.openstack.common import importutils
from quantum.openstack.common import log as logging
from quantum.openstack.common import periodic_task
from quantum.plugins.services.agent_loadbalancer.agent import api
from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import (
agent_api,
plugin_driver
)
LOG = logging.getLogger(__name__)
NS_PREFIX = 'qlbaas-'
@ -128,8 +130,8 @@ class LbaasAgentManager(periodic_task.PeriodicTasks):
msg = _('Error importing loadbalancer device driver: %s')
raise SystemExit(msg % conf.device_driver)
ctx = context.get_admin_context_without_session()
self.plugin_rpc = api.LbaasAgentApi(
topics.LOADBALANCER_PLUGIN,
self.plugin_rpc = agent_api.LbaasAgentApi(
plugin_driver.TOPIC_PROCESS_ON_HOST,
ctx,
conf.host
)

@ -0,0 +1,300 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Mark McClain, DreamHost
import uuid
from oslo.config import cfg
from quantum.common import exceptions as q_exc
from quantum.common import rpc as q_rpc
from quantum.db.loadbalancer import loadbalancer_db
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import proxy
from quantum.plugins.common import constants
from quantum.plugins.services.agent_loadbalancer.drivers import (
abstract_driver
)
LOG = logging.getLogger(__name__)
ACTIVE_PENDING = (
constants.ACTIVE,
constants.PENDING_CREATE,
constants.PENDING_UPDATE
)
# topic name for this particular agent implementation
TOPIC_PROCESS_ON_HOST = 'q-lbaas-process-on-host'
TOPIC_LOADBALANCER_AGENT = 'lbaas_process_on_host_agent'
class LoadBalancerCallbacks(object):
RPC_API_VERSION = '1.0'
def __init__(self, plugin):
self.plugin = plugin
def create_rpc_dispatcher(self):
return q_rpc.PluginRpcDispatcher([self])
def get_ready_devices(self, context, host=None):
with context.session.begin(subtransactions=True):
qry = (context.session.query(loadbalancer_db.Pool.id).
join(loadbalancer_db.Vip))
qry = qry.filter(loadbalancer_db.Vip.status.in_(ACTIVE_PENDING))
qry = qry.filter(loadbalancer_db.Pool.status.in_(ACTIVE_PENDING))
up = True # makes pep8 and sqlalchemy happy
qry = qry.filter(loadbalancer_db.Vip.admin_state_up == up)
qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up)
return [id for id, in qry]
def get_logical_device(self, context, pool_id=None, activate=True,
**kwargs):
with context.session.begin(subtransactions=True):
qry = context.session.query(loadbalancer_db.Pool)
qry = qry.filter_by(id=pool_id)
pool = qry.one()
if activate:
# set all resources to active
if pool.status in ACTIVE_PENDING:
pool.status = constants.ACTIVE
if pool.vip.status in ACTIVE_PENDING:
pool.vip.status = constants.ACTIVE
for m in pool.members:
if m.status in ACTIVE_PENDING:
m.status = constants.ACTIVE
for hm in pool.monitors:
if hm.healthmonitor.status in ACTIVE_PENDING:
hm.healthmonitor.status = constants.ACTIVE
if (pool.status != constants.ACTIVE
or pool.vip.status != constants.ACTIVE):
raise q_exc.Invalid(_('Expected active pool and vip'))
retval = {}
retval['pool'] = self.plugin._make_pool_dict(pool)
retval['vip'] = self.plugin._make_vip_dict(pool.vip)
retval['vip']['port'] = (
self.plugin._core_plugin._make_port_dict(pool.vip.port)
)
for fixed_ip in retval['vip']['port']['fixed_ips']:
fixed_ip['subnet'] = (
self.plugin._core_plugin.get_subnet(
context,
fixed_ip['subnet_id']
)
)
retval['members'] = [
self.plugin._make_member_dict(m)
for m in pool.members if m.status == constants.ACTIVE
]
retval['healthmonitors'] = [
self.plugin._make_health_monitor_dict(hm.healthmonitor)
for hm in pool.monitors
if hm.healthmonitor.status == constants.ACTIVE
]
return retval
def pool_destroyed(self, context, pool_id=None, host=None):
"""Agent confirmation hook that a pool has been destroyed.
This method exists for subclasses to change the deletion
behavior.
"""
pass
def plug_vip_port(self, context, port_id=None, host=None):
if not port_id:
return
try:
port = self.plugin._core_plugin.get_port(
context,
port_id
)
except q_exc.PortNotFound:
msg = _('Unable to find port %s to plug.')
LOG.debug(msg, port_id)
return
port['admin_state_up'] = True
port['device_owner'] = 'quantum:' + constants.LOADBALANCER
port['device_id'] = str(uuid.uuid5(uuid.NAMESPACE_DNS, str(host)))
self.plugin._core_plugin.update_port(
context,
port_id,
{'port': port}
)
def unplug_vip_port(self, context, port_id=None, host=None):
if not port_id:
return
try:
port = self.plugin._core_plugin.get_port(
context,
port_id
)
except q_exc.PortNotFound:
msg = _('Unable to find port %s to unplug. This can occur when '
'the Vip has been deleted first.')
LOG.debug(msg, port_id)
return
port['admin_state_up'] = False
port['device_owner'] = ''
port['device_id'] = ''
try:
self.plugin._core_plugin.update_port(
context,
port_id,
{'port': port}
)
except q_exc.PortNotFound:
msg = _('Unable to find port %s to unplug. This can occur when '
'the Vip has been deleted first.')
LOG.debug(msg, port_id)
def update_pool_stats(self, context, pool_id=None, stats=None, host=None):
# TODO(markmcclain): add stats collection
pass
class LoadBalancerAgentApi(proxy.RpcProxy):
"""Plugin side of plugin to agent RPC API."""
API_VERSION = '1.0'
def __init__(self, topic, host):
super(LoadBalancerAgentApi, self).__init__(topic, self.API_VERSION)
self.host = host
def reload_pool(self, context, pool_id):
return self.cast(
context,
self.make_msg('reload_pool', pool_id=pool_id, host=self.host),
topic=self.topic
)
def destroy_pool(self, context, pool_id):
return self.cast(
context,
self.make_msg('destroy_pool', pool_id=pool_id, host=self.host),
topic=self.topic
)
def modify_pool(self, context, pool_id):
return self.cast(
context,
self.make_msg('modify_pool', pool_id=pool_id, host=self.host),
topic=self.topic
)
class HaproxyOnHostPluginDriver(abstract_driver.LoadBalancerAbstractDriver):
def __init__(self, plugin):
self.agent_rpc = LoadBalancerAgentApi(
TOPIC_LOADBALANCER_AGENT,
cfg.CONF.host
)
self.callbacks = LoadBalancerCallbacks(plugin)
self.conn = rpc.create_connection(new=True)
self.conn.create_consumer(
TOPIC_PROCESS_ON_HOST,
self.callbacks.create_rpc_dispatcher(),
fanout=False)
self.conn.consume_in_thread()
self.plugin = plugin
def create_vip(self, context, vip):
self.agent_rpc.reload_pool(context, vip['pool_id'])
def update_vip(self, context, old_vip, vip):
if vip['status'] in ACTIVE_PENDING:
self.agent_rpc.reload_pool(context, vip['pool_id'])
else:
self.agent_rpc.destroy_pool(context, vip['pool_id'])
def delete_vip(self, context, vip):
self.plugin._delete_db_vip(context, vip['id'])
self.agent_rpc.destroy_pool(context, vip['pool_id'])
def create_pool(self, context, pool):
# don't notify here because a pool needs a vip to be useful
pass
def update_pool(self, context, old_pool, pool):
if pool['status'] in ACTIVE_PENDING:
if pool['vip_id'] is not None:
self.agent_rpc.reload_pool(context, pool['id'])
else:
self.agent_rpc.destroy_pool(context, pool['id'])
def delete_pool(self, context, pool):
self.plugin._delete_db_pool(context, pool['id'])
self.agent_rpc.destroy_pool(context, pool['id'])
def create_member(self, context, member):
self.agent_rpc.modify_pool(context, member['pool_id'])
def update_member(self, context, old_member, member):
# member may change pool id
if member['pool_id'] != old_member['pool_id']:
self.agent_rpc.modify_pool(context, old_member['pool_id'])
self.agent_rpc.modify_pool(context, member['pool_id'])
def delete_member(self, context, member):
self.plugin._delete_db_member(context, member['id'])
self.agent_rpc.modify_pool(context, member['pool_id'])
def update_health_monitor(self, context, healthmon, pool_id):
# healthmon is unused here because agent will fetch what is necessary
self.agent_rpc.modify_pool(context, pool_id)
def delete_health_monitor(self, context, healthmon_id, pool_id):
# healthmon_id is not used in this driver
self.agent_rpc.modify_pool(context, pool_id)
def create_pool_health_monitor(self, context, healthmon, pool_id):
# healthmon is not used here
self.agent_rpc.modify_pool(context, pool_id)
def delete_pool_health_monitor(self, context, health_monitor, pool_id):
self.plugin._delete_db_pool_health_monitor(
context, health_monitor['id'], pool_id
)
# healthmon_id is not used here
self.agent_rpc.modify_pool(context, pool_id)
def create_health_monitor(self, context, health_monitor):
pass
def stats(self, context, pool_id):
pass

@ -1,221 +0,0 @@
#
# Copyright 2013 Radware LTD.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Avishay Balderman, Radware
from oslo.config import cfg
from quantum.db import api as qdbapi
from quantum.db.loadbalancer import loadbalancer_db
from quantum.openstack.common import importutils
from quantum.openstack.common import log as logging
from quantum.plugins.common import constants
LOG = logging.getLogger(__name__)
DEFAULT_DRIVER = ("quantum.plugins.services.agent_loadbalancer"
".drivers.noop"
".noop_driver.NoopLbaaSDriver")
lbaas_plugin_opts = [
cfg.StrOpt('driver_fqn',
default=DEFAULT_DRIVER,
help=_('LBaaS driver Fully Qualified Name'))
]
cfg.CONF.register_opts(lbaas_plugin_opts, "LBAAS")
class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
"""Implementation of the Quantum Loadbalancer Service Plugin.
This class manages the workflow of LBaaS request/response.
Most DB related works are implemented in class
loadbalancer_db.LoadBalancerPluginDb.
"""
supported_extension_aliases = ["lbaas"]
def __init__(self):
"""Initialization for the loadbalancer service plugin."""
qdbapi.register_models()
self.driver = importutils.import_object(
cfg.CONF.LBAAS.driver_fqn, self)
def get_plugin_type(self):
return constants.LOADBALANCER
def get_plugin_description(self):
return "Quantum LoadBalancer Service Plugin"
def create_vip(self, context, vip):
v = super(LoadBalancerPlugin, self).create_vip(context, vip)
self.driver.create_vip(context, v)
return v
def update_vip(self, context, id, vip):
if 'status' not in vip['vip']:
vip['vip']['status'] = constants.PENDING_UPDATE
old_vip = self.get_vip(context, id)
v = super(LoadBalancerPlugin, self).update_vip(context, id, vip)
self.driver.update_vip(context, old_vip, v)
return v
def _delete_db_vip(self, context, id):
super(LoadBalancerPlugin, self).delete_vip(context, id)
def delete_vip(self, context, id):
self.update_status(context, loadbalancer_db.Vip,
id, constants.PENDING_DELETE)
v = self.get_vip(context, id)
self.driver.delete_vip(context, v)
def create_pool(self, context, pool):
p = super(LoadBalancerPlugin, self).create_pool(context, pool)
self.driver.create_pool(context, p)
return p
def update_pool(self, context, id, pool):
if 'status' not in pool['pool']:
pool['pool']['status'] = constants.PENDING_UPDATE
old_pool = self.get_pool(context, id)
p = super(LoadBalancerPlugin, self).update_pool(context, id, pool)
self.driver.update_pool(context, old_pool, p)
return p
def _delete_db_pool(self, context, id):
super(LoadBalancerPlugin, self).delete_pool(context, id)
def delete_pool(self, context, id):
self.update_status(context, loadbalancer_db.Pool,
id, constants.PENDING_DELETE)
p = self.get_pool(context, id)
self.driver.delete_pool(context, p)
def create_member(self, context, member):
m = super(LoadBalancerPlugin, self).create_member(context, member)
self.driver.create_member(context, m)
return m
def update_member(self, context, id, member):
if 'status' not in member['member']:
member['member']['status'] = constants.PENDING_UPDATE
old_member = self.get_member(context, id)
m = super(LoadBalancerPlugin, self).update_member(context, id, member)
self.driver.update_member(context, old_member, m)
return m
def _delete_db_member(self, context, id):
super(LoadBalancerPlugin, self).delete_member(context, id)
def delete_member(self, context, id):
self.update_status(context, loadbalancer_db.Member,
id, constants.PENDING_DELETE)
m = self.get_member(context, id)
self.driver.delete_member(context, m)
def create_health_monitor(self, context, health_monitor):
hm = super(LoadBalancerPlugin, self).create_health_monitor(
context,
health_monitor
)
self.driver.create_health_monitor(context, hm)
return hm
def update_health_monitor(self, context, id, health_monitor):
if 'status' not in health_monitor['health_monitor']:
health_monitor['health_monitor']['status'] = (
constants.PENDING_UPDATE
)
old_hm = self.get_health_monitor(context, id)
hm = super(LoadBalancerPlugin, self).update_health_monitor(
context,
id,
health_monitor
)
with context.session.begin(subtransactions=True):
qry = context.session.query(
loadbalancer_db.PoolMonitorAssociation
)
qry = qry.filter_by(monitor_id=hm['id'])
assocs = qry.all()
for assoc in assocs:
self.driver.update_health_monitor(context, old_hm, hm, assoc)
return hm
def _delete_db_pool_health_monitor(self, context, hm_id, pool_id):
super(LoadBalancerPlugin, self).delete_pool_health_monitor(context,
hm_id,
pool_id)
def delete_health_monitor(self, context, id):
with context.session.begin(subtransactions=True):
qry = context.session.query(
loadbalancer_db.PoolMonitorAssociation
)
qry = qry.filter_by(monitor_id=id)
assocs = qry.all()
hm = self.get_health_monitor(context, id)
for assoc in assocs:
self.driver.delete_pool_health_monitor(context,
hm,
assoc['pool_id'])
def create_pool_health_monitor(self, context, health_monitor, pool_id):
retval = super(LoadBalancerPlugin, self).create_pool_health_monitor(
context,
health_monitor,
pool_id
)
# open issue: PoolMonitorAssociation has no status field
# so we cant set the status to pending and let the driver
# set the real status of the association
self.driver.create_pool_health_monitor(
context, health_monitor, pool_id)
return retval
def delete_pool_health_monitor(self, context, id, pool_id):
hm = self.get_health_monitor(context, id)
self.driver.delete_pool_health_monitor(
context, hm, pool_id)
def stats(self, context, pool_id):
stats_data = self.driver.stats(context, pool_id)
# if we get something from the driver -
# update the db and return the value from db
# else - return what we have in db
if stats_data:
super(LoadBalancerPlugin, self)._update_pool_stats(
context,
pool_id,
stats_data
)
return super(LoadBalancerPlugin, self).stats(context,
pool_id)
def populate_vip_graph(self, context, vip):
"""Populate the vip with: pool, members, healthmonitors."""
pool = self.get_pool(context, vip['pool_id'])
vip['pool'] = pool
vip['members'] = [
self.get_member(context, member_id)
for member_id in pool['members']]
vip['health_monitors'] = [
self.get_health_monitor(context, hm_id)
for hm_id in pool['health_monitors']]
return vip

@ -1,7 +1,5 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack Foundation.
# All Rights Reserved.
#
# Copyright 2013 Radware LTD.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -14,200 +12,30 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
#
# @author: Avishay Balderman, Radware
from oslo.config import cfg
from quantum.common import exceptions as q_exc
from quantum.common import rpc as q_rpc
from quantum.common import topics
from quantum.db import api as qdbapi
from quantum.db.loadbalancer import loadbalancer_db
from quantum.openstack.common import importutils
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import proxy
from quantum.plugins.common import constants
LOG = logging.getLogger(__name__)
ACTIVE_PENDING = (
constants.ACTIVE,
constants.PENDING_CREATE,
constants.PENDING_UPDATE
)
DEFAULT_DRIVER = ("quantum.plugins.services.agent_loadbalancer"
".drivers.haproxy"
".plugin_driver.HaproxyOnHostPluginDriver")
lbaas_plugin_opts = [
cfg.StrOpt('driver_fqn',
default=DEFAULT_DRIVER,
help=_('LBaaS driver Fully Qualified Name'))
]
class LoadBalancerCallbacks(object):
RPC_API_VERSION = '1.0'
def __init__(self, plugin):
self.plugin = plugin
def create_rpc_dispatcher(self):
return q_rpc.PluginRpcDispatcher([self])
def get_ready_devices(self, context, host=None):
with context.session.begin(subtransactions=True):
qry = (context.session.query(loadbalancer_db.Pool.id).
join(loadbalancer_db.Vip))
qry = qry.filter(loadbalancer_db.Vip.status.in_(ACTIVE_PENDING))
qry = qry.filter(loadbalancer_db.Pool.status.in_(ACTIVE_PENDING))
up = True # makes pep8 and sqlalchemy happy
qry = qry.filter(loadbalancer_db.Vip.admin_state_up == up)
qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up)
return [id for id, in qry]
def get_logical_device(self, context, pool_id=None, activate=True,
**kwargs):
with context.session.begin(subtransactions=True):
qry = context.session.query(loadbalancer_db.Pool)
qry = qry.filter_by(id=pool_id)
pool = qry.one()
if activate:
# set all resources to active
if pool.status in ACTIVE_PENDING:
pool.status = constants.ACTIVE
if pool.vip.status in ACTIVE_PENDING:
pool.vip.status = constants.ACTIVE
for m in pool.members:
if m.status in ACTIVE_PENDING:
m.status = constants.ACTIVE
for hm in pool.monitors:
if hm.healthmonitor.status in ACTIVE_PENDING:
hm.healthmonitor.status = constants.ACTIVE
if (pool.status != constants.ACTIVE
or pool.vip.status != constants.ACTIVE):
raise q_exc.Invalid(_('Expected active pool and vip'))
retval = {}
retval['pool'] = self.plugin._make_pool_dict(pool)
retval['vip'] = self.plugin._make_vip_dict(pool.vip)
retval['vip']['port'] = (
self.plugin._core_plugin._make_port_dict(pool.vip.port)
)
for fixed_ip in retval['vip']['port']['fixed_ips']:
fixed_ip['subnet'] = (
self.plugin._core_plugin.get_subnet(
context,
fixed_ip['subnet_id']
)
)
retval['members'] = [
self.plugin._make_member_dict(m)
for m in pool.members if m.status == constants.ACTIVE
]
retval['healthmonitors'] = [
self.plugin._make_health_monitor_dict(hm.healthmonitor)
for hm in pool.monitors
if hm.healthmonitor.status == constants.ACTIVE
]
return retval
def pool_destroyed(self, context, pool_id=None, host=None):
"""Agent confirmation hook that a pool has been destroyed.
This method exists for subclasses to change the deletion
behavior.
"""
pass
def plug_vip_port(self, context, port_id=None, host=None):
if not port_id:
return
try:
port = self.plugin._core_plugin.get_port(
context,
port_id
)
except q_exc.PortNotFound:
msg = _('Unable to find port %s to plug.')
LOG.debug(msg, port_id)
return
port['admin_state_up'] = True
port['device_owner'] = 'quantum:' + constants.LOADBALANCER
port['device_id'] = str(uuid.uuid5(uuid.NAMESPACE_DNS, str(host)))
self.plugin._core_plugin.update_port(
context,
port_id,
{'port': port}
)
def unplug_vip_port(self, context, port_id=None, host=None):
if not port_id:
return
try:
port = self.plugin._core_plugin.get_port(
context,
port_id
)
except q_exc.PortNotFound:
msg = _('Unable to find port %s to unplug. This can occur when '
'the Vip has been deleted first.')
LOG.debug(msg, port_id)
return
port['admin_state_up'] = False
port['device_owner'] = ''
port['device_id'] = ''
try:
self.plugin._core_plugin.update_port(
context,
port_id,
{'port': port}
)
except q_exc.PortNotFound:
msg = _('Unable to find port %s to unplug. This can occur when '
'the Vip has been deleted first.')
LOG.debug(msg, port_id)
def update_pool_stats(self, context, pool_id=None, stats=None, host=None):
# TODO(markmcclain): add stats collection
pass
class LoadBalancerAgentApi(proxy.RpcProxy):
"""Plugin side of plugin to agent RPC API."""
API_VERSION = '1.0'
def __init__(self, topic, host):
super(LoadBalancerAgentApi, self).__init__(topic, self.API_VERSION)
self.host = host
def reload_pool(self, context, pool_id):
return self.cast(
context,
self.make_msg('reload_pool', pool_id=pool_id, host=self.host),
topic=self.topic
)
def destroy_pool(self, context, pool_id):
return self.cast(
context,
self.make_msg('destroy_pool', pool_id=pool_id, host=self.host),
topic=self.topic
)
def modify_pool(self, context, pool_id):
return self.cast(
context,
self.make_msg('modify_pool', pool_id=pool_id, host=self.host),
topic=self.topic
)
cfg.CONF.register_opts(lbaas_plugin_opts, "LBAAS")
class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
@ -221,22 +49,22 @@ class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
supported_extension_aliases = ["lbaas"]
def __init__(self):
"""Do the initialization for the loadbalancer service plugin here."""
"""Initialization for the loadbalancer service plugin."""
qdbapi.register_models()
self._load_drivers()
self.callbacks = LoadBalancerCallbacks(self)
def _load_drivers(self):
"""Loads plugin-driver from configuration.
self.conn = rpc.create_connection(new=True)
self.conn.create_consumer(
topics.LOADBALANCER_PLUGIN,
self.callbacks.create_rpc_dispatcher(),
fanout=False)
self.conn.consume_in_thread()
self.agent_rpc = LoadBalancerAgentApi(
topics.LOADBALANCER_AGENT,
cfg.CONF.host
)
That method will later leverage service type framework
"""
try:
self.driver = importutils.import_object(
cfg.CONF.LBAAS.driver_fqn, self)
except ImportError:
LOG.exception(_("Error loading LBaaS driver %s"),
cfg.CONF.LBAAS.driver_fqn)
def get_plugin_type(self):
return constants.LOADBALANCER
@ -245,68 +73,89 @@ class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
return "Quantum LoadBalancer Service Plugin"
def create_vip(self, context, vip):
vip['vip']['status'] = constants.PENDING_CREATE
v = super(LoadBalancerPlugin, self).create_vip(context, vip)
self.agent_rpc.reload_pool(context, v['pool_id'])
self.driver.create_vip(context, v)
return v
def update_vip(self, context, id, vip):
if 'status' not in vip['vip']:
vip['vip']['status'] = constants.PENDING_UPDATE
old_vip = self.get_vip(context, id)
v = super(LoadBalancerPlugin, self).update_vip(context, id, vip)
if v['status'] in ACTIVE_PENDING:
self.agent_rpc.reload_pool(context, v['pool_id'])
else:
self.agent_rpc.destroy_pool(context, v['pool_id'])
self.driver.update_vip(context, old_vip, v)
return v
def delete_vip(self, context, id):
vip = self.get_vip(context, id)
def _delete_db_vip(self, context, id):
# proxy the call until plugin inherits from DBPlugin
super(LoadBalancerPlugin, self).delete_vip(context, id)
self.agent_rpc.destroy_pool(context, vip['pool_id'])
def delete_vip(self, context, id):
self.update_status(context, loadbalancer_db.Vip,
id, constants.PENDING_DELETE)
v = self.get_vip(context, id)
self.driver.delete_vip(context, v)
def create_pool(self, context, pool):
p = super(LoadBalancerPlugin, self).create_pool(context, pool)
# don't notify here because a pool needs a vip to be useful
self.driver.create_pool(context, p)
return p
def update_pool(self, context, id, pool):
if 'status' not in pool['pool']:
pool['pool']['status'] = constants.PENDING_UPDATE
old_pool = self.get_pool(context, id)
p = super(LoadBalancerPlugin, self).update_pool(context, id, pool)
if p['status'] in ACTIVE_PENDING:
if p['vip_id'] is not None:
self.agent_rpc.reload_pool(context, p['id'])
else:
self.agent_rpc.destroy_pool(context, p['id'])
self.driver.update_pool(context, old_pool, p)
return p
def delete_pool(self, context, id):
def _delete_db_pool(self, context, id):
# proxy the call until plugin inherits from DBPlugin
super(LoadBalancerPlugin, self).delete_pool(context, id)
self.agent_rpc.destroy_pool(context, id)
def delete_pool(self, context, id):
self.update_status(context, loadbalancer_db.Pool,
id, constants.PENDING_DELETE)
p = self.get_pool(context, id)
self.driver.delete_pool(context, p)
def create_member(self, context, member):
m = super(LoadBalancerPlugin, self).create_member(context, member)
self.agent_rpc.modify_pool(context, m['pool_id'])
self.driver.create_member(context, m)
return m
def update_member(self, context, id, member):
if 'status' not in member['member']:
member['member']['status'] = constants.PENDING_UPDATE
old_member = self.get_member(context, id)
m = super(LoadBalancerPlugin, self).update_member(context, id, member)
self.agent_rpc.modify_pool(context, m['pool_id'])
self.driver.update_member(context, old_member, m)
return m
def delete_member(self, context, id):
m = self.get_member(context, id)
def _delete_db_member(self, context, id):
# proxy the call until plugin inherits from DBPlugin
super(LoadBalancerPlugin, self).delete_member(context, id)
self.agent_rpc.modify_pool(context, m['pool_id'])
def delete_member(self, context, id):
self.update_status(context, loadbalancer_db.Member,
id, constants.PENDING_DELETE)
m = self.get_member(context, id)
self.driver.delete_member(context, m)
def create_health_monitor(self, context, health_monitor):
# no PENDING_CREATE status sinse healthmon is shared DB object
hm = super(LoadBalancerPlugin, self).create_health_monitor(
context,
health_monitor
)
self.driver.create_health_monitor(context, hm)
return hm
def update_health_monitor(self, context, id, health_monitor):
if 'status' not in health_monitor['health_monitor']:
health_monitor['health_monitor']['status'] = (
constants.PENDING_UPDATE
)
old_hm = self.get_health_monitor(context, id)
hm = super(LoadBalancerPlugin, self).update_health_monitor(
context,
id,
@ -316,24 +165,26 @@ class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
with context.session.begin(subtransactions=True):
qry = context.session.query(
loadbalancer_db.PoolMonitorAssociation
)
qry = qry.filter_by(monitor_id=hm['id'])
).filter_by(monitor_id=hm['id'])
for assoc in qry:
self.agent_rpc.modify_pool(context, assoc['pool_id'])
self.driver.update_health_monitor(context, old_hm, hm, assoc)
return hm
def _delete_db_pool_health_monitor(self, context, hm_id, pool_id):
super(LoadBalancerPlugin, self).delete_pool_health_monitor(context,
hm_id,
pool_id)
def delete_health_monitor(self, context, id):
with context.session.begin(subtransactions=True):
hm = self.get_health_monitor(context, id)
qry = context.session.query(
loadbalancer_db.PoolMonitorAssociation
)
qry = qry.filter_by(monitor_id=id)
pool_ids = [a['pool_id'] for a in qry]
super(LoadBalancerPlugin, self).delete_health_monitor(context, id)
for pid in pool_ids:
self.agent_rpc.modify_pool(context, pid)
).filter_by(monitor_id=id)
for assoc in qry:
self.driver.delete_pool_health_monitor(context,
hm,
assoc['pool_id'])
def create_pool_health_monitor(self, context, health_monitor, pool_id):
retval = super(LoadBalancerPlugin, self).create_pool_health_monitor(
@ -341,16 +192,41 @@ class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
health_monitor,
pool_id
)
self.agent_rpc.modify_pool(context, pool_id)
# open issue: PoolMonitorAssociation has no status field
# so we cant set the status to pending and let the driver
# set the real status of the association
self.driver.create_pool_health_monitor(
context, health_monitor, pool_id)
return retval
def delete_pool_health_monitor(self, context, id, pool_id):
retval = super(LoadBalancerPlugin, self).delete_pool_health_monitor(
context,
id,
pool_id
)
self.agent_rpc.modify_pool(context, pool_id)
hm = self.get_health_monitor(context, id)
self.driver.delete_pool_health_monitor(
context, hm, pool_id)
return retval
def stats(self, context, pool_id):
stats_data = self.driver.stats(context, pool_id)
# if we get something from the driver -
# update the db and return the value from db
# else - return what we have in db
if stats_data:
super(LoadBalancerPlugin, self)._update_pool_stats(
context,
pool_id,
stats_data
)
return super(LoadBalancerPlugin, self).stats(context,
pool_id)
def populate_vip_graph(self, context, vip):
"""Populate the vip with: pool, members, healthmonitors."""
pool = self.get_pool(context, vip['pool_id'])
vip['pool'] = pool
vip['members'] = [
self.get_member(context, member_id)
for member_id in pool['members']]
vip['health_monitors'] = [
self.get_health_monitor(context, hm_id)
for hm_id in pool['health_monitors']]
return vip

@ -39,7 +39,7 @@ LOG = logging.getLogger(__name__)
DB_CORE_PLUGIN_KLASS = 'quantum.db.db_base_plugin_v2.QuantumDbPluginV2'
DB_LB_PLUGIN_KLASS = (
"quantum.plugins.services.agent_loadbalancer."
"lbaas_plugin.LoadBalancerPlugin"
"plugin.LoadBalancerPlugin"
)
ROOTDIR = os.path.dirname(__file__) + '../../../..'
ETCDIR = os.path.join(ROOTDIR, 'etc')

@ -1,17 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Mark McClain, DreamHost

@ -20,7 +20,7 @@ import contextlib
import mock
from oslo.config import cfg
from quantum.plugins.services.agent_loadbalancer import agent
from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import agent
from quantum.tests import base

@ -20,7 +20,9 @@ import contextlib
import mock
from quantum.plugins.services.agent_loadbalancer.agent import manager
from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import (
agent_manager as manager
)
from quantum.tests import base
@ -145,8 +147,8 @@ class TestManager(base.BaseTestCase):
self.mock_importer = mock.patch.object(manager, 'importutils').start()
rpc_mock_cls = mock.patch(
'quantum.plugins.services.agent_loadbalancer.agent.api'
'.LbaasAgentApi'
'quantum.plugins.services.agent_loadbalancer.drivers'
'.haproxy.agent_api.LbaasAgentApi'
).start()
self.mgr = manager.LbaasAgentManager(mock_conf)

@ -18,7 +18,9 @@
import mock
from quantum.plugins.services.agent_loadbalancer.agent import api
from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import (
agent_api as api
)
from quantum.tests import base

@ -1,7 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack Foundation.
# All Rights Reserved.
#
# Copyright 2013 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -23,10 +22,11 @@ from quantum.common import exceptions
from quantum import context
from quantum.db.loadbalancer import loadbalancer_db as ldb
from quantum import manager
from quantum.openstack.common import importutils
from quantum.openstack.common import uuidutils
from quantum.plugins.common import constants
from quantum.plugins.services.agent_loadbalancer import plugin
from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import (
plugin_driver
)
from quantum.tests import base
from quantum.tests.unit.db.loadbalancer import test_db_loadbalancer
@ -42,25 +42,18 @@ class TestLoadBalancerPluginBase(
# we need access to loaded plugins to modify models
loaded_plugins = manager.QuantumManager().get_service_plugins()
# TODO(avishayb) - below is a little hack that helps the
# test to pass :-)
# the problem is the code below assumes the existance of 'callbacks'
# on the plugin. So the bypass is to load the plugin that has
# the callbacks as a member.The hack will be removed once we will
# have one lbaas plugin. (we currently have 2 - (Grizzly and Havana))
hack = True
if hack:
HACK_KLASS = (
"quantum.plugins.services.agent_loadbalancer."
"plugin.LoadBalancerPlugin"
)
self.plugin_instance = importutils.import_object(HACK_KLASS)
else:
self.plugin_instance = loaded_plugins[constants.LOADBALANCER]
self.callbacks = self.plugin_instance.callbacks
self.plugin_instance = loaded_plugins[constants.LOADBALANCER]
class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
def setUp(self):
super(TestLoadBalancerCallbacks, self).setUp()
self.callbacks = plugin_driver.LoadBalancerCallbacks(
self.plugin_instance
)
def test_get_ready_devices(self):
with self.vip() as vip:
ready = self.callbacks.get_ready_devices(
@ -242,7 +235,7 @@ class TestLoadBalancerAgentApi(base.BaseTestCase):
super(TestLoadBalancerAgentApi, self).setUp()
self.addCleanup(mock.patch.stopall)
self.api = plugin.LoadBalancerAgentApi('topic', 'host')
self.api = plugin_driver.LoadBalancerAgentApi('topic', 'host')
self.mock_cast = mock.patch.object(self.api, 'cast').start()
self.mock_msg = mock.patch.object(self.api, 'make_msg').start()
@ -273,3 +266,58 @@ class TestLoadBalancerAgentApi(base.BaseTestCase):
def test_modify_pool(self):
self._call_test_helper('modify_pool')
class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
def setUp(self):
self.log = mock.patch.object(plugin_driver, 'LOG')
api_cls = mock.patch.object(plugin_driver,
'LoadBalancerAgentApi').start()
super(TestLoadBalancerPluginNotificationWrapper, self).setUp()
self.mock_api = api_cls.return_value
self.addCleanup(mock.patch.stopall)
def test_create_vip(self):
with self.subnet() as subnet:
with self.pool(subnet=subnet) as pool:
with self.vip(pool=pool, subnet=subnet) as vip:
self.mock_api.reload_pool.assert_called_once_with(
mock.ANY,
vip['vip']['pool_id']
)
def test_update_vip(self):
with self.subnet() as subnet:
with self.pool(subnet=subnet) as pool:
with self.vip(pool=pool, subnet=subnet) as vip:
self.mock_api.reset_mock()
ctx = context.get_admin_context()
vip['vip'].pop('status')
new_vip = self.plugin_instance.update_vip(
ctx,
vip['vip']['id'],
vip
)
self.mock_api.reload_pool.assert_called_once_with(
mock.ANY,
vip['vip']['pool_id']
)
self.assertEqual(
new_vip['status'],
constants.PENDING_UPDATE
)
def test_delete_vip(self):
with self.subnet() as subnet:
with self.pool(subnet=subnet) as pool:
with self.vip(pool=pool, subnet=subnet, no_delete=True) as vip:
self.mock_api.reset_mock()
ctx = context.get_admin_context()
self.plugin_instance.delete_vip(ctx, vip['vip']['id'])
self.mock_api.destroy_pool.assert_called_once_with(
mock.ANY,
vip['vip']['pool_id']
)