Merge "Make the notifier max_workers configurable"
This commit is contained in:
commit
5d1964d96b
@ -58,6 +58,8 @@ _NOTIFICATION_OPTIONS = (
|
|||||||
cfg.StrOpt('smtp_command', default='/usr/sbin/sendmail -t -oi',
|
cfg.StrOpt('smtp_command', default='/usr/sbin/sendmail -t -oi',
|
||||||
help=('The command of smtp to send email. The format is '
|
help=('The command of smtp to send email. The format is '
|
||||||
'"command_name arg1 arg2".')),
|
'"command_name arg1 arg2".')),
|
||||||
|
cfg.IntOpt('max_notifier_workers', default=10,
|
||||||
|
help='The max amount of the notification workers.')
|
||||||
)
|
)
|
||||||
|
|
||||||
_NOTIFICATION_GROUP = 'notification'
|
_NOTIFICATION_GROUP = 'notification'
|
||||||
|
@ -32,8 +32,8 @@ class NotifierDriver(object):
|
|||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
self.subscription_controller = kwargs.get('subscription_controller')
|
self.subscription_controller = kwargs.get('subscription_controller')
|
||||||
# TODO(flwang): Make the max_workers configurable
|
max_workers = kwargs.get('max_notifier_workers', 10)
|
||||||
self.executor = futurist.ThreadPoolExecutor(max_workers=10)
|
self.executor = futurist.ThreadPoolExecutor(max_workers=max_workers)
|
||||||
|
|
||||||
def post(self, queue_name, messages, client_uuid, project=None):
|
def post(self, queue_name, messages, client_uuid, project=None):
|
||||||
"""Send messages to the subscribers."""
|
"""Send messages to the subscribers."""
|
||||||
|
@ -148,7 +148,9 @@ class DataDriver(base.DataDriverBase):
|
|||||||
stages = _get_builtin_entry_points('message', self._storage,
|
stages = _get_builtin_entry_points('message', self._storage,
|
||||||
self.control_driver)
|
self.control_driver)
|
||||||
kwargs = {'subscription_controller':
|
kwargs = {'subscription_controller':
|
||||||
self._storage.subscription_controller}
|
self._storage.subscription_controller,
|
||||||
|
'max_notifier_workers':
|
||||||
|
self.conf.notification.max_notifier_workers}
|
||||||
stages.extend(_get_storage_pipeline('message', self.conf, **kwargs))
|
stages.extend(_get_storage_pipeline('message', self.conf, **kwargs))
|
||||||
stages.append(self._storage.message_controller)
|
stages.append(self._storage.message_controller)
|
||||||
return common.Pipeline(stages)
|
return common.Pipeline(stages)
|
||||||
|
@ -56,6 +56,8 @@ class TestBase(testtools.TestCase):
|
|||||||
self.conf.register_opts(configs._GENERAL_OPTIONS)
|
self.conf.register_opts(configs._GENERAL_OPTIONS)
|
||||||
self.conf.register_opts(configs._DRIVER_OPTIONS,
|
self.conf.register_opts(configs._DRIVER_OPTIONS,
|
||||||
group=configs._DRIVER_GROUP)
|
group=configs._DRIVER_GROUP)
|
||||||
|
self.conf.register_opts(configs._NOTIFICATION_OPTIONS,
|
||||||
|
group=configs._NOTIFICATION_GROUP)
|
||||||
|
|
||||||
self.mongodb_url = os.environ.get('ZAQAR_TEST_MONGODB_URL',
|
self.mongodb_url = os.environ.get('ZAQAR_TEST_MONGODB_URL',
|
||||||
'mongodb://127.0.0.1:27017')
|
'mongodb://127.0.0.1:27017')
|
||||||
|
Loading…
x
Reference in New Issue
Block a user