[Event-engine] Allow event_engine to work in HA
A previous patch allows to make multiple event_engines to listen to a single queue, but the RPC calls on CRUD are still synchronous This patch modifies the calls and broadcasts them on all the event engines allow them to modify each independent listeners. Closes-Bug: #1715848 Change-Id: Ia37831a03993f5a1bf980d62344d25377062788d
This commit is contained in:
parent
70c269e7d1
commit
defff08773
@ -405,6 +405,4 @@ class DefaultEventEngine(base.EventEngine):
|
||||
|
||||
return
|
||||
|
||||
security.delete_trust(trigger['trust_id'])
|
||||
|
||||
self._add_event_listener(trigger['exchange'], trigger['topic'], events)
|
||||
|
@ -137,7 +137,7 @@ class RPCClient(object):
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def async_call(self, ctx, method, target=None, **kwargs):
|
||||
def async_call(self, ctx, method, target=None, fanout=False, **kwargs):
|
||||
"""Asynchronous call of RPC method.
|
||||
|
||||
Does not block the thread, just send invoking data to
|
||||
|
@ -392,26 +392,29 @@ class EventEngineClient(evt_eng.EventEngine):
|
||||
self._client = base.get_rpc_client_driver()(rpc_conf_dict)
|
||||
|
||||
def create_event_trigger(self, trigger, events):
|
||||
return self._client.sync_call(
|
||||
return self._client.async_call(
|
||||
auth_ctx.ctx(),
|
||||
'create_event_trigger',
|
||||
trigger=trigger,
|
||||
events=events
|
||||
events=events,
|
||||
fanout=True,
|
||||
)
|
||||
|
||||
def delete_event_trigger(self, trigger, events):
|
||||
return self._client.sync_call(
|
||||
return self._client.async_call(
|
||||
auth_ctx.ctx(),
|
||||
'delete_event_trigger',
|
||||
trigger=trigger,
|
||||
events=events
|
||||
events=events,
|
||||
fanout=True,
|
||||
)
|
||||
|
||||
def update_event_trigger(self, trigger):
|
||||
return self._client.sync_call(
|
||||
return self._client.async_call(
|
||||
auth_ctx.ctx(),
|
||||
'update_event_trigger',
|
||||
trigger=trigger,
|
||||
fanout=True,
|
||||
)
|
||||
|
||||
|
||||
|
@ -206,5 +206,5 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
|
||||
def sync_call(self, ctx, method, target=None, **kwargs):
|
||||
return self._call(ctx, method, async_=False, target=target, **kwargs)
|
||||
|
||||
def async_call(self, ctx, method, target=None, **kwargs):
|
||||
def async_call(self, ctx, method, target=None, fanout=False, **kwargs):
|
||||
return self._call(ctx, method, async_=True, target=target, **kwargs)
|
||||
|
@ -38,9 +38,7 @@ class OsloRPCClient(rpc.RPCClient):
|
||||
**kwargs
|
||||
)
|
||||
|
||||
def async_call(self, ctx, method, target=None, **kwargs):
|
||||
return self._client.prepare(topic=self.topic, server=target).cast(
|
||||
ctx,
|
||||
method,
|
||||
**kwargs
|
||||
)
|
||||
def async_call(self, ctx, method, target=None, fanout=False, **kwargs):
|
||||
return self._client.prepare(topic=self.topic,
|
||||
server=target,
|
||||
fanout=fanout).cast(ctx, method, **kwargs)
|
||||
|
@ -221,6 +221,8 @@ def delete_event_trigger(event_trigger):
|
||||
list(events)
|
||||
)
|
||||
|
||||
security.delete_trust(event_trigger['trust_id'])
|
||||
|
||||
|
||||
def update_event_trigger(id, values):
|
||||
trig = db_api.update_event_trigger(id, values)
|
||||
|
@ -21,6 +21,7 @@ import sqlalchemy as sa
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.db.v2.sqlalchemy import models
|
||||
from mistral import exceptions as exc
|
||||
from mistral.services import security
|
||||
from mistral.services import triggers
|
||||
from mistral.tests.unit.api import base
|
||||
from mistral.tests.unit import base as unit_base
|
||||
@ -207,14 +208,22 @@ class TestEventTriggerController(base.APITest):
|
||||
self.assertEqual(400, resp.status_int)
|
||||
|
||||
@mock.patch('mistral.rpc.clients.get_event_engine_client')
|
||||
@mock.patch.object(db_api, "get_event_trigger", MOCK_TRIGGER)
|
||||
@mock.patch('mistral.db.v2.api.get_event_trigger')
|
||||
@mock.patch.object(db_api, "get_event_triggers",
|
||||
mock.MagicMock(return_value=[]))
|
||||
@mock.patch.object(db_api, "delete_event_trigger", MOCK_NONE)
|
||||
def test_delete(self, mock_rpc_client):
|
||||
@mock.patch.object(security, "delete_trust", MOCK_NONE)
|
||||
def test_delete(self, mock_delete, mock_rpc_client):
|
||||
client = mock.Mock()
|
||||
mock_rpc_client.return_value = client
|
||||
|
||||
DELETE_TRIGGER = models.EventTrigger()
|
||||
DELETE_TRIGGER.update(trigger_values)
|
||||
DELETE_TRIGGER.update(
|
||||
{'trust_id': 'c30e50e8-ee7d-4f8a-9515-f0530d9dc54b'}
|
||||
)
|
||||
mock_delete.return_value = DELETE_TRIGGER
|
||||
|
||||
resp = self.app.delete(
|
||||
'/v2/event_triggers/09cc56a9-d15e-4494-a6e2-c4ec8bdaacae'
|
||||
)
|
||||
@ -223,7 +232,7 @@ class TestEventTriggerController(base.APITest):
|
||||
self.assertEqual(1, client.delete_event_trigger.call_count)
|
||||
|
||||
self.assertDictEqual(
|
||||
TRIGGER_DB.to_dict(),
|
||||
DELETE_TRIGGER.to_dict(),
|
||||
client.delete_event_trigger.call_args[0][0]
|
||||
)
|
||||
self.assertListEqual(
|
||||
|
Loading…
x
Reference in New Issue
Block a user