Stop using legacy facade
Introduce get_reader_session() and get_writer_session() and replace get_session() with them. Mark get_session as depricated. Stop using get_engine from legacy facade. Use writer engine for places where it is required. Partially-Implements blueprint: enginefacade-switch Change-Id: I28b741bfa27bf04cbe273586e6e3e00e14fbe683
This commit is contained in:
parent
3987159db8
commit
4f17f70089
@ -151,7 +151,7 @@ class Context(ContextBaseWithSession):
|
||||
if hasattr(super(Context, self), 'session'):
|
||||
return super(Context, self).session
|
||||
if self._session is None:
|
||||
self._session = db_api.get_session()
|
||||
self._session = db_api.get_writer_session()
|
||||
return self._session
|
||||
|
||||
|
||||
|
@ -16,6 +16,7 @@
|
||||
import contextlib
|
||||
import copy
|
||||
|
||||
from debtcollector import removals
|
||||
from neutron_lib import exceptions
|
||||
from oslo_config import cfg
|
||||
from oslo_db import api as oslo_db_api
|
||||
@ -200,6 +201,9 @@ def exc_to_retry(etypes):
|
||||
|
||||
#TODO(akamyshnikova): when all places in the code, which use sessions/
|
||||
# connections will be updated, this won't be needed
|
||||
@removals.remove(version='Ocata', removal_version='Pike',
|
||||
message="Usage of legacy facade is deprecated. Use "
|
||||
"get_reader_session or get_writer_session instead.")
|
||||
def get_session(autocommit=True, expire_on_commit=False, use_slave=False):
|
||||
"""Helper method to grab session."""
|
||||
return context_manager.get_legacy_facade().get_session(
|
||||
@ -207,6 +211,16 @@ def get_session(autocommit=True, expire_on_commit=False, use_slave=False):
|
||||
use_slave=use_slave)
|
||||
|
||||
|
||||
def get_reader_session():
|
||||
"""Helper to get reader session"""
|
||||
return context_manager.reader.get_sessionmaker()()
|
||||
|
||||
|
||||
def get_writer_session():
|
||||
"""Helper to get writer session"""
|
||||
return context_manager.writer.get_sessionmaker()()
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def autonested_transaction(sess):
|
||||
"""This is a convenience method to not bother with 'nested' parameter."""
|
||||
|
@ -711,7 +711,7 @@ def is_ha_router(router):
|
||||
|
||||
|
||||
def is_ha_router_port(context, device_owner, router_id):
|
||||
session = db_api.get_session()
|
||||
session = db_api.get_reader_session()
|
||||
if device_owner == constants.DEVICE_OWNER_HA_REPLICATED_INT:
|
||||
return True
|
||||
elif device_owner == constants.DEVICE_OWNER_ROUTER_SNAT:
|
||||
|
@ -74,7 +74,7 @@ class L2populationMechanismDriver(api.MechanismDriver):
|
||||
fdb_entries = self._get_agent_fdb(
|
||||
context, context.bottom_bound_segment, port, agent_host)
|
||||
if port['device_owner'] in l2pop_db.HA_ROUTER_PORTS and fdb_entries:
|
||||
session = db_api.get_session()
|
||||
session = db_api.get_reader_session()
|
||||
network_id = port['network_id']
|
||||
other_fdb_ports = self._get_ha_port_agents_fdb(
|
||||
session, network_id, port['device_id'])
|
||||
@ -111,7 +111,7 @@ class L2populationMechanismDriver(api.MechanismDriver):
|
||||
if not agent_host:
|
||||
return
|
||||
|
||||
agent_ip = l2pop_db.get_agent_ip_by_host(db_api.get_session(),
|
||||
agent_ip = l2pop_db.get_agent_ip_by_host(db_api.get_reader_session(),
|
||||
agent_host)
|
||||
|
||||
orig_mac_ip = [l2pop_rpc.PortInfo(mac_address=port['mac_address'],
|
||||
@ -255,7 +255,7 @@ class L2populationMechanismDriver(api.MechanismDriver):
|
||||
def update_port_up(self, context):
|
||||
port = context.current
|
||||
agent_host = context.host
|
||||
session = db_api.get_session()
|
||||
session = db_api.get_reader_session()
|
||||
agent = l2pop_db.get_agent_by_host(session, agent_host)
|
||||
if not agent:
|
||||
LOG.warning(_LW("Unable to retrieve active L2 agent on host %s"),
|
||||
@ -306,11 +306,12 @@ class L2populationMechanismDriver(api.MechanismDriver):
|
||||
|
||||
network_id = port['network_id']
|
||||
|
||||
session = db_api.get_session()
|
||||
session = db_api.get_reader_session()
|
||||
agent_active_ports = l2pop_db.get_agent_network_active_port_count(
|
||||
session, agent_host, network_id)
|
||||
|
||||
agent = l2pop_db.get_agent_by_host(db_api.get_session(), agent_host)
|
||||
agent = l2pop_db.get_agent_by_host(session,
|
||||
agent_host)
|
||||
if not self._validate_segment(segment, port['id'], agent):
|
||||
return
|
||||
|
||||
|
@ -28,6 +28,7 @@ from sqlalchemy import or_
|
||||
|
||||
from neutron._i18n import _, _LI, _LW
|
||||
from neutron.common import topics
|
||||
from neutron import context
|
||||
from neutron.db import api as db_api
|
||||
from neutron.plugins.common import constants as p_const
|
||||
from neutron.plugins.common import utils as plugin_utils
|
||||
@ -146,12 +147,12 @@ class _TunnelTypeDriverBase(helpers.SegmentTypeDriver):
|
||||
|
||||
tunnel_id_getter = operator.attrgetter(self.segmentation_key)
|
||||
tunnel_col = getattr(self.model, self.segmentation_key)
|
||||
session = db_api.get_session()
|
||||
with session.begin(subtransactions=True):
|
||||
ctx = context.get_admin_context()
|
||||
with db_api.context_manager.writer.using(ctx):
|
||||
# remove from table unallocated tunnels not currently allocatable
|
||||
# fetch results as list via all() because we'll be iterating
|
||||
# through them twice
|
||||
allocs = (session.query(self.model).
|
||||
allocs = (ctx.session.query(self.model).
|
||||
with_lockmode("update").all())
|
||||
|
||||
# collect those vnis that needs to be deleted from db
|
||||
@ -161,7 +162,7 @@ class _TunnelTypeDriverBase(helpers.SegmentTypeDriver):
|
||||
# Immediately delete tunnels in chunks. This leaves no work for
|
||||
# flush at the end of transaction
|
||||
for chunk in chunks(to_remove, self.BULK_SIZE):
|
||||
session.query(self.model).filter(
|
||||
ctx.session.query(self.model).filter(
|
||||
tunnel_col.in_(chunk)).delete(synchronize_session=False)
|
||||
|
||||
# collect vnis that need to be added
|
||||
@ -170,7 +171,7 @@ class _TunnelTypeDriverBase(helpers.SegmentTypeDriver):
|
||||
for chunk in chunks(missings, self.BULK_SIZE):
|
||||
bulk = [{self.segmentation_key: x, 'allocated': False}
|
||||
for x in chunk]
|
||||
session.execute(self.model.__table__.insert(), bulk)
|
||||
ctx.session.execute(self.model.__table__.insert(), bulk)
|
||||
|
||||
def is_partial_segment(self, segment):
|
||||
return segment.get(api.SEGMENTATION_ID) is None
|
||||
@ -346,40 +347,37 @@ class EndpointTunnelTypeDriver(ML2TunnelTypeDriver):
|
||||
|
||||
def get_endpoint_by_host(self, host):
|
||||
LOG.debug("get_endpoint_by_host() called for host %s", host)
|
||||
session = db_api.get_session()
|
||||
session = db_api.get_reader_session()
|
||||
return (session.query(self.endpoint_model).
|
||||
filter_by(host=host).first())
|
||||
|
||||
def get_endpoint_by_ip(self, ip):
|
||||
LOG.debug("get_endpoint_by_ip() called for ip %s", ip)
|
||||
session = db_api.get_session()
|
||||
session = db_api.get_reader_session()
|
||||
return (session.query(self.endpoint_model).
|
||||
filter_by(ip_address=ip).first())
|
||||
|
||||
def delete_endpoint(self, ip):
|
||||
LOG.debug("delete_endpoint() called for ip %s", ip)
|
||||
session = db_api.get_session()
|
||||
with session.begin(subtransactions=True):
|
||||
(session.query(self.endpoint_model).
|
||||
filter_by(ip_address=ip).delete())
|
||||
session = db_api.get_writer_session()
|
||||
session.query(self.endpoint_model).filter_by(ip_address=ip).delete()
|
||||
|
||||
def delete_endpoint_by_host_or_ip(self, host, ip):
|
||||
LOG.debug("delete_endpoint_by_host_or_ip() called for "
|
||||
"host %(host)s or %(ip)s", {'host': host, 'ip': ip})
|
||||
session = db_api.get_session()
|
||||
with session.begin(subtransactions=True):
|
||||
session.query(self.endpoint_model).filter(
|
||||
or_(self.endpoint_model.host == host,
|
||||
self.endpoint_model.ip_address == ip)).delete()
|
||||
session = db_api.get_writer_session()
|
||||
session.query(self.endpoint_model).filter(
|
||||
or_(self.endpoint_model.host == host,
|
||||
self.endpoint_model.ip_address == ip)).delete()
|
||||
|
||||
def _get_endpoints(self):
|
||||
LOG.debug("_get_endpoints() called")
|
||||
session = db_api.get_session()
|
||||
session = db_api.get_reader_session()
|
||||
return session.query(self.endpoint_model)
|
||||
|
||||
def _add_endpoint(self, ip, host, **kwargs):
|
||||
LOG.debug("_add_endpoint() called for ip %s", ip)
|
||||
session = db_api.get_session()
|
||||
session = db_api.get_writer_session()
|
||||
try:
|
||||
endpoint = self.endpoint_model(ip_address=ip, host=host, **kwargs)
|
||||
endpoint.save(session)
|
||||
|
@ -23,6 +23,7 @@ from six import moves
|
||||
from neutron._i18n import _, _LE, _LI, _LW
|
||||
from neutron.common import _deprecate
|
||||
from neutron.conf.plugins.ml2.drivers import driver_type
|
||||
from neutron import context
|
||||
from neutron.db import api as db_api
|
||||
from neutron.db.models.plugins.ml2 import vlanallocation as vlan_alloc_model
|
||||
from neutron.plugins.common import constants as p_const
|
||||
@ -64,11 +65,11 @@ class VlanTypeDriver(helpers.SegmentTypeDriver):
|
||||
|
||||
@db_api.retry_db_errors
|
||||
def _sync_vlan_allocations(self):
|
||||
session = db_api.get_session()
|
||||
with session.begin(subtransactions=True):
|
||||
ctx = context.get_admin_context()
|
||||
with db_api.context_manager.writer.using(ctx):
|
||||
# get existing allocations for all physical networks
|
||||
allocations = dict()
|
||||
allocs = (session.query(vlan_alloc_model.VlanAllocation).
|
||||
allocs = (ctx.session.query(vlan_alloc_model.VlanAllocation).
|
||||
with_lockmode('update'))
|
||||
for alloc in allocs:
|
||||
if alloc.physical_network not in allocations:
|
||||
@ -101,7 +102,7 @@ class VlanTypeDriver(helpers.SegmentTypeDriver):
|
||||
{'vlan_id': alloc.vlan_id,
|
||||
'physical_network':
|
||||
physical_network})
|
||||
session.delete(alloc)
|
||||
ctx.session.delete(alloc)
|
||||
del allocations[physical_network]
|
||||
|
||||
# add missing allocatable vlans to table
|
||||
@ -110,7 +111,7 @@ class VlanTypeDriver(helpers.SegmentTypeDriver):
|
||||
physical_network=physical_network,
|
||||
vlan_id=vlan_id,
|
||||
allocated=False)
|
||||
session.add(alloc)
|
||||
ctx.session.add(alloc)
|
||||
|
||||
# remove from table unallocated vlans for any unconfigured
|
||||
# physical networks
|
||||
@ -122,7 +123,7 @@ class VlanTypeDriver(helpers.SegmentTypeDriver):
|
||||
{'vlan_id': alloc.vlan_id,
|
||||
'physical_network':
|
||||
alloc.physical_network})
|
||||
session.delete(alloc)
|
||||
ctx.session.delete(alloc)
|
||||
|
||||
def get_type(self):
|
||||
return p_const.TYPE_VLAN
|
||||
|
@ -590,20 +590,16 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
attributes.SUBNETS, ['_ml2_md_extend_subnet_dict'])
|
||||
|
||||
def _ml2_md_extend_network_dict(self, result, netdb):
|
||||
session = db_api.get_session()
|
||||
with session.begin(subtransactions=True):
|
||||
self.extension_manager.extend_network_dict(session, netdb, result)
|
||||
session = db_api.get_reader_session()
|
||||
self.extension_manager.extend_network_dict(session, netdb, result)
|
||||
|
||||
def _ml2_md_extend_port_dict(self, result, portdb):
|
||||
session = db_api.get_session()
|
||||
with session.begin(subtransactions=True):
|
||||
self.extension_manager.extend_port_dict(session, portdb, result)
|
||||
session = db_api.get_reader_session()
|
||||
self.extension_manager.extend_port_dict(session, portdb, result)
|
||||
|
||||
def _ml2_md_extend_subnet_dict(self, result, subnetdb):
|
||||
session = db_api.get_session()
|
||||
with session.begin(subtransactions=True):
|
||||
self.extension_manager.extend_subnet_dict(
|
||||
session, subnetdb, result)
|
||||
session = db_api.get_reader_session()
|
||||
self.extension_manager.extend_subnet_dict(session, subnetdb, result)
|
||||
|
||||
# Note - The following hook methods have "ml2" in their names so
|
||||
# that they are not called twice during unit tests due to global
|
||||
|
@ -6466,7 +6466,7 @@ class DbOperationBoundMixin(object):
|
||||
def _event_incrementer(*args, **kwargs):
|
||||
self._db_execute_count += 1
|
||||
|
||||
engine = db_api.context_manager.get_legacy_facade().get_engine()
|
||||
engine = db_api.context_manager.writer.get_engine()
|
||||
event.listen(engine, 'after_execute', _event_incrementer)
|
||||
self.addCleanup(event.remove, engine, 'after_execute',
|
||||
_event_incrementer)
|
||||
|
@ -458,7 +458,7 @@ class FlavorPluginTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase,
|
||||
self.service_manager.add_provider_configuration(
|
||||
provider.split(':')[0], provconf.ProviderConfiguration())
|
||||
|
||||
dbapi.context_manager.get_legacy_facade().get_engine()
|
||||
dbapi.context_manager.writer.get_engine()
|
||||
|
||||
def _create_flavor(self, description=None):
|
||||
flavor = {'flavor': {'name': 'GOLD',
|
||||
|
@ -126,7 +126,7 @@ class TestL3GwModeMixin(testlib_api.SqlTestCase):
|
||||
self.context = mock_context.get_admin_context()
|
||||
# This ensure also calls to elevated work in unit tests
|
||||
self.context.elevated.return_value = self.context
|
||||
self.context.session = db_api.get_session()
|
||||
self.context.session = db_api.get_writer_session()
|
||||
# Create sample data for tests
|
||||
self.ext_net_id = _uuid()
|
||||
self.int_net_id = _uuid()
|
||||
|
@ -1166,7 +1166,7 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
|
||||
raise db_exc.DBDuplicateEntry()
|
||||
|
||||
listener = IPAllocationsGrenade()
|
||||
engine = db_api.context_manager.get_legacy_facade().get_engine()
|
||||
engine = db_api.context_manager.writer.get_engine()
|
||||
event.listen(engine, 'before_cursor_execute', listener.execute)
|
||||
event.listen(engine, 'commit', listener.commit)
|
||||
self.addCleanup(event.remove, engine, 'before_cursor_execute',
|
||||
|
@ -60,7 +60,7 @@ class TestResource(base.DietTestCase):
|
||||
class TestTrackedResource(testlib_api.SqlTestCaseLight):
|
||||
|
||||
def _add_data(self, tenant_id=None):
|
||||
session = db_api.get_session()
|
||||
session = db_api.get_writer_session()
|
||||
with session.begin():
|
||||
tenant_id = tenant_id or self.tenant_id
|
||||
session.add(test_quota.MehModel(
|
||||
@ -71,7 +71,7 @@ class TestTrackedResource(testlib_api.SqlTestCaseLight):
|
||||
tenant_id=tenant_id))
|
||||
|
||||
def _delete_data(self):
|
||||
session = db_api.get_session()
|
||||
session = db_api.get_writer_session()
|
||||
with session.begin():
|
||||
query = session.query(test_quota.MehModel).filter_by(
|
||||
tenant_id=self.tenant_id)
|
||||
@ -79,7 +79,7 @@ class TestTrackedResource(testlib_api.SqlTestCaseLight):
|
||||
session.delete(item)
|
||||
|
||||
def _update_data(self):
|
||||
session = db_api.get_session()
|
||||
session = db_api.get_writer_session()
|
||||
with session.begin():
|
||||
query = session.query(test_quota.MehModel).filter_by(
|
||||
tenant_id=self.tenant_id)
|
||||
|
@ -67,7 +67,7 @@ class FakeContext(context.ContextBaseWithSession):
|
||||
@property
|
||||
def session(self):
|
||||
if self._session is None:
|
||||
self._session = db_api.get_session()
|
||||
self._session = db_api.get_writer_session()
|
||||
return self._session
|
||||
|
||||
|
||||
|
@ -25,7 +25,7 @@ class TestNeutronContext(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestNeutronContext, self).setUp()
|
||||
db_api = 'neutron.db.api.get_session'
|
||||
db_api = 'neutron.db.api.get_writer_session'
|
||||
self._db_api_session_patcher = mock.patch(db_api)
|
||||
self.db_api_session = self._db_api_session_patcher.start()
|
||||
|
||||
|
@ -106,7 +106,7 @@ class SqlFixture(fixtures.Fixture):
|
||||
|
||||
db_api.context_manager._root_factory = self.enginefacade_factory
|
||||
|
||||
engine = db_api.context_manager.get_legacy_facade().get_engine()
|
||||
engine = db_api.context_manager.writer.get_engine()
|
||||
|
||||
self.addCleanup(
|
||||
lambda: setattr(
|
||||
|
Loading…
x
Reference in New Issue
Block a user