From 4fb41b5198c865b46a02dd72501d12e60ec10dd6 Mon Sep 17 00:00:00 2001 From: Lingxian Kong Date: Wed, 2 Sep 2020 10:10:23 +1200 Subject: [PATCH] Postgresql: Backup and restore Change-Id: Icf08b7dc82ce501d82b45cf5412256a43716b6ae --- backup/Dockerfile | 5 +- backup/drivers/base.py | 10 +- backup/drivers/innobackupex.py | 1 - backup/drivers/mariabackup.py | 1 - backup/drivers/mysql_base.py | 7 +- backup/drivers/postgres.py | 249 ++++++++++++++++++ backup/install.sh | 9 +- backup/main.py | 28 +- backup/requirements.txt | 2 + backup/storage/swift.py | 8 +- backup/utils/__init__.py | 46 ++++ backup/utils/postgresql.py | 53 ++++ trove/common/cfg.py | 7 +- trove/guestagent/common/operating_system.py | 44 ++-- trove/guestagent/datastore/manager.py | 19 +- .../datastore/mysql_common/manager.py | 33 +-- .../datastore/mysql_common/service.py | 104 -------- .../guestagent/datastore/postgres/manager.py | 49 +++- trove/guestagent/datastore/postgres/query.py | 2 +- .../guestagent/datastore/postgres/service.py | 66 ++++- trove/guestagent/datastore/service.py | 112 +++++++- trove/templates/postgresql/config.template | 9 +- 22 files changed, 673 insertions(+), 191 deletions(-) create mode 100644 backup/drivers/postgres.py create mode 100644 backup/utils/__init__.py create mode 100644 backup/utils/postgresql.py diff --git a/backup/Dockerfile b/backup/Dockerfile index 86c19ede4a..38ebb14ad6 100644 --- a/backup/Dockerfile +++ b/backup/Dockerfile @@ -4,8 +4,9 @@ LABEL maintainer="anlin.kong@gmail.com" ARG DATASTORE="mysql" ARG APTOPTS="-y -qq --no-install-recommends --allow-unauthenticated" ARG PERCONA_XTRABACKUP_VERSION=24 -ENV DEBIAN_FRONTEND noninteractive \ - APT_KEY_DONT_WARN_ON_DANGEROUS_USAGE=1 + +RUN export DEBIAN_FRONTEND="noninteractive" \ + && export APT_KEY_DONT_WARN_ON_DANGEROUS_USAGE=1 RUN apt-get update \ && apt-get install $APTOPTS gnupg2 lsb-release apt-utils apt-transport-https ca-certificates software-properties-common curl \ diff --git a/backup/drivers/base.py b/backup/drivers/base.py index 033553bcf9..20ed75cfcc 100644 --- a/backup/drivers/base.py +++ b/backup/drivers/base.py @@ -27,12 +27,11 @@ class BaseRunner(object): """Base class for Backup Strategy implementations.""" # Subclass should provide the commands. - cmd = None - restore_cmd = None - prepare_cmd = None + cmd = '' + restore_cmd = '' + prepare_cmd = '' encrypt_key = CONF.backup_encryption_key - default_data_dir = '/var/lib/mysql/data' def __init__(self, *args, **kwargs): self.process = None @@ -43,8 +42,9 @@ class BaseRunner(object): self.checksum = kwargs.pop('checksum', '') if 'restore_location' not in kwargs: - kwargs['restore_location'] = self.default_data_dir + kwargs['restore_location'] = self.datadir self.restore_location = kwargs['restore_location'] + self.restore_content_length = 0 self.command = self.cmd % kwargs self.restore_command = (self.decrypt_cmd + diff --git a/backup/drivers/innobackupex.py b/backup/drivers/innobackupex.py index e077d49714..9bbebc3a88 100644 --- a/backup/drivers/innobackupex.py +++ b/backup/drivers/innobackupex.py @@ -102,7 +102,6 @@ class InnoBackupExIncremental(InnoBackupEx): raise AttributeError('lsn attribute missing') self.parent_location = kwargs.pop('parent_location', '') self.parent_checksum = kwargs.pop('parent_checksum', '') - self.restore_content_length = 0 super(InnoBackupExIncremental, self).__init__(*args, **kwargs) diff --git a/backup/drivers/mariabackup.py b/backup/drivers/mariabackup.py index e10cca30b9..dbf3bd0752 100644 --- a/backup/drivers/mariabackup.py +++ b/backup/drivers/mariabackup.py @@ -56,7 +56,6 @@ class MariaBackupIncremental(MariaBackup): raise AttributeError('lsn attribute missing') self.parent_location = kwargs.pop('parent_location', '') self.parent_checksum = kwargs.pop('parent_checksum', '') - self.restore_content_length = 0 super(MariaBackupIncremental, self).__init__(*args, **kwargs) diff --git a/backup/drivers/mysql_base.py b/backup/drivers/mysql_base.py index 2450daf03c..6389cdb9be 100644 --- a/backup/drivers/mysql_base.py +++ b/backup/drivers/mysql_base.py @@ -27,6 +27,8 @@ LOG = logging.getLogger(__name__) class MySQLBaseRunner(base.BaseRunner): def __init__(self, *args, **kwargs): + self.datadir = kwargs.pop('db_datadir', '/var/lib/mysql/data') + super(MySQLBaseRunner, self).__init__(*args, **kwargs) @property @@ -113,8 +115,8 @@ class MySQLBaseRunner(base.BaseRunner): incremental_dir = None if 'parent_location' in metadata: - LOG.info("Restoring parent: %(parent_location)s" - " checksum: %(parent_checksum)s.", metadata) + LOG.info("Restoring parent: %(parent_location)s, " + "checksum: %(parent_checksum)s.", metadata) parent_location = metadata['parent_location'] parent_checksum = metadata['parent_checksum'] @@ -129,6 +131,7 @@ class MySQLBaseRunner(base.BaseRunner): else: # The parent (full backup) use the same command from InnobackupEx # super class and do not set an incremental_dir. + LOG.info("Restoring back to full backup.") command = self.restore_command self.restore_content_length += self.unpack(location, checksum, command) diff --git a/backup/drivers/postgres.py b/backup/drivers/postgres.py new file mode 100644 index 0000000000..0b6538bb75 --- /dev/null +++ b/backup/drivers/postgres.py @@ -0,0 +1,249 @@ +# Copyright 2020 Catalyst Cloud +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import re + +from oslo_log import log as logging + +from backup import utils +from backup.drivers import base +from backup.utils import postgresql as psql_util + +LOG = logging.getLogger(__name__) + + +class PgBasebackup(base.BaseRunner): + def __init__(self, *args, **kwargs): + if not kwargs.get('wal_archive_dir'): + raise AttributeError('wal_archive_dir attribute missing') + self.wal_archive_dir = kwargs.pop('wal_archive_dir') + self.datadir = kwargs.pop( + 'db_datadir', '/var/lib/postgresql/data/pgdata') + + self.label = None + self.stop_segment = None + self.start_segment = None + self.start_wal_file = None + self.stop_wal_file = None + self.checkpoint_location = None + self.metadata = {} + + super(PgBasebackup, self).__init__(*args, **kwargs) + + self.restore_command = (f"{self.decrypt_cmd}tar xzf - -C " + f"{self.datadir}") + + @property + def cmd(self): + cmd = (f"pg_basebackup -U postgres -Ft -z --wal-method=fetch " + f"--label={self.filename} --pgdata=-") + return cmd + self.encrypt_cmd + + @property + def manifest(self): + """Target file name.""" + return "%s.tar.gz%s" % (self.filename, self.encrypt_manifest) + + def get_wal_files(self, backup_pos=0): + """Return the WAL files since the provided last backup. + + pg_archivebackup depends on alphanumeric sorting to decide wal order, + so we'll do so too: + https://github.com/postgres/postgres/blob/REL9_4_STABLE/contrib + /pg_archivecleanup/pg_archivecleanup.c#L122 + """ + backup_file = self.get_backup_file(backup_pos=backup_pos) + last_wal = backup_file.split('.')[0] + wal_re = re.compile("^[0-9A-F]{24}$") + wal_files = [wal_file for wal_file in os.listdir(self.wal_archive_dir) + if wal_re.search(wal_file) and wal_file >= last_wal] + return wal_files + + def get_backup_file(self, backup_pos=0): + """Look for the most recent .backup file that basebackup creates + + :return: a string like 000000010000000000000006.00000168.backup + """ + backup_re = re.compile("[0-9A-F]{24}.*.backup") + wal_files = [wal_file for wal_file in os.listdir(self.wal_archive_dir) + if backup_re.search(wal_file)] + wal_files = sorted(wal_files, reverse=True) + if not wal_files: + return None + return wal_files[backup_pos] + + def get_backup_metadata(self, metadata_file): + """Parse the contents of the .backup file""" + metadata = {} + + start_re = re.compile(r"START WAL LOCATION: (.*) \(file (.*)\)") + stop_re = re.compile(r"STOP WAL LOCATION: (.*) \(file (.*)\)") + checkpt_re = re.compile("CHECKPOINT LOCATION: (.*)") + label_re = re.compile("LABEL: (.*)") + + with open(metadata_file, 'r') as file: + metadata_contents = file.read() + + match = start_re.search(metadata_contents) + if match: + self.start_segment = match.group(1) + metadata['start-segment'] = self.start_segment + self.start_wal_file = match.group(2) + metadata['start-wal-file'] = self.start_wal_file + + match = stop_re.search(metadata_contents) + if match: + self.stop_segment = match.group(1) + metadata['stop-segment'] = self.stop_segment + self.stop_wal_file = match.group(2) + metadata['stop-wal-file'] = self.stop_wal_file + + match = checkpt_re.search(metadata_contents) + if match: + self.checkpoint_location = match.group(1) + metadata['checkpoint-location'] = self.checkpoint_location + + match = label_re.search(metadata_contents) + if match: + self.label = match.group(1) + metadata['label'] = self.label + + return metadata + + def get_metadata(self): + """Get metadata. + + pg_basebackup may complete, and we arrive here before the + history file is written to the wal archive. So we need to + handle two possibilities: + - this is the first backup, and no history file exists yet + - this isn't the first backup, and so the history file we retrieve + isn't the one we just ran! + """ + def _metadata_found(): + backup_file = self.get_backup_file() + if not backup_file: + return False + + self.metadata = self.get_backup_metadata( + os.path.join(self.wal_archive_dir, backup_file)) + LOG.info("Metadata for backup: %s.", self.metadata) + return self.metadata['label'] == self.filename + + try: + LOG.debug("Polling for backup metadata... ") + utils.poll_until(_metadata_found, sleep_time=5, time_out=60) + except Exception as e: + raise RuntimeError(f"Failed to get backup metadata for backup " + f"{self.filename}: {str(e)}") + + return self.metadata + + def check_process(self): + # If any of the below variables were not set by either metadata() + # or direct retrieval from the pgsql backup commands, then something + # has gone wrong + if not self.start_segment or not self.start_wal_file: + LOG.error("Unable to determine starting WAL file/segment") + return False + if not self.stop_segment or not self.stop_wal_file: + LOG.error("Unable to determine ending WAL file/segment") + return False + if not self.label: + LOG.error("No backup label found") + return False + return True + + +class PgBasebackupIncremental(PgBasebackup): + """Incremental backup/restore for PostgreSQL. + + To restore an incremental backup from a previous backup, in PostgreSQL, + is effectively to replay the WAL entries to a designated point in time. + All that is required is the most recent base backup, and all WAL files + """ + + def __init__(self, *args, **kwargs): + self.parent_location = kwargs.pop('parent_location', '') + self.parent_checksum = kwargs.pop('parent_checksum', '') + + super(PgBasebackupIncremental, self).__init__(*args, **kwargs) + + self.incr_restore_cmd = f'tar -xzf - -C {self.wal_archive_dir}' + + def pre_backup(self): + with psql_util.PostgresConnection('postgres') as conn: + self.start_segment = conn.query( + f"SELECT pg_start_backup('{self.filename}', false, false)" + )[0][0] + self.start_wal_file = conn.query( + f"SELECT pg_walfile_name('{self.start_segment}')")[0][0] + self.stop_segment = conn.query( + "SELECT * FROM pg_stop_backup(false, true)")[0][0] + + # We have to hack this because self.command is + # initialized in the base class before we get here, which is + # when we will know exactly what WAL files we want to archive + self.command = self._cmd() + + def _cmd(self): + wal_file_list = self.get_wal_files(backup_pos=1) + cmd = (f'tar -czf - -C {self.wal_archive_dir} ' + f'{" ".join(wal_file_list)}') + return cmd + self.encrypt_cmd + + def get_metadata(self): + _meta = super(PgBasebackupIncremental, self).get_metadata() + _meta.update({ + 'parent_location': self.parent_location, + 'parent_checksum': self.parent_checksum, + }) + return _meta + + def incremental_restore_cmd(self, incr=False): + cmd = self.restore_command + if incr: + cmd = self.incr_restore_cmd + return self.decrypt_cmd + cmd + + def incremental_restore(self, location, checksum): + """Perform incremental restore. + + For the child backups, restore the wal files to wal archive dir. + For the base backup, restore to datadir. + """ + metadata = self.storage.load_metadata(location, checksum) + if 'parent_location' in metadata: + LOG.info("Restoring parent: %(parent_location)s, " + "checksum: %(parent_checksum)s.", metadata) + + parent_location = metadata['parent_location'] + parent_checksum = metadata['parent_checksum'] + + # Restore parents recursively so backup are applied sequentially + self.incremental_restore(parent_location, parent_checksum) + + command = self.incremental_restore_cmd(incr=True) + else: + # For the parent base backup, revert to the default restore cmd + LOG.info("Restoring back to full backup.") + command = self.incremental_restore_cmd(incr=False) + + self.restore_content_length += self.unpack(location, checksum, command) + + def run_restore(self): + """Run incremental restore.""" + LOG.debug('Running incremental restore') + self.incremental_restore(self.location, self.checksum) + return self.restore_content_length diff --git a/backup/install.sh b/backup/install.sh index ad1c2e4a05..19177bafd6 100755 --- a/backup/install.sh +++ b/backup/install.sh @@ -1,6 +1,7 @@ #!/usr/bin/env bash set -e +export APT_KEY_DONT_WARN_ON_DANGEROUS_USAGE=1 APTOPTS="-y -qq --no-install-recommends --allow-unauthenticated" case "$1" in @@ -8,17 +9,21 @@ case "$1" in curl -sSL https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb -o percona-release.deb dpkg -i percona-release.deb percona-release enable-only tools release - apt-get update apt-get install $APTOPTS percona-xtrabackup-$2 apt-get clean ;; "mariadb") apt-key adv --fetch-keys 'https://mariadb.org/mariadb_release_signing_key.asc' add-apt-repository "deb [arch=amd64] http://mirror2.hs-esslingen.de/mariadb/repo/10.4/ubuntu $(lsb_release -cs) main" - apt-get update apt-get install $APTOPTS mariadb-backup apt-get clean ;; +"postgresql") + apt-key adv --fetch-keys 'https://www.postgresql.org/media/keys/ACCC4CF8.asc' + add-apt-repository "deb [arch=amd64] http://apt.postgresql.org/pub/repos/apt/ $(lsb_release -cs)-pgdg main" + apt-get install $APTOPTS postgresql-client-12 + apt-get clean + ;; *) echo "datastore $1 not supported" exit 1 diff --git a/backup/main.py b/backup/main.py index c52becbfd9..a42dc4bad3 100644 --- a/backup/main.py +++ b/backup/main.py @@ -36,13 +36,14 @@ cli_opts = [ cfg.StrOpt( 'driver', default='innobackupex', - choices=['innobackupex', 'xtrabackup', 'mariabackup'] + choices=['innobackupex', 'mariabackup', 'pg_basebackup'] ), cfg.BoolOpt('backup'), cfg.StrOpt('backup-encryption-key'), cfg.StrOpt('db-user'), cfg.StrOpt('db-password'), cfg.StrOpt('db-host'), + cfg.StrOpt('db-datadir'), cfg.StrOpt('os-token'), cfg.StrOpt('os-auth-url'), cfg.StrOpt('os-tenant-id'), @@ -57,6 +58,7 @@ cli_opts = [ help='It is up to the storage driver to decide to validate the ' 'checksum or not. ' ), + cfg.StrOpt('pg-wal-archive-dir'), ] driver_mapping = { @@ -64,6 +66,8 @@ driver_mapping = { 'innobackupex_inc': 'backup.drivers.innobackupex.InnoBackupExIncremental', 'mariabackup': 'backup.drivers.mariabackup.MariaBackup', 'mariabackup_inc': 'backup.drivers.mariabackup.MariaBackupIncremental', + 'pg_basebackup': 'backup.drivers.postgres.PgBasebackup', + 'pg_basebackup_inc': 'backup.drivers.postgres.PgBasebackupIncremental', } storage_mapping = { 'swift': 'backup.storage.swift.SwiftStorage', @@ -72,6 +76,7 @@ storage_mapping = { def stream_backup_to_storage(runner_cls, storage): parent_metadata = {} + extra_params = {} if CONF.incremental: if not CONF.parent_location: @@ -88,8 +93,13 @@ def stream_backup_to_storage(runner_cls, storage): } ) + if CONF.pg_wal_archive_dir: + extra_params['wal_archive_dir'] = CONF.pg_wal_archive_dir + + extra_params.update(parent_metadata) + try: - with runner_cls(filename=CONF.backup_id, **parent_metadata) as bkup: + with runner_cls(filename=CONF.backup_id, **extra_params) as bkup: checksum, location = storage.save( bkup, metadata=CONF.swift_extra_metadata, @@ -103,13 +113,19 @@ def stream_backup_to_storage(runner_cls, storage): def stream_restore_from_storage(runner_cls, storage): - lsn = "" + params = { + 'storage': storage, + 'location': CONF.restore_from, + 'checksum': CONF.restore_checksum, + 'wal_archive_dir': CONF.pg_wal_archive_dir, + 'lsn': None + } + if storage.is_incremental_backup(CONF.restore_from): - lsn = storage.get_backup_lsn(CONF.restore_from) + params['lsn'] = storage.get_backup_lsn(CONF.restore_from) try: - runner = runner_cls(storage=storage, location=CONF.restore_from, - checksum=CONF.restore_checksum, lsn=lsn) + runner = runner_cls(**params) restore_size = runner.restore() LOG.info('Restore successfully, restore_size: %s', restore_size) except Exception as err: diff --git a/backup/requirements.txt b/backup/requirements.txt index 38358bd3c1..34b906147f 100644 --- a/backup/requirements.txt +++ b/backup/requirements.txt @@ -2,5 +2,7 @@ oslo.config!=4.3.0,!=4.4.0;python_version>='3.0' # Apache-2.0 oslo.log;python_version>='3.0' # Apache-2.0 oslo.utils!=3.39.1,!=3.40.0,!=3.40.1;python_version>='3.0' # Apache-2.0 oslo.concurrency;python_version>='3.0' # Apache-2.0 +oslo.service!=1.28.1 # Apache-2.0 keystoneauth1 # Apache-2.0 python-swiftclient # Apache-2.0 +psycopg2-binary>=2.6.2 # LGPL/ZPL diff --git a/backup/storage/swift.py b/backup/storage/swift.py index 3930e68a3e..8c60cb566c 100644 --- a/backup/storage/swift.py +++ b/backup/storage/swift.py @@ -185,7 +185,7 @@ class SwiftStorage(base.Storage): for key, value in metadata.items(): headers[_set_attr(key)] = value - LOG.debug('Metadata headers: %s', headers) + LOG.info('Metadata headers: %s', headers) if large_object: manifest_data = json.dumps(segment_results) LOG.info('Creating the SLO manifest file, manifest content: %s', @@ -212,8 +212,8 @@ class SwiftStorage(base.Storage): headers=headers) # Delete the old segment file that was copied - LOG.debug('Deleting the old segment file %s.', - stream_reader.first_segment) + LOG.info('Deleting the old segment file %s.', + stream_reader.first_segment) self.client.delete_object(container, stream_reader.first_segment) @@ -288,7 +288,7 @@ class SwiftStorage(base.Storage): return False def get_backup_lsn(self, location): - """Get the backup LSN.""" + """Get the backup LSN if exists.""" _, container, filename = self._explodeLocation(location) headers = self.client.head_object(container, filename) return headers.get('x-object-meta-lsn') diff --git a/backup/utils/__init__.py b/backup/utils/__init__.py new file mode 100644 index 0000000000..6c942335cb --- /dev/null +++ b/backup/utils/__init__.py @@ -0,0 +1,46 @@ +# Copyright 2020 Catalyst Cloud +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from oslo_service import loopingcall + + +def build_polling_task(retriever, condition=lambda value: value, + sleep_time=1, time_out=0, initial_delay=0): + """Run a function in a loop with backoff on error. + + The condition function runs based on the retriever function result. + """ + + def poll_and_check(): + obj = retriever() + if condition(obj): + raise loopingcall.LoopingCallDone(retvalue=obj) + + call = loopingcall.BackOffLoopingCall(f=poll_and_check) + return call.start(initial_delay=initial_delay, + starting_interval=sleep_time, + max_interval=30, timeout=time_out) + + +def poll_until(retriever, condition=lambda value: value, + sleep_time=3, time_out=0, initial_delay=0): + """Retrieves object until it passes condition, then returns it. + + If time_out_limit is passed in, PollTimeOut will be raised once that + amount of time is eclipsed. + + """ + task = build_polling_task(retriever, condition=condition, + sleep_time=sleep_time, time_out=time_out, + initial_delay=initial_delay) + return task.wait() diff --git a/backup/utils/postgresql.py b/backup/utils/postgresql.py new file mode 100644 index 0000000000..033652f068 --- /dev/null +++ b/backup/utils/postgresql.py @@ -0,0 +1,53 @@ +# Copyright 2020 Catalyst Cloud +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import psycopg2 + + +class PostgresConnection(object): + def __init__(self, user, password='', host='localhost', port=5432): + self.user = user + self.password = password + self.host = host + self.port = port + + self.connect_str = (f"user='{self.user}' password='{self.password}' " + f"host='{self.host}' port='{self.port}'") + + def __enter__(self, autocommit=False): + self.conn = psycopg2.connect(self.connect_str) + self.conn.autocommit = autocommit + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.conn.close() + + def execute(self, statement, identifiers=None, data_values=None): + """Execute a non-returning statement.""" + self._execute_stmt(statement, identifiers, data_values, False) + + def query(self, query, identifiers=None, data_values=None): + """Execute a query and return the result set.""" + return self._execute_stmt(query, identifiers, data_values, True) + + def _execute_stmt(self, statement, identifiers, data_values, fetch): + cmd = self._bind(statement, identifiers) + with self.conn.cursor() as cursor: + cursor.execute(cmd, data_values) + if fetch: + return cursor.fetchall() + + def _bind(self, statement, identifiers): + if identifiers: + return statement.format(*identifiers) + return statement diff --git a/trove/common/cfg.py b/trove/common/cfg.py index 03f3caa220..46d0b2fc40 100644 --- a/trove/common/cfg.py +++ b/trove/common/cfg.py @@ -1056,6 +1056,11 @@ postgresql_opts = [ 'docker_image', default='postgres', help='Database docker image.' ), + cfg.StrOpt( + 'backup_docker_image', + default='openstacktrove/db-backup-postgresql:1.0.0', + help='The docker image used for backup and restore.' + ), cfg.BoolOpt('icmp', default=False, help='Whether to permit ICMP.', deprecated_for_removal=True), @@ -1069,7 +1074,7 @@ postgresql_opts = [ 'if trove_security_groups_support is True).'), cfg.PortOpt('postgresql_port', default=5432, help='The TCP port the server listens on.'), - cfg.StrOpt('backup_strategy', default='PgBaseBackup', + cfg.StrOpt('backup_strategy', default='pg_basebackup', help='Default strategy to perform backups.'), cfg.StrOpt('replication_strategy', default='PostgresqlReplicationStreaming', diff --git a/trove/guestagent/common/operating_system.py b/trove/guestagent/common/operating_system.py index 93eab9b24e..433ed603ed 100644 --- a/trove/guestagent/common/operating_system.py +++ b/trove/guestagent/common/operating_system.py @@ -480,7 +480,7 @@ def service_discovery(service_candidates): return result -def _execute_shell_cmd(cmd, options, *args, **kwargs): +def execute_shell_cmd(cmd, options, *args, **kwargs): """Execute a given shell command passing it given options (flags) and arguments. @@ -519,7 +519,7 @@ def ensure_directory(dir_path, user=None, group=None, force=True, **kwargs): """Create a given directory and update its ownership (recursively) to the given user and group if any. - seealso:: _execute_shell_cmd for valid optional keyword arguments. + seealso:: execute_shell_cmd for valid optional keyword arguments. :param dir_path: Path to the created directory. :type dir_path: string @@ -549,7 +549,7 @@ def ensure_directory(dir_path, user=None, group=None, force=True, **kwargs): def chown(path, user, group, recursive=True, force=False, **kwargs): """Changes the owner and group of a given file. - seealso:: _execute_shell_cmd for valid optional keyword arguments. + seealso:: execute_shell_cmd for valid optional keyword arguments. :param path: Path to the modified file. :type path: string @@ -579,7 +579,7 @@ def chown(path, user, group, recursive=True, force=False, **kwargs): owner_group_modifier = _build_user_group_pair(user, group) options = (('f', force), ('R', recursive)) - _execute_shell_cmd('chown', options, owner_group_modifier, path, **kwargs) + execute_shell_cmd('chown', options, owner_group_modifier, path, **kwargs) def _build_user_group_pair(user, group): @@ -599,14 +599,14 @@ def _create_directory(dir_path, force=True, **kwargs): """ options = (('p', force),) - _execute_shell_cmd('mkdir', options, dir_path, **kwargs) + execute_shell_cmd('mkdir', options, dir_path, **kwargs) def chmod(path, mode, recursive=True, force=False, **kwargs): """Changes the mode of a given file. :seealso: Modes for more information on the representation of modes. - :seealso: _execute_shell_cmd for valid optional keyword arguments. + :seealso: execute_shell_cmd for valid optional keyword arguments. :param path: Path to the modified file. :type path: string @@ -629,7 +629,7 @@ def chmod(path, mode, recursive=True, force=False, **kwargs): if path: options = (('f', force), ('R', recursive)) shell_modes = _build_shell_chmod_mode(mode) - _execute_shell_cmd('chmod', options, shell_modes, path, **kwargs) + execute_shell_cmd('chmod', options, shell_modes, path, **kwargs) else: raise exception.UnprocessableEntity( _("Cannot change mode of a blank file.")) @@ -639,7 +639,7 @@ def change_user_group(user, group, append=True, add_group=True, **kwargs): """Adds a user to groups by using the usermod linux command with -a and -G options. - seealso:: _execute_shell_cmd for valid optional keyword arguments. + seealso:: execute_shell_cmd for valid optional keyword arguments. :param user: Username. :type user: string @@ -668,7 +668,7 @@ def change_user_group(user, group, append=True, add_group=True, **kwargs): raise exception.UnprocessableEntity(_("Missing group.")) options = (('a', append), ('G', add_group)) - _execute_shell_cmd('usermod', options, group, user, **kwargs) + execute_shell_cmd('usermod', options, group, user, **kwargs) def _build_shell_chmod_mode(mode): @@ -704,7 +704,7 @@ def _build_shell_chmod_mode(mode): def remove(path, force=False, recursive=True, **kwargs): """Remove a given file or directory. - :seealso: _execute_shell_cmd for valid optional keyword arguments. + :seealso: execute_shell_cmd for valid optional keyword arguments. :param path: Path to the removed file. :type path: string @@ -720,7 +720,7 @@ def remove(path, force=False, recursive=True, **kwargs): if path: options = (('f', force), ('R', recursive)) - _execute_shell_cmd('rm', options, path, **kwargs) + execute_shell_cmd('rm', options, path, **kwargs) else: raise exception.UnprocessableEntity(_("Cannot remove a blank file.")) @@ -730,7 +730,7 @@ def move(source, destination, force=False, **kwargs): Move attempts to preserve the original ownership, permissions and timestamps. - :seealso: _execute_shell_cmd for valid optional keyword arguments. + :seealso: execute_shell_cmd for valid optional keyword arguments. :param source: Path to the source location. :type source: string @@ -751,7 +751,7 @@ def move(source, destination, force=False, **kwargs): raise exception.UnprocessableEntity(_("Missing destination path.")) options = (('f', force),) - _execute_shell_cmd('mv', options, source, destination, **kwargs) + execute_shell_cmd('mv', options, source, destination, **kwargs) def copy(source, destination, force=False, preserve=False, recursive=True, @@ -761,7 +761,7 @@ def copy(source, destination, force=False, preserve=False, recursive=True, Copy does NOT attempt to preserve ownership, permissions and timestamps unless the 'preserve' option is enabled. - :seealso: _execute_shell_cmd for valid optional keyword arguments. + :seealso: execute_shell_cmd for valid optional keyword arguments. :param source: Path to the source location. :type source: string @@ -793,7 +793,7 @@ def copy(source, destination, force=False, preserve=False, recursive=True, options = (('f', force), ('p', preserve), ('R', recursive), ('L', dereference)) - _execute_shell_cmd('cp', options, source, destination, **kwargs) + execute_shell_cmd('cp', options, source, destination, **kwargs) def get_bytes_free_on_fs(path): @@ -830,7 +830,7 @@ def list_files_in_directory(root_dir, recursive=False, pattern=None, if pattern: cmd_args.extend(['-regextype', 'posix-extended', '-regex', os.path.join('.*', pattern) + '$']) - files = _execute_shell_cmd('find', [], *cmd_args, as_root=True) + files = execute_shell_cmd('find', [], *cmd_args, as_root=True) return {fp for fp in files.splitlines()} return {os.path.abspath(os.path.join(root, name)) @@ -851,7 +851,7 @@ def _build_command_options(options): def get_device(path, as_root=False): """Get the device that a given path exists on.""" - stdout = _execute_shell_cmd('df', [], path, as_root=as_root) + stdout = execute_shell_cmd('df', [], path, as_root=as_root) return stdout.splitlines()[1].split()[0] @@ -879,8 +879,8 @@ def create_user(user_name, user_id, group_name=None, group_id=None): group_id = group_id or user_id try: - _execute_shell_cmd('groupadd', [], '--gid', group_id, group_name, - as_root=True) + execute_shell_cmd('groupadd', [], '--gid', group_id, group_name, + as_root=True) except exception.ProcessExecutionError as err: if 'already exists' not in err.stderr: raise exception.UnprocessableEntity( @@ -888,8 +888,8 @@ def create_user(user_name, user_id, group_name=None, group_id=None): ) try: - _execute_shell_cmd('useradd', [], '--uid', user_id, '--gid', group_id, - '-M', user_name, as_root=True) + execute_shell_cmd('useradd', [], '--uid', user_id, '--gid', group_id, + '-M', user_name, as_root=True) except exception.ProcessExecutionError as err: if 'already exists' not in err.stderr: raise exception.UnprocessableEntity( @@ -903,4 +903,4 @@ def remove_dir_contents(folder): Use shell=True here because shell=False doesn't support '*' """ path = os.path.join(folder, '*') - _execute_shell_cmd(f'rm -rf {path}', [], shell=True, as_root=True) + execute_shell_cmd(f'rm -rf {path}', [], shell=True, as_root=True) diff --git a/trove/guestagent/datastore/manager.py b/trove/guestagent/datastore/manager.py index b6d57a0545..60020f72af 100644 --- a/trove/guestagent/datastore/manager.py +++ b/trove/guestagent/datastore/manager.py @@ -303,6 +303,9 @@ class Manager(periodic_task.PeriodicTasks): LOG.info('No post_prepare work has been defined.') pass + def stop_db(self, context): + self.app.stop_db() + def restart(self, context): self.app.restart() @@ -736,12 +739,20 @@ class Manager(periodic_task.PeriodicTasks): :param backup_info: a dictionary containing the db instance id of the backup task, location, type, and other data. """ - with EndNotification(context): - self.app.create_backup(context, backup_info) + pass def perform_restore(self, context, restore_location, backup_info): - raise exception.DatastoreOperationNotSupported( - operation='_perform_restore', datastore=self.manager) + LOG.info("Starting to restore database from backup %s, " + "backup_info: %s", backup_info['id'], backup_info) + + try: + self.app.restore_backup(context, backup_info, restore_location) + except Exception: + LOG.error("Failed to restore from backup %s.", backup_info['id']) + self.status.set_status(service_status.ServiceStatuses.FAILED) + raise + + LOG.info("Finished restore data from backup %s", backup_info['id']) ################ # Database and user management diff --git a/trove/guestagent/datastore/mysql_common/manager.py b/trove/guestagent/datastore/mysql_common/manager.py index 832be1655b..935894082e 100644 --- a/trove/guestagent/datastore/mysql_common/manager.py +++ b/trove/guestagent/datastore/mysql_common/manager.py @@ -23,6 +23,7 @@ from trove.common import cfg from trove.common import configurations from trove.common import exception from trove.common import utils +from trove.common.notification import EndNotification from trove.guestagent import guest_log from trove.guestagent.common import operating_system from trove.guestagent.datastore import manager @@ -119,12 +120,25 @@ class MySqlManager(manager.Manager): # This instance is a replication slave self.attach_replica(context, snapshot, snapshot['config']) - def stop_db(self, context): - self.app.stop_db() - def start_db_with_conf_changes(self, context, config_contents, ds_version): self.app.start_db_with_conf_changes(config_contents, ds_version) + def create_backup(self, context, backup_info): + """Create backup for the database. + + :param context: User context object. + :param backup_info: a dictionary containing the db instance id of the + backup task, location, type, and other data. + """ + LOG.info(f"Creating backup {backup_info['id']}") + with EndNotification(context): + volumes_mapping = { + '/var/lib/mysql': {'bind': '/var/lib/mysql', 'mode': 'rw'} + } + self.app.create_backup(context, backup_info, + volumes_mapping=volumes_mapping, + need_dbuser=True) + def get_datastore_log_defs(self): owner = cfg.get_configuration_property('database_service_uid') datastore_dir = self.app.get_data_dir() @@ -189,19 +203,6 @@ class MySqlManager(manager.Manager): LOG.info("Applying overrides (%s).", overrides) self.app.apply_overrides(overrides) - def perform_restore(self, context, restore_location, backup_info): - LOG.info("Starting to restore database from backup %s, " - "backup_info: %s", backup_info['id'], backup_info) - - try: - self.app.restore_backup(context, backup_info, restore_location) - except Exception: - LOG.error("Failed to restore from backup %s.", backup_info['id']) - self.status.set_status(service_status.ServiceStatuses.FAILED) - raise - - LOG.info("Finished restore data from backup %s", backup_info['id']) - def reset_password_for_restore(self, ds_version=None, data_dir='/var/lib/mysql/data'): """Reset the root password after restore the db data. diff --git a/trove/guestagent/datastore/mysql_common/service.py b/trove/guestagent/datastore/mysql_common/service.py index d9b569ecef..67b2145405 100644 --- a/trove/guestagent/datastore/mysql_common/service.py +++ b/trove/guestagent/datastore/mysql_common/service.py @@ -17,21 +17,18 @@ import re from oslo_log import log as logging from oslo_utils import encodeutils -from oslo_utils import timeutils import six from six.moves import urllib import sqlalchemy from sqlalchemy import exc from sqlalchemy.sql.expression import text -from trove.backup.state import BackupState from trove.common import cfg from trove.common import exception from trove.common import utils from trove.common.configurations import MySQLConfParser from trove.common.db.mysql import models from trove.common.i18n import _ -from trove.conductor import api as conductor_api from trove.guestagent.common import guestagent_utils from trove.guestagent.common import operating_system from trove.guestagent.common import sql_query @@ -663,107 +660,6 @@ class BaseMySqlApp(service.BaseDbApp): LOG.info("Finished restarting mysql") - def create_backup(self, context, backup_info): - storage_driver = CONF.storage_strategy - backup_driver = cfg.get_configuration_property('backup_strategy') - incremental = '' - backup_type = 'full' - if backup_info.get('parent'): - incremental = ( - f'--incremental ' - f'--parent-location={backup_info["parent"]["location"]} ' - f'--parent-checksum={backup_info["parent"]["checksum"]}') - backup_type = 'incremental' - - backup_id = backup_info["id"] - image = cfg.get_configuration_property('backup_docker_image') - name = 'db_backup' - volumes = {'/var/lib/mysql': {'bind': '/var/lib/mysql', 'mode': 'rw'}} - admin_pass = self.get_auth_password() - user_token = context.auth_token - auth_url = CONF.service_credentials.auth_url - user_tenant = context.project_id - - swift_metadata = ( - f'datastore:{backup_info["datastore"]},' - f'datastore_version:{backup_info["datastore_version"]}' - ) - swift_container = backup_info.get('swift_container', - CONF.backup_swift_container) - swift_params = (f'--swift-extra-metadata={swift_metadata} ' - f'--swift-container {swift_container}') - - command = ( - f'/usr/bin/python3 main.py --backup --backup-id={backup_id} ' - f'--storage-driver={storage_driver} --driver={backup_driver} ' - f'--db-user=os_admin --db-password={admin_pass} ' - f'--db-host=127.0.0.1 ' - f'--os-token={user_token} --os-auth-url={auth_url} ' - f'--os-tenant-id={user_tenant} ' - f'{swift_params} ' - f'{incremental}' - ) - - # Update backup status in db - conductor = conductor_api.API(context) - mount_point = CONF.get(CONF.datastore_manager).mount_point - stats = guestagent_utils.get_filesystem_volume_stats(mount_point) - backup_state = { - 'backup_id': backup_id, - 'size': stats.get('used', 0.0), - 'state': BackupState.BUILDING, - 'backup_type': backup_type - } - conductor.update_backup(CONF.guest_id, - sent=timeutils.utcnow_ts(microsecond=True), - **backup_state) - LOG.debug("Updated state for %s to %s.", backup_id, backup_state) - - # Start to run backup inside a separate docker container - try: - LOG.info('Starting to create backup %s, command: %s', backup_id, - command) - output, ret = docker_util.run_container( - self.docker_client, image, name, - volumes=volumes, command=command) - result = output[-1] - if not ret: - msg = f'Failed to run backup container, error: {result}' - LOG.error(msg) - raise Exception(msg) - - backup_result = BACKUP_LOG.match(result) - if backup_result: - backup_state.update({ - 'checksum': backup_result.group('checksum'), - 'location': backup_result.group('location'), - 'success': True, - 'state': BackupState.COMPLETED, - }) - else: - LOG.error(f'Cannot parse backup output: {result}') - backup_state.update({ - 'success': False, - 'state': BackupState.FAILED, - }) - except Exception as err: - LOG.error("Failed to create backup %s", backup_id) - backup_state.update({ - 'success': False, - 'state': BackupState.FAILED, - }) - raise exception.TroveError( - "Failed to create backup %s, error: %s" % - (backup_id, str(err)) - ) - finally: - LOG.info("Completed backup %s.", backup_id) - conductor.update_backup(CONF.guest_id, - sent=timeutils.utcnow_ts( - microsecond=True), - **backup_state) - LOG.debug("Updated state for %s to %s.", backup_id, backup_state) - def restore_backup(self, context, backup_info, restore_location): backup_id = backup_info['id'] storage_driver = CONF.storage_strategy diff --git a/trove/guestagent/datastore/postgres/manager.py b/trove/guestagent/datastore/postgres/manager.py index 1226b785ae..0169deeb1d 100644 --- a/trove/guestagent/datastore/postgres/manager.py +++ b/trove/guestagent/datastore/postgres/manager.py @@ -16,10 +16,11 @@ import os from oslo_log import log as logging from trove.common import cfg -from trove.guestagent.common import operating_system -from trove.guestagent.datastore.postgres import service -from trove.guestagent.datastore import manager +from trove.common.notification import EndNotification from trove.guestagent import guest_log +from trove.guestagent.common import operating_system +from trove.guestagent.datastore import manager +from trove.guestagent.datastore.postgres import service LOG = logging.getLogger(__name__) CONF = cfg.CONF @@ -45,17 +46,26 @@ class PostgresManager(manager.Manager): user=CONF.database_service_uid, group=CONF.database_service_uid, as_root=True) + operating_system.ensure_directory(service.WAL_ARCHIVE_DIR, + user=CONF.database_service_uid, + group=CONF.database_service_uid, + as_root=True) LOG.info('Preparing database config files') self.app.configuration_manager.save_configuration(config_contents) self.app.set_data_dir(self.app.datadir) self.app.update_overrides(overrides) - # # Restore data from backup and reset root password - # if backup_info: - # self.perform_restore(context, data_dir, backup_info) - # self.reset_password_for_restore(ds_version=ds_version, - # data_dir=data_dir) + # Restore data from backup and reset root password + if backup_info: + self.perform_restore(context, self.app.datadir, backup_info) + + signal_file = f"{self.app.datadir}/recovery.signal" + operating_system.execute_shell_cmd( + f"touch {signal_file}", [], shell=True, as_root=True) + operating_system.chown(signal_file, CONF.database_service_uid, + CONF.database_service_uid, force=True, + as_root=True) # config_file can only be set on the postgres command line command = f"postgres -c config_file={service.CONFIG_FILE}" @@ -101,3 +111,26 @@ class PostgresManager(manager.Manager): def is_log_enabled(self, logname): return self.configuration_manager.get_value('logging_collector', False) + + def create_backup(self, context, backup_info): + """Create backup for the database. + + :param context: User context object. + :param backup_info: a dictionary containing the db instance id of the + backup task, location, type, and other data. + """ + LOG.info(f"Creating backup {backup_info['id']}") + with EndNotification(context): + volumes_mapping = { + '/var/lib/postgresql/data': { + 'bind': '/var/lib/postgresql/data', 'mode': 'rw' + }, + "/var/run/postgresql": {"bind": "/var/run/postgresql", + "mode": "ro"}, + } + extra_params = f"--pg-wal-archive-dir {service.WAL_ARCHIVE_DIR}" + + self.app.create_backup(context, backup_info, + volumes_mapping=volumes_mapping, + need_dbuser=False, + extra_params=extra_params) diff --git a/trove/guestagent/datastore/postgres/query.py b/trove/guestagent/datastore/postgres/query.py index 96f3bc441c..8634dc974e 100644 --- a/trove/guestagent/datastore/postgres/query.py +++ b/trove/guestagent/datastore/postgres/query.py @@ -138,7 +138,7 @@ class UserQuery(object): @classmethod def drop(cls, name): """Query to drop a user.""" - return f'DROP USER "{name}"' + return f'DROP USER IF EXISTS "{name}"' class AccessQuery(object): diff --git a/trove/guestagent/datastore/postgres/service.py b/trove/guestagent/datastore/postgres/service.py index 589d7e1803..f7bb5db39d 100644 --- a/trove/guestagent/datastore/postgres/service.py +++ b/trove/guestagent/datastore/postgres/service.py @@ -39,6 +39,8 @@ CNF_EXT = 'conf' # The same with include_dir config option CNF_INCLUDE_DIR = '/etc/postgresql/conf.d' HBA_CONFIG_FILE = '/etc/postgresql/pg_hba.conf' +# The same with the path in archive_command config option. +WAL_ARCHIVE_DIR = '/var/lib/postgresql/data/wal_archive' class PgSqlAppStatus(service.BaseDbStatus): @@ -113,6 +115,8 @@ class PgSqlApp(service.BaseDbApp): admin_password = utils.generate_random_password() os_admin = models.PostgreSQLUser(ADMIN_USER_NAME, admin_password) + # Drop os_admin user if exists, this is needed for restore. + PgSqlAdmin(SUPER_USER_NAME).delete_user({'_name': ADMIN_USER_NAME}) PgSqlAdmin(SUPER_USER_NAME).create_admin_user(os_admin, encrypt_password=True) self.save_password(ADMIN_USER_NAME, admin_password) @@ -176,9 +180,9 @@ class PgSqlApp(service.BaseDbApp): command = command if command else '' try: - root_pass = self.get_auth_password(file="root.cnf") + postgres_pass = self.get_auth_password(file="postgres.cnf") except exception.UnprocessableEntity: - root_pass = utils.generate_random_password() + postgres_pass = utils.generate_random_password() # Get uid and gid user = "%s:%s" % (CONF.database_service_uid, CONF.database_service_uid) @@ -211,7 +215,7 @@ class PgSqlApp(service.BaseDbApp): network_mode="host", user=user, environment={ - "POSTGRES_PASSWORD": root_pass, + "POSTGRES_PASSWORD": postgres_pass, "PGDATA": self.datadir, }, command=command @@ -219,7 +223,7 @@ class PgSqlApp(service.BaseDbApp): # Save root password LOG.debug("Saving root credentials to local host.") - self.save_password('postgres', root_pass) + self.save_password('postgres', postgres_pass) except Exception: LOG.exception("Failed to start database service") raise exception.TroveError("Failed to start database service") @@ -254,6 +258,55 @@ class PgSqlApp(service.BaseDbApp): LOG.info("Finished restarting database") + def restore_backup(self, context, backup_info, restore_location): + backup_id = backup_info['id'] + storage_driver = CONF.storage_strategy + backup_driver = cfg.get_configuration_property('backup_strategy') + image = cfg.get_configuration_property('backup_docker_image') + name = 'db_restore' + volumes = { + '/var/lib/postgresql/data': { + 'bind': '/var/lib/postgresql/data', + 'mode': 'rw' + } + } + + os_cred = (f"--os-token={context.auth_token} " + f"--os-auth-url={CONF.service_credentials.auth_url} " + f"--os-tenant-id={context.project_id}") + + command = ( + f'/usr/bin/python3 main.py --nobackup ' + f'--storage-driver={storage_driver} --driver={backup_driver} ' + f'{os_cred} ' + f'--restore-from={backup_info["location"]} ' + f'--restore-checksum={backup_info["checksum"]} ' + f'--pg-wal-archive-dir {WAL_ARCHIVE_DIR}' + ) + + LOG.debug('Stop the database and clean up the data before restore ' + 'from %s', backup_id) + self.stop_db() + for dir in [WAL_ARCHIVE_DIR, self.datadir]: + operating_system.remove_dir_contents(dir) + + # Start to run restore inside a separate docker container + LOG.info('Starting to restore backup %s, command: %s', backup_id, + command) + output, ret = docker_util.run_container( + self.docker_client, image, name, + volumes=volumes, command=command) + result = output[-1] + if not ret: + msg = f'Failed to run restore container, error: {result}' + LOG.error(msg) + raise Exception(msg) + + for dir in [WAL_ARCHIVE_DIR, self.datadir]: + operating_system.chown(dir, CONF.database_service_uid, + CONF.database_service_uid, force=True, + as_root=True) + class PgSqlAdmin(object): # Default set of options of an administrative account. @@ -352,10 +405,7 @@ class PgSqlAdmin(object): Return a list of serialized Postgres databases. """ user = self._find_user(username) - if user is not None: - return user.databases - - raise exception.UserNotFound(username) + return user.databases if user is not None else [] def create_databases(self, databases): """Create the list of specified databases. diff --git a/trove/guestagent/datastore/service.py b/trove/guestagent/datastore/service.py index 38f4c2fa3b..8f4c6bd6f7 100644 --- a/trove/guestagent/datastore/service.py +++ b/trove/guestagent/datastore/service.py @@ -13,16 +13,18 @@ # License for the specific language governing permissions and limitations # under the License. import os +import re import time from oslo_log import log as logging from oslo_utils import timeutils +from trove.backup.state import BackupState from trove.common import cfg from trove.common import context as trove_context from trove.common import exception -from trove.common.i18n import _ from trove.common import stream_codecs +from trove.common.i18n import _ from trove.conductor import api as conductor_api from trove.guestagent.common import guestagent_utils from trove.guestagent.common import operating_system @@ -31,6 +33,8 @@ from trove.instance import service_status LOG = logging.getLogger(__name__) CONF = cfg.CONF +BACKUP_LOG_RE = re.compile(r'.*Backup successfully, checksum: ' + r'(?P.*), location: (?P.*)') class BaseDbStatus(object): @@ -401,3 +405,109 @@ class BaseDbApp(object): self.reset_configuration(config_contents) self.start_db(update_db=True, ds_version=ds_version) + + def create_backup(self, context, backup_info, volumes_mapping={}, + need_dbuser=True, extra_params=''): + storage_driver = CONF.storage_strategy + backup_driver = cfg.get_configuration_property('backup_strategy') + incremental = '' + backup_type = 'full' + if backup_info.get('parent'): + incremental = ( + f'--incremental ' + f'--parent-location={backup_info["parent"]["location"]} ' + f'--parent-checksum={backup_info["parent"]["checksum"]}') + backup_type = 'incremental' + + backup_id = backup_info["id"] + image = cfg.get_configuration_property('backup_docker_image') + name = 'db_backup' + + os_cred = (f"--os-token={context.auth_token} " + f"--os-auth-url={CONF.service_credentials.auth_url} " + f"--os-tenant-id={context.project_id}") + + db_userinfo = '' + if need_dbuser: + admin_pass = self.get_auth_password() + db_userinfo = (f"--db-host=127.0.0.1 --db-user=os_admin " + f"--db-password={admin_pass}") + + swift_metadata = ( + f'datastore:{backup_info["datastore"]},' + f'datastore_version:{backup_info["datastore_version"]}' + ) + swift_container = (backup_info.get('swift_container') or + CONF.backup_swift_container) + swift_params = (f'--swift-extra-metadata={swift_metadata} ' + f'--swift-container {swift_container}') + + command = ( + f'/usr/bin/python3 main.py --backup --backup-id={backup_id} ' + f'--storage-driver={storage_driver} --driver={backup_driver} ' + f'{os_cred} ' + f'{db_userinfo} ' + f'{swift_params} ' + f'{incremental} ' + f'{extra_params} ' + ) + + # Update backup status in db + conductor = conductor_api.API(context) + mount_point = cfg.get_configuration_property('mount_point') + stats = guestagent_utils.get_filesystem_volume_stats(mount_point) + backup_state = { + 'backup_id': backup_id, + 'size': stats.get('used', 0.0), + 'state': BackupState.BUILDING, + 'backup_type': backup_type + } + conductor.update_backup(CONF.guest_id, + sent=timeutils.utcnow_ts(microsecond=True), + **backup_state) + LOG.debug(f"Updated state for backup {backup_id} to {backup_state}") + + # Start to run backup inside a separate docker container + try: + LOG.info(f'Starting to create backup {backup_id}, ' + f'command: {command}') + output, ret = docker_util.run_container( + self.docker_client, image, name, + volumes=volumes_mapping, command=command) + result = output[-1] + if not ret: + msg = f'Failed to run backup container, error: {result}' + LOG.error(msg) + raise Exception(msg) + + backup_result = BACKUP_LOG_RE.match(result) + if backup_result: + backup_state.update({ + 'checksum': backup_result.group('checksum'), + 'location': backup_result.group('location'), + 'success': True, + 'state': BackupState.COMPLETED, + }) + else: + LOG.error(f'Cannot parse backup output: {result}') + backup_state.update({ + 'success': False, + 'state': BackupState.FAILED, + }) + except Exception as err: + LOG.error("Failed to create backup %s", backup_id) + backup_state.update({ + 'success': False, + 'state': BackupState.FAILED, + }) + raise exception.TroveError( + "Failed to create backup %s, error: %s" % + (backup_id, str(err)) + ) + finally: + LOG.info("Completed backup %s.", backup_id) + conductor.update_backup( + CONF.guest_id, + sent=timeutils.utcnow_ts(microsecond=True), + **backup_state) + LOG.debug("Updated state for %s to %s.", backup_id, backup_state) diff --git a/trove/templates/postgresql/config.template b/trove/templates/postgresql/config.template index cf23ebedbe..0d374b489e 100644 --- a/trove/templates/postgresql/config.template +++ b/trove/templates/postgresql/config.template @@ -240,10 +240,11 @@ min_wal_size = 80MB archive_mode = on # enables archiving; off, on, or always # (change requires restart) # (Trove default) -#archive_command = '' # command to use to archive a logfile segment +archive_command = 'test ! -f /var/lib/postgresql/data/wal_archive/%f && cp %p /var/lib/postgresql/data/wal_archive/%f' # command to use to archive a logfile segment # placeholders: %p = path of file to archive # %f = file name only # e.g. 'test ! -f /mnt/server/archivedir/%f && cp %p /mnt/server/archivedir/%f' + # (Trove default) #archive_timeout = 0 # force a logfile segment switch after this # number of seconds; 0 disables @@ -251,11 +252,12 @@ archive_mode = on # enables archiving; off, on, or always # These are only used in recovery mode. -#restore_command = '' # command to use to restore an archived logfile segment +restore_command = 'cp /var/lib/postgresql/data/wal_archive/%f "%p"' # command to use to restore an archived logfile segment # placeholders: %p = path of file to restore # %f = file name only # e.g. 'cp /mnt/server/archivedir/%f %p' # (change requires restart) + # (Trove default) #archive_cleanup_command = '' # command to execute at every restartpoint #recovery_end_command = '' # command to execute at completion of recovery @@ -294,7 +296,8 @@ archive_mode = on # enables archiving; off, on, or always #max_wal_senders = 10 # max number of walsender processes # (change requires restart) -#wal_keep_segments = 0 # in logfile segments; 0 disables +wal_keep_segments = 5 # in logfile segments; 0 disables + # (Trove default) #wal_sender_timeout = 60s # in milliseconds; 0 disables #max_replication_slots = 10 # max number of replication slots