Add WaitEvent to the API
The ability to wait on an event to complete is a useful construct. Instead of just having a one-off class to do it in the functional tests, add it to the formal API. There were some pylint issues with unused variables that are also fixed in this patch. Change-Id: Ie6d7c2ab384e152776b3d5f000c4ee405a8a3974
This commit is contained in:
parent
03a79cb37b
commit
0e5904ac9e
@ -39,3 +39,7 @@ class RowEvent(ovsdb_event.RowEvent): # pylint: disable=abstract-method
|
||||
LOG.debug("%s : Matched %s, %s, %s %s", self.event_name, self.table,
|
||||
self.events, self.conditions, self.old_conditions)
|
||||
return True
|
||||
|
||||
|
||||
class WaitEvent(RowEvent, ovsdb_event.WaitEvent):
|
||||
pass
|
||||
|
@ -68,6 +68,26 @@ class RowEvent(object):
|
||||
"""Method to run when the event matches"""
|
||||
|
||||
|
||||
class WaitEvent(RowEvent):
|
||||
event_name = 'WaitEvent'
|
||||
ONETIME = True
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.event = threading.Event()
|
||||
self.timeout = kwargs.pop('timeout', None)
|
||||
super(WaitEvent, self).__init__(*args, **kwargs)
|
||||
|
||||
@abc.abstractmethod
|
||||
def matches(self, event, row, old=None):
|
||||
"""Test that `event on `row` matches watched events. See: RowEvent"""
|
||||
|
||||
def run(self, event, row, old):
|
||||
self.event.set()
|
||||
|
||||
def wait(self):
|
||||
return self.event.wait(self.timeout)
|
||||
|
||||
|
||||
class RowEventHandler(object):
|
||||
def __init__(self):
|
||||
self.__watched_events = set()
|
||||
|
@ -10,8 +10,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import threading
|
||||
|
||||
from ovsdbapp.backend.ovs_idl import event
|
||||
from ovsdbapp.backend.ovs_idl import idlutils
|
||||
from ovsdbapp import event as ovsdb_event
|
||||
@ -22,21 +20,13 @@ from ovsdbapp.tests.functional.schema.ovn_southbound import fixtures
|
||||
from ovsdbapp.tests import utils
|
||||
|
||||
|
||||
class WaitForPortBindingEvent(event.RowEvent):
|
||||
class WaitForPortBindingEvent(event.WaitEvent):
|
||||
event_name = 'WaitForPortBindingEvent'
|
||||
ONETIME = True
|
||||
|
||||
def __init__(self, port, timeout=5):
|
||||
self.event = threading.Event()
|
||||
self.timeout = timeout
|
||||
super(WaitForPortBindingEvent, self).__init__(
|
||||
(self.ROW_CREATE,), 'Port_Binding', (('logical_port', '=', port),))
|
||||
|
||||
def run(self, event, row, old):
|
||||
self.event.set()
|
||||
|
||||
def wait(self):
|
||||
self.event.wait(self.timeout)
|
||||
(self.ROW_CREATE,), 'Port_Binding', (('logical_port', '=', port),),
|
||||
timeout=timeout)
|
||||
|
||||
|
||||
class OvnSouthboundTest(base.FunctionalTestCase):
|
||||
@ -109,7 +99,7 @@ class OvnSouthboundTest(base.FunctionalTestCase):
|
||||
with self.nbapi.transaction(check_error=True) as txn:
|
||||
switch = txn.add(self.nbapi.ls_add(sname))
|
||||
port = txn.add(self.nbapi.lsp_add(sname, pname))
|
||||
row_event.wait()
|
||||
self.assertTrue(row_event.wait())
|
||||
return chassis, switch.result, port.result
|
||||
|
||||
def test_lsp_bind(self):
|
||||
@ -121,12 +111,12 @@ class OvnSouthboundTest(base.FunctionalTestCase):
|
||||
return chassis, switch, port
|
||||
|
||||
def test_lsp_bind_exists(self):
|
||||
chassis, switch, port = self.test_lsp_bind()
|
||||
chassis, _switch, port = self.test_lsp_bind()
|
||||
cmd = self.api.lsp_bind(port.name, chassis.name)
|
||||
self.assertRaises(RuntimeError, cmd.execute, check_error=True)
|
||||
|
||||
def test_lsp_bind_may_exist(self):
|
||||
chassis, switch, port = self.test_lsp_bind()
|
||||
chassis, _switch, port = self.test_lsp_bind()
|
||||
other = self._chassis_add(['vxlan'], '192.0.2.2',
|
||||
chassis=utils.get_rand_device_name())
|
||||
self.api.lsp_bind(port.name, other.name, may_exist=True).execute(
|
||||
@ -137,7 +127,7 @@ class OvnSouthboundTest(base.FunctionalTestCase):
|
||||
self.assertIn(chassis, binding.chassis)
|
||||
|
||||
def test_lsp_unbind(self):
|
||||
chassis, switch, port = self.test_lsp_bind()
|
||||
_chassis, _switch, port = self.test_lsp_bind()
|
||||
self.api.lsp_unbind(port.name).execute(check_error=True)
|
||||
binding = idlutils.row_by_value(self.api.idl, 'Port_Binding',
|
||||
'logical_port', port.name)
|
||||
|
Loading…
x
Reference in New Issue
Block a user