Merge "Limit spares pool to the spare_amphora_pool_size"
This commit is contained in:
commit
a635dd6bc9
@ -17,6 +17,7 @@ import datetime
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import timeutils
|
||||
from sqlalchemy.orm import exc as sqlalchemy_exceptions
|
||||
|
||||
from octavia.controller.worker import controller_worker as cw
|
||||
@ -30,6 +31,7 @@ CONF = cfg.CONF
|
||||
class SpareAmphora(object):
|
||||
def __init__(self):
|
||||
self.amp_repo = repo.AmphoraRepository()
|
||||
self.spares_repo = repo.SparesPoolRepository()
|
||||
self.cw = cw.ControllerWorker()
|
||||
|
||||
def spare_check(self):
|
||||
@ -37,26 +39,42 @@ class SpareAmphora(object):
|
||||
|
||||
If it's less than the requirement, starts new amphora.
|
||||
"""
|
||||
lock_session = db_api.get_session(autocommit=False)
|
||||
session = db_api.get_session()
|
||||
conf_spare_cnt = CONF.house_keeping.spare_amphora_pool_size
|
||||
curr_spare_cnt = self.amp_repo.get_spare_amphora_count(session)
|
||||
LOG.debug("Required Spare Amphora count : %d", conf_spare_cnt)
|
||||
LOG.debug("Current Spare Amphora count : %d", curr_spare_cnt)
|
||||
diff_count = conf_spare_cnt - curr_spare_cnt
|
||||
try:
|
||||
# Lock the spares_pool record for read and write
|
||||
spare_amp_row = self.spares_repo.get_for_update(lock_session)
|
||||
|
||||
# When the current spare amphora is less than required
|
||||
if diff_count > 0:
|
||||
LOG.info("Initiating creation of %d spare amphora.", diff_count)
|
||||
conf_spare_cnt = CONF.house_keeping.spare_amphora_pool_size
|
||||
curr_spare_cnt = self.amp_repo.get_spare_amphora_count(session)
|
||||
LOG.debug("Required Spare Amphora count : %d", conf_spare_cnt)
|
||||
LOG.debug("Current Spare Amphora count : %d", curr_spare_cnt)
|
||||
diff_count = conf_spare_cnt - curr_spare_cnt
|
||||
|
||||
# Call Amphora Create Flow diff_count times
|
||||
with futures.ThreadPoolExecutor(
|
||||
max_workers=CONF.house_keeping.spare_amphora_pool_size
|
||||
) as executor:
|
||||
for i in range(1, diff_count + 1):
|
||||
LOG.debug("Starting amphorae number %d ...", i)
|
||||
executor.submit(self.cw.create_amphora)
|
||||
else:
|
||||
LOG.debug("Current spare amphora count satisfies the requirement")
|
||||
# When the current spare amphora is less than required
|
||||
amp_booting = []
|
||||
if diff_count > 0:
|
||||
LOG.info("Initiating creation of %d spare amphora.",
|
||||
diff_count)
|
||||
|
||||
# Call Amphora Create Flow diff_count times
|
||||
with futures.ThreadPoolExecutor(
|
||||
max_workers=CONF.house_keeping.spare_amphora_pool_size
|
||||
) as executor:
|
||||
for i in range(1, diff_count + 1):
|
||||
LOG.debug("Starting amphorae number %d ...", i)
|
||||
amp_booting.append(
|
||||
executor.submit(self.cw.create_amphora))
|
||||
else:
|
||||
LOG.debug("Current spare amphora count satisfies the "
|
||||
"requirement")
|
||||
|
||||
# Wait for the amphora boot threads to finish
|
||||
futures.wait(amp_booting)
|
||||
spare_amp_row.updated_at = timeutils.utcnow()
|
||||
lock_session.commit()
|
||||
except Exception:
|
||||
lock_session.rollback()
|
||||
|
||||
|
||||
class DatabaseCleanup(object):
|
||||
|
@ -0,0 +1,35 @@
|
||||
# Copyright 2019 Michael Johnson
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Spares pool table
|
||||
|
||||
Revision ID: 6ffc710674ef
|
||||
Revises: 7432f1d4ea83
|
||||
Create Date: 2019-03-11 10:45:43.296236
|
||||
|
||||
"""
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '6ffc710674ef'
|
||||
down_revision = '7432f1d4ea83'
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.create_table(
|
||||
u'spares_pool',
|
||||
sa.Column(u'updated_at', sa.DateTime(), nullable=True,
|
||||
server_default=sa.func.current_timestamp()))
|
@ -782,3 +782,10 @@ class ClientAuthenticationMode(base_models.BASE):
|
||||
__tablename__ = "client_authentication_mode"
|
||||
|
||||
name = sa.Column(sa.String(10), primary_key=True, nullable=False)
|
||||
|
||||
|
||||
class SparesPool(base_models.BASE):
|
||||
|
||||
__tablename__ = "spares_pool"
|
||||
|
||||
updated_at = sa.Column(sa.DateTime, primary_key=True, nullable=True)
|
||||
|
@ -226,6 +226,7 @@ class Repositories(object):
|
||||
self.quotas = QuotasRepository()
|
||||
self.flavor = FlavorRepository()
|
||||
self.flavor_profile = FlavorProfileRepository()
|
||||
self.spares_pool = SparesPoolRepository()
|
||||
|
||||
def create_load_balancer_and_vip(self, session, lb_dict, vip_dict):
|
||||
"""Inserts load balancer and vip entities into the database.
|
||||
@ -1785,3 +1786,17 @@ class FlavorRepository(BaseRepository):
|
||||
|
||||
class FlavorProfileRepository(BaseRepository):
|
||||
model_class = models.FlavorProfile
|
||||
|
||||
|
||||
class SparesPoolRepository(BaseRepository):
|
||||
model_class = models.SparesPool
|
||||
|
||||
def get_for_update(self, lock_session):
|
||||
"""Queries and locks the SparesPool record.
|
||||
|
||||
This call will query for the SparesPool table record and lock it
|
||||
so that other processes cannot read or write it.
|
||||
:returns: expected_spares_count, updated_at
|
||||
"""
|
||||
row = lock_session.query(models.SparesPool).with_for_update().one()
|
||||
return row
|
||||
|
@ -119,7 +119,7 @@ class AllRepositoriesTest(base.OctaviaDBTestBase):
|
||||
'listener_stats', 'amphora', 'sni',
|
||||
'amphorahealth', 'vrrpgroup', 'l7rule', 'l7policy',
|
||||
'amp_build_slots', 'amp_build_req', 'quotas',
|
||||
'flavor', 'flavor_profile')
|
||||
'flavor', 'flavor_profile', 'spares_pool')
|
||||
for repo_attr in repo_attr_names:
|
||||
single_repo = getattr(self.repos, repo_attr, None)
|
||||
message = ("Class Repositories should have %s instance"
|
||||
|
@ -83,6 +83,21 @@ class TestSpareCheck(base.TestCase):
|
||||
self.assertEqual(0, DIFF_CNT)
|
||||
self.assertEqual(DIFF_CNT, self.cw.create_amphora.call_count)
|
||||
|
||||
@mock.patch('octavia.db.repositories.SparesPoolRepository.get_for_update')
|
||||
@mock.patch('octavia.db.api.get_session')
|
||||
def test_spare_check_rollback(self, mock_session, mock_update):
|
||||
"""When spare amphora count meets the requirement."""
|
||||
lock_session = mock.MagicMock()
|
||||
session = mock.MagicMock()
|
||||
mock_session.side_effect = [lock_session, session]
|
||||
mock_update.side_effect = [Exception('boom')]
|
||||
# self.CONF.config(group="house_keeping",
|
||||
# spare_amphora_pool_size=self.FAKE_CNF_SPAR2)
|
||||
# self.amp_repo.get_spare_amphora_count.return_value = (
|
||||
# self.FAKE_CUR_SPAR2)
|
||||
self.spare_amp.spare_check()
|
||||
lock_session.rollback.assert_called_once()
|
||||
|
||||
|
||||
class TestDatabaseCleanup(base.TestCase):
|
||||
FAKE_IP = "10.0.0.1"
|
||||
|
Loading…
x
Reference in New Issue
Block a user