Add the config option for Oslo Messaging executor type

* With Pymysql driver and "eventlet" Oslo Messaging executor type
  Mistral seems to work fine. Just in case there's a regression in
  comparison with using "blocking" executor this patch makes a
  a change in a form of a config option that defines an RPC
  executor rather making it hardcoded.

Change-Id: Id73364eee29f2113fc983718b9891a496ca32ee4
This commit is contained in:
Renat Akhmerov 2018-07-11 17:42:23 +07:00
parent 38b28db168
commit 0448383d20
6 changed files with 15 additions and 8 deletions

View File

@ -111,6 +111,14 @@ rpc_response_timeout_opt = cfg.IntOpt(
help=_('Seconds to wait for a response from a call.') help=_('Seconds to wait for a response from a call.')
) )
oslo_rpc_executor = cfg.StrOpt(
'oslo_rpc_executor',
default='eventlet',
choices=['eventlet', 'blocking', 'threading'],
help=_('Executor type used by Oslo Messaging framework. Defines how '
'Oslo Messaging based RPC subsystem processes incoming calls.')
)
expiration_token_duration = cfg.IntOpt( expiration_token_duration = cfg.IntOpt(
'expiration_token_duration', 'expiration_token_duration',
default=30, default=30,
@ -559,6 +567,7 @@ CONF.register_opt(auth_type_opt)
CONF.register_opt(js_impl_opt) CONF.register_opt(js_impl_opt)
CONF.register_opt(rpc_impl_opt) CONF.register_opt(rpc_impl_opt)
CONF.register_opt(rpc_response_timeout_opt) CONF.register_opt(rpc_response_timeout_opt)
CONF.register_opt(oslo_rpc_executor)
CONF.register_opt(expiration_token_duration) CONF.register_opt(expiration_token_duration)
CONF.register_opts(api_opts, group=API_GROUP) CONF.register_opts(api_opts, group=API_GROUP)
@ -596,6 +605,7 @@ default_group_opts = itertools.chain(
js_impl_opt, js_impl_opt,
rpc_impl_opt, rpc_impl_opt,
rpc_response_timeout_opt, rpc_response_timeout_opt,
oslo_rpc_executor,
expiration_token_duration expiration_token_duration
] ]
) )

View File

@ -61,10 +61,7 @@ class EngineServer(service_base.MistralService):
self._rpc_server = rpc.get_rpc_server_driver()(cfg.CONF.engine) self._rpc_server = rpc.get_rpc_server_driver()(cfg.CONF.engine)
self._rpc_server.register_endpoint(self) self._rpc_server.register_endpoint(self)
# Note(ddeja): Engine needs to be run in default (blocking) mode self._rpc_server.run(executor=cfg.CONF.oslo_rpc_executor)
# since using another mode may lead to a deadlock.
# See https://review.openstack.org/#/c/356343 for more info.
self._rpc_server.run(executor='blocking')
self._notify_started('Engine server started.') self._notify_started('Engine server started.')

View File

@ -112,7 +112,7 @@ def _check_and_complete(wf_ex_id):
else 4 else 4
) )
# Rescheduling this check may not happen if erros are # Rescheduling this check may not happen if errors are
# raised in the business logic. If the error is DB related # raised in the business logic. If the error is DB related
# and not considered fatal (e.g. disconnect, deadlock), the # and not considered fatal (e.g. disconnect, deadlock), the
# retry annotation around the method will ensure that the # retry annotation around the method will ensure that the

View File

@ -168,7 +168,7 @@ class RPCServer(object):
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def run(self, executor='blocking'): def run(self, executor='eventlet'):
"""Runs the RPC server. """Runs the RPC server.
:param executor: Executor used to process incoming requests. Different :param executor: Executor used to process incoming requests. Different

View File

@ -83,7 +83,7 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
"""Return whether server is running.""" """Return whether server is running."""
return self._running.is_set() return self._running.is_set()
def run(self, executor='blocking'): def run(self, executor='eventlet'):
if self._thread is None: if self._thread is None:
self._thread = threading.Thread(target=self._run, args=(executor,)) self._thread = threading.Thread(target=self._run, args=(executor,))
self._thread.daemon = True self._thread.daemon = True

View File

@ -35,7 +35,7 @@ class OsloRPCServer(rpc.RPCServer):
def register_endpoint(self, endpoint): def register_endpoint(self, endpoint):
self.endpoints.append(endpoint) self.endpoints.append(endpoint)
def run(self, executor='blocking'): def run(self, executor='eventlet'):
target = messaging.Target( target = messaging.Target(
topic=self.topic, topic=self.topic,
server=self.server_id server=self.server_id