Conductor proxies host db access for guests
Previously, instances updated their status by updating the database on the host directly. Necessarily, each instance would need access to the database to stay updated. Trove's new conductor service eliminates that need by working as a proxy for those instances. By sending a heartbeat to conductor via RPC, conductor updates the database on the host on behalf of the instance. As backups also made use of the host database, the backup code has been refactored to take richer inputs to remove the need to query the host database, and now conductor is also used to submit updates to backup states. Implements: blueprint trove-conductor Change-Id: I4cb34baedd0e3a50051f9e66de95c9028c66e4b5
This commit is contained in:
parent
5755ede7a2
commit
384576675f
62
bin/trove-conductor
Executable file
62
bin/trove-conductor
Executable file
@ -0,0 +1,62 @@
|
||||
#!/usr/bin/env python
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 Rackspace Hosting
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import eventlet
|
||||
eventlet.monkey_patch(all=True, thread=False)
|
||||
|
||||
import gettext
|
||||
import traceback
|
||||
import sys
|
||||
|
||||
|
||||
gettext.install('trove', unicode=1)
|
||||
|
||||
|
||||
from trove.common import cfg
|
||||
from trove.common import debug_utils
|
||||
from trove.common.rpc import service as rpc_service
|
||||
from trove.db import get_db_api
|
||||
from trove.openstack.common import log as logging
|
||||
from trove.openstack.common import service as openstack_service
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
def main():
|
||||
get_db_api().configure_db(CONF)
|
||||
manager = 'trove.conductor.manager.Manager'
|
||||
topic = CONF.conductor_queue
|
||||
server = rpc_service.RpcService(manager=manager, topic=topic)
|
||||
launcher = openstack_service.launch(server,
|
||||
workers=CONF.trove_conductor_workers)
|
||||
launcher.wait()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
cfg.parse_args(sys.argv)
|
||||
logging.setup(None)
|
||||
|
||||
debug_utils.setup()
|
||||
|
||||
if not debug_utils.enabled():
|
||||
eventlet.monkey_patch(thread=True)
|
||||
try:
|
||||
main()
|
||||
except RuntimeError as error:
|
||||
print traceback.format_exc()
|
||||
sys.exit("ERROR: %s" % error)
|
@ -91,4 +91,37 @@ bus and performs the requested operation.
|
||||
* Actual handling is usually done in the dbaas.py module
|
||||
|
||||
|
||||
Trove-conductor
|
||||
===============
|
||||
|
||||
Conductor is a service that runs on the host, responsible for recieving
|
||||
messages from guest instances to update information on the host.
|
||||
For example, instance statuses and the current status of a backup.
|
||||
With conductor, guest instances do not need a direct connection to the
|
||||
host's database. Conductor listens for RPC messages through the message
|
||||
bus and performs the relevant operation.
|
||||
|
||||
* Similar to guest-agent in that it is a service that listens to a
|
||||
RabbitMQ topic. The difference is conductor lives on the host, not
|
||||
the guest.
|
||||
* Guest agents communicate to conductor by putting messages on the
|
||||
topic defined in cfg as conductor_queue. By default this is
|
||||
"trove-conductor".
|
||||
* Entry point - Trove/bin/trove-conductor
|
||||
* Runs as RpcService configured by
|
||||
Trove/etc/trove/trove-conductor.conf.sample which defines
|
||||
trove.conductor.manager.Manager as the manager. This is the entry
|
||||
point for requests arriving on the queue.
|
||||
* As guestagent above, requests are pushed to MQ from another component
|
||||
using _cast() (synchronous), generally of the form
|
||||
{"method": "<method_name>", "args": {<arguments>}}
|
||||
* Actual database update work is done by trove/conductor/manager.py
|
||||
* The "heartbeat" method updates the status of an instance. This is
|
||||
used to report that instance has changed from NEW to BUILDING to
|
||||
ACTIVE and so on.
|
||||
* The "update_backup" method changes the details of a backup, including
|
||||
its current status, size of the backup, type, and checksum.
|
||||
|
||||
|
||||
|
||||
.. Trove - Database as a Service: https://wiki.openstack.org/wiki/Trove
|
||||
|
8
etc/trove/trove-conductor.conf.sample
Normal file
8
etc/trove/trove-conductor.conf.sample
Normal file
@ -0,0 +1,8 @@
|
||||
[DEFAULT]
|
||||
control_exchange = trove
|
||||
trove_auth_url = http://0.0.0.0:5000/v2.0
|
||||
nova_proxy_admin_pass = 3de4922d8b6ac5a1aad9
|
||||
nova_proxy_admin_tenant_name = admin
|
||||
nova_proxy_admin_user = admin
|
||||
sql_connection = mysql://root:e1a2c042c828d3566d0a@localhost/trove
|
||||
rabbit_password = f7999d1955c5014aa32c
|
@ -14,34 +14,12 @@ bind_port = 8778
|
||||
# AMQP Connection info
|
||||
rabbit_password=f7999d1955c5014aa32c
|
||||
|
||||
# SQLAlchemy connection string for the reference implementation
|
||||
# registry server. Any valid SQLAlchemy connection string is fine.
|
||||
# See: http://www.sqlalchemy.org/docs/05/reference/sqlalchemy/connections.html#sqlalchemy.create_engine
|
||||
#sql_connection = sqlite:///trove_test.sqlite
|
||||
sql_connection = mysql://root:e1a2c042c828d3566d0a@10.0.0.1/trove?charset=utf8
|
||||
#sql_connection = postgresql://trove:trove@10.0.0.1/trove
|
||||
|
||||
#Allow SQLAlchemy to log all queries
|
||||
sql_query_logging = False
|
||||
|
||||
# Period in seconds after which SQLAlchemy should reestablish its connection
|
||||
# to the database.
|
||||
#
|
||||
# MySQL uses a default `wait_timeout` of 8 hours, after which it will drop
|
||||
# idle connections. This can result in 'MySQL Gone Away' exceptions. If you
|
||||
# notice this, you can lower this value to ensure that SQLAlchemy reconnects
|
||||
# before MySQL can drop the connection.
|
||||
sql_idle_timeout = 3600
|
||||
|
||||
#DB Api Implementation
|
||||
db_api_implementation = "trove.db.sqlalchemy.api"
|
||||
|
||||
# Path to the extensions
|
||||
api_extensions_path = trove/extensions/routes
|
||||
|
||||
# Configuration options for talking to nova via the novaclient.
|
||||
# These options are for an admin user in your keystone config.
|
||||
# It proxy's the token received from the user to send to nova via this admin users creds,
|
||||
# It proxies the token received from the user to send to nova via this admin users creds,
|
||||
# basically acting like the client via that proxy token.
|
||||
nova_proxy_admin_user = admin
|
||||
nova_proxy_admin_pass = 3de4922d8b6ac5a1aad9
|
||||
@ -62,6 +40,9 @@ root_grant_option = True
|
||||
# used by passlib to generate root password
|
||||
#default_password_length = 36
|
||||
|
||||
# For communicating with trove-conductor
|
||||
control_exchange = trove
|
||||
|
||||
# ============ kombu connection options ========================
|
||||
|
||||
rabbit_host=10.0.0.1
|
||||
|
@ -24,6 +24,7 @@ packages =
|
||||
scripts =
|
||||
bin/trove-api
|
||||
bin/trove-fake-mode
|
||||
bin/trove-conductor
|
||||
bin/trove-manage
|
||||
bin/trove-mgmt-taskmanager
|
||||
bin/trove-taskmanager
|
||||
|
2
tox.ini
2
tox.ini
@ -36,4 +36,4 @@ show-source = True
|
||||
ignore = F401,F403,F821,H301,H306,H401,H402,H403,H404,H702
|
||||
builtins = _
|
||||
exclude=.venv,.tox,dist,doc,openstack,*egg,rsdns,tools,etc,build
|
||||
filename=*.py,trove-*
|
||||
filename=*.py,trove-*
|
||||
|
@ -58,7 +58,7 @@ class Backup(object):
|
||||
# parse the ID from the Ref
|
||||
instance_id = utils.get_id_from_href(instance)
|
||||
|
||||
# verify that the instance exist and can perform actions
|
||||
# verify that the instance exists and can perform actions
|
||||
from trove.instance.models import Instance
|
||||
instance_model = Instance.load(context, instance_id)
|
||||
instance_model.validate_can_perform_action()
|
||||
@ -76,7 +76,14 @@ class Backup(object):
|
||||
LOG.exception("Unable to create Backup record:")
|
||||
raise exception.BackupCreationError(str(ex))
|
||||
|
||||
api.API(context).create_backup(db_info.id, instance_id)
|
||||
backup_info = {'id': db_info.id,
|
||||
'name': name,
|
||||
'description': description,
|
||||
'instance_id': instance_id,
|
||||
'backup_type': db_info.backup_type,
|
||||
'checksum': db_info.checksum,
|
||||
}
|
||||
api.API(context).create_backup(backup_info, instance_id)
|
||||
return db_info
|
||||
|
||||
return run_with_quotas(context.tenant,
|
||||
|
@ -108,6 +108,8 @@ common_opts = [
|
||||
default='trove.quota.quota.DbQuotaDriver',
|
||||
help='default driver to use for quota checks'),
|
||||
cfg.StrOpt('taskmanager_queue', default='taskmanager'),
|
||||
cfg.StrOpt('conductor_queue', default='trove-conductor'),
|
||||
cfg.IntOpt('trove_conductor_workers', default=1),
|
||||
cfg.BoolOpt('use_nova_server_volume', default=False),
|
||||
cfg.BoolOpt('use_heat', default=False),
|
||||
cfg.StrOpt('device_path', default='/dev/vdb'),
|
||||
|
@ -67,7 +67,8 @@ class ServiceStatus(object):
|
||||
@staticmethod
|
||||
def from_description(desc):
|
||||
all_items = ServiceStatus._lookup.items()
|
||||
status_codes = [code for (code, status) in all_items if status == desc]
|
||||
status_codes = [code for (code, status) in all_items
|
||||
if status.description == desc]
|
||||
if not status_codes:
|
||||
msg = 'Status description %s is not a valid ServiceStatus.'
|
||||
raise ValueError(msg % desc)
|
||||
|
16
trove/conductor/__init__.py
Normal file
16
trove/conductor/__init__.py
Normal file
@ -0,0 +1,16 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 Rackspace Hosting
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
52
trove/conductor/api.py
Normal file
52
trove/conductor/api.py
Normal file
@ -0,0 +1,52 @@
|
||||
# Copyright 2013 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import traceback
|
||||
import sys
|
||||
|
||||
from trove.common import cfg
|
||||
from trove.openstack.common.rpc import proxy
|
||||
from trove.openstack.common import log as logging
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
RPC_API_VERSION = "1.0"
|
||||
|
||||
|
||||
class API(proxy.RpcProxy):
|
||||
"""API for interacting with trove conductor."""
|
||||
|
||||
def __init__(self, context):
|
||||
self.context = context
|
||||
super(API, self).__init__(self._get_routing_key(), RPC_API_VERSION)
|
||||
|
||||
def _get_routing_key(self):
|
||||
"""Create the routing key for conductor."""
|
||||
return CONF.conductor_queue
|
||||
|
||||
def heartbeat(self, instance_id, payload):
|
||||
LOG.debug("Making async call to cast heartbeat for instance: %s"
|
||||
% instance_id)
|
||||
self.cast(self.context, self.make_msg("heartbeat",
|
||||
instance_id=instance_id,
|
||||
payload=payload))
|
||||
|
||||
def update_backup(self, instance_id, backup_id, **backup_fields):
|
||||
LOG.debug("Making async call to cast update_backup for instance: %s"
|
||||
% instance_id)
|
||||
self.cast(self.context, self.make_msg("update_backup",
|
||||
instance_id=instance_id,
|
||||
backup_id=backup_id,
|
||||
**backup_fields))
|
68
trove/conductor/manager.py
Normal file
68
trove/conductor/manager.py
Normal file
@ -0,0 +1,68 @@
|
||||
# Copyright 2013 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from trove.backup import models as bkup_models
|
||||
from trove.common.context import TroveContext
|
||||
from trove.common.instance import ServiceStatus
|
||||
from trove.instance import models as t_models
|
||||
from trove.openstack.common import periodic_task
|
||||
from trove.openstack.common import log as logging
|
||||
from trove.common import cfg
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
RPC_API_VERSION = "1.0"
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class Manager(periodic_task.PeriodicTasks):
|
||||
|
||||
def __init__(self):
|
||||
super(Manager, self).__init__()
|
||||
self.admin_context = TroveContext(
|
||||
user=CONF.nova_proxy_admin_user,
|
||||
auth_token=CONF.nova_proxy_admin_pass,
|
||||
tenant=CONF.nova_proxy_admin_tenant_name)
|
||||
|
||||
def heartbeat(self, context, instance_id, payload):
|
||||
LOG.debug("Instance ID: %s" % str(instance_id))
|
||||
LOG.debug("Payload: %s" % str(payload))
|
||||
status = t_models.InstanceServiceStatus.find_by(
|
||||
instance_id=instance_id)
|
||||
if payload.get('service_status') is not None:
|
||||
status.set_status(ServiceStatus.from_description(
|
||||
payload['service_status']))
|
||||
status.save()
|
||||
|
||||
def update_backup(self, context, instance_id, backup_id,
|
||||
**backup_fields):
|
||||
LOG.debug("Instance ID: %s" % str(instance_id))
|
||||
LOG.debug("Backup ID: %s" % str(backup_id))
|
||||
backup = bkup_models.DBBackup.find_by(id=backup_id)
|
||||
# TODO(datsun180b): use context to verify tenant matches
|
||||
|
||||
# Some verification based on IDs
|
||||
if backup_id != backup.id:
|
||||
LOG.error("Backup IDs mismatch! Expected %s, found %s" %
|
||||
(backup_id, backup.id))
|
||||
return
|
||||
if instance_id != backup.instance_id:
|
||||
LOG.error("Backup instance IDs mismatch! Expected %s, found %s" %
|
||||
(instance_id, backup.instance_id))
|
||||
return
|
||||
|
||||
for k, v in backup_fields.items():
|
||||
if hasattr(backup, k):
|
||||
LOG.debug("Backup %s: %s" % (k, v))
|
||||
setattr(backup, k, v)
|
||||
backup.save()
|
@ -29,15 +29,17 @@ class DatabaseModelBase(models.ModelBase):
|
||||
|
||||
@classmethod
|
||||
def create(cls, **values):
|
||||
if 'id' not in values:
|
||||
values['id'] = utils.generate_uuid()
|
||||
if hasattr(cls, 'deleted') and 'deleted' not in values:
|
||||
values['deleted'] = False
|
||||
values['created'] = utils.utcnow()
|
||||
instance = cls(**values).save()
|
||||
init_vals = {
|
||||
'id': utils.generate_uuid(),
|
||||
'created': utils.utcnow(),
|
||||
}
|
||||
if hasattr(cls, 'deleted'):
|
||||
init_vals['deleted'] = False
|
||||
init_vals.update(values)
|
||||
instance = cls(**init_vals)
|
||||
if not instance.is_valid():
|
||||
raise exception.InvalidModelError(errors=instance.errors)
|
||||
return instance
|
||||
return instance.save()
|
||||
|
||||
@property
|
||||
def db_api(self):
|
||||
|
@ -214,7 +214,7 @@ class API(proxy.RpcProxy):
|
||||
|
||||
def prepare(self, memory_mb, packages, databases, users,
|
||||
device_path='/dev/vdb', mount_point='/mnt/volume',
|
||||
backup_id=None, config_contents=None, root_password=None):
|
||||
backup_info=None, config_contents=None, root_password=None):
|
||||
"""Make an asynchronous call to prepare the guest
|
||||
as a database container optionally includes a backup id for restores
|
||||
"""
|
||||
@ -222,7 +222,7 @@ class API(proxy.RpcProxy):
|
||||
self._cast_with_consumer(
|
||||
"prepare", packages=packages, databases=databases,
|
||||
memory_mb=memory_mb, users=users, device_path=device_path,
|
||||
mount_point=mount_point, backup_id=backup_id,
|
||||
mount_point=mount_point, backup_info=backup_info,
|
||||
config_contents=config_contents, root_password=root_password)
|
||||
|
||||
def restart(self):
|
||||
@ -267,9 +267,9 @@ class API(proxy.RpcProxy):
|
||||
"""Make a synchronous call to update the guest agent."""
|
||||
self._call("update_guest", AGENT_HIGH_TIMEOUT)
|
||||
|
||||
def create_backup(self, backup_id):
|
||||
def create_backup(self, backup_info):
|
||||
"""Make async call to create a full backup of this instance"""
|
||||
LOG.debug(_("Create Backup %(backup_id)s "
|
||||
"for Instance %(instance_id)s") % {'backup_id': backup_id,
|
||||
'instance_id': self.id})
|
||||
self._cast("create_backup", backup_id=backup_id)
|
||||
"for Instance %(instance_id)s") %
|
||||
{'backup_id': backup_info['id'], 'instance_id': self.id})
|
||||
self._cast("create_backup", backup_info=backup_info)
|
||||
|
@ -3,7 +3,7 @@ from trove.guestagent.backup.backupagent import BackupAgent
|
||||
AGENT = BackupAgent()
|
||||
|
||||
|
||||
def backup(context, backup_id):
|
||||
def backup(context, backup_info):
|
||||
"""
|
||||
Main entry point for starting a backup based on the given backup id. This
|
||||
will create a backup for this DB instance and will then store the backup
|
||||
@ -12,10 +12,10 @@ def backup(context, backup_id):
|
||||
:param context: the context token which contains the users details
|
||||
:param backup_id: the id of the persisted backup object
|
||||
"""
|
||||
return AGENT.execute_backup(context, backup_id)
|
||||
return AGENT.execute_backup(context, backup_info)
|
||||
|
||||
|
||||
def restore(context, backup_id, restore_location):
|
||||
def restore(context, backup_info, restore_location):
|
||||
"""
|
||||
Main entry point for restoring a backup based on the given backup id. This
|
||||
will transfer backup data to this instance an will carry out the
|
||||
@ -24,4 +24,4 @@ def restore(context, backup_id, restore_location):
|
||||
:param context: the context token which contains the users details
|
||||
:param backup_id: the id of the persisted backup object
|
||||
"""
|
||||
return AGENT.execute_restore(context, backup_id, restore_location)
|
||||
return AGENT.execute_restore(context, backup_info, restore_location)
|
||||
|
@ -15,9 +15,11 @@
|
||||
#
|
||||
|
||||
import logging
|
||||
from trove.backup.models import DBBackup
|
||||
from trove.backup.models import BackupState
|
||||
from trove.common import cfg, utils
|
||||
from trove.common import cfg
|
||||
from trove.common import context as trove_context
|
||||
from trove.common import utils
|
||||
from trove.conductor import api as conductor_api
|
||||
from trove.guestagent.dbaas import get_filesystem_volume_stats
|
||||
from trove.guestagent.datastore.mysql.service import ADMIN_USER_NAME
|
||||
from trove.guestagent.datastore.mysql.service import get_auth_password
|
||||
@ -45,15 +47,14 @@ class BackupAgent(object):
|
||||
raise UnknownBackupType("Unknown Backup type: %s" % backup_type)
|
||||
return runner
|
||||
|
||||
def execute_backup(self, context, backup_id, runner=RUNNER):
|
||||
LOG.debug("Searching for backup instance %s", backup_id)
|
||||
backup = DBBackup.find_by(id=backup_id)
|
||||
LOG.info("Setting task state to %s for instance %s",
|
||||
BackupState.NEW, backup.instance_id)
|
||||
backup.state = BackupState.NEW
|
||||
backup.save()
|
||||
def execute_backup(self, context, backup_info, runner=RUNNER):
|
||||
LOG.debug("Searching for backup instance %s", backup_info['id'])
|
||||
ctxt = trove_context.TroveContext(
|
||||
user=CONF.nova_proxy_admin_user,
|
||||
auth_token=CONF.nova_proxy_admin_pass)
|
||||
conductor = conductor_api.API(ctxt)
|
||||
|
||||
LOG.info("Running backup %s", backup_id)
|
||||
LOG.info("Running backup %s", backup_info['id'])
|
||||
user = ADMIN_USER_NAME
|
||||
password = get_auth_password()
|
||||
swiftStorage = get_storage_strategy(
|
||||
@ -62,43 +63,50 @@ class BackupAgent(object):
|
||||
|
||||
# Store the size of the filesystem before the backup.
|
||||
stats = get_filesystem_volume_stats(CONF.mount_point)
|
||||
backup.size = stats.get('used', 0.0)
|
||||
backup.state = BackupState.BUILDING
|
||||
backup.save()
|
||||
conductor.update_backup(CONF.guest_id,
|
||||
backup_id=backup_info['id'],
|
||||
size=stats.get('used', 0.0),
|
||||
state=BackupState.BUILDING)
|
||||
|
||||
try:
|
||||
with runner(filename=backup_id, user=user, password=password)\
|
||||
as bkup:
|
||||
LOG.info("Starting Backup %s", backup_id)
|
||||
with runner(filename=backup_info['id'],
|
||||
user=user, password=password) as bkup:
|
||||
try:
|
||||
LOG.info("Starting Backup %s", backup_info['id'])
|
||||
success, note, checksum, location = swiftStorage.save(
|
||||
BACKUP_CONTAINER,
|
||||
bkup)
|
||||
|
||||
LOG.info("Backup %s completed status: %s", backup_id, success)
|
||||
LOG.info("Backup %s file size: %s", backup_id, bkup.content_length)
|
||||
LOG.info('Backup %s swift checksum: %s', backup_id, checksum)
|
||||
LOG.info('Backup %s location: %s', backup_id, location)
|
||||
LOG.info("Backup %s completed status: %s", backup_info['id'],
|
||||
success)
|
||||
LOG.info("Backup %s file size: %s", backup_info['id'],
|
||||
bkup.content_length)
|
||||
LOG.info('Backup %s file swift checksum: %s',
|
||||
backup_info['id'], checksum)
|
||||
LOG.info('Backup %s location: %s', backup_info['id'],
|
||||
location)
|
||||
|
||||
if not success:
|
||||
raise BackupError(backup.note)
|
||||
if not success:
|
||||
raise BackupError(note)
|
||||
|
||||
except Exception as e:
|
||||
LOG.error(e)
|
||||
LOG.error("Error saving %s Backup", backup_id)
|
||||
backup.state = BackupState.FAILED
|
||||
backup.save()
|
||||
raise
|
||||
except Exception as e:
|
||||
LOG.error(e)
|
||||
LOG.error("Error saving %s Backup", backup_info['id'])
|
||||
conductor.update_backup(CONF.guest_id,
|
||||
backup_id=backup_info['id'],
|
||||
state=BackupState.FAILED)
|
||||
raise
|
||||
|
||||
else:
|
||||
LOG.info("Saving %s Backup Info to model", backup_id)
|
||||
backup.state = BackupState.COMPLETED
|
||||
backup.checksum = checksum
|
||||
backup.location = location
|
||||
backup.note = note
|
||||
backup.backup_type = bkup.backup_type
|
||||
backup.save()
|
||||
else:
|
||||
LOG.info("Saving %s Backup Info to model", backup_info['id'])
|
||||
conductor.update_backup(CONF.guest_id,
|
||||
backup_id=backup_info['id'],
|
||||
checksum=checksum,
|
||||
location=location,
|
||||
note=note,
|
||||
backup_type=bkup.backup_type,
|
||||
state=BackupState.COMPLETED)
|
||||
|
||||
def execute_restore(self, context, backup_id, restore_location):
|
||||
def execute_restore(self, context, backup_info, restore_location):
|
||||
|
||||
try:
|
||||
LOG.debug("Cleaning out restore location: %s", restore_location)
|
||||
@ -106,11 +114,8 @@ class BackupAgent(object):
|
||||
"0777", restore_location)
|
||||
utils.clean_out(restore_location)
|
||||
|
||||
LOG.debug("Finding backup %s to restore", backup_id)
|
||||
backup = DBBackup.find_by(id=backup_id)
|
||||
|
||||
LOG.debug("Getting Restore Runner of type %s", backup.backup_type)
|
||||
restore_runner = self._get_restore_runner(backup.backup_type)
|
||||
LOG.debug("Getting Restore Runner of type %s", backup_info['type'])
|
||||
restore_runner = self._get_restore_runner(backup_info['type'])
|
||||
|
||||
LOG.debug("Getting Storage Strategy")
|
||||
storage_strategy = get_storage_strategy(
|
||||
@ -119,17 +124,17 @@ class BackupAgent(object):
|
||||
|
||||
LOG.debug("Preparing storage to download stream.")
|
||||
download_stream = storage_strategy.load(context,
|
||||
backup.location,
|
||||
backup_info['location'],
|
||||
restore_runner.is_zipped,
|
||||
backup.checksum)
|
||||
backup_info['checksum'])
|
||||
|
||||
with restore_runner(restore_stream=download_stream,
|
||||
restore_location=restore_location) as runner:
|
||||
LOG.debug("Restoring instance from backup %s to %s",
|
||||
backup_id, restore_location)
|
||||
backup_info['id'], restore_location)
|
||||
content_size = runner.restore()
|
||||
LOG.info("Restore from backup %s completed successfully to %s",
|
||||
backup_id, restore_location)
|
||||
backup_info['id'], restore_location)
|
||||
LOG.info("Restore size: %s", content_size)
|
||||
|
||||
utils.execute_with_timeout("sudo", "chown", "-R",
|
||||
@ -137,8 +142,8 @@ class BackupAgent(object):
|
||||
|
||||
except Exception as e:
|
||||
LOG.error(e)
|
||||
LOG.error("Error restoring backup %s", backup_id)
|
||||
LOG.error("Error restoring backup %s", backup_info['id'])
|
||||
raise
|
||||
|
||||
else:
|
||||
LOG.info("Restored Backup %s", backup_id)
|
||||
LOG.info("Restored Backup %s", backup_info['id'])
|
||||
|
@ -73,19 +73,20 @@ class Manager(periodic_task.PeriodicTasks):
|
||||
def is_root_enabled(self, context):
|
||||
return MySqlAdmin().is_root_enabled()
|
||||
|
||||
def _perform_restore(self, backup_id, context, restore_location, app):
|
||||
LOG.info(_("Restoring database from backup %s") % backup_id)
|
||||
def _perform_restore(self, backup_info, context, restore_location, app):
|
||||
LOG.info(_("Restoring database from backup %s") % backup_info['id'])
|
||||
try:
|
||||
backup.restore(context, backup_id, restore_location)
|
||||
backup.restore(context, backup_info, restore_location)
|
||||
except Exception as e:
|
||||
LOG.error(e)
|
||||
LOG.error("Error performing restore from backup %s", backup_id)
|
||||
LOG.error("Error performing restore from backup %s",
|
||||
backup_info['id'])
|
||||
app.status.set_status(rd_instance.ServiceStatuses.FAILED)
|
||||
raise
|
||||
LOG.info(_("Restored database successfully"))
|
||||
|
||||
def prepare(self, context, packages, databases, memory_mb, users,
|
||||
device_path=None, mount_point=None, backup_id=None,
|
||||
device_path=None, mount_point=None, backup_info=None,
|
||||
config_contents=None, root_password=None):
|
||||
"""Makes ready DBAAS on a Guest container."""
|
||||
MySqlAppStatus.get().begin_install()
|
||||
@ -104,12 +105,14 @@ class Manager(periodic_task.PeriodicTasks):
|
||||
device.mount(mount_point)
|
||||
LOG.debug(_("Mounted the volume."))
|
||||
app.start_mysql()
|
||||
if backup_id:
|
||||
self._perform_restore(backup_id, context, CONF.mount_point, app)
|
||||
if backup_info:
|
||||
self._perform_restore(backup_info, context,
|
||||
CONF.mount_point, app)
|
||||
LOG.info(_("Securing mysql now."))
|
||||
app.secure(config_contents)
|
||||
enable_root_on_restore = (backup_id and MySqlAdmin().is_root_enabled())
|
||||
if root_password and not backup_id:
|
||||
enable_root_on_restore = (backup_info and
|
||||
MySqlAdmin().is_root_enabled())
|
||||
if root_password and not backup_info:
|
||||
app.secure_root(secure_remote_root=True)
|
||||
MySqlAdmin().enable_root(root_password)
|
||||
MySqlAdmin().report_root_enabled(context)
|
||||
@ -145,13 +148,14 @@ class Manager(periodic_task.PeriodicTasks):
|
||||
""" Gets the filesystem stats for the path given """
|
||||
return dbaas.get_filesystem_volume_stats(fs_path)
|
||||
|
||||
def create_backup(self, context, backup_id):
|
||||
def create_backup(self, context, backup_info):
|
||||
"""
|
||||
Entry point for initiating a backup for this guest agents db instance.
|
||||
The call currently blocks until the backup is complete or errors. If
|
||||
device_path is specified, it will be mounted based to a point specified
|
||||
in configuration.
|
||||
|
||||
:param backup_id: the db instance id of the backup task
|
||||
:param backup_info: a dictionary containing the db instance id of the
|
||||
backup task, location, type, and other data.
|
||||
"""
|
||||
backup.backup(context, backup_id)
|
||||
backup.backup(context, backup_info)
|
||||
|
@ -19,7 +19,9 @@
|
||||
import time
|
||||
|
||||
from trove.common import cfg
|
||||
from trove.common import context
|
||||
from trove.common import instance as rd_instance
|
||||
from trove.conductor import api as conductor_api
|
||||
from trove.instance import models as rd_models
|
||||
from trove.openstack.common import log as logging
|
||||
|
||||
@ -55,7 +57,9 @@ class BaseDbStatus(object):
|
||||
def __init__(self):
|
||||
if self._instance is not None:
|
||||
raise RuntimeError("Cannot instantiate twice.")
|
||||
self.status = self._load_status().status
|
||||
self.status = rd_models.InstanceServiceStatus(
|
||||
instance_id=CONF.guest_id,
|
||||
status=rd_instance.ServiceStatuses.NEW)
|
||||
self.restart_mode = False
|
||||
|
||||
def begin_install(self):
|
||||
@ -101,17 +105,17 @@ class BaseDbStatus(object):
|
||||
return (self.status is not None and
|
||||
self.status == rd_instance.ServiceStatuses.RUNNING)
|
||||
|
||||
@staticmethod
|
||||
def _load_status():
|
||||
"""Loads the status from the database."""
|
||||
inst_id = CONF.guest_id
|
||||
return rd_models.InstanceServiceStatus.find_by(instance_id=inst_id)
|
||||
|
||||
def set_status(self, status):
|
||||
"""Changes the status of the DB app in the database."""
|
||||
db_status = self._load_status()
|
||||
db_status.status = status
|
||||
db_status.save()
|
||||
"""Use conductor to update the DB app status."""
|
||||
LOG.debug("Casting set_status message to conductor.")
|
||||
ctxt = context.TroveContext(user=CONF.nova_proxy_admin_user,
|
||||
auth_token=CONF.nova_proxy_admin_pass)
|
||||
|
||||
heartbeat = {
|
||||
'service_status': status.description,
|
||||
}
|
||||
conductor_api.API(ctxt).heartbeat(CONF.guest_id, heartbeat)
|
||||
LOG.debug("Successfully cast set_status.")
|
||||
self.status = status
|
||||
|
||||
def update(self):
|
||||
|
@ -87,11 +87,11 @@ class API(proxy.RpcProxy):
|
||||
self.cast(self.context,
|
||||
self.make_msg("delete_instance", instance_id=instance_id))
|
||||
|
||||
def create_backup(self, backup_id, instance_id):
|
||||
def create_backup(self, backup_info, instance_id):
|
||||
LOG.debug("Making async call to create a backup for instance: %s" %
|
||||
instance_id)
|
||||
self.cast(self.context, self.make_msg("create_backup",
|
||||
backup_id=backup_id,
|
||||
backup_info=backup_info,
|
||||
instance_id=instance_id))
|
||||
|
||||
def delete_backup(self, backup_id):
|
||||
|
@ -76,9 +76,9 @@ class Manager(periodic_task.PeriodicTasks):
|
||||
def delete_backup(self, context, backup_id):
|
||||
models.BackupTasks.delete_backup(context, backup_id)
|
||||
|
||||
def create_backup(self, context, backup_id, instance_id):
|
||||
def create_backup(self, context, backup_info, instance_id):
|
||||
instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)
|
||||
instance_tasks.create_backup(backup_id)
|
||||
instance_tasks.create_backup(backup_info)
|
||||
|
||||
def create_instance(self, context, instance_id, name, flavor,
|
||||
image_id, databases, users, datastore_manager,
|
||||
|
@ -17,6 +17,7 @@ import os.path
|
||||
from cinderclient import exceptions as cinder_exceptions
|
||||
from eventlet import greenthread
|
||||
from novaclient import exceptions as nova_exceptions
|
||||
from trove.backup import models as bkup_models
|
||||
from trove.common import cfg
|
||||
from trove.common import template
|
||||
from trove.common import utils
|
||||
@ -47,7 +48,6 @@ from trove.openstack.common.gettextutils import _
|
||||
from trove.openstack.common.notifier import api as notifier
|
||||
from trove.openstack.common import timeutils
|
||||
import trove.common.remote as remote
|
||||
import trove.backup.models
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
@ -193,8 +193,16 @@ class FreshInstanceTasks(FreshInstance, NotifyMixin, ConfigurationMixin):
|
||||
config = self._render_config(datastore_manager, flavor, self.id)
|
||||
|
||||
if server:
|
||||
backup_info = None
|
||||
if backup_id is not None:
|
||||
backup = bkup_models.Backup.get_by_id(self.context, backup_id)
|
||||
backup_info = {'id': backup_id,
|
||||
'location': backup.location,
|
||||
'type': backup.backup_type,
|
||||
'checksum': backup.checksum,
|
||||
}
|
||||
self._guest_prepare(server, flavor['ram'], volume_info,
|
||||
packages, databases, users, backup_id,
|
||||
packages, databases, users, backup_info,
|
||||
config.config_contents, root_password)
|
||||
|
||||
if not self.db_info.task_status.is_error:
|
||||
@ -507,14 +515,14 @@ class FreshInstanceTasks(FreshInstance, NotifyMixin, ConfigurationMixin):
|
||||
return server
|
||||
|
||||
def _guest_prepare(self, server, flavor_ram, volume_info,
|
||||
packages, databases, users, backup_id=None,
|
||||
packages, databases, users, backup_info=None,
|
||||
config_contents=None, root_password=None):
|
||||
LOG.info(_("Entering guest_prepare"))
|
||||
# Now wait for the response from the create to do additional work
|
||||
self.guest.prepare(flavor_ram, packages, databases, users,
|
||||
device_path=volume_info['device_path'],
|
||||
mount_point=volume_info['mount_point'],
|
||||
backup_id=backup_id,
|
||||
backup_info=backup_info,
|
||||
config_contents=config_contents,
|
||||
root_password=root_password)
|
||||
|
||||
@ -733,9 +741,9 @@ class BuiltInstanceTasks(BuiltInstance, NotifyMixin, ConfigurationMixin):
|
||||
action = MigrateAction(self, host)
|
||||
action.execute()
|
||||
|
||||
def create_backup(self, backup_id):
|
||||
def create_backup(self, backup_info):
|
||||
LOG.debug(_("Calling create_backup %s ") % self.id)
|
||||
self.guest.create_backup(backup_id)
|
||||
self.guest.create_backup(backup_info)
|
||||
|
||||
def reboot(self):
|
||||
try:
|
||||
@ -836,7 +844,7 @@ class BackupTasks(object):
|
||||
@classmethod
|
||||
def delete_backup(cls, context, backup_id):
|
||||
#delete backup from swift
|
||||
backup = trove.backup.models.Backup.get_by_id(context, backup_id)
|
||||
backup = bkup_models.Backup.get_by_id(context, backup_id)
|
||||
try:
|
||||
filename = backup.filename
|
||||
if filename:
|
||||
@ -849,8 +857,8 @@ class BackupTasks(object):
|
||||
backup.delete()
|
||||
else:
|
||||
LOG.exception(_("Exception deleting from swift. "
|
||||
"Details: %s") % e)
|
||||
backup.state = trove.backup.models.BackupState.DELETE_FAILED
|
||||
"Details: %s") % e)
|
||||
backup.state = bkup_models.BackupState.DELETE_FAILED
|
||||
backup.save()
|
||||
raise TroveError("Failed to delete swift objects")
|
||||
else:
|
||||
|
@ -74,7 +74,8 @@ class ResizeTestBase(TestCase):
|
||||
try:
|
||||
self.instance.update_db(task_status=InstanceTasks.NONE)
|
||||
self.mock.ReplayAll()
|
||||
self.assertRaises(Exception, self.action.execute)
|
||||
excs = (Exception)
|
||||
self.assertRaises(excs, self.action.execute)
|
||||
self.mock.VerifyAll()
|
||||
finally:
|
||||
self.mock.UnsetStubs()
|
||||
|
@ -208,7 +208,7 @@ class FakeGuest(object):
|
||||
return self.users.get((username, hostname), None)
|
||||
|
||||
def prepare(self, memory_mb, packages, databases, users, device_path=None,
|
||||
mount_point=None, backup_id=None, config_contents=None,
|
||||
mount_point=None, backup_info=None, config_contents=None,
|
||||
root_password=None):
|
||||
from trove.instance.models import DBInstance
|
||||
from trove.instance.models import InstanceServiceStatus
|
||||
@ -296,9 +296,10 @@ class FakeGuest(object):
|
||||
} for db in current_grants]
|
||||
return dbs
|
||||
|
||||
def create_backup(self, backup_id):
|
||||
def create_backup(self, backup_info):
|
||||
from trove.backup.models import Backup, BackupState
|
||||
backup = Backup.get_by_id(context=None, backup_id=backup_id)
|
||||
backup = Backup.get_by_id(context=None,
|
||||
backup_id=backup_info['id'])
|
||||
|
||||
def finish_create_backup():
|
||||
backup.state = BackupState.COMPLETED
|
||||
|
@ -12,16 +12,18 @@
|
||||
#See the License for the specific language governing permissions and
|
||||
#limitations under the License.
|
||||
|
||||
import hashlib
|
||||
import os
|
||||
|
||||
import testtools
|
||||
from mock import Mock
|
||||
from mockito import when, verify, unstub, mock, any, contains
|
||||
from testtools.matchers import Equals, Is
|
||||
from webob.exc import HTTPNotFound
|
||||
from mockito import when, verify, unstub, mock, any, contains
|
||||
|
||||
import hashlib
|
||||
import os
|
||||
import testtools
|
||||
|
||||
from trove.common import utils
|
||||
from trove.common.context import TroveContext
|
||||
from trove.conductor import api as conductor_api
|
||||
from trove.guestagent.strategies.backup import mysql_impl
|
||||
from trove.guestagent.strategies.restore.base import RestoreRunner
|
||||
from trove.backup.models import DBBackup
|
||||
@ -33,6 +35,8 @@ from trove.guestagent.strategies.backup.base import BackupRunner
|
||||
from trove.guestagent.strategies.backup.base import UnknownBackupType
|
||||
from trove.guestagent.strategies.storage.base import Storage
|
||||
|
||||
conductor_api.API.update_backup = Mock()
|
||||
|
||||
|
||||
def create_fake_data():
|
||||
from random import choice
|
||||
@ -216,48 +220,60 @@ class BackupAgentTest(testtools.TestCase):
|
||||
starts storage
|
||||
reports status
|
||||
"""
|
||||
backup = mock(DBBackup)
|
||||
when(DatabaseModelBase).find_by(id='123').thenReturn(backup)
|
||||
when(backup).save().thenReturn(backup)
|
||||
|
||||
agent = backupagent.BackupAgent()
|
||||
agent.execute_backup(context=None, backup_id='123', runner=MockBackup)
|
||||
backup_info = {'id': '123',
|
||||
'location': 'fake-location',
|
||||
'type': 'InnoBackupEx',
|
||||
'checksum': 'fake-checksum',
|
||||
}
|
||||
agent.execute_backup(context=None, backup_info=backup_info,
|
||||
runner=MockBackup)
|
||||
|
||||
verify(DatabaseModelBase).find_by(id='123')
|
||||
self.assertThat(backup.state, Is(BackupState.COMPLETED))
|
||||
self.assertThat(backup.location,
|
||||
Equals('http://mockswift/v1/database_backups/123'))
|
||||
verify(backup, times=3).save()
|
||||
self.assertTrue(
|
||||
conductor_api.API.update_backup.called_once_with(
|
||||
any(),
|
||||
backup_id=backup_info['id'],
|
||||
state=BackupState.NEW))
|
||||
|
||||
self.assertTrue(
|
||||
conductor_api.API.update_backup.called_once_with(
|
||||
any(),
|
||||
backup_id=backup_info['id'],
|
||||
size=any(),
|
||||
state=BackupState.BUILDING))
|
||||
|
||||
self.assertTrue(
|
||||
conductor_api.API.update_backup.called_once_with(
|
||||
any(),
|
||||
backup_id=backup_info['id'],
|
||||
checksum=any(),
|
||||
location=any(),
|
||||
note=any(),
|
||||
backup_type=backup_info['type'],
|
||||
state=BackupState.COMPLETED))
|
||||
|
||||
def test_execute_lossy_backup(self):
|
||||
"""This test verifies that incomplete writes to swift will fail."""
|
||||
backup = mock(DBBackup)
|
||||
when(backupagent).get_auth_password().thenReturn('secret')
|
||||
when(DatabaseModelBase).find_by(id='123').thenReturn(backup)
|
||||
when(backup).save().thenReturn(backup)
|
||||
when(MockSwift).save(any(), any()).thenReturn((False, 'Error', 'y',
|
||||
'z'))
|
||||
agent = backupagent.BackupAgent()
|
||||
|
||||
backup_info = {'id': '123',
|
||||
'location': 'fake-location',
|
||||
'type': 'InnoBackupEx',
|
||||
'checksum': 'fake-checksum',
|
||||
}
|
||||
self.assertRaises(backupagent.BackupError, agent.execute_backup,
|
||||
context=None, backup_id='123',
|
||||
context=None, backup_info=backup_info,
|
||||
runner=MockLossyBackup)
|
||||
|
||||
self.assertThat(backup.state, Is(BackupState.FAILED))
|
||||
verify(backup, times=3).save()
|
||||
|
||||
def test_execute_backup_model_exception(self):
|
||||
"""This test should ensure backup agent
|
||||
properly handles condition where backup model is not found
|
||||
"""
|
||||
when(DatabaseModelBase).find_by(id='123').thenRaise(ModelNotFoundError)
|
||||
|
||||
agent = backupagent.BackupAgent()
|
||||
# probably should catch this exception and return a backup exception
|
||||
# also note that since the model is not found there is no way to report
|
||||
# this error
|
||||
self.assertRaises(ModelNotFoundError, agent.execute_backup,
|
||||
context=None, backup_id='123')
|
||||
#self.assertThat(backup.state, Is(BackupState.FAILED))
|
||||
self.assertTrue(
|
||||
conductor_api.API.update_backup.called_once_with(
|
||||
any(),
|
||||
backup_id=backup_info['id'],
|
||||
state=BackupState.FAILED))
|
||||
|
||||
def test_execute_restore(self):
|
||||
"""This test should ensure backup agent
|
||||
@ -282,7 +298,12 @@ class BackupAgentTest(testtools.TestCase):
|
||||
|
||||
agent = backupagent.BackupAgent()
|
||||
|
||||
agent.execute_restore(TroveContext(), '123', '/var/lib/mysql')
|
||||
bkup_info = {'id': '123',
|
||||
'location': 'fake-location',
|
||||
'type': 'InnoBackupEx',
|
||||
'checksum': 'fake-checksum',
|
||||
}
|
||||
agent.execute_restore(TroveContext(), bkup_info, '/var/lib/mysql')
|
||||
|
||||
def test_restore_unknown(self):
|
||||
backup = mock(DBBackup)
|
||||
@ -296,6 +317,11 @@ class BackupAgentTest(testtools.TestCase):
|
||||
|
||||
agent = backupagent.BackupAgent()
|
||||
|
||||
bkup_info = {'id': '123',
|
||||
'location': backup.location,
|
||||
'type': backup.backup_type,
|
||||
'checksum': 'fake-checksum',
|
||||
}
|
||||
self.assertRaises(UnknownBackupType, agent.execute_restore,
|
||||
context=None, backup_id='123',
|
||||
context=None, backup_info=bkup_info,
|
||||
restore_location='/var/lib/mysql')
|
||||
|
13
trove/tests/unittests/conductor/__init__.py
Normal file
13
trove/tests/unittests/conductor/__init__.py
Normal file
@ -0,0 +1,13 @@
|
||||
# Copyright 2013 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
148
trove/tests/unittests/conductor/test_methods.py
Normal file
148
trove/tests/unittests/conductor/test_methods.py
Normal file
@ -0,0 +1,148 @@
|
||||
# Copyright 2013 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import testtools
|
||||
from datetime import datetime
|
||||
from mockito import unstub
|
||||
from trove.backup import models as bkup_models
|
||||
from trove.common import context
|
||||
from trove.common import exception as t_exception
|
||||
from trove.common import instance as t_instance
|
||||
from trove.conductor import api as conductor_api
|
||||
from trove.conductor import manager as conductor_manager
|
||||
from trove.instance import models as t_models
|
||||
from trove.instance.tasks import InstanceTasks
|
||||
from trove.tests.unittests.util import util
|
||||
from uuid import uuid4
|
||||
|
||||
|
||||
# See LP bug #1255178
|
||||
OLD_DBB_SAVE = bkup_models.DBBackup.save
|
||||
|
||||
|
||||
def generate_uuid(length=16):
|
||||
uuid = []
|
||||
while len(''.join(uuid)) < length:
|
||||
uuid.append(str(uuid4()))
|
||||
return (''.join(uuid))[:length]
|
||||
|
||||
|
||||
class ConductorMethodTests(testtools.TestCase):
|
||||
def setUp(self):
|
||||
# See LP bug #1255178
|
||||
bkup_models.DBBackup.save = OLD_DBB_SAVE
|
||||
super(ConductorMethodTests, self).setUp()
|
||||
util.init_db()
|
||||
self.cond_mgr = conductor_manager.Manager()
|
||||
self.instance_id = generate_uuid()
|
||||
|
||||
def tearDown(self):
|
||||
super(ConductorMethodTests, self).tearDown()
|
||||
unstub()
|
||||
|
||||
def _create_iss(self):
|
||||
new_id = generate_uuid()
|
||||
iss = t_models.InstanceServiceStatus(
|
||||
id=new_id,
|
||||
instance_id=self.instance_id,
|
||||
status=t_instance.ServiceStatuses.NEW)
|
||||
iss.save()
|
||||
return new_id
|
||||
|
||||
def _get_iss(self, id):
|
||||
return t_models.InstanceServiceStatus.find_by(id=id)
|
||||
|
||||
def _create_backup(self, name='fake backup'):
|
||||
new_id = generate_uuid()
|
||||
backup = bkup_models.DBBackup.create(
|
||||
id=new_id,
|
||||
name=name,
|
||||
description='This is a fake backup object.',
|
||||
tenant_id=generate_uuid(),
|
||||
state=bkup_models.BackupState.NEW,
|
||||
instance_id=self.instance_id)
|
||||
backup.save()
|
||||
return new_id
|
||||
|
||||
def _get_backup(self, id):
|
||||
return bkup_models.DBBackup.find_by(id=id)
|
||||
|
||||
# --- Tests for heartbeat ---
|
||||
|
||||
def test_heartbeat_instance_not_found(self):
|
||||
new_id = generate_uuid()
|
||||
self.assertRaises(t_exception.ModelNotFoundError,
|
||||
self.cond_mgr.heartbeat, None, new_id, {})
|
||||
|
||||
def test_heartbeat_instance_no_changes(self):
|
||||
iss_id = self._create_iss()
|
||||
old_iss = self._get_iss(iss_id)
|
||||
self.cond_mgr.heartbeat(None, self.instance_id, {})
|
||||
new_iss = self._get_iss(iss_id)
|
||||
self.assertEqual(old_iss.status_id, new_iss.status_id)
|
||||
self.assertEqual(old_iss.status_description,
|
||||
new_iss.status_description)
|
||||
|
||||
def test_heartbeat_instance_status_bogus_change(self):
|
||||
iss_id = self._create_iss()
|
||||
old_iss = self._get_iss(iss_id)
|
||||
new_status = 'potato salad'
|
||||
payload = {
|
||||
'service_status': new_status,
|
||||
}
|
||||
self.assertRaises(ValueError, self.cond_mgr.heartbeat,
|
||||
None, self.instance_id, payload)
|
||||
new_iss = self._get_iss(iss_id)
|
||||
self.assertEqual(old_iss.status_id, new_iss.status_id)
|
||||
self.assertEqual(old_iss.status_description,
|
||||
new_iss.status_description)
|
||||
|
||||
def test_heartbeat_instance_status_changed(self):
|
||||
iss_id = self._create_iss()
|
||||
payload = {'service_status': 'building'}
|
||||
self.cond_mgr.heartbeat(None, self.instance_id, payload)
|
||||
iss = self._get_iss(iss_id)
|
||||
self.assertEqual(t_instance.ServiceStatuses.BUILDING, iss.status)
|
||||
|
||||
# --- Tests for update_backup ---
|
||||
|
||||
def test_backup_not_found(self):
|
||||
new_bkup_id = generate_uuid()
|
||||
self.assertRaises(t_exception.ModelNotFoundError,
|
||||
self.cond_mgr.update_backup,
|
||||
None, self.instance_id, new_bkup_id)
|
||||
|
||||
def test_backup_instance_id_nomatch(self):
|
||||
new_iid = generate_uuid()
|
||||
bkup_id = self._create_backup('nomatch')
|
||||
old_name = self._get_backup(bkup_id).name
|
||||
self.cond_mgr.update_backup(None, new_iid, bkup_id,
|
||||
name="remains unchanged")
|
||||
bkup = self._get_backup(bkup_id)
|
||||
self.assertEqual(old_name, bkup.name)
|
||||
|
||||
def test_backup_bogus_fields_not_changed(self):
|
||||
bkup_id = self._create_backup('bogus')
|
||||
self.cond_mgr.update_backup(None, self.instance_id, bkup_id,
|
||||
not_a_valid_field="INVALID")
|
||||
bkup = self._get_backup(bkup_id)
|
||||
self.assertFalse(hasattr(bkup, 'not_a_valid_field'))
|
||||
|
||||
def test_backup_real_fields_changed(self):
|
||||
bkup_id = self._create_backup('realrenamed')
|
||||
new_name = "recently renamed"
|
||||
self.cond_mgr.update_backup(None, self.instance_id, bkup_id,
|
||||
name=new_name)
|
||||
bkup = self._get_backup(bkup_id)
|
||||
self.assertEqual(new_name, bkup.name)
|
@ -237,9 +237,9 @@ class ApiTest(testtools.TestCase):
|
||||
self._verify_rpc_call(exp_msg)
|
||||
|
||||
def test_create_backup(self):
|
||||
exp_msg = RpcMsgMatcher('create_backup', 'backup_id')
|
||||
exp_msg = RpcMsgMatcher('create_backup', 'backup_info')
|
||||
self._mock_rpc_cast(exp_msg)
|
||||
self.api.create_backup('123')
|
||||
self.api.create_backup({'id': '123'})
|
||||
self._verify_rpc_cast(exp_msg)
|
||||
|
||||
def _verify_rpc_connection_and_cast(self, rpc, mock_conn, exp_msg):
|
||||
@ -254,8 +254,8 @@ class ApiTest(testtools.TestCase):
|
||||
when(mock_conn).create_consumer(any(), any(), any()).thenReturn(None)
|
||||
exp_msg = RpcMsgMatcher('prepare', 'memory_mb', 'packages',
|
||||
'databases', 'users', 'device_path',
|
||||
'mount_point', 'backup_id', 'config_contents',
|
||||
'root_password')
|
||||
'mount_point', 'backup_info',
|
||||
'config_contents', 'root_password')
|
||||
|
||||
when(rpc).cast(any(), any(), exp_msg).thenReturn(None)
|
||||
|
||||
@ -270,12 +270,13 @@ class ApiTest(testtools.TestCase):
|
||||
when(mock_conn).create_consumer(any(), any(), any()).thenReturn(None)
|
||||
exp_msg = RpcMsgMatcher('prepare', 'memory_mb', 'packages',
|
||||
'databases', 'users', 'device_path',
|
||||
'mount_point', 'backup_id', 'config_contents',
|
||||
'root_password')
|
||||
'mount_point', 'backup_info',
|
||||
'config_contents', 'root_password')
|
||||
when(rpc).cast(any(), any(), exp_msg).thenReturn(None)
|
||||
bkup = {'id': 'backup_id_123'}
|
||||
|
||||
self.api.prepare('2048', 'package1', 'db1', 'user1', '/dev/vdt',
|
||||
'/mnt/opt', 'backup_id_123', 'cont', '1-2-3-4')
|
||||
'/mnt/opt', bkup, 'cont', '1-2-3-4')
|
||||
|
||||
self._verify_rpc_connection_and_cast(rpc, mock_conn, exp_msg)
|
||||
|
||||
|
@ -13,7 +13,7 @@
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
from random import randint
|
||||
from uuid import uuid4
|
||||
import time
|
||||
from mock import Mock
|
||||
from mock import MagicMock
|
||||
@ -37,6 +37,7 @@ import trove
|
||||
from trove.common.context import TroveContext
|
||||
from trove.common import utils
|
||||
from trove.common import instance as rd_instance
|
||||
from trove.conductor import api as conductor_api
|
||||
import trove.guestagent.datastore.mysql.service as dbaas
|
||||
from trove.guestagent import dbaas as dbaas_sr
|
||||
from trove.guestagent import pkg
|
||||
@ -65,6 +66,9 @@ FAKE_USER = [{"_name": "random", "_password": "guesswhat",
|
||||
"_databases": [FAKE_DB]}]
|
||||
|
||||
|
||||
conductor_api.API.heartbeat = Mock()
|
||||
|
||||
|
||||
class FakeAppStatus(MySqlAppStatus):
|
||||
|
||||
def __init__(self, id, status):
|
||||
@ -74,9 +78,6 @@ class FakeAppStatus(MySqlAppStatus):
|
||||
def _get_actual_db_status(self):
|
||||
return self.next_fake_status
|
||||
|
||||
def _load_status(self):
|
||||
return InstanceServiceStatus.find_by(instance_id=self.id)
|
||||
|
||||
def set_next_status(self, next_status):
|
||||
self.next_fake_status = next_status
|
||||
|
||||
@ -455,7 +456,7 @@ class MySqlAppTest(testtools.TestCase):
|
||||
self.orig_utils_execute_with_timeout = dbaas.utils.execute_with_timeout
|
||||
self.orig_time_sleep = dbaas.time.sleep
|
||||
util.init_db()
|
||||
self.FAKE_ID = randint(1, 10000)
|
||||
self.FAKE_ID = str(uuid4())
|
||||
InstanceServiceStatus.create(instance_id=self.FAKE_ID,
|
||||
status=rd_instance.ServiceStatuses.NEW)
|
||||
self.appStatus = FakeAppStatus(self.FAKE_ID,
|
||||
@ -523,7 +524,8 @@ class MySqlAppTest(testtools.TestCase):
|
||||
rd_instance.ServiceStatuses.SHUTDOWN)
|
||||
|
||||
self.mySqlApp.stop_db(True)
|
||||
self.assert_reported_status(rd_instance.ServiceStatuses.SHUTDOWN)
|
||||
self.assertTrue(conductor_api.API.heartbeat.called_once_with(
|
||||
self.FAKE_ID, {'service_status': 'shutdown'}))
|
||||
|
||||
def test_stop_mysql_error(self):
|
||||
|
||||
@ -543,7 +545,8 @@ class MySqlAppTest(testtools.TestCase):
|
||||
|
||||
self.assertTrue(self.mySqlApp.stop_db.called)
|
||||
self.assertTrue(self.mySqlApp.start_mysql.called)
|
||||
self.assert_reported_status(rd_instance.ServiceStatuses.RUNNING)
|
||||
self.assertTrue(conductor_api.API.heartbeat.called_once_with(
|
||||
self.FAKE_ID, {'service_status': 'running'}))
|
||||
|
||||
def test_restart_mysql_wont_start_up(self):
|
||||
|
||||
@ -589,8 +592,9 @@ class MySqlAppTest(testtools.TestCase):
|
||||
self.mySqlApp._enable_mysql_on_boot = Mock()
|
||||
self.appStatus.set_next_status(rd_instance.ServiceStatuses.RUNNING)
|
||||
|
||||
self.mySqlApp.start_mysql(True)
|
||||
self.assert_reported_status(rd_instance.ServiceStatuses.RUNNING)
|
||||
self.mySqlApp.start_mysql(update_db=True)
|
||||
self.assertTrue(conductor_api.API.heartbeat.called_once_with(
|
||||
self.FAKE_ID, {'service_status': 'running'}))
|
||||
|
||||
def test_start_mysql_runs_forever(self):
|
||||
|
||||
@ -600,7 +604,8 @@ class MySqlAppTest(testtools.TestCase):
|
||||
self.appStatus.set_next_status(rd_instance.ServiceStatuses.SHUTDOWN)
|
||||
|
||||
self.assertRaises(RuntimeError, self.mySqlApp.start_mysql)
|
||||
self.assert_reported_status(rd_instance.ServiceStatuses.SHUTDOWN)
|
||||
self.assertTrue(conductor_api.API.heartbeat.called_once_with(
|
||||
self.FAKE_ID, {'service_status': 'shutdown'}))
|
||||
|
||||
def test_start_mysql_error(self):
|
||||
|
||||
@ -1003,7 +1008,7 @@ class BaseDbStatusTest(testtools.TestCase):
|
||||
super(BaseDbStatusTest, self).setUp()
|
||||
util.init_db()
|
||||
self.orig_dbaas_time_sleep = dbaas.time.sleep
|
||||
self.FAKE_ID = randint(1, 10000)
|
||||
self.FAKE_ID = str(uuid4())
|
||||
InstanceServiceStatus.create(instance_id=self.FAKE_ID,
|
||||
status=rd_instance.ServiceStatuses.NEW)
|
||||
dbaas.CONF.guest_id = self.FAKE_ID
|
||||
@ -1122,7 +1127,7 @@ class MySqlAppStatusTest(testtools.TestCase):
|
||||
self.orig_load_mysqld_options = dbaas.load_mysqld_options
|
||||
self.orig_dbaas_os_path_exists = dbaas.os.path.exists
|
||||
self.orig_dbaas_time_sleep = dbaas.time.sleep
|
||||
self.FAKE_ID = randint(1, 10000)
|
||||
self.FAKE_ID = str(uuid4())
|
||||
InstanceServiceStatus.create(instance_id=self.FAKE_ID,
|
||||
status=rd_instance.ServiceStatuses.NEW)
|
||||
dbaas.CONF.guest_id = self.FAKE_ID
|
||||
|
@ -140,6 +140,13 @@ class GuestAgentManagerTest(testtools.TestCase):
|
||||
|
||||
# covering all outcomes is starting to cause trouble here
|
||||
COUNT = 1 if device_path else 0
|
||||
backup_info = None
|
||||
if backup_id is not None:
|
||||
backup_info = {'id': backup_id,
|
||||
'location': 'fake-location',
|
||||
'type': 'InnoBackupEx',
|
||||
'checksum': 'fake-checksum',
|
||||
}
|
||||
|
||||
# TODO(juice): this should stub an instance of the MySqlAppStatus
|
||||
mock_status = mock()
|
||||
@ -150,8 +157,10 @@ class GuestAgentManagerTest(testtools.TestCase):
|
||||
when(VolumeDevice).mount().thenReturn(None)
|
||||
when(dbaas.MySqlApp).stop_db().thenReturn(None)
|
||||
when(dbaas.MySqlApp).start_mysql().thenReturn(None)
|
||||
when(dbaas.MySqlApp).install_if_needed().thenReturn(None)
|
||||
when(backup).restore(self.context, backup_id).thenReturn(None)
|
||||
when(dbaas.MySqlApp).install_if_needed(any()).thenReturn(None)
|
||||
when(backup).restore(self.context,
|
||||
backup_info,
|
||||
'/var/lib/mysql').thenReturn(None)
|
||||
when(dbaas.MySqlApp).secure(any()).thenReturn(None)
|
||||
when(dbaas.MySqlApp).secure_root(any()).thenReturn(None)
|
||||
(when(pkg.Package).pkg_is_installed(any()).
|
||||
@ -164,12 +173,14 @@ class GuestAgentManagerTest(testtools.TestCase):
|
||||
|
||||
when(os.path).exists(any()).thenReturn(True)
|
||||
# invocation
|
||||
self.manager.prepare(context=self.context, packages=None,
|
||||
self.manager.prepare(context=self.context,
|
||||
packages=None,
|
||||
memory_mb='2048',
|
||||
databases=None,
|
||||
memory_mb='2048', users=None,
|
||||
users=None,
|
||||
device_path=device_path,
|
||||
mount_point='/var/lib/mysql',
|
||||
backup_id=backup_id)
|
||||
backup_info=backup_info)
|
||||
# verification/assertion
|
||||
verify(mock_status).begin_install()
|
||||
|
||||
@ -177,8 +188,9 @@ class GuestAgentManagerTest(testtools.TestCase):
|
||||
verify(dbaas.MySqlApp, times=COUNT).stop_db()
|
||||
verify(VolumeDevice, times=COUNT).migrate_data(
|
||||
any())
|
||||
if backup_id:
|
||||
verify(backup).restore(self.context, backup_id, '/var/lib/mysql')
|
||||
if backup_info:
|
||||
verify(backup).restore(self.context, backup_info, '/var/lib/mysql')
|
||||
verify(dbaas.MySqlApp).install_if_needed(any())
|
||||
# We dont need to make sure the exact contents are there
|
||||
verify(dbaas.MySqlApp).secure(any())
|
||||
verify(dbaas.MySqlAdmin, never).create_database()
|
||||
|
@ -68,9 +68,9 @@ class Run_with_quotasTest(testtools.TestCase):
|
||||
|
||||
def test_run_with_quotas_error(self):
|
||||
|
||||
f = Mock(side_effect=Exception())
|
||||
f = Mock(side_effect=exception.TroveError())
|
||||
|
||||
self.assertRaises(Exception, run_with_quotas, FAKE_TENANT1,
|
||||
self.assertRaises(exception.TroveError, run_with_quotas, FAKE_TENANT1,
|
||||
{'instances': 1, 'volumes': 5}, f)
|
||||
self.assertTrue(QUOTAS.reserve.called)
|
||||
self.assertTrue(QUOTAS.rollback.called)
|
||||
|
@ -17,12 +17,12 @@ from testtools.matchers import Equals
|
||||
from mockito import mock, when, unstub, any, verify, never
|
||||
from trove.datastore import models as datastore_models
|
||||
from trove.taskmanager import models as taskmanager_models
|
||||
import trove.common.remote as remote
|
||||
from trove.backup import models as backup_models
|
||||
from trove.common import remote
|
||||
from trove.common.instance import ServiceStatuses
|
||||
from trove.instance.models import InstanceServiceStatus
|
||||
from trove.instance.models import DBInstance
|
||||
from trove.instance.tasks import InstanceTasks
|
||||
import trove.backup.models as backup_models
|
||||
from trove.common.exception import TroveError
|
||||
from swiftclient.client import ClientException
|
||||
from tempfile import NamedTemporaryFile
|
||||
|
Loading…
x
Reference in New Issue
Block a user