Support for Jobboard etcd backend
- add a new etcd_taskflow_driver for the octavia worker - bump taskflow (Etcd support) and etcd3gw (important fixes for the backend) - enable etcd in devstack if requested - add experimental jobs for jobboard/etcd Change-Id: Ib557a9cf938fcf4257d2c2848fff78a62d82109a
This commit is contained in:
@@ -10,7 +10,7 @@ GET_PIP_CACHE_LOCATION=/opt/stack/cache/files/get-pip.py
|
|||||||
|
|
||||||
function octavia_install {
|
function octavia_install {
|
||||||
if [[ ${OCTAVIA_ENABLE_AMPHORAV2_JOBBOARD} == True ]]; then
|
if [[ ${OCTAVIA_ENABLE_AMPHORAV2_JOBBOARD} == True ]]; then
|
||||||
setup_develop $OCTAVIA_DIR redis
|
setup_develop $OCTAVIA_DIR ${OCTAVIA_JOBBOARD_BACKEND}
|
||||||
else
|
else
|
||||||
setup_develop $OCTAVIA_DIR
|
setup_develop $OCTAVIA_DIR
|
||||||
fi
|
fi
|
||||||
@@ -299,6 +299,11 @@ function octavia_configure {
|
|||||||
iniset $OCTAVIA_CONF task_flow persistence_connection "mysql+pymysql://${DATABASE_USER}:${DATABASE_PASSWORD}@${DATABASE_HOST}:3306/octavia_persistence"
|
iniset $OCTAVIA_CONF task_flow persistence_connection "mysql+pymysql://${DATABASE_USER}:${DATABASE_PASSWORD}@${DATABASE_HOST}:3306/octavia_persistence"
|
||||||
iniset $OCTAVIA_CONF task_flow jobboard_expiration_time ${OCTAVIA_JOBBOARD_EXPIRATION_TIME}
|
iniset $OCTAVIA_CONF task_flow jobboard_expiration_time ${OCTAVIA_JOBBOARD_EXPIRATION_TIME}
|
||||||
iniset $OCTAVIA_CONF task_flow jobboard_enabled True
|
iniset $OCTAVIA_CONF task_flow jobboard_enabled True
|
||||||
|
if [[ ${OCTAVIA_JOBBOARD_BACKEND} == "etcd" ]]; then
|
||||||
|
iniset $OCTAVIA_CONF task_flow jobboard_backend_driver etcd_taskflow_driver
|
||||||
|
iniset $OCTAVIA_CONF task_flow jobboard_backend_port 2379
|
||||||
|
iniset $OCTAVIA_CONF task_flow jobboard_backend_hosts ${SERVICE_HOST}
|
||||||
|
fi
|
||||||
fi
|
fi
|
||||||
# Configure keystone auth_token for all users
|
# Configure keystone auth_token for all users
|
||||||
configure_keystone_authtoken_middleware $OCTAVIA_CONF octavia
|
configure_keystone_authtoken_middleware $OCTAVIA_CONF octavia
|
||||||
|
@@ -32,6 +32,7 @@ OCTAVIA_HM_LISTEN_PORT=${OCTAVIA_HM_LISTEN_PORT:-"5555"}
|
|||||||
|
|
||||||
OCTAVIA_ENABLE_AMPHORAV2_JOBBOARD=${OCTAVIA_ENABLE_AMPHORAV2_JOBBOARD:-False}
|
OCTAVIA_ENABLE_AMPHORAV2_JOBBOARD=${OCTAVIA_ENABLE_AMPHORAV2_JOBBOARD:-False}
|
||||||
OCTAVIA_JOBBOARD_EXPIRATION_TIME=${OCTAVIA_JOBBOARD_EXPIRATION_TIME:-30}
|
OCTAVIA_JOBBOARD_EXPIRATION_TIME=${OCTAVIA_JOBBOARD_EXPIRATION_TIME:-30}
|
||||||
|
OCTAVIA_JOBBOARD_BACKEND=${OCTAVIA_JOBBOARD_BACKEND:-redis}
|
||||||
|
|
||||||
OCTAVIA_MGMT_SUBNET=${OCTAVIA_MGMT_SUBNET:-"192.168.0.0/24"}
|
OCTAVIA_MGMT_SUBNET=${OCTAVIA_MGMT_SUBNET:-"192.168.0.0/24"}
|
||||||
OCTAVIA_MGMT_SUBNET_START=${OCTAVIA_MGMT_SUBNET_START:-"192.168.0.2"}
|
OCTAVIA_MGMT_SUBNET_START=${OCTAVIA_MGMT_SUBNET_START:-"192.168.0.2"}
|
||||||
|
@@ -156,7 +156,7 @@ class DynamicLoggingConductor(impl_blocking.BlockingConductor):
|
|||||||
job.name)
|
job.name)
|
||||||
|
|
||||||
|
|
||||||
class RedisDynamicLoggingConductor(DynamicLoggingConductor):
|
class ExtendExpiryDynamicLoggingConductor(DynamicLoggingConductor):
|
||||||
|
|
||||||
def _listeners_from_job(self, job, engine):
|
def _listeners_from_job(self, job, engine):
|
||||||
listeners = super()._listeners_from_job(job, engine)
|
listeners = super()._listeners_from_job(job, engine)
|
||||||
@@ -206,20 +206,29 @@ class TaskFlowServiceController:
|
|||||||
def run_conductor(self, name):
|
def run_conductor(self, name):
|
||||||
with self.driver.persistence_driver.get_persistence() as persistence:
|
with self.driver.persistence_driver.get_persistence() as persistence:
|
||||||
with self.driver.job_board(persistence) as board:
|
with self.driver.job_board(persistence) as board:
|
||||||
# Redis do not expire jobs by default, so jobs won't be resumed
|
# Redis and etcd do not expire jobs by default, so jobs won't
|
||||||
# with restart of controller. Add expiry for board and use
|
# be resumed with restart of controller. Add expiry for board
|
||||||
# special listener.
|
# and use special listener.
|
||||||
if (CONF.task_flow.jobboard_backend_driver ==
|
if (CONF.task_flow.jobboard_backend_driver in (
|
||||||
'redis_taskflow_driver'):
|
'etcd_taskflow_driver',
|
||||||
conductor = RedisDynamicLoggingConductor(
|
'redis_taskflow_driver')):
|
||||||
|
conductor = ExtendExpiryDynamicLoggingConductor(
|
||||||
name, board, persistence=persistence,
|
name, board, persistence=persistence,
|
||||||
engine=CONF.task_flow.engine,
|
engine=CONF.task_flow.engine,
|
||||||
engine_options={
|
engine_options={
|
||||||
'max_workers': CONF.task_flow.max_workers
|
'max_workers': CONF.task_flow.max_workers
|
||||||
})
|
})
|
||||||
board.claim = functools.partial(
|
if (CONF.task_flow.jobboard_backend_driver ==
|
||||||
board.claim,
|
'redis_taskflow_driver'):
|
||||||
expiry=CONF.task_flow.jobboard_expiration_time)
|
# Hack for redis only:
|
||||||
|
# The TTL of the jobs of the Redis Jobboard driver can
|
||||||
|
# be only overriden by using the 'expiry' parameter of
|
||||||
|
# the 'claim' function
|
||||||
|
# For the Etcd driver, the default TTL for all the
|
||||||
|
# locks can be configured while creating the backend
|
||||||
|
board.claim = functools.partial(
|
||||||
|
board.claim,
|
||||||
|
expiry=CONF.task_flow.jobboard_expiration_time)
|
||||||
else:
|
else:
|
||||||
conductor = DynamicLoggingConductor(
|
conductor = DynamicLoggingConductor(
|
||||||
name, board, persistence=persistence,
|
name, board, persistence=persistence,
|
||||||
|
@@ -555,8 +555,10 @@ task_flow_opts = [
|
|||||||
choices=[('redis_taskflow_driver',
|
choices=[('redis_taskflow_driver',
|
||||||
'Driver that will use Redis to store job states.'),
|
'Driver that will use Redis to store job states.'),
|
||||||
('zookeeper_taskflow_driver',
|
('zookeeper_taskflow_driver',
|
||||||
'Driver that will use Zookeeper to store job states.')
|
'Driver that will use Zookeeper to store job '
|
||||||
],
|
'states.'),
|
||||||
|
('etcd_taskflow_driver',
|
||||||
|
'Driver that will user Etcd to store job states.')],
|
||||||
help='Jobboard backend driver that will monitor job state.'),
|
help='Jobboard backend driver that will monitor job state.'),
|
||||||
cfg.ListOpt('jobboard_backend_hosts', default=['127.0.0.1'],
|
cfg.ListOpt('jobboard_backend_hosts', default=['127.0.0.1'],
|
||||||
help='Jobboard backend server host(s).'),
|
help='Jobboard backend server host(s).'),
|
||||||
@@ -596,6 +598,16 @@ task_flow_opts = [
|
|||||||
'keyfile_password': None,
|
'keyfile_password': None,
|
||||||
'certfile': None,
|
'certfile': None,
|
||||||
'verify_certs': True}),
|
'verify_certs': True}),
|
||||||
|
cfg.DictOpt('jobboard_etcd_ssl_options',
|
||||||
|
help='Etcd jobboard backend ssl configuration options.',
|
||||||
|
default={'use_ssl': False,
|
||||||
|
'ca_cert': None,
|
||||||
|
'cert_key': None,
|
||||||
|
'cert_cert': None}),
|
||||||
|
cfg.IntOpt('jobboard_etcd_timeout', default=None,
|
||||||
|
help='Timeout when communicating with the Etcd backend.'),
|
||||||
|
cfg.StrOpt('jobboard_etcd_api_path', default=None,
|
||||||
|
help='API Path of the Etcd server.'),
|
||||||
cfg.IntOpt('jobboard_expiration_time', default=30,
|
cfg.IntOpt('jobboard_expiration_time', default=30,
|
||||||
help='For backends like redis claiming jobs requiring setting '
|
help='For backends like redis claiming jobs requiring setting '
|
||||||
'the expiry - how many seconds the claim should be '
|
'the expiry - how many seconds the claim should be '
|
||||||
|
@@ -131,3 +131,33 @@ class RedisTaskFlowDriver(JobboardTaskFlowDriver):
|
|||||||
CONF.task_flow.jobboard_backend_namespace,
|
CONF.task_flow.jobboard_backend_namespace,
|
||||||
jobboard_backend_conf,
|
jobboard_backend_conf,
|
||||||
persistence=persistence)
|
persistence=persistence)
|
||||||
|
|
||||||
|
|
||||||
|
class EtcdTaskFlowDriver(JobboardTaskFlowDriver):
|
||||||
|
|
||||||
|
def __init__(self, persistence_driver):
|
||||||
|
self.persistence_driver = persistence_driver
|
||||||
|
|
||||||
|
def job_board(self, persistence):
|
||||||
|
jobboard_backend_conf = {
|
||||||
|
'board': 'etcd',
|
||||||
|
'host': CONF.task_flow.jobboard_backend_hosts[0],
|
||||||
|
'port': CONF.task_flow.jobboard_backend_port,
|
||||||
|
'path': CONF.task_flow.jobboard_backend_namespace,
|
||||||
|
'ttl': CONF.task_flow.jobboard_expiration_time,
|
||||||
|
}
|
||||||
|
if CONF.task_flow.jobboard_etcd_ssl_options['use_ssl']:
|
||||||
|
jobboard_backend_conf.update(
|
||||||
|
CONF.task_flow.jobboard_etcd_ssl_options)
|
||||||
|
jobboard_backend_conf.pop('use_ssl')
|
||||||
|
jobboard_backend_conf['protocol'] = 'https'
|
||||||
|
if CONF.task_flow.jobboard_etcd_timeout is not None:
|
||||||
|
jobboard_backend_conf['timeout'] = (
|
||||||
|
CONF.task_flow.jobboard_etcd_timeout)
|
||||||
|
if CONF.task_flow.jobboard_etcd_api_path is not None:
|
||||||
|
jobboard_backend_conf['api_path'] = (
|
||||||
|
CONF.task_flow.jobboard_etcd_api_path)
|
||||||
|
|
||||||
|
return job_backends.backend(CONF.task_flow.jobboard_backend_namespace,
|
||||||
|
jobboard_backend_conf,
|
||||||
|
persistence=persistence)
|
||||||
|
@@ -130,12 +130,13 @@ class TestTaskFlowServiceController(base.TestCase):
|
|||||||
job1.wait.assert_called_once()
|
job1.wait.assert_called_once()
|
||||||
job2.wait.assert_called_once()
|
job2.wait.assert_called_once()
|
||||||
|
|
||||||
@mock.patch('octavia.common.base_taskflow.RedisDynamicLoggingConductor')
|
@mock.patch('octavia.common.base_taskflow.'
|
||||||
|
'ExtendExpiryDynamicLoggingConductor')
|
||||||
@mock.patch('octavia.common.base_taskflow.DynamicLoggingConductor')
|
@mock.patch('octavia.common.base_taskflow.DynamicLoggingConductor')
|
||||||
@mock.patch('concurrent.futures.ThreadPoolExecutor')
|
@mock.patch('concurrent.futures.ThreadPoolExecutor')
|
||||||
def test_run_conductor(self, mock_threadpoolexec, dynamiccond, rediscond):
|
def test_run_conductor(self, mock_threadpoolexec, dynamiccond, expirycond):
|
||||||
self.service_controller.run_conductor("test")
|
self.service_controller.run_conductor("test")
|
||||||
rediscond.assert_called_once_with(
|
expirycond.assert_called_once_with(
|
||||||
"test", self.jobboard_mock.__enter__(),
|
"test", self.jobboard_mock.__enter__(),
|
||||||
persistence=self.persistence_mock.__enter__(),
|
persistence=self.persistence_mock.__enter__(),
|
||||||
engine='parallel',
|
engine='parallel',
|
||||||
|
@@ -0,0 +1,4 @@
|
|||||||
|
---
|
||||||
|
features:
|
||||||
|
- |
|
||||||
|
Added support for the Jobboard Etcd backend in Taskflow.
|
@@ -40,7 +40,7 @@ python-novaclient>=9.1.0 # Apache-2.0
|
|||||||
python-cinderclient>=3.3.0 # Apache-2.0
|
python-cinderclient>=3.3.0 # Apache-2.0
|
||||||
WSME>=0.8.0 # MIT
|
WSME>=0.8.0 # MIT
|
||||||
Jinja2>=2.10 # BSD License (3 clause)
|
Jinja2>=2.10 # BSD License (3 clause)
|
||||||
taskflow>=5.5.0 # Apache-2.0
|
taskflow>=5.9.0 # Apache-2.0
|
||||||
castellan>=0.16.0 # Apache-2.0
|
castellan>=0.16.0 # Apache-2.0
|
||||||
tenacity>=5.0.4 # Apache-2.0
|
tenacity>=5.0.4 # Apache-2.0
|
||||||
distro>=1.2.0 # Apache-2.0
|
distro>=1.2.0 # Apache-2.0
|
||||||
|
@@ -99,6 +99,7 @@ octavia.plugins =
|
|||||||
octavia.worker.jobboard_driver =
|
octavia.worker.jobboard_driver =
|
||||||
redis_taskflow_driver = octavia.controller.worker.v2.taskflow_jobboard_driver:RedisTaskFlowDriver
|
redis_taskflow_driver = octavia.controller.worker.v2.taskflow_jobboard_driver:RedisTaskFlowDriver
|
||||||
zookeeper_taskflow_driver = octavia.controller.worker.v2.taskflow_jobboard_driver:ZookeeperTaskFlowDriver
|
zookeeper_taskflow_driver = octavia.controller.worker.v2.taskflow_jobboard_driver:ZookeeperTaskFlowDriver
|
||||||
|
etcd_taskflow_driver = octavia.controller.worker.v2.taskflow_jobboard_driver:EtcdTaskFlowDriver
|
||||||
oslo.config.opts =
|
oslo.config.opts =
|
||||||
octavia = octavia.opts:list_opts
|
octavia = octavia.opts:list_opts
|
||||||
oslo.config.opts.defaults =
|
oslo.config.opts.defaults =
|
||||||
@@ -118,3 +119,6 @@ redis =
|
|||||||
zookeeper =
|
zookeeper =
|
||||||
kazoo>=2.6.0 # Apache-2.0
|
kazoo>=2.6.0 # Apache-2.0
|
||||||
zake>=0.1.6 # Apache-2.0
|
zake>=0.1.6 # Apache-2.0
|
||||||
|
# Required by Etcd jobboard
|
||||||
|
etcd =
|
||||||
|
etcd3gw>=2.4.1 # Apache-2.0
|
||||||
|
@@ -108,6 +108,26 @@
|
|||||||
devstack_localrc:
|
devstack_localrc:
|
||||||
OCTAVIA_ENABLE_AMPHORAV2_JOBBOARD: True
|
OCTAVIA_ENABLE_AMPHORAV2_JOBBOARD: True
|
||||||
|
|
||||||
|
- job:
|
||||||
|
name: octavia-v2-dsvm-scenario-traffic-ops-jobboard-etcd
|
||||||
|
parent: octavia-v2-dsvm-scenario-traffic-ops
|
||||||
|
vars:
|
||||||
|
devstack_localrc:
|
||||||
|
OCTAVIA_ENABLE_AMPHORAV2_JOBBOARD: True
|
||||||
|
OCTAVIA_JOBBOARD_BACKEND: etcd
|
||||||
|
required-projects:
|
||||||
|
- openstack/taskflow
|
||||||
|
|
||||||
|
- job:
|
||||||
|
name: octavia-v2-dsvm-scenario-non-traffic-ops-jobboard-etcd
|
||||||
|
parent: octavia-v2-dsvm-scenario-non-traffic-ops
|
||||||
|
vars:
|
||||||
|
devstack_localrc:
|
||||||
|
OCTAVIA_ENABLE_AMPHORAV2_JOBBOARD: True
|
||||||
|
OCTAVIA_JOBBOARD_BACKEND: etcd
|
||||||
|
required-projects:
|
||||||
|
- openstack/taskflow
|
||||||
|
|
||||||
- project-template:
|
- project-template:
|
||||||
name: octavia-tox-tips
|
name: octavia-tox-tips
|
||||||
check:
|
check:
|
||||||
|
@@ -127,3 +127,5 @@
|
|||||||
experimental:
|
experimental:
|
||||||
jobs:
|
jobs:
|
||||||
- octavia-v2-dsvm-scenario-nftables
|
- octavia-v2-dsvm-scenario-nftables
|
||||||
|
- octavia-v2-dsvm-scenario-traffic-ops-jobboard-etcd
|
||||||
|
- octavia-v2-dsvm-scenario-non-traffic-ops-jobboard-etcd
|
||||||
|
Reference in New Issue
Block a user