diff --git a/neutron/agent/common/ovs_lib.py b/neutron/agent/common/ovs_lib.py
index b1c15677cd2..f96e31c92a8 100644
--- a/neutron/agent/common/ovs_lib.py
+++ b/neutron/agent/common/ovs_lib.py
@@ -30,7 +30,7 @@ import tenacity
 from neutron._i18n import _, _LE, _LI, _LW
 from neutron.agent.common import ip_lib
 from neutron.agent.common import utils
-from neutron.agent.ovsdb import api as ovsdb
+from neutron.agent.ovsdb import api as ovsdb_api
 from neutron.conf.agent import ovs_conf
 from neutron.plugins.common import constants as p_const
 from neutron.plugins.ml2.drivers.openvswitch.agent.common \
@@ -106,7 +106,7 @@ class BaseOVS(object):
 
     def __init__(self):
         self.vsctl_timeout = cfg.CONF.ovs_vsctl_timeout
-        self.ovsdb = ovsdb.API.get(self)
+        self.ovsdb = ovsdb_api.from_config(self)
 
     def add_manager(self, connection_uri, timeout=_SENTINEL):
         """Have ovsdb-server listen for manager connections
diff --git a/neutron/agent/ovsdb/api.py b/neutron/agent/ovsdb/api.py
index 31f87ca1906..81fc59cbc4f 100644
--- a/neutron/agent/ovsdb/api.py
+++ b/neutron/agent/ovsdb/api.py
@@ -12,20 +12,26 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-import abc
 import collections
-import contextlib
 import uuid
 
+from debtcollector import moves
 from oslo_config import cfg
 from oslo_utils import importutils
-import six
+from ovsdbapp import api
+from ovsdbapp import exceptions
 
 from neutron._i18n import _
 
+API = moves.moved_class(api.API, 'API', __name__)
+Command = moves.moved_class(api.Command, 'Command', __name__)
+Transaction = moves.moved_class(api.Transaction, 'Transaction', __name__)
+TimeoutException = moves.moved_class(exceptions.TimeoutException,
+                                     'TimeoutException', __name__)
+
 interface_map = {
-    'vsctl': 'neutron.agent.ovsdb.impl_vsctl.OvsdbVsctl',
-    'native': 'neutron.agent.ovsdb.impl_idl.NeutronOvsdbIdl',
+    'vsctl': 'neutron.agent.ovsdb.impl_vsctl',
+    'native': 'neutron.agent.ovsdb.impl_idl',
 }
 
 OPTS = [
@@ -44,400 +50,11 @@ OPTS = [
 cfg.CONF.register_opts(OPTS, 'OVS')
 
 
-@six.add_metaclass(abc.ABCMeta)
-class Command(object):
-    """An OVSDB command that can be executed in a transaction
-
-    :attr result: The result of executing the command in a transaction
-    """
-
-    @abc.abstractmethod
-    def execute(self, **transaction_options):
-        """Immediately execute an OVSDB command
-
-        This implicitly creates a transaction with the passed options and then
-        executes it, returning the value of the executed transaction
-
-        :param transaction_options: Options to pass to the transaction
-        """
-
-
-@six.add_metaclass(abc.ABCMeta)
-class Transaction(object):
-    @abc.abstractmethod
-    def commit(self):
-        """Commit the transaction to OVSDB"""
-
-    @abc.abstractmethod
-    def add(self, command):
-        """Append an OVSDB operation to the transaction"""
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, exc_type, exc_val, tb):
-        if exc_type is None:
-            self.result = self.commit()
-
-
-@six.add_metaclass(abc.ABCMeta)
-class API(object):
-    def __init__(self, context):
-        self.context = context
-        self._nested_txn = None
-
-    @staticmethod
-    def get(context, iface_name=None):
-        """Return the configured OVSDB API implementation"""
-        iface = importutils.import_class(
-            interface_map[iface_name or cfg.CONF.OVS.ovsdb_interface])
-        return iface(context)
-
-    @abc.abstractmethod
-    def create_transaction(self, check_error=False, log_errors=True, **kwargs):
-        """Create a transaction
-
-        :param check_error: Allow the transaction to raise an exception?
-        :type check_error:  bool
-        :param log_errors:  Log an error if the transaction fails?
-        :type log_errors:   bool
-        :returns: A new transaction
-        :rtype: :class:`Transaction`
-        """
-
-    @contextlib.contextmanager
-    def transaction(self, check_error=False, log_errors=True, **kwargs):
-        """Create a transaction context.
-
-        :param check_error: Allow the transaction to raise an exception?
-        :type check_error:  bool
-        :param log_errors:  Log an error if the transaction fails?
-        :type log_errors:   bool
-        :returns: Either a new transaction or an existing one.
-        :rtype: :class:`Transaction`
-        """
-        if self._nested_txn:
-            yield self._nested_txn
-        else:
-            with self.create_transaction(
-                    check_error, log_errors, **kwargs) as txn:
-                self._nested_txn = txn
-                try:
-                    yield txn
-                finally:
-                    self._nested_txn = None
-
-    @abc.abstractmethod
-    def add_manager(self, connection_uri):
-        """Create a command to add a Manager to the OVS switch
-
-        This API will add a new manager without overriding the existing ones.
-
-        :param connection_uri: target to which manager needs to be set
-        :type connection_uri: string, see ovs-vsctl manpage for format
-        :returns:           :class:`Command` with no result
-        """
-
-    @abc.abstractmethod
-    def get_manager(self):
-        """Create a command to get Manager list from the OVS switch
-
-        :returns: :class:`Command` with list of Manager names result
-        """
-
-    @abc.abstractmethod
-    def remove_manager(self, connection_uri):
-        """Create a command to remove a Manager from the OVS switch
-
-        This API will remove the manager configured on the OVS switch.
-
-        :param connection_uri: target identifying the manager uri that
-                               needs to be removed.
-        :type connection_uri: string, see ovs-vsctl manpage for format
-        :returns:           :class:`Command` with no result
-        """
-
-    @abc.abstractmethod
-    def add_br(self, name, may_exist=True, datapath_type=None):
-        """Create a command to add an OVS bridge
-
-        :param name:            The name of the bridge
-        :type name:             string
-        :param may_exist:       Do not fail if bridge already exists
-        :type may_exist:        bool
-        :param datapath_type:   The datapath_type of the bridge
-        :type datapath_type:    string
-        :returns:               :class:`Command` with no result
-        """
-
-    @abc.abstractmethod
-    def del_br(self, name, if_exists=True):
-        """Create a command to delete an OVS bridge
-
-        :param name:      The name of the bridge
-        :type name:       string
-        :param if_exists: Do not fail if the bridge does not exist
-        :type if_exists:  bool
-        :returns:        :class:`Command` with no result
-        """
-
-    @abc.abstractmethod
-    def br_exists(self, name):
-        """Create a command to check if an OVS bridge exists
-
-        :param name: The name of the bridge
-        :type name:  string
-        :returns:    :class:`Command` with bool result
-        """
-
-    @abc.abstractmethod
-    def port_to_br(self, name):
-        """Create a command to return the name of the bridge with the port
-
-        :param name: The name of the OVS port
-        :type name:  string
-        :returns:    :class:`Command` with bridge name result
-        """
-
-    @abc.abstractmethod
-    def iface_to_br(self, name):
-        """Create a command to return the name of the bridge with the interface
-
-        :param name: The name of the OVS interface
-        :type name:  string
-        :returns:    :class:`Command` with bridge name result
-        """
-
-    @abc.abstractmethod
-    def list_br(self):
-        """Create a command to return the current list of OVS bridge names
-
-        :returns: :class:`Command` with list of bridge names result
-        """
-
-    @abc.abstractmethod
-    def br_get_external_id(self, name, field):
-        """Create a command to return a field from the Bridge's external_ids
-
-        :param name:  The name of the OVS Bridge
-        :type name:   string
-        :param field: The external_ids field to return
-        :type field:  string
-        :returns:     :class:`Command` with field value result
-        """
-
-    @abc.abstractmethod
-    def db_create(self, table, **col_values):
-        """Create a command to create new record
-
-        :param table:      The OVS table containing the record to be created
-        :type table:       string
-        :param col_values: The columns and their associated values
-                           to be set after create
-        :type col_values:  Dictionary of columns id's and values
-        :returns:          :class:`Command` with no result
-        """
-
-    @abc.abstractmethod
-    def db_destroy(self, table, record):
-        """Create a command to destroy a record
-
-        :param table:      The OVS table containing the record to be destroyed
-        :type table:       string
-        :param record:     The record id (name/uuid) to be destroyed
-        :type record:      uuid/string
-        :returns:          :class:`Command` with no result
-        """
-
-    @abc.abstractmethod
-    def db_set(self, table, record, *col_values):
-        """Create a command to set fields in a record
-
-        :param table:      The OVS table containing the record to be modified
-        :type table:       string
-        :param record:     The record id (name/uuid) to be modified
-        :type table:       string
-        :param col_values: The columns and their associated values
-        :type col_values:  Tuples of (column, value). Values may be atomic
-                           values or unnested sequences/mappings
-        :returns:          :class:`Command` with no result
-        """
-        # TODO(twilson) Consider handling kwargs for arguments where order
-        # doesn't matter. Though that would break the assert_called_once_with
-        # unit tests
-
-    @abc.abstractmethod
-    def db_add(self, table, record, column, *values):
-        """Create a command to add a value to a record
-
-        Adds each value or key-value pair to column in record in table. If
-        column is a map, then each value will be a dict, otherwise a base type.
-        If key already exists in a map column, then the current value is not
-        replaced (use the set command to replace an existing value).
-
-        :param table:  The OVS table containing the record to be modified
-        :type table:   string
-        :param record: The record id (name/uuid) to modified
-        :type record:  string
-        :param column: The column name to be modified
-        :type column:  string
-        :param values: The values to be added to the column
-        :type values:  The base type of the column. If column is a map, then
-                       a dict containing the key name and the map's value type
-        :returns:     :class:`Command` with no result
-        """
-
-    @abc.abstractmethod
-    def db_clear(self, table, record, column):
-        """Create a command to clear a field's value in a record
-
-        :param table:  The OVS table containing the record to be modified
-        :type table:   string
-        :param record: The record id (name/uuid) to be modified
-        :type record:  string
-        :param column: The column whose value should be cleared
-        :type column:  string
-        :returns:      :class:`Command` with no result
-        """
-
-    @abc.abstractmethod
-    def db_get(self, table, record, column):
-        """Create a command to return a field's value in a record
-
-        :param table:  The OVS table containing the record to be queried
-        :type table:   string
-        :param record: The record id (name/uuid) to be queried
-        :type record:  string
-        :param column: The column whose value should be returned
-        :type column:  string
-        :returns:      :class:`Command` with the field's value result
-        """
-
-    @abc.abstractmethod
-    def db_list(self, table, records=None, columns=None, if_exists=False):
-        """Create a command to return a list of OVSDB records
-
-        :param table:     The OVS table to query
-        :type table:      string
-        :param records:   The records to return values from
-        :type records:    list of record ids (names/uuids)
-        :param columns:   Limit results to only columns, None means all columns
-        :type columns:    list of column names or None
-        :param if_exists: Do not fail if the record does not exist
-        :type if_exists:  bool
-        :returns:         :class:`Command` with [{'column', value}, ...] result
-        """
-
-    @abc.abstractmethod
-    def db_find(self, table, *conditions, **kwargs):
-        """Create a command to return find OVSDB records matching conditions
-
-        :param table:     The OVS table to query
-        :type table:      string
-        :param conditions:The conditions to satisfy the query
-        :type conditions: 3-tuples containing (column, operation, match)
-                          Type of 'match' parameter MUST be identical to column
-                          type
-                          Examples:
-                              atomic: ('tag', '=', 7)
-                              map: ('external_ids' '=', {'iface-id': 'xxx'})
-                              field exists?
-                                  ('external_ids', '!=', {'iface-id', ''})
-                              set contains?:
-                                  ('protocols', '{>=}', 'OpenFlow13')
-                          See the ovs-vsctl man page for more operations
-        :param columns:   Limit results to only columns, None means all columns
-        :type columns:    list of column names or None
-        :returns:         :class:`Command` with [{'column', value}, ...] result
-        """
-
-    @abc.abstractmethod
-    def set_controller(self, bridge, controllers):
-        """Create a command to set an OVS bridge's OpenFlow controllers
-
-        :param bridge:      The name of the bridge
-        :type bridge:       string
-        :param controllers: The controller strings
-        :type controllers:  list of strings, see ovs-vsctl manpage for format
-        :returns:           :class:`Command` with no result
-        """
-
-    @abc.abstractmethod
-    def del_controller(self, bridge):
-        """Create a command to clear an OVS bridge's OpenFlow controllers
-
-        :param bridge: The name of the bridge
-        :type bridge:  string
-        :returns:      :class:`Command` with no result
-        """
-
-    @abc.abstractmethod
-    def get_controller(self, bridge):
-        """Create a command to return an OVS bridge's OpenFlow controllers
-
-        :param bridge: The name of the bridge
-        :type bridge:  string
-        :returns:      :class:`Command` with list of controller strings result
-        """
-
-    @abc.abstractmethod
-    def set_fail_mode(self, bridge, mode):
-        """Create a command to set an OVS bridge's failure mode
-
-        :param bridge: The name of the bridge
-        :type bridge:  string
-        :param mode:   The failure mode
-        :type mode:    "secure" or "standalone"
-        :returns:      :class:`Command` with no result
-        """
-
-    @abc.abstractmethod
-    def add_port(self, bridge, port, may_exist=True):
-        """Create a command to add a port to an OVS bridge
-
-        :param bridge:    The name of the bridge
-        :type bridge:     string
-        :param port:      The name of the port
-        :type port:       string
-        :param may_exist: Do not fail if the port already exists
-        :type may_exist:  bool
-        :returns:         :class:`Command` with no result
-        """
-
-    @abc.abstractmethod
-    def del_port(self, port, bridge=None, if_exists=True):
-        """Create a command to delete a port an OVS port
-
-        :param port:      The name of the port
-        :type port:       string
-        :param bridge:    Only delete port if it is attached to this bridge
-        :type bridge:     string
-        :param if_exists: Do not fail if the port does not exist
-        :type if_exists:  bool
-        :returns:         :class:`Command` with no result
-        """
-
-    @abc.abstractmethod
-    def list_ports(self, bridge):
-        """Create a command to list the names of ports on a bridge
-
-        :param bridge: The name of the bridge
-        :type bridge:  string
-        :returns:      :class:`Command` with list of port names result
-        """
-
-    @abc.abstractmethod
-    def list_ifaces(self, bridge):
-        """Create a command to list the names of interfaces on a bridge
-
-        :param bridge: The name of the bridge
-        :type bridge:  string
-        :returns:      :class:`Command` with list of interfaces names result
-        """
-
-
-class TimeoutException(Exception):
-    pass
+def from_config(context, iface_name=None):
+    """Return the configured OVSDB API implementation"""
+    iface = importutils.import_module(
+        interface_map[iface_name or cfg.CONF.OVS.ovsdb_interface])
+    return iface.api_factory(context)
 
 
 def val_to_py(val):
diff --git a/neutron/agent/ovsdb/impl_idl.py b/neutron/agent/ovsdb/impl_idl.py
index fcae77d8880..824f4418e0b 100644
--- a/neutron/agent/ovsdb/impl_idl.py
+++ b/neutron/agent/ovsdb/impl_idl.py
@@ -12,289 +12,32 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-import time
-
-from neutron_lib import exceptions
+from debtcollector import moves
 from oslo_config import cfg
-from oslo_log import log as logging
-from oslo_utils import excutils
-from ovs.db import idl
-from six.moves import queue as Queue
+from ovsdbapp.schema.open_vswitch import impl_idl
 
-from neutron._i18n import _, _LE
-from neutron.agent.ovsdb import api
-from neutron.agent.ovsdb.native import commands as cmd
 from neutron.agent.ovsdb.native import connection
-from neutron.agent.ovsdb.native import idlutils
 from neutron.agent.ovsdb.native import vlog
 
+NeutronOVSDBTransaction = moves.moved_class(
+    impl_idl.OvsVsctlTransaction,
+    'NeutronOVSDBTransaction',
+    __name__)
 
-cfg.CONF.import_opt('ovs_vsctl_timeout', 'neutron.agent.common.ovs_lib')
+VswitchdInterfaceAddException = moves.moved_class(
+    impl_idl.VswitchdInterfaceAddException,
+    'VswitchdInterfaceAddException',
+    __name__)
 
-LOG = logging.getLogger(__name__)
+_connection = connection.Connection(idl_factory=connection.idl_factory,
+                                    timeout=cfg.CONF.ovs_vsctl_timeout)
 
 
-class VswitchdInterfaceAddException(exceptions.NeutronException):
-    message = _("Failed to add interfaces: %(ifaces)s")
+def api_factory(context):
+    return NeutronOvsdbIdl(_connection)
 
 
-class Transaction(api.Transaction):
-    def __init__(self, api, ovsdb_connection, timeout,
-                 check_error=False, log_errors=True):
-        self.api = api
-        self.check_error = check_error
-        self.log_errors = log_errors
-        self.commands = []
-        self.results = Queue.Queue(1)
-        self.ovsdb_connection = ovsdb_connection
-        self.timeout = timeout
-        self.expected_ifaces = set()
-
-    def __str__(self):
-        return ", ".join(str(cmd) for cmd in self.commands)
-
-    def add(self, command):
-        """Add a command to the transaction
-
-        returns The command passed as a convenience
-        """
-
-        self.commands.append(command)
-        return command
-
-    def commit(self):
-        self.ovsdb_connection.queue_txn(self)
-        try:
-            result = self.results.get(timeout=self.timeout)
-        except Queue.Empty:
-            raise api.TimeoutException(
-                _("Commands %(commands)s exceeded timeout %(timeout)d "
-                  "seconds") % {'commands': self.commands,
-                                'timeout': self.timeout})
-        if isinstance(result, idlutils.ExceptionResult):
-            if self.log_errors:
-                LOG.error(result.tb)
-            if self.check_error:
-                raise result.ex
-        return result
-
-    def pre_commit(self, txn):
-        pass
-
-    def post_commit(self, txn):
-        for command in self.commands:
-            command.post_commit(txn)
-
-    def do_commit(self):
-        self.start_time = time.time()
-        attempts = 0
-        while True:
-            if attempts > 0 and self.timeout_exceeded():
-                raise RuntimeError(_("OVS transaction timed out"))
-            attempts += 1
-            # TODO(twilson) Make sure we don't loop longer than vsctl_timeout
-            txn = idl.Transaction(self.api.idl)
-            self.pre_commit(txn)
-            for i, command in enumerate(self.commands):
-                LOG.debug("Running txn command(idx=%(idx)s): %(cmd)s",
-                          {'idx': i, 'cmd': command})
-                try:
-                    command.run_idl(txn)
-                except Exception:
-                    with excutils.save_and_reraise_exception() as ctx:
-                        txn.abort()
-                        if not self.check_error:
-                            ctx.reraise = False
-            seqno = self.api.idl.change_seqno
-            status = txn.commit_block()
-            if status == txn.TRY_AGAIN:
-                LOG.debug("OVSDB transaction returned TRY_AGAIN, retrying")
-                idlutils.wait_for_change(self.api.idl, self.time_remaining(),
-                                         seqno)
-                continue
-            elif status == txn.ERROR:
-                msg = _("OVSDB Error: %s") % txn.get_error()
-                if self.log_errors:
-                    LOG.error(msg)
-                if self.check_error:
-                    # For now, raise similar error to vsctl/utils.execute()
-                    raise RuntimeError(msg)
-                return
-            elif status == txn.ABORTED:
-                LOG.debug("Transaction aborted")
-                return
-            elif status == txn.UNCHANGED:
-                LOG.debug("Transaction caused no change")
-            elif status == txn.SUCCESS:
-                self.post_commit(txn)
-
-            return [cmd.result for cmd in self.commands]
-
-    def elapsed_time(self):
-        return time.time() - self.start_time
-
-    def time_remaining(self):
-        return self.timeout - self.elapsed_time()
-
-    def timeout_exceeded(self):
-        return self.elapsed_time() > self.timeout
-
-
-class NeutronOVSDBTransaction(Transaction):
-    def pre_commit(self, txn):
-        self.api._ovs.increment('next_cfg')
-        txn.expected_ifaces = set()
-
-    def post_commit(self, txn):
-        super(NeutronOVSDBTransaction, self).post_commit(txn)
-        # ovs-vsctl only logs these failures and does not return nonzero
-        try:
-            self.do_post_commit(txn)
-        except Exception:
-            LOG.exception(_LE("Post-commit checks failed"))
-
-    def do_post_commit(self, txn):
-        next_cfg = txn.get_increment_new_value()
-        while not self.timeout_exceeded():
-            self.api.idl.run()
-            if self.vswitchd_has_completed(next_cfg):
-                failed = self.post_commit_failed_interfaces(txn)
-                if failed:
-                    raise VswitchdInterfaceAddException(
-                        ifaces=", ".join(failed))
-                break
-            self.ovsdb_connection.poller.timer_wait(
-                self.time_remaining() * 1000)
-            self.api.idl.wait(self.ovsdb_connection.poller)
-            self.ovsdb_connection.poller.block()
-        else:
-            raise api.TimeoutException(
-                _("Commands %(commands)s exceeded timeout %(timeout)d "
-                  "seconds post-commit") % {'commands': self.commands,
-                                            'timeout': self.timeout})
-
-    def post_commit_failed_interfaces(self, txn):
-        failed = []
-        for iface_uuid in txn.expected_ifaces:
-            uuid = txn.get_insert_uuid(iface_uuid)
-            if uuid:
-                ifaces = self.api.idl.tables['Interface']
-                iface = ifaces.rows.get(uuid)
-                if iface and (not iface.ofport or iface.ofport == -1):
-                    failed.append(iface.name)
-        return failed
-
-    def vswitchd_has_completed(self, next_cfg):
-        return self.api._ovs.cur_cfg >= next_cfg
-
-
-class OvsdbIdl(api.API):
-
-    ovsdb_connection = connection.Connection(cfg.CONF.OVS.ovsdb_connection,
-                                             cfg.CONF.ovs_vsctl_timeout,
-                                             'Open_vSwitch')
-
-    def __init__(self, context):
-        super(OvsdbIdl, self).__init__(context)
-        OvsdbIdl.ovsdb_connection.start()
-        self.idl = OvsdbIdl.ovsdb_connection.idl
-
-    @property
-    def _tables(self):
-        return self.idl.tables
-
-    @property
-    def _ovs(self):
-        return list(self._tables['Open_vSwitch'].rows.values())[0]
-
-    def create_transaction(self, check_error=False, log_errors=True, **kwargs):
-        return NeutronOVSDBTransaction(self, OvsdbIdl.ovsdb_connection,
-                                       self.context.vsctl_timeout,
-                                       check_error, log_errors)
-
-    def add_manager(self, connection_uri):
-        return cmd.AddManagerCommand(self, connection_uri)
-
-    def get_manager(self):
-        return cmd.GetManagerCommand(self)
-
-    def remove_manager(self, connection_uri):
-        return cmd.RemoveManagerCommand(self, connection_uri)
-
-    def add_br(self, name, may_exist=True, datapath_type=None):
-        return cmd.AddBridgeCommand(self, name, may_exist, datapath_type)
-
-    def del_br(self, name, if_exists=True):
-        return cmd.DelBridgeCommand(self, name, if_exists)
-
-    def br_exists(self, name):
-        return cmd.BridgeExistsCommand(self, name)
-
-    def port_to_br(self, name):
-        return cmd.PortToBridgeCommand(self, name)
-
-    def iface_to_br(self, name):
-        return cmd.InterfaceToBridgeCommand(self, name)
-
-    def list_br(self):
-        return cmd.ListBridgesCommand(self)
-
-    def br_get_external_id(self, name, field):
-        return cmd.BrGetExternalIdCommand(self, name, field)
-
-    def br_set_external_id(self, name, field, value):
-        return cmd.BrSetExternalIdCommand(self, name, field, value)
-
-    def db_create(self, table, **col_values):
-        return cmd.DbCreateCommand(self, table, **col_values)
-
-    def db_destroy(self, table, record):
-        return cmd.DbDestroyCommand(self, table, record)
-
-    def db_set(self, table, record, *col_values):
-        return cmd.DbSetCommand(self, table, record, *col_values)
-
-    def db_add(self, table, record, column, *values):
-        return cmd.DbAddCommand(self, table, record, column, *values)
-
-    def db_clear(self, table, record, column):
-        return cmd.DbClearCommand(self, table, record, column)
-
-    def db_get(self, table, record, column):
-        return cmd.DbGetCommand(self, table, record, column)
-
-    def db_list(self, table, records=None, columns=None, if_exists=False):
-        return cmd.DbListCommand(self, table, records, columns, if_exists)
-
-    def db_find(self, table, *conditions, **kwargs):
-        return cmd.DbFindCommand(self, table, *conditions, **kwargs)
-
-    def set_controller(self, bridge, controllers):
-        return cmd.SetControllerCommand(self, bridge, controllers)
-
-    def del_controller(self, bridge):
-        return cmd.DelControllerCommand(self, bridge)
-
-    def get_controller(self, bridge):
-        return cmd.GetControllerCommand(self, bridge)
-
-    def set_fail_mode(self, bridge, mode):
-        return cmd.SetFailModeCommand(self, bridge, mode)
-
-    def add_port(self, bridge, port, may_exist=True):
-        return cmd.AddPortCommand(self, bridge, port, may_exist)
-
-    def del_port(self, port, bridge=None, if_exists=True):
-        return cmd.DelPortCommand(self, port, bridge, if_exists)
-
-    def list_ports(self, bridge):
-        return cmd.ListPortsCommand(self, bridge)
-
-    def list_ifaces(self, bridge):
-        return cmd.ListIfacesCommand(self, bridge)
-
-
-class NeutronOvsdbIdl(OvsdbIdl):
-    def __init__(self, context):
-        vlog.use_oslo_logger()
-        super(NeutronOvsdbIdl, self).__init__(context)
+class NeutronOvsdbIdl(impl_idl.OvsdbIdl):
+    def __init__(self, connection):
+        vlog.use_python_logger()
+        super(NeutronOvsdbIdl, self).__init__(connection)
diff --git a/neutron/agent/ovsdb/impl_vsctl.py b/neutron/agent/ovsdb/impl_vsctl.py
index e09370f095e..24dab28549d 100644
--- a/neutron/agent/ovsdb/impl_vsctl.py
+++ b/neutron/agent/ovsdb/impl_vsctl.py
@@ -29,6 +29,10 @@ from neutron.agent.ovsdb import api as ovsdb
 LOG = logging.getLogger(__name__)
 
 
+def api_factory(context):
+    return OvsdbVsctl(context)
+
+
 class Transaction(ovsdb.Transaction):
     def __init__(self, context, check_error=False, log_errors=True, opts=None):
         self.context = context
@@ -178,6 +182,10 @@ class BrExistsCommand(DbCommand):
 
 
 class OvsdbVsctl(ovsdb.API):
+    def __init__(self, context):
+        super(OvsdbVsctl, self).__init__()
+        self.context = context
+
     def create_transaction(self, check_error=False, log_errors=True, **kwargs):
         return Transaction(self.context, check_error, log_errors, **kwargs)
 
diff --git a/neutron/agent/ovsdb/native/commands.py b/neutron/agent/ovsdb/native/commands.py
index 55a17567eb7..0a2b60b4a1c 100644
--- a/neutron/agent/ovsdb/native/commands.py
+++ b/neutron/agent/ovsdb/native/commands.py
@@ -12,561 +12,8 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-import collections
+from ovsdbapp.schema.open_vswitch import commands
 
-from oslo_log import log as logging
-from oslo_utils import excutils
+from neutron.common import _deprecate
 
-from neutron._i18n import _, _LE
-from neutron.agent.ovsdb import api
-from neutron.agent.ovsdb.native import idlutils
-
-LOG = logging.getLogger(__name__)
-
-
-class BaseCommand(api.Command):
-    def __init__(self, api):
-        self.api = api
-        self.result = None
-
-    def execute(self, check_error=False, log_errors=True):
-        try:
-            with self.api.transaction(check_error, log_errors) as txn:
-                txn.add(self)
-            return self.result
-        except Exception:
-            with excutils.save_and_reraise_exception() as ctx:
-                if log_errors:
-                    LOG.exception(_LE("Error executing command"))
-                if not check_error:
-                    ctx.reraise = False
-
-    def post_commit(self, txn):
-        pass
-
-    def __str__(self):
-        command_info = self.__dict__
-        return "%s(%s)" % (
-            self.__class__.__name__,
-            ", ".join("%s=%s" % (k, v) for k, v in command_info.items()
-                      if k not in ['api', 'result']))
-
-    __repr__ = __str__
-
-
-class AddManagerCommand(BaseCommand):
-    def __init__(self, api, target):
-        super(AddManagerCommand, self).__init__(api)
-        self.target = target
-
-    def run_idl(self, txn):
-        row = txn.insert(self.api._tables['Manager'])
-        row.target = self.target
-        try:
-            self.api._ovs.addvalue('manager_options', row)
-        except AttributeError:  # OVS < 2.6
-            self.api._ovs.verify('manager_options')
-            self.api._ovs.manager_options = (
-                self.api._ovs.manager_options + [row])
-
-
-class GetManagerCommand(BaseCommand):
-    def __init__(self, api):
-        super(GetManagerCommand, self).__init__(api)
-
-    def run_idl(self, txn):
-        self.result = [m.target for m in
-                       self.api._tables['Manager'].rows.values()]
-
-
-class RemoveManagerCommand(BaseCommand):
-    def __init__(self, api, target):
-        super(RemoveManagerCommand, self).__init__(api)
-        self.target = target
-
-    def run_idl(self, txn):
-        try:
-            manager = idlutils.row_by_value(self.api.idl, 'Manager', 'target',
-                                            self.target)
-        except idlutils.RowNotFound:
-            msg = _("Manager with target %s does not exist") % self.target
-            LOG.error(msg)
-            raise RuntimeError(msg)
-        try:
-            self.api._ovs.delvalue('manager_options', manager)
-        except AttributeError:  # OVS < 2.6
-            self.api._ovs.verify('manager_options')
-            manager_list = self.api._ovs.manager_options
-            manager_list.remove(manager)
-            self.api._ovs.manager_options = manager_list
-        manager.delete()
-
-
-class AddBridgeCommand(BaseCommand):
-    def __init__(self, api, name, may_exist, datapath_type):
-        super(AddBridgeCommand, self).__init__(api)
-        self.name = name
-        self.may_exist = may_exist
-        self.datapath_type = datapath_type
-
-    def run_idl(self, txn):
-        if self.may_exist:
-            br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name',
-                                       self.name, None)
-            if br:
-                if self.datapath_type:
-                    br.datapath_type = self.datapath_type
-                return
-        row = txn.insert(self.api._tables['Bridge'])
-        row.name = self.name
-        if self.datapath_type:
-            row.datapath_type = self.datapath_type
-        try:
-            self.api._ovs.addvalue('bridges', row)
-        except AttributeError:  # OVS < 2.6
-            self.api._ovs.verify('bridges')
-            self.api._ovs.bridges = self.api._ovs.bridges + [row]
-
-        # Add the internal bridge port
-        cmd = AddPortCommand(self.api, self.name, self.name, self.may_exist)
-        cmd.run_idl(txn)
-
-        cmd = DbSetCommand(self.api, 'Interface', self.name,
-                           ('type', 'internal'))
-        cmd.run_idl(txn)
-
-
-class DelBridgeCommand(BaseCommand):
-    def __init__(self, api, name, if_exists):
-        super(DelBridgeCommand, self).__init__(api)
-        self.name = name
-        self.if_exists = if_exists
-
-    def run_idl(self, txn):
-        try:
-            br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name',
-                                       self.name)
-        except idlutils.RowNotFound:
-            if self.if_exists:
-                return
-            else:
-                msg = _("Bridge %s does not exist") % self.name
-                LOG.error(msg)
-                raise RuntimeError(msg)
-        # Clean up cached ports/interfaces
-        for port in br.ports:
-            for interface in port.interfaces:
-                interface.delete()
-            port.delete()
-        try:
-            self.api._ovs.delvalue('bridges', br)
-        except AttributeError:  # OVS < 2.6
-            self.api._ovs.verify('bridges')
-            bridges = self.api._ovs.bridges
-            bridges.remove(br)
-            self.api._ovs.bridges = bridges
-        br.delete()
-
-
-class BridgeExistsCommand(BaseCommand):
-    def __init__(self, api, name):
-        super(BridgeExistsCommand, self).__init__(api)
-        self.name = name
-
-    def run_idl(self, txn):
-        self.result = bool(idlutils.row_by_value(self.api.idl, 'Bridge',
-                                                 'name', self.name, None))
-
-
-class ListBridgesCommand(BaseCommand):
-    def __init__(self, api):
-        super(ListBridgesCommand, self).__init__(api)
-
-    def run_idl(self, txn):
-        # NOTE (twilson) [x.name for x in rows.values()] if no index
-        self.result = [x.name for x in
-                       self.api._tables['Bridge'].rows.values()]
-
-
-class BrGetExternalIdCommand(BaseCommand):
-    def __init__(self, api, name, field):
-        super(BrGetExternalIdCommand, self).__init__(api)
-        self.name = name
-        self.field = field
-
-    def run_idl(self, txn):
-        br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name', self.name)
-        self.result = br.external_ids[self.field]
-
-
-class BrSetExternalIdCommand(BaseCommand):
-    def __init__(self, api, name, field, value):
-        super(BrSetExternalIdCommand, self).__init__(api)
-        self.name = name
-        self.field = field
-        self.value = value
-
-    def run_idl(self, txn):
-        br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name', self.name)
-        external_ids = getattr(br, 'external_ids', {})
-        external_ids[self.field] = self.value
-        br.external_ids = external_ids
-
-
-class DbCreateCommand(BaseCommand):
-    def __init__(self, api, table, **columns):
-        super(DbCreateCommand, self).__init__(api)
-        self.table = table
-        self.columns = columns
-
-    def run_idl(self, txn):
-        row = txn.insert(self.api._tables[self.table])
-        for col, val in self.columns.items():
-            setattr(row, col, idlutils.db_replace_record(val))
-        # This is a temporary row to be used within the transaction
-        self.result = row
-
-    def post_commit(self, txn):
-        # Replace the temporary row with the post-commit UUID to match vsctl
-        self.result = txn.get_insert_uuid(self.result.uuid)
-
-
-class DbDestroyCommand(BaseCommand):
-    def __init__(self, api, table, record):
-        super(DbDestroyCommand, self).__init__(api)
-        self.table = table
-        self.record = record
-
-    def run_idl(self, txn):
-        record = idlutils.row_by_record(self.api.idl, self.table, self.record)
-        record.delete()
-
-
-class DbSetCommand(BaseCommand):
-    def __init__(self, api, table, record, *col_values):
-        super(DbSetCommand, self).__init__(api)
-        self.table = table
-        self.record = record
-        self.col_values = col_values
-
-    def run_idl(self, txn):
-        record = idlutils.row_by_record(self.api.idl, self.table, self.record)
-        for col, val in self.col_values:
-            # TODO(twilson) Ugh, the OVS library doesn't like OrderedDict
-            # We're only using it to make a unit test work, so we should fix
-            # this soon.
-            if isinstance(val, collections.OrderedDict):
-                val = dict(val)
-            if isinstance(val, dict):
-                # NOTE(twilson) OVS 2.6's Python IDL has mutate methods that
-                # would make this cleaner, but it's too early to rely on them.
-                existing = getattr(record, col, {})
-                existing.update(val)
-                val = existing
-            setattr(record, col, idlutils.db_replace_record(val))
-
-
-class DbAddCommand(BaseCommand):
-    def __init__(self, api, table, record, column, *values):
-        super(DbAddCommand, self).__init__(api)
-        self.table = table
-        self.record = record
-        self.column = column
-        self.values = values
-
-    def run_idl(self, txn):
-        record = idlutils.row_by_record(self.api.idl, self.table, self.record)
-        for value in self.values:
-            if isinstance(value, collections.Mapping):
-                # We should be doing an add on a 'map' column. If the key is
-                # already set, do nothing, otherwise set the key to the value
-                # Since this operation depends on the previous value, verify()
-                # must be called.
-                field = getattr(record, self.column, {})
-                for k, v in value.items():
-                    if k in field:
-                        continue
-                    field[k] = v
-            else:
-                # We should be appending to a 'set' column.
-                try:
-                    record.addvalue(self.column,
-                                    idlutils.db_replace_record(value))
-                    continue
-                except AttributeError:  # OVS < 2.6
-                    field = getattr(record, self.column, [])
-                    field.append(value)
-            record.verify(self.column)
-            setattr(record, self.column, idlutils.db_replace_record(field))
-
-
-class DbClearCommand(BaseCommand):
-    def __init__(self, api, table, record, column):
-        super(DbClearCommand, self).__init__(api)
-        self.table = table
-        self.record = record
-        self.column = column
-
-    def run_idl(self, txn):
-        record = idlutils.row_by_record(self.api.idl, self.table, self.record)
-        # Create an empty value of the column type
-        value = type(getattr(record, self.column))()
-        setattr(record, self.column, value)
-
-
-class DbGetCommand(BaseCommand):
-    def __init__(self, api, table, record, column):
-        super(DbGetCommand, self).__init__(api)
-        self.table = table
-        self.record = record
-        self.column = column
-
-    def run_idl(self, txn):
-        record = idlutils.row_by_record(self.api.idl, self.table, self.record)
-        # TODO(twilson) This feels wrong, but ovs-vsctl returns single results
-        # on set types without the list. The IDL is returning them as lists,
-        # even if the set has the maximum number of items set to 1. Might be
-        # able to inspect the Schema and just do this conversion for that case.
-        result = idlutils.get_column_value(record, self.column)
-        if isinstance(result, list) and len(result) == 1:
-            self.result = result[0]
-        else:
-            self.result = result
-
-
-class SetControllerCommand(BaseCommand):
-    def __init__(self, api, bridge, targets):
-        super(SetControllerCommand, self).__init__(api)
-        self.bridge = bridge
-        self.targets = targets
-
-    def run_idl(self, txn):
-        br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name', self.bridge)
-        controllers = []
-        for target in self.targets:
-            controller = txn.insert(self.api._tables['Controller'])
-            controller.target = target
-            controllers.append(controller)
-        # Don't need to verify because we unconditionally overwrite
-        br.controller = controllers
-
-
-class DelControllerCommand(BaseCommand):
-    def __init__(self, api, bridge):
-        super(DelControllerCommand, self).__init__(api)
-        self.bridge = bridge
-
-    def run_idl(self, txn):
-        br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name', self.bridge)
-        br.controller = []
-
-
-class GetControllerCommand(BaseCommand):
-    def __init__(self, api, bridge):
-        super(GetControllerCommand, self).__init__(api)
-        self.bridge = bridge
-
-    def run_idl(self, txn):
-        br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name', self.bridge)
-        self.result = [c.target for c in br.controller]
-
-
-class SetFailModeCommand(BaseCommand):
-    def __init__(self, api, bridge, mode):
-        super(SetFailModeCommand, self).__init__(api)
-        self.bridge = bridge
-        self.mode = mode
-
-    def run_idl(self, txn):
-        br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name', self.bridge)
-        br.fail_mode = self.mode
-
-
-class AddPortCommand(BaseCommand):
-    def __init__(self, api, bridge, port, may_exist):
-        super(AddPortCommand, self).__init__(api)
-        self.bridge = bridge
-        self.port = port
-        self.may_exist = may_exist
-
-    def run_idl(self, txn):
-        br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name', self.bridge)
-        if self.may_exist:
-            port = idlutils.row_by_value(self.api.idl, 'Port', 'name',
-                                         self.port, None)
-            if port:
-                return
-        port = txn.insert(self.api._tables['Port'])
-        port.name = self.port
-        try:
-            br.addvalue('ports', port)
-        except AttributeError:  # OVS < 2.6
-            br.verify('ports')
-            ports = getattr(br, 'ports', [])
-            ports.append(port)
-            br.ports = ports
-
-        iface = txn.insert(self.api._tables['Interface'])
-        txn.expected_ifaces.add(iface.uuid)
-        iface.name = self.port
-        # This is a new port, so it won't have any existing interfaces
-        port.interfaces = [iface]
-
-
-class DelPortCommand(BaseCommand):
-    def __init__(self, api, port, bridge, if_exists):
-        super(DelPortCommand, self).__init__(api)
-        self.port = port
-        self.bridge = bridge
-        self.if_exists = if_exists
-
-    def run_idl(self, txn):
-        try:
-            port = idlutils.row_by_value(self.api.idl, 'Port', 'name',
-                                         self.port)
-        except idlutils.RowNotFound:
-            if self.if_exists:
-                return
-            msg = _("Port %s does not exist") % self.port
-            raise RuntimeError(msg)
-        if self.bridge:
-            br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name',
-                                       self.bridge)
-        else:
-            br = next(b for b in self.api._tables['Bridge'].rows.values()
-                      if port in b.ports)
-
-        if port not in br.ports and not self.if_exists:
-            # TODO(twilson) Make real errors across both implementations
-            msg = _("Port %(port)s does not exist on %(bridge)s!") % {
-                'port': self.port, 'bridge': self.bridge
-            }
-            LOG.error(msg)
-            raise RuntimeError(msg)
-
-        try:
-            br.delvalue('ports', port)
-        except AttributeError:  # OVS < 2.6
-            br.verify('ports')
-            ports = br.ports
-            ports.remove(port)
-            br.ports = ports
-
-        # The interface on the port will be cleaned up by ovsdb-server
-        for interface in port.interfaces:
-            interface.delete()
-        port.delete()
-
-
-class ListPortsCommand(BaseCommand):
-    def __init__(self, api, bridge):
-        super(ListPortsCommand, self).__init__(api)
-        self.bridge = bridge
-
-    def run_idl(self, txn):
-        br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name', self.bridge)
-        self.result = [p.name for p in br.ports if p.name != self.bridge]
-
-
-class ListIfacesCommand(BaseCommand):
-    def __init__(self, api, bridge):
-        super(ListIfacesCommand, self).__init__(api)
-        self.bridge = bridge
-
-    def run_idl(self, txn):
-        br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name', self.bridge)
-        self.result = [i.name for p in br.ports if p.name != self.bridge
-                       for i in p.interfaces]
-
-
-class PortToBridgeCommand(BaseCommand):
-    def __init__(self, api, name):
-        super(PortToBridgeCommand, self).__init__(api)
-        self.name = name
-
-    def run_idl(self, txn):
-        # TODO(twilson) This is expensive!
-        # This traversal of all ports could be eliminated by caching the bridge
-        # name on the Port's external_id field
-        # In fact, if we did that, the only place that uses to_br functions
-        # could just add the external_id field to the conditions passed to find
-        port = idlutils.row_by_value(self.api.idl, 'Port', 'name', self.name)
-        bridges = self.api._tables['Bridge'].rows.values()
-        self.result = next(br.name for br in bridges if port in br.ports)
-
-
-class InterfaceToBridgeCommand(BaseCommand):
-    def __init__(self, api, name):
-        super(InterfaceToBridgeCommand, self).__init__(api)
-        self.name = name
-
-    def run_idl(self, txn):
-        interface = idlutils.row_by_value(self.api.idl, 'Interface', 'name',
-                                          self.name)
-        ports = self.api._tables['Port'].rows.values()
-        pname = next(
-                port for port in ports if interface in port.interfaces)
-
-        bridges = self.api._tables['Bridge'].rows.values()
-        self.result = next(br.name for br in bridges if pname in br.ports)
-
-
-class DbListCommand(BaseCommand):
-    def __init__(self, api, table, records, columns, if_exists):
-        super(DbListCommand, self).__init__(api)
-        self.table = table
-        self.columns = columns
-        self.if_exists = if_exists
-        self.records = records
-
-    def run_idl(self, txn):
-        table_schema = self.api._tables[self.table]
-        columns = self.columns or list(table_schema.columns.keys()) + ['_uuid']
-        if self.records:
-            row_uuids = []
-            for record in self.records:
-                try:
-                    row_uuids.append(idlutils.row_by_record(
-                                     self.api.idl, self.table, record).uuid)
-                except idlutils.RowNotFound:
-                    if self.if_exists:
-                        continue
-                    # NOTE(kevinbenton): this is converted to a RuntimeError
-                    # for compat with the vsctl version. It might make more
-                    # sense to change this to a RowNotFoundError in the future.
-                    raise RuntimeError(_(
-                          "Row doesn't exist in the DB. Request info: "
-                          "Table=%(table)s. Columns=%(columns)s. "
-                          "Records=%(records)s.") % {
-                              "table": self.table,
-                              "columns": self.columns,
-                              "records": self.records,
-                          })
-        else:
-            row_uuids = table_schema.rows.keys()
-        self.result = [
-            {
-                c: idlutils.get_column_value(table_schema.rows[uuid], c)
-                for c in columns
-            }
-            for uuid in row_uuids
-        ]
-
-
-class DbFindCommand(BaseCommand):
-    def __init__(self, api, table, *conditions, **kwargs):
-        super(DbFindCommand, self).__init__(api)
-        self.table = self.api._tables[table]
-        self.conditions = conditions
-        self.columns = (kwargs.get('columns') or
-                        list(self.table.columns.keys()) + ['_uuid'])
-
-    def run_idl(self, txn):
-        self.result = [
-            {
-                c: idlutils.get_column_value(r, c)
-                for c in self.columns
-            }
-            for r in self.table.rows.values()
-            if idlutils.row_match(r, self.conditions)
-        ]
+_deprecate._MovedGlobals(commands)
diff --git a/neutron/agent/ovsdb/native/connection.py b/neutron/agent/ovsdb/native/connection.py
index 49f45581bfe..ca2ab4ede32 100644
--- a/neutron/agent/ovsdb/native/connection.py
+++ b/neutron/agent/ovsdb/native/connection.py
@@ -12,150 +12,36 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-import os
-import threading
-import traceback
-
-from debtcollector import removals
+from debtcollector import moves
+from oslo_config import cfg
 from ovs.db import idl
-from ovs import poller
-import six
-from six.moves import queue as Queue
+from ovsdbapp.backend.ovs_idl import connection as _connection
+from ovsdbapp.backend.ovs_idl import idlutils
+import tenacity
 
-from neutron._i18n import _
-from neutron.agent.ovsdb.native import idlutils
+from neutron.agent.ovsdb.native import helpers
+
+TransactionQueue = moves.moved_class(_connection.TransactionQueue,
+                                     'TransactionQueue', __name__)
+Connection = moves.moved_class(_connection.Connection, 'Connection', __name__)
 
 
-class TransactionQueue(Queue.Queue, object):
-    def __init__(self, *args, **kwargs):
-        super(TransactionQueue, self).__init__(*args, **kwargs)
-        alertpipe = os.pipe()
-        # NOTE(ivasilevskaya) python 3 doesn't allow unbuffered I/O. Will get
-        # around this constraint by using binary mode.
-        self.alertin = os.fdopen(alertpipe[0], 'rb', 0)
-        self.alertout = os.fdopen(alertpipe[1], 'wb', 0)
+def idl_factory():
+    conn = cfg.CONF.OVS.ovsdb_connection
+    schema_name = 'Open_vSwitch'
+    try:
+        helper = idlutils.get_schema_helper(conn, schema_name)
+    except Exception:
+        helpers.enable_connection_uri(conn)
 
-    def get_nowait(self, *args, **kwargs):
-        try:
-            result = super(TransactionQueue, self).get_nowait(*args, **kwargs)
-        except Queue.Empty:
-            return None
-        self.alertin.read(1)
-        return result
+        @tenacity.retry(wait=tenacity.wait_exponential(multiplier=0.01),
+                        stop=tenacity.stop_after_delay(1),
+                        reraise=True)
+        def do_get_schema_helper():
+            return idlutils.get_schema_helper(conn, schema_name)
 
-    def put(self, *args, **kwargs):
-        super(TransactionQueue, self).put(*args, **kwargs)
-        self.alertout.write(six.b('X'))
-        self.alertout.flush()
+        helper = do_get_schema_helper()
 
-    @property
-    def alert_fileno(self):
-        return self.alertin.fileno()
-
-
-class Connection(object):
-    __rm_args = {'version': 'Ocata', 'removal_version': 'Pike',
-                 'message': _('Use an idl_factory function instead')}
-
-    @removals.removed_kwarg('connection', **__rm_args)
-    @removals.removed_kwarg('schema_name', **__rm_args)
-    @removals.removed_kwarg('idl_class', **__rm_args)
-    def __init__(self, connection=None, timeout=None, schema_name=None,
-                 idl_class=None, idl_factory=None):
-        """Create a connection to an OVSDB server using the OVS IDL
-
-        :param connection: (deprecated) An OVSDB connection string
-        :param timeout: The timeout value for OVSDB operations (required)
-        :param schema_name: (deprecated) The name ovs the OVSDB schema to use
-        :param idl_class: (deprecated) An Idl subclass. Defaults to idl.Idl
-        :param idl_factory: A factory function that produces an Idl instance
-
-        The signature of this class is changing. It is recommended to pass in
-        a timeout and idl_factory
-        """
-        assert timeout is not None
-        self.idl = None
-        self.timeout = timeout
-        self.txns = TransactionQueue(1)
-        self.lock = threading.Lock()
-        if idl_factory:
-            if connection or schema_name:
-                raise TypeError(_('Connection: Takes either idl_factory, or '
-                                  'connection and schema_name. Both given'))
-            self.idl_factory = idl_factory
-        else:
-            if not connection or not schema_name:
-                raise TypeError(_('Connection: Takes either idl_factory, or '
-                                  'connection and schema_name. Neither given'))
-            self.idl_factory = self._idl_factory
-            self.connection = connection
-            self.schema_name = schema_name
-            self.idl_class = idl_class or idl.Idl
-            self._schema_filter = None
-
-    @removals.remove(**__rm_args)
-    def _idl_factory(self):
-        helper = self.get_schema_helper()
-        self.update_schema_helper(helper)
-        return self.idl_class(self.connection, helper)
-
-    @removals.removed_kwarg('table_name_list', **__rm_args)
-    def start(self, table_name_list=None):
-        """
-        :param table_name_list: A list of table names for schema_helper to
-                register. When this parameter is given, schema_helper will only
-                register tables which name are in list. Otherwise,
-                schema_helper will register all tables for given schema_name as
-                default.
-        """
-        self._schema_filter = table_name_list
-        with self.lock:
-            if self.idl is not None:
-                return
-
-            self.idl = self.idl_factory()
-            idlutils.wait_for_change(self.idl, self.timeout)
-            self.poller = poller.Poller()
-            self.thread = threading.Thread(target=self.run)
-            self.thread.setDaemon(True)
-            self.thread.start()
-
-    @removals.remove(
-        version='Ocata', removal_version='Pike',
-        message=_("Use idlutils.get_schema_helper(conn, schema, retry=True)"))
-    def get_schema_helper(self):
-        """Retrieve the schema helper object from OVSDB"""
-        return idlutils.get_schema_helper(self.connection, self.schema_name,
-                                          retry=True)
-
-    @removals.remove(
-        version='Ocata', removal_version='Pike',
-        message=_("Use an idl_factory and ovs.db.SchemaHelper for filtering"))
-    def update_schema_helper(self, helper):
-        if self._schema_filter:
-            for table_name in self._schema_filter:
-                helper.register_table(table_name)
-        else:
-            helper.register_all()
-
-    def run(self):
-        while True:
-            self.idl.wait(self.poller)
-            self.poller.fd_wait(self.txns.alert_fileno, poller.POLLIN)
-            #TODO(jlibosva): Remove next line once losing connection to ovsdb
-            #                is solved.
-            self.poller.timer_wait(self.timeout * 1000)
-            self.poller.block()
-            self.idl.run()
-            txn = self.txns.get_nowait()
-            if txn is not None:
-                try:
-                    txn.results.put(txn.do_commit())
-                except Exception as ex:
-                    er = idlutils.ExceptionResult(ex=ex,
-                                                  tb=traceback.format_exc())
-                    txn.results.put(er)
-                self.txns.task_done()
-
-    def queue_txn(self, txn):
-        self.txns.put(txn)
+    # TODO(twilson) We should still select only the tables/columns we use
+    helper.register_all()
+    return idl.Idl(conn, helper)
diff --git a/neutron/agent/ovsdb/native/helpers.py b/neutron/agent/ovsdb/native/helpers.py
index 93c0c6f6fc3..b0a9d5b6175 100644
--- a/neutron/agent/ovsdb/native/helpers.py
+++ b/neutron/agent/ovsdb/native/helpers.py
@@ -12,31 +12,17 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-from oslo_config import cfg
+import functools
 
-from neutron.agent.ovsdb import api as ovsdb
+from debtcollector import moves
+from ovsdbapp.schema.open_vswitch import helpers
 
-cfg.CONF.import_opt('ovs_vsctl_timeout', 'neutron.agent.common.ovs_lib')
+from neutron.agent.common import utils
 
+_connection_to_manager_uri = moves.moved_function(
+    helpers._connection_to_manager_uri,
+    '_connection_to_manager_uri', __name__)
 
-def _connection_to_manager_uri(conn_uri):
-    proto, addr = conn_uri.split(':', 1)
-    if ':' in addr:
-        ip, port = addr.split(':', 1)
-        return 'p%s:%s:%s' % (proto, port, ip)
-    else:
-        return 'p%s:%s' % (proto, addr)
-
-
-def enable_connection_uri(conn_uri, set_timeout=False):
-    class OvsdbVsctlContext(object):
-        vsctl_timeout = cfg.CONF.ovs_vsctl_timeout
-
-    manager_uri = _connection_to_manager_uri(conn_uri)
-    api = ovsdb.API.get(OvsdbVsctlContext, 'vsctl')
-    with api.transaction() as txn:
-        txn.add(api.add_manager(manager_uri))
-        if set_timeout:
-            timeout = cfg.CONF.ovs_vsctl_timeout * 1000
-            txn.add(api.db_set('Manager', manager_uri,
-                               ('inactivity_probe', timeout)))
+enable_connection_uri = functools.partial(
+    helpers.enable_connection_uri, execute=utils.execute, run_as_root=True,
+    log_fail_as_error=False, check_exit_code=False)
diff --git a/neutron/agent/ovsdb/native/idlutils.py b/neutron/agent/ovsdb/native/idlutils.py
index 87969fbc8dd..b8d140f8db1 100644
--- a/neutron/agent/ovsdb/native/idlutils.py
+++ b/neutron/agent/ovsdb/native/idlutils.py
@@ -12,275 +12,8 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-import collections
-import os
-import time
-import uuid
+from ovsdbapp.backend.ovs_idl import idlutils
 
-from neutron_lib import exceptions
-from ovs.db import idl
-from ovs import jsonrpc
-from ovs import poller
-from ovs import stream
-import six
-import tenacity
+from neutron.common import _deprecate
 
-from neutron._i18n import _
-from neutron.agent.ovsdb import api
-from neutron.agent.ovsdb.native import helpers
-
-
-RowLookup = collections.namedtuple('RowLookup',
-                                   ['table', 'column', 'uuid_column'])
-
-# Tables with no index in OVSDB and special record lookup rules
-_LOOKUP_TABLE = {
-    'Controller': RowLookup('Bridge', 'name', 'controller'),
-    'Flow_Table': RowLookup('Flow_Table', 'name', None),
-    'IPFIX': RowLookup('Bridge', 'name', 'ipfix'),
-    'Mirror': RowLookup('Mirror', 'name', None),
-    'NetFlow': RowLookup('Bridge', 'name', 'netflow'),
-    'Open_vSwitch': RowLookup('Open_vSwitch', None, None),
-    'QoS': RowLookup('Port', 'name', 'qos'),
-    'Queue': RowLookup(None, None, None),
-    'sFlow': RowLookup('Bridge', 'name', 'sflow'),
-    'SSL': RowLookup('Open_vSwitch', None, 'ssl'),
-}
-
-_NO_DEFAULT = object()
-
-
-class RowNotFound(exceptions.NeutronException):
-    message = _("Cannot find %(table)s with %(col)s=%(match)s")
-
-
-def row_by_value(idl_, table, column, match, default=_NO_DEFAULT):
-    """Lookup an IDL row in a table by column/value"""
-    tab = idl_.tables[table]
-    for r in tab.rows.values():
-        if getattr(r, column) == match:
-            return r
-    if default is not _NO_DEFAULT:
-        return default
-    raise RowNotFound(table=table, col=column, match=match)
-
-
-def row_by_record(idl_, table, record):
-    t = idl_.tables[table]
-    try:
-        if isinstance(record, uuid.UUID):
-            return t.rows[record]
-        uuid_ = uuid.UUID(record)
-        return t.rows[uuid_]
-    except ValueError:
-        # Not a UUID string, continue lookup by other means
-        pass
-    except KeyError:
-        raise RowNotFound(table=table, col='uuid', match=record)
-
-    rl = _LOOKUP_TABLE.get(table, RowLookup(table, get_index_column(t), None))
-    # no table means uuid only, no column means lookup table only has one row
-    if rl.table is None:
-        raise ValueError(_("Table %s can only be queried by UUID") % table)
-    if rl.column is None:
-        return next(iter(t.rows.values()))
-    row = row_by_value(idl_, rl.table, rl.column, record)
-    if rl.uuid_column:
-        rows = getattr(row, rl.uuid_column)
-        if len(rows) != 1:
-            raise RowNotFound(table=table, col=_('record'), match=record)
-        row = rows[0]
-    return row
-
-
-class ExceptionResult(object):
-    def __init__(self, ex, tb):
-        self.ex = ex
-        self.tb = tb
-
-
-def _get_schema_helper(connection, schema_name):
-    err, strm = stream.Stream.open_block(
-        stream.Stream.open(connection))
-    if err:
-        raise Exception(_("Could not connect to %s") % connection)
-    rpc = jsonrpc.Connection(strm)
-    req = jsonrpc.Message.create_request('get_schema', [schema_name])
-    err, resp = rpc.transact_block(req)
-    rpc.close()
-    if err:
-        raise Exception(_("Could not retrieve schema from %(conn)s: "
-                          "%(err)s") % {'conn': connection,
-                                        'err': os.strerror(err)})
-    elif resp.error:
-        raise Exception(resp.error)
-    return idl.SchemaHelper(None, resp.result)
-
-
-def get_schema_helper(connection, schema_name, retry=True,
-                      try_add_manager=True):
-    try:
-        return _get_schema_helper(connection, schema_name)
-    except Exception:
-        if not retry:
-            raise
-        # We may have failed due to set-manager not being called
-        if try_add_manager:
-            helpers.enable_connection_uri(connection, set_timeout=True)
-
-        # There is a small window for a race, so retry up to a second
-        @tenacity.retry(wait=tenacity.wait_exponential(multiplier=0.01),
-                        stop=tenacity.stop_after_delay(1),
-                        reraise=True)
-        def do_get_schema_helper():
-            return _get_schema_helper(connection, schema_name)
-
-        return do_get_schema_helper()
-
-
-def wait_for_change(_idl, timeout, seqno=None):
-    if seqno is None:
-        seqno = _idl.change_seqno
-    stop = time.time() + timeout
-    while _idl.change_seqno == seqno and not _idl.run():
-        ovs_poller = poller.Poller()
-        _idl.wait(ovs_poller)
-        ovs_poller.timer_wait(timeout * 1000)
-        ovs_poller.block()
-        if time.time() > stop:
-            raise Exception(_("Timeout"))
-
-
-def get_column_value(row, col):
-    """Retrieve column value from the given row.
-
-    If column's type is optional, the value will be returned as a single
-    element instead of a list of length 1.
-    """
-    if col == '_uuid':
-        val = row.uuid
-    else:
-        val = getattr(row, col)
-
-    # Idl returns lists of Rows where ovs-vsctl returns lists of UUIDs
-    if isinstance(val, list) and len(val):
-        if isinstance(val[0], idl.Row):
-            val = [v.uuid for v in val]
-        col_type = row._table.columns[col].type
-        # ovs-vsctl treats lists of 1 as single results
-        if col_type.is_optional():
-            val = val[0]
-    return val
-
-
-def condition_match(row, condition):
-    """Return whether a condition matches a row
-
-    :param row:       An OVSDB Row
-    :param condition: A 3-tuple containing (column, operation, match)
-    """
-    col, op, match = condition
-    val = get_column_value(row, col)
-
-    # both match and val are primitive types, so type can be used for type
-    # equality here.
-    if type(match) is not type(val):
-        # Types of 'val' and 'match' arguments MUST match in all cases with 2
-        # exceptions:
-        # - 'match' is an empty list and column's type is optional;
-        # - 'value' is an empty and  column's type is optional
-        if (not all([match, val]) and
-                row._table.columns[col].type.is_optional()):
-            # utilize the single elements comparison logic
-            if match == []:
-                match = None
-            elif val == []:
-                val = None
-        else:
-            # no need to process any further
-            raise ValueError(
-                _("Column type and condition operand do not match"))
-
-    matched = True
-
-    # TODO(twilson) Implement other operators and type comparisons
-    # ovs_lib only uses dict '=' and '!=' searches for now
-    if isinstance(match, dict):
-        for key in match:
-            if op == '=':
-                if (key not in val or match[key] != val[key]):
-                    matched = False
-                    break
-            elif op == '!=':
-                if key not in val or match[key] == val[key]:
-                    matched = False
-                    break
-            else:
-                raise NotImplementedError()
-    elif isinstance(match, list):
-        # According to rfc7047, lists support '=' and '!='
-        # (both strict and relaxed). Will follow twilson's dict comparison
-        # and implement relaxed version (excludes/includes as per standard)
-        if op == "=":
-            if not all([val, match]):
-                return val == match
-            for elem in set(match):
-                if elem not in val:
-                    matched = False
-                    break
-        elif op == '!=':
-            if not all([val, match]):
-                return val != match
-            for elem in set(match):
-                if elem in val:
-                    matched = False
-                    break
-        else:
-            raise NotImplementedError()
-    else:
-        if op == '=':
-            if val != match:
-                matched = False
-        elif op == '!=':
-            if val == match:
-                matched = False
-        else:
-            raise NotImplementedError()
-    return matched
-
-
-def row_match(row, conditions):
-    """Return whether the row matches the list of conditions"""
-    return all(condition_match(row, cond) for cond in conditions)
-
-
-def get_index_column(table):
-    if len(table.indexes) == 1:
-        idx = table.indexes[0]
-        if len(idx) == 1:
-            return idx[0].name
-
-
-def db_replace_record(obj):
-    """Replace any api.Command objects with their results
-
-    This method should leave obj untouched unless the object contains an
-    api.Command object.
-    """
-    if isinstance(obj, collections.Mapping):
-        for k, v in obj.items():
-            if isinstance(v, api.Command):
-                obj[k] = v.result
-    elif (isinstance(obj, collections.Sequence)
-          and not isinstance(obj, six.string_types)):
-        for i, v in enumerate(obj):
-            if isinstance(v, api.Command):
-                try:
-                    obj[i] = v.result
-                except TypeError:
-                    # NOTE(twilson) If someone passes a tuple, then just return
-                    # a tuple with the Commands replaced with their results
-                    return type(obj)(getattr(v, "result", v) for v in obj)
-    elif isinstance(obj, api.Command):
-        obj = obj.result
-    return obj
+_deprecate._MovedGlobals(idlutils)
diff --git a/neutron/agent/ovsdb/native/vlog.py b/neutron/agent/ovsdb/native/vlog.py
index 01b30f6a46a..4d503aed371 100644
--- a/neutron/agent/ovsdb/native/vlog.py
+++ b/neutron/agent/ovsdb/native/vlog.py
@@ -12,19 +12,8 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-from oslo_log import log as logging
-from ovs import vlog
+from ovsdbapp.backend.ovs_idl import vlog
 
-LOG = logging.getLogger(__name__)
+from neutron.common import _deprecate
 
-
-def use_oslo_logger():
-    """Replace the OVS IDL logger functions with our logger"""
-
-    # NOTE(twilson) Replace functions directly instead of subclassing so that
-    # debug messages contain the correct function/filename/line information
-    vlog.Vlog.emer = LOG.critical
-    vlog.Vlog.err = LOG.error
-    vlog.Vlog.warn = LOG.warning
-    vlog.Vlog.info = LOG.info
-    vlog.Vlog.dbg = LOG.debug
+_deprecate._MovedGlobals(vlog)
diff --git a/neutron/tests/functional/agent/ovsdb/test_impl_idl.py b/neutron/tests/functional/agent/ovsdb/test_impl_idl.py
index 7bc5335453c..db3bdab8c94 100644
--- a/neutron/tests/functional/agent/ovsdb/test_impl_idl.py
+++ b/neutron/tests/functional/agent/ovsdb/test_impl_idl.py
@@ -15,9 +15,10 @@
 
 import mock
 
+from ovsdbapp import exceptions as exc
+from ovsdbapp.schema.open_vswitch import impl_idl
+
 from neutron.agent.common import ovs_lib
-from neutron.agent.ovsdb import api
-from neutron.agent.ovsdb import impl_idl
 from neutron.common import utils
 from neutron.tests.common import net_helpers
 from neutron.tests.functional import base
@@ -26,7 +27,7 @@ from neutron.tests.functional import base
 # NOTE(twilson) functools.partial does not work for this
 def trpatch(*args, **kwargs):
     def wrapped(fn):
-        return mock.patch.object(impl_idl.NeutronOVSDBTransaction,
+        return mock.patch.object(impl_idl.OvsVsctlTransaction,
                                  *args, **kwargs)(fn)
     return wrapped
 
@@ -39,8 +40,8 @@ class ImplIdlTestCase(base.BaseSudoTestCase):
         self.brname = utils.get_rand_device_name(net_helpers.BR_PREFIX)
         # Make sure exceptions pass through by calling do_post_commit directly
         mock.patch.object(
-            impl_idl.NeutronOVSDBTransaction, "post_commit",
-            side_effect=impl_idl.NeutronOVSDBTransaction.do_post_commit,
+            impl_idl.OvsVsctlTransaction, "post_commit",
+            side_effect=impl_idl.OvsVsctlTransaction.do_post_commit,
             autospec=True).start()
 
     def _add_br(self):
@@ -65,11 +66,12 @@ class ImplIdlTestCase(base.BaseSudoTestCase):
     @trpatch("post_commit_failed_interfaces", return_value=["failed_if1"])
     @trpatch("timeout_exceeded", return_value=False)
     def test_post_commit_vswitchd_completed_failures(self, *args):
-        self.assertRaises(impl_idl.VswitchdInterfaceAddException, self._add_br)
+        self.assertRaises(impl_idl.VswitchdInterfaceAddException,
+                          self._add_br)
 
     @trpatch("vswitchd_has_completed", return_value=False)
     def test_post_commit_vswitchd_incomplete_timeout(self, *args):
         # Due to timing issues we may rarely hit the global timeout, which
         # raises RuntimeError to match the vsctl implementation
-        self.ovs.vsctl_timeout = 3
-        self.assertRaises((api.TimeoutException, RuntimeError), self._add_br)
+        self.ovs.ovsdb.connection.timeout = 3
+        self.assertRaises((exc.TimeoutException, RuntimeError), self._add_br)
diff --git a/neutron/tests/unit/agent/ovsdb/native/test_connection.py b/neutron/tests/unit/agent/ovsdb/native/test_connection.py
index 84ce437112f..c1ea027732a 100644
--- a/neutron/tests/unit/agent/ovsdb/native/test_connection.py
+++ b/neutron/tests/unit/agent/ovsdb/native/test_connection.py
@@ -12,79 +12,22 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-import eventlet
 import mock
-from ovs.db import idl
-from ovs import poller
 
-from neutron.agent.ovsdb.native import connection
-from neutron.agent.ovsdb.native import idlutils
+from ovsdbapp.backend.ovs_idl import connection
+from ovsdbapp.backend.ovs_idl import idlutils
+
+from neutron.agent.ovsdb.native import connection as native_conn
+from neutron.agent.ovsdb.native import helpers
 from neutron.tests import base
 
 
 class TestOVSNativeConnection(base.BaseTestCase):
-
-    @mock.patch.object(connection, 'TransactionQueue')
-    @mock.patch.object(idlutils, 'get_schema_helper')
-    @mock.patch.object(idl, 'Idl')
-    @mock.patch.object(idlutils, 'wait_for_change')
-    def _test_start(self, wfc, idl, gsh, tq, table_name_list=None):
-        gsh.return_value = helper = mock.Mock()
-        self.connection = connection.Connection(
-            mock.Mock(), mock.Mock(), mock.Mock())
-        with mock.patch.object(poller, 'Poller') as poller_mock,\
-                mock.patch('threading.Thread'):
-            poller_mock.return_value.block.side_effect = eventlet.sleep
-            self.connection.start(table_name_list=table_name_list)
-        reg_all_called = table_name_list is None
-        reg_table_called = table_name_list is not None
-        self.assertEqual(reg_all_called, helper.register_all.called)
-        self.assertEqual(reg_table_called, helper.register_table.called)
-
-    def test_start_without_table_name_list(self):
-        self._test_start()
-
-    def test_start_with_table_name_list(self):
-        self._test_start(table_name_list=['fake-table1', 'fake-table2'])
-
-    @mock.patch.object(connection, 'TransactionQueue')
-    @mock.patch.object(idl, 'Idl')
-    @mock.patch.object(idlutils, 'wait_for_change')
-    def test_start_call_graph(self, wait_for_change, idl, transaction_queue):
-        self.connection = connection.Connection(
-            mock.sentinel, mock.sentinel, mock.sentinel)
-        self.connection.get_schema_helper = mock.Mock()
-        helper = self.connection.get_schema_helper.return_value
-        self.connection.update_schema_helper = mock.Mock()
-        with mock.patch.object(poller, 'Poller') as poller_mock,\
-                mock.patch('threading.Thread'):
-            poller_mock.return_value.block.side_effect = eventlet.sleep
-            self.connection.start()
-        self.connection.get_schema_helper.assert_called_once_with()
-        self.connection.update_schema_helper.assert_called_once_with(helper)
-
-    def test_transaction_queue_init(self):
-        # a test to cover py34 failure during initialization (LP Bug #1580270)
-        # make sure no ValueError: can't have unbuffered text I/O is raised
-        connection.TransactionQueue()
-
-    @mock.patch.object(connection, 'TransactionQueue')
-    @mock.patch.object(idlutils, 'get_schema_helper')
-    @mock.patch.object(idlutils, 'wait_for_change')
-    def test_start_with_idl_class(self, wait_for_change, get_schema_helper,
-                                  transaction_queue):
-        idl_class = mock.Mock()
-        self.connection = connection.Connection(
-            mock.sentinel, mock.sentinel, mock.sentinel, idl_class=idl_class)
-        idl_instance = idl_class.return_value
-        self.connection.start()
-        self.assertEqual(idl_instance, self.connection.idl)
-
     @mock.patch.object(connection, 'threading')
-    @mock.patch.object(connection.idlutils, 'wait_for_change')
-    @mock.patch.object(connection, 'idl')
-    @mock.patch.object(idlutils.helpers, 'enable_connection_uri')
-    @mock.patch.object(connection.idlutils, '_get_schema_helper')
+    @mock.patch.object(idlutils, 'wait_for_change')
+    @mock.patch.object(native_conn, 'idl')
+    @mock.patch.object(helpers, 'enable_connection_uri')
+    @mock.patch.object(idlutils, 'get_schema_helper')
     def test_do_get_schema_helper_retry(self, mock_get_schema_helper,
                                         mock_enable_conn,
                                         mock_idl,
@@ -94,8 +37,8 @@ class TestOVSNativeConnection(base.BaseTestCase):
         # raise until 3rd retry attempt
         mock_get_schema_helper.side_effect = [Exception(), Exception(),
                                               mock_helper]
-        conn = connection.Connection(
-            mock.Mock(), mock.Mock(), mock.Mock())
+        conn = connection.Connection(idl_factory=native_conn.idl_factory,
+                                     timeout=mock.Mock())
         conn.start()
         self.assertEqual(3, len(mock_get_schema_helper.mock_calls))
         mock_helper.register_all.assert_called_once_with()
diff --git a/neutron/tests/unit/agent/ovsdb/native/test_idlutils.py b/neutron/tests/unit/agent/ovsdb/native/test_idlutils.py
deleted file mode 100644
index 030486a7f24..00000000000
--- a/neutron/tests/unit/agent/ovsdb/native/test_idlutils.py
+++ /dev/null
@@ -1,204 +0,0 @@
-# Copyright 2016, Mirantis Inc.
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-
-import mock
-
-from neutron.agent.ovsdb import api
-from neutron.agent.ovsdb.native import idlutils
-from neutron.tests import base
-
-
-class MockColumn(object):
-    def __init__(self, name, type, is_optional=False, test_value=None):
-        self.name = name
-        self.type = mock.MagicMock(
-            **{"key.type.name": type,
-               "is_optional": mock.Mock(return_value=is_optional),
-               })
-        # for test purposes only to operate with some values in condition_match
-        # testcase
-        self.test_value = test_value
-
-
-class MockTable(object):
-    def __init__(self, name, *columns):
-        # columns is a list of tuples (col_name, col_type)
-        self.name = name
-        self.columns = {c.name: c for c in columns}
-
-
-class MockRow(object):
-    def __init__(self, table):
-        self._table = table
-
-    def __getattr__(self, attr):
-        if attr in self._table.columns:
-            return self._table.columns[attr].test_value
-        return super(MockRow, self).__getattr__(attr)
-
-
-class MockCommand(api.Command):
-    def __init__(self, result):
-        self.result = result
-
-    def execute(self, **kwargs):
-        pass
-
-
-class TestIdlUtils(base.BaseTestCase):
-    def test_condition_match(self):
-        """
-        Make sure that the function respects the following:
-        * if column type is_optional and value is a single element, value is
-          transformed to a length-1-list
-        * any other value is returned as it is, no type convertions
-        """
-        table = MockTable("SomeTable",
-                          MockColumn("tag", "integer", is_optional=True,
-                                     test_value=[42]),
-                          MockColumn("num", "integer", is_optional=True,
-                                     test_value=[]),
-                          MockColumn("ids", "integer", is_optional=False,
-                                     test_value=42),
-                          MockColumn("comments", "string",
-                                     test_value=["a", "b", "c"]),
-                          MockColumn("status", "string",
-                                     test_value="sorry for inconvenience"))
-        row = MockRow(table=table)
-        self.assertTrue(idlutils.condition_match(row, ("tag", "=", 42)))
-        # optional types can be compared only as single elements
-        self.assertRaises(ValueError,
-                          idlutils.condition_match, row, ("tag", "!=", [42]))
-        # empty list comparison is ok for optional types though
-        self.assertTrue(idlutils.condition_match(row, ("tag", "!=", [])))
-        self.assertTrue(idlutils.condition_match(row, ("num", "=", [])))
-        # value = [] may be compared to a single elem if optional column type
-        self.assertTrue(idlutils.condition_match(row, ("num", "!=", 42)))
-        # no type conversion for non optional types
-        self.assertTrue(idlutils.condition_match(row, ("ids", "=", 42)))
-        self.assertTrue(idlutils.condition_match(
-            row, ("status", "=", "sorry for inconvenience")))
-        self.assertFalse(idlutils.condition_match(
-            row, ("status", "=", "sorry")))
-        # bad types
-        self.assertRaises(ValueError,
-                          idlutils.condition_match, row, ("ids", "=", "42"))
-        self.assertRaises(ValueError,
-                          idlutils.condition_match, row, ("ids", "!=", "42"))
-        self.assertRaises(ValueError,
-                          idlutils.condition_match, row,
-                          ("ids", "!=", {"a": "b"}))
-        # non optional list types are kept as they are
-        self.assertTrue(idlutils.condition_match(
-            row, ("comments", "=", ["c", "b", "a"])))
-        # also true because list comparison is relaxed
-        self.assertTrue(idlutils.condition_match(
-            row, ("comments", "=", ["c", "b"])))
-        self.assertTrue(idlutils.condition_match(
-            row, ("comments", "!=", ["d"])))
-
-    def test_db_replace_record_dict(self):
-        obj = {'a': 1, 'b': 2}
-        self.assertIs(obj, idlutils.db_replace_record(obj))
-
-    def test_db_replace_record_dict_cmd(self):
-        obj = {'a': 1, 'b': MockCommand(2)}
-        res = {'a': 1, 'b': 2}
-        self.assertEqual(res, idlutils.db_replace_record(obj))
-
-    def test_db_replace_record_list(self):
-        obj = [1, 2, 3]
-        self.assertIs(obj, idlutils.db_replace_record(obj))
-
-    def test_db_replace_record_list_cmd(self):
-        obj = [1, MockCommand(2), 3]
-        res = [1, 2, 3]
-        self.assertEqual(res, idlutils.db_replace_record(obj))
-
-    def test_db_replace_record_tuple(self):
-        obj = (1, 2, 3)
-        self.assertIs(obj, idlutils.db_replace_record(obj))
-
-    def test_db_replace_record_tuple_cmd(self):
-        obj = (1, MockCommand(2), 3)
-        res = (1, 2, 3)
-        self.assertEqual(res, idlutils.db_replace_record(obj))
-
-    def test_db_replace_record(self):
-        obj = "test"
-        self.assertIs(obj, idlutils.db_replace_record(obj))
-
-    def test_db_replace_record_cmd(self):
-        obj = MockCommand("test")
-        self.assertEqual("test", idlutils.db_replace_record(obj))
-
-    def test_row_by_record(self):
-        FAKE_RECORD = 'fake_record'
-        mock_idl_ = mock.MagicMock()
-        mock_table = mock.MagicMock(
-            rows={mock.sentinel.row: mock.sentinel.row_value})
-        mock_idl_.tables = {mock.sentinel.table_name: mock_table}
-
-        res = idlutils.row_by_record(mock_idl_,
-                                     mock.sentinel.table_name,
-                                     FAKE_RECORD)
-        self.assertEqual(mock.sentinel.row_value, res)
-
-    @mock.patch.object(idlutils.helpers, 'enable_connection_uri')
-    @mock.patch.object(idlutils, '_get_schema_helper')
-    def test_get_schema_helper_succeed_once(self,
-                                            mock_get_schema_helper,
-                                            mock_enable_conn):
-        mock_get_schema_helper.return_value = mock.Mock()
-
-        idlutils.get_schema_helper(mock.Mock(), mock.Mock())
-        self.assertEqual(1, mock_get_schema_helper.call_count)
-        self.assertEqual(0, mock_enable_conn.call_count)
-
-    @mock.patch.object(idlutils.helpers, 'enable_connection_uri')
-    @mock.patch.object(idlutils, '_get_schema_helper')
-    def test_get_schema_helper_fail_and_then_succeed(self,
-                                                     mock_get_schema_helper,
-                                                     mock_enable_conn):
-        # raise until 3rd retry attempt
-        mock_get_schema_helper.side_effect = [Exception(), Exception(),
-                                              mock.Mock()]
-
-        idlutils.get_schema_helper(mock.Mock(), mock.Mock())
-        self.assertEqual(3, mock_get_schema_helper.call_count)
-        self.assertEqual(1, mock_enable_conn.call_count)
-
-    @mock.patch.object(idlutils.helpers, 'enable_connection_uri')
-    @mock.patch.object(idlutils, '_get_schema_helper')
-    def test_get_schema_helper_not_add_manager_and_timeout(
-            self, mock_get_schema_helper, mock_enable_conn):
-        # raise always
-        mock_get_schema_helper.side_effect = RuntimeError()
-
-        self.assertRaises(RuntimeError, idlutils.get_schema_helper,
-                          mock.Mock(), mock.Mock(), retry=True,
-                          try_add_manager=False)
-        self.assertEqual(8, mock_get_schema_helper.call_count)
-        self.assertEqual(0, mock_enable_conn.call_count)
-
-    @mock.patch.object(idlutils.helpers, 'enable_connection_uri')
-    @mock.patch.object(idlutils, '_get_schema_helper')
-    def test_get_schema_helper_not_retry(
-            self, mock_get_schema_helper, mock_enable_conn):
-        # raise always
-        mock_get_schema_helper.side_effect = RuntimeError()
-
-        self.assertRaises(RuntimeError, idlutils.get_schema_helper,
-                          mock.Mock(), mock.Mock(), retry=False)
-        self.assertEqual(1, mock_get_schema_helper.call_count)
diff --git a/neutron/tests/unit/agent/ovsdb/test_api.py b/neutron/tests/unit/agent/ovsdb/test_api.py
deleted file mode 100644
index 1df441b530f..00000000000
--- a/neutron/tests/unit/agent/ovsdb/test_api.py
+++ /dev/null
@@ -1,137 +0,0 @@
-# Copyright (c) 2017 Red Hat, Inc.
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-
-import mock
-import testtools
-
-from neutron.agent.ovsdb import api
-from neutron.tests import base
-
-
-class FakeTransaction(object):
-    def __enter__(self):
-        return self
-
-    def __exit__(self, exc_type, exc_val, tb):
-        self.commit()
-
-    def commit(self):
-        """Serves just for mock."""
-
-
-class TestingAPI(api.API):
-    def create_transaction(self, check_error=False, log_errors=True, **kwargs):
-        return FakeTransaction()
-
-    def add_manager(self, connection_uri):
-        pass
-
-    def get_manager(self):
-        pass
-
-    def remove_manager(self, connection_uri):
-        pass
-
-    def add_br(self, name, may_exist=True, datapath_type=None):
-        pass
-
-    def del_br(self, name, if_exists=True):
-        pass
-
-    def br_exists(self, name):
-        pass
-
-    def port_to_br(self, name):
-        pass
-
-    def iface_to_br(self, name):
-        pass
-
-    def list_br(self):
-        pass
-
-    def br_get_external_id(self, name, field):
-        pass
-
-    def db_create(self, table, **col_values):
-        pass
-
-    def db_destroy(self, table, record):
-        pass
-
-    def db_set(self, table, record, *col_values):
-        pass
-
-    def db_add(self, table, record, column, *values):
-        pass
-
-    def db_clear(self, table, record, column):
-        pass
-
-    def db_get(self, table, record, column):
-        pass
-
-    def db_list(self, table, records=None, columns=None, if_exists=False):
-        pass
-
-    def db_find(self, table, *conditions, **kwargs):
-        pass
-
-    def set_controller(self, bridge, controllers):
-        pass
-
-    def del_controller(self, bridge):
-        pass
-
-    def get_controller(self, bridge):
-        pass
-
-    def set_fail_mode(self, bridge, mode):
-        pass
-
-    def add_port(self, bridge, port, may_exist=True):
-        pass
-
-    def del_port(self, port, bridge=None, if_exists=True):
-        pass
-
-    def list_ports(self, bridge):
-        pass
-
-    def list_ifaces(self, bridge):
-        pass
-
-
-class TransactionTestCase(base.BaseTestCase):
-    def setUp(self):
-        super(TransactionTestCase, self).setUp()
-        self.api = TestingAPI(None)
-        mock.patch.object(FakeTransaction, 'commit').start()
-
-    def test_transaction_nested(self):
-        with self.api.transaction() as txn1:
-            with self.api.transaction() as txn2:
-                self.assertIs(txn1, txn2)
-        txn1.commit.assert_called_once_with()
-
-    def test_transaction_no_nested_transaction_after_error(self):
-        class TestException(Exception):
-            pass
-
-        with testtools.ExpectedException(TestException):
-            with self.api.transaction() as txn1:
-                raise TestException()
-
-        with self.api.transaction() as txn2:
-            self.assertIsNot(txn1, txn2)
diff --git a/neutron/tests/unit/agent/ovsdb/test_impl_idl.py b/neutron/tests/unit/agent/ovsdb/test_impl_idl.py
index 5d2a3e6a981..bb57e01b612 100644
--- a/neutron/tests/unit/agent/ovsdb/test_impl_idl.py
+++ b/neutron/tests/unit/agent/ovsdb/test_impl_idl.py
@@ -15,7 +15,8 @@
 import mock
 import testtools
 
-from neutron.agent.ovsdb import api
+from ovsdbapp import exceptions
+
 from neutron.agent.ovsdb import impl_idl
 from neutron.tests import base
 
@@ -25,7 +26,7 @@ class TransactionTestCase(base.BaseTestCase):
         transaction = impl_idl.NeutronOVSDBTransaction(mock.sentinel,
                                                        mock.Mock(), 1)
         with self.assert_max_execution_time(10):
-            with testtools.ExpectedException(api.TimeoutException):
+            with testtools.ExpectedException(exceptions.TimeoutException):
                 transaction.commit()
 
     def test_post_commit_does_not_raise_exception(self):
diff --git a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/openflow/native/test_br_phys.py b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/openflow/native/test_br_phys.py
index df682815a7f..c49a0dff4f4 100644
--- a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/openflow/native/test_br_phys.py
+++ b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/openflow/native/test_br_phys.py
@@ -31,10 +31,10 @@ class OVSPhysicalBridgeTest(ovs_bridge_test_base.OVSBridgeTestBase,
     dvr_process_next_table_id = ovs_const.LOCAL_VLAN_TRANSLATION
 
     def setUp(self):
-        super(OVSPhysicalBridgeTest, self).setUp()
         conn_patcher = mock.patch(
             'neutron.agent.ovsdb.native.connection.Connection.start')
         conn_patcher.start()
+        super(OVSPhysicalBridgeTest, self).setUp()
         self.addCleanup(conn_patcher.stop)
         self.setup_bridge_mock('br-phys', self.br_phys_cls)
         self.stamp = self.br.default_cookie
diff --git a/requirements.txt b/requirements.txt
index f010b12a639..13392d0a32e 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -44,6 +44,7 @@ oslo.utils>=3.20.0 # Apache-2.0
 oslo.versionedobjects>=1.17.0 # Apache-2.0
 osprofiler>=1.4.0 # Apache-2.0
 ovs>=2.7.0 # Apache-2.0
+ovsdbapp>=0.3.0 # Apache-2.0
 psutil>=3.2.2 # BSD
 pyroute2>=0.4.12 # Apache-2.0 (+ dual licensed GPL2)
 weakrefmethod>=1.0.2;python_version=='2.7' # PSF