Postgresql: Backup and restore

Change-Id: Icf08b7dc82ce501d82b45cf5412256a43716b6ae
This commit is contained in:
Lingxian Kong 2020-09-02 10:10:23 +12:00
parent 768ec34dfe
commit 4fb41b5198
22 changed files with 673 additions and 191 deletions

View File

@ -4,8 +4,9 @@ LABEL maintainer="anlin.kong@gmail.com"
ARG DATASTORE="mysql" ARG DATASTORE="mysql"
ARG APTOPTS="-y -qq --no-install-recommends --allow-unauthenticated" ARG APTOPTS="-y -qq --no-install-recommends --allow-unauthenticated"
ARG PERCONA_XTRABACKUP_VERSION=24 ARG PERCONA_XTRABACKUP_VERSION=24
ENV DEBIAN_FRONTEND noninteractive \
APT_KEY_DONT_WARN_ON_DANGEROUS_USAGE=1 RUN export DEBIAN_FRONTEND="noninteractive" \
&& export APT_KEY_DONT_WARN_ON_DANGEROUS_USAGE=1
RUN apt-get update \ RUN apt-get update \
&& apt-get install $APTOPTS gnupg2 lsb-release apt-utils apt-transport-https ca-certificates software-properties-common curl \ && apt-get install $APTOPTS gnupg2 lsb-release apt-utils apt-transport-https ca-certificates software-properties-common curl \

View File

@ -27,12 +27,11 @@ class BaseRunner(object):
"""Base class for Backup Strategy implementations.""" """Base class for Backup Strategy implementations."""
# Subclass should provide the commands. # Subclass should provide the commands.
cmd = None cmd = ''
restore_cmd = None restore_cmd = ''
prepare_cmd = None prepare_cmd = ''
encrypt_key = CONF.backup_encryption_key encrypt_key = CONF.backup_encryption_key
default_data_dir = '/var/lib/mysql/data'
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.process = None self.process = None
@ -43,8 +42,9 @@ class BaseRunner(object):
self.checksum = kwargs.pop('checksum', '') self.checksum = kwargs.pop('checksum', '')
if 'restore_location' not in kwargs: if 'restore_location' not in kwargs:
kwargs['restore_location'] = self.default_data_dir kwargs['restore_location'] = self.datadir
self.restore_location = kwargs['restore_location'] self.restore_location = kwargs['restore_location']
self.restore_content_length = 0
self.command = self.cmd % kwargs self.command = self.cmd % kwargs
self.restore_command = (self.decrypt_cmd + self.restore_command = (self.decrypt_cmd +

View File

@ -102,7 +102,6 @@ class InnoBackupExIncremental(InnoBackupEx):
raise AttributeError('lsn attribute missing') raise AttributeError('lsn attribute missing')
self.parent_location = kwargs.pop('parent_location', '') self.parent_location = kwargs.pop('parent_location', '')
self.parent_checksum = kwargs.pop('parent_checksum', '') self.parent_checksum = kwargs.pop('parent_checksum', '')
self.restore_content_length = 0
super(InnoBackupExIncremental, self).__init__(*args, **kwargs) super(InnoBackupExIncremental, self).__init__(*args, **kwargs)

View File

@ -56,7 +56,6 @@ class MariaBackupIncremental(MariaBackup):
raise AttributeError('lsn attribute missing') raise AttributeError('lsn attribute missing')
self.parent_location = kwargs.pop('parent_location', '') self.parent_location = kwargs.pop('parent_location', '')
self.parent_checksum = kwargs.pop('parent_checksum', '') self.parent_checksum = kwargs.pop('parent_checksum', '')
self.restore_content_length = 0
super(MariaBackupIncremental, self).__init__(*args, **kwargs) super(MariaBackupIncremental, self).__init__(*args, **kwargs)

View File

@ -27,6 +27,8 @@ LOG = logging.getLogger(__name__)
class MySQLBaseRunner(base.BaseRunner): class MySQLBaseRunner(base.BaseRunner):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.datadir = kwargs.pop('db_datadir', '/var/lib/mysql/data')
super(MySQLBaseRunner, self).__init__(*args, **kwargs) super(MySQLBaseRunner, self).__init__(*args, **kwargs)
@property @property
@ -113,8 +115,8 @@ class MySQLBaseRunner(base.BaseRunner):
incremental_dir = None incremental_dir = None
if 'parent_location' in metadata: if 'parent_location' in metadata:
LOG.info("Restoring parent: %(parent_location)s" LOG.info("Restoring parent: %(parent_location)s, "
" checksum: %(parent_checksum)s.", metadata) "checksum: %(parent_checksum)s.", metadata)
parent_location = metadata['parent_location'] parent_location = metadata['parent_location']
parent_checksum = metadata['parent_checksum'] parent_checksum = metadata['parent_checksum']
@ -129,6 +131,7 @@ class MySQLBaseRunner(base.BaseRunner):
else: else:
# The parent (full backup) use the same command from InnobackupEx # The parent (full backup) use the same command from InnobackupEx
# super class and do not set an incremental_dir. # super class and do not set an incremental_dir.
LOG.info("Restoring back to full backup.")
command = self.restore_command command = self.restore_command
self.restore_content_length += self.unpack(location, checksum, command) self.restore_content_length += self.unpack(location, checksum, command)

249
backup/drivers/postgres.py Normal file
View File

@ -0,0 +1,249 @@
# Copyright 2020 Catalyst Cloud
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import re
from oslo_log import log as logging
from backup import utils
from backup.drivers import base
from backup.utils import postgresql as psql_util
LOG = logging.getLogger(__name__)
class PgBasebackup(base.BaseRunner):
def __init__(self, *args, **kwargs):
if not kwargs.get('wal_archive_dir'):
raise AttributeError('wal_archive_dir attribute missing')
self.wal_archive_dir = kwargs.pop('wal_archive_dir')
self.datadir = kwargs.pop(
'db_datadir', '/var/lib/postgresql/data/pgdata')
self.label = None
self.stop_segment = None
self.start_segment = None
self.start_wal_file = None
self.stop_wal_file = None
self.checkpoint_location = None
self.metadata = {}
super(PgBasebackup, self).__init__(*args, **kwargs)
self.restore_command = (f"{self.decrypt_cmd}tar xzf - -C "
f"{self.datadir}")
@property
def cmd(self):
cmd = (f"pg_basebackup -U postgres -Ft -z --wal-method=fetch "
f"--label={self.filename} --pgdata=-")
return cmd + self.encrypt_cmd
@property
def manifest(self):
"""Target file name."""
return "%s.tar.gz%s" % (self.filename, self.encrypt_manifest)
def get_wal_files(self, backup_pos=0):
"""Return the WAL files since the provided last backup.
pg_archivebackup depends on alphanumeric sorting to decide wal order,
so we'll do so too:
https://github.com/postgres/postgres/blob/REL9_4_STABLE/contrib
/pg_archivecleanup/pg_archivecleanup.c#L122
"""
backup_file = self.get_backup_file(backup_pos=backup_pos)
last_wal = backup_file.split('.')[0]
wal_re = re.compile("^[0-9A-F]{24}$")
wal_files = [wal_file for wal_file in os.listdir(self.wal_archive_dir)
if wal_re.search(wal_file) and wal_file >= last_wal]
return wal_files
def get_backup_file(self, backup_pos=0):
"""Look for the most recent .backup file that basebackup creates
:return: a string like 000000010000000000000006.00000168.backup
"""
backup_re = re.compile("[0-9A-F]{24}.*.backup")
wal_files = [wal_file for wal_file in os.listdir(self.wal_archive_dir)
if backup_re.search(wal_file)]
wal_files = sorted(wal_files, reverse=True)
if not wal_files:
return None
return wal_files[backup_pos]
def get_backup_metadata(self, metadata_file):
"""Parse the contents of the .backup file"""
metadata = {}
start_re = re.compile(r"START WAL LOCATION: (.*) \(file (.*)\)")
stop_re = re.compile(r"STOP WAL LOCATION: (.*) \(file (.*)\)")
checkpt_re = re.compile("CHECKPOINT LOCATION: (.*)")
label_re = re.compile("LABEL: (.*)")
with open(metadata_file, 'r') as file:
metadata_contents = file.read()
match = start_re.search(metadata_contents)
if match:
self.start_segment = match.group(1)
metadata['start-segment'] = self.start_segment
self.start_wal_file = match.group(2)
metadata['start-wal-file'] = self.start_wal_file
match = stop_re.search(metadata_contents)
if match:
self.stop_segment = match.group(1)
metadata['stop-segment'] = self.stop_segment
self.stop_wal_file = match.group(2)
metadata['stop-wal-file'] = self.stop_wal_file
match = checkpt_re.search(metadata_contents)
if match:
self.checkpoint_location = match.group(1)
metadata['checkpoint-location'] = self.checkpoint_location
match = label_re.search(metadata_contents)
if match:
self.label = match.group(1)
metadata['label'] = self.label
return metadata
def get_metadata(self):
"""Get metadata.
pg_basebackup may complete, and we arrive here before the
history file is written to the wal archive. So we need to
handle two possibilities:
- this is the first backup, and no history file exists yet
- this isn't the first backup, and so the history file we retrieve
isn't the one we just ran!
"""
def _metadata_found():
backup_file = self.get_backup_file()
if not backup_file:
return False
self.metadata = self.get_backup_metadata(
os.path.join(self.wal_archive_dir, backup_file))
LOG.info("Metadata for backup: %s.", self.metadata)
return self.metadata['label'] == self.filename
try:
LOG.debug("Polling for backup metadata... ")
utils.poll_until(_metadata_found, sleep_time=5, time_out=60)
except Exception as e:
raise RuntimeError(f"Failed to get backup metadata for backup "
f"{self.filename}: {str(e)}")
return self.metadata
def check_process(self):
# If any of the below variables were not set by either metadata()
# or direct retrieval from the pgsql backup commands, then something
# has gone wrong
if not self.start_segment or not self.start_wal_file:
LOG.error("Unable to determine starting WAL file/segment")
return False
if not self.stop_segment or not self.stop_wal_file:
LOG.error("Unable to determine ending WAL file/segment")
return False
if not self.label:
LOG.error("No backup label found")
return False
return True
class PgBasebackupIncremental(PgBasebackup):
"""Incremental backup/restore for PostgreSQL.
To restore an incremental backup from a previous backup, in PostgreSQL,
is effectively to replay the WAL entries to a designated point in time.
All that is required is the most recent base backup, and all WAL files
"""
def __init__(self, *args, **kwargs):
self.parent_location = kwargs.pop('parent_location', '')
self.parent_checksum = kwargs.pop('parent_checksum', '')
super(PgBasebackupIncremental, self).__init__(*args, **kwargs)
self.incr_restore_cmd = f'tar -xzf - -C {self.wal_archive_dir}'
def pre_backup(self):
with psql_util.PostgresConnection('postgres') as conn:
self.start_segment = conn.query(
f"SELECT pg_start_backup('{self.filename}', false, false)"
)[0][0]
self.start_wal_file = conn.query(
f"SELECT pg_walfile_name('{self.start_segment}')")[0][0]
self.stop_segment = conn.query(
"SELECT * FROM pg_stop_backup(false, true)")[0][0]
# We have to hack this because self.command is
# initialized in the base class before we get here, which is
# when we will know exactly what WAL files we want to archive
self.command = self._cmd()
def _cmd(self):
wal_file_list = self.get_wal_files(backup_pos=1)
cmd = (f'tar -czf - -C {self.wal_archive_dir} '
f'{" ".join(wal_file_list)}')
return cmd + self.encrypt_cmd
def get_metadata(self):
_meta = super(PgBasebackupIncremental, self).get_metadata()
_meta.update({
'parent_location': self.parent_location,
'parent_checksum': self.parent_checksum,
})
return _meta
def incremental_restore_cmd(self, incr=False):
cmd = self.restore_command
if incr:
cmd = self.incr_restore_cmd
return self.decrypt_cmd + cmd
def incremental_restore(self, location, checksum):
"""Perform incremental restore.
For the child backups, restore the wal files to wal archive dir.
For the base backup, restore to datadir.
"""
metadata = self.storage.load_metadata(location, checksum)
if 'parent_location' in metadata:
LOG.info("Restoring parent: %(parent_location)s, "
"checksum: %(parent_checksum)s.", metadata)
parent_location = metadata['parent_location']
parent_checksum = metadata['parent_checksum']
# Restore parents recursively so backup are applied sequentially
self.incremental_restore(parent_location, parent_checksum)
command = self.incremental_restore_cmd(incr=True)
else:
# For the parent base backup, revert to the default restore cmd
LOG.info("Restoring back to full backup.")
command = self.incremental_restore_cmd(incr=False)
self.restore_content_length += self.unpack(location, checksum, command)
def run_restore(self):
"""Run incremental restore."""
LOG.debug('Running incremental restore')
self.incremental_restore(self.location, self.checksum)
return self.restore_content_length

View File

@ -1,6 +1,7 @@
#!/usr/bin/env bash #!/usr/bin/env bash
set -e set -e
export APT_KEY_DONT_WARN_ON_DANGEROUS_USAGE=1
APTOPTS="-y -qq --no-install-recommends --allow-unauthenticated" APTOPTS="-y -qq --no-install-recommends --allow-unauthenticated"
case "$1" in case "$1" in
@ -8,17 +9,21 @@ case "$1" in
curl -sSL https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb -o percona-release.deb curl -sSL https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb -o percona-release.deb
dpkg -i percona-release.deb dpkg -i percona-release.deb
percona-release enable-only tools release percona-release enable-only tools release
apt-get update
apt-get install $APTOPTS percona-xtrabackup-$2 apt-get install $APTOPTS percona-xtrabackup-$2
apt-get clean apt-get clean
;; ;;
"mariadb") "mariadb")
apt-key adv --fetch-keys 'https://mariadb.org/mariadb_release_signing_key.asc' apt-key adv --fetch-keys 'https://mariadb.org/mariadb_release_signing_key.asc'
add-apt-repository "deb [arch=amd64] http://mirror2.hs-esslingen.de/mariadb/repo/10.4/ubuntu $(lsb_release -cs) main" add-apt-repository "deb [arch=amd64] http://mirror2.hs-esslingen.de/mariadb/repo/10.4/ubuntu $(lsb_release -cs) main"
apt-get update
apt-get install $APTOPTS mariadb-backup apt-get install $APTOPTS mariadb-backup
apt-get clean apt-get clean
;; ;;
"postgresql")
apt-key adv --fetch-keys 'https://www.postgresql.org/media/keys/ACCC4CF8.asc'
add-apt-repository "deb [arch=amd64] http://apt.postgresql.org/pub/repos/apt/ $(lsb_release -cs)-pgdg main"
apt-get install $APTOPTS postgresql-client-12
apt-get clean
;;
*) *)
echo "datastore $1 not supported" echo "datastore $1 not supported"
exit 1 exit 1

View File

@ -36,13 +36,14 @@ cli_opts = [
cfg.StrOpt( cfg.StrOpt(
'driver', 'driver',
default='innobackupex', default='innobackupex',
choices=['innobackupex', 'xtrabackup', 'mariabackup'] choices=['innobackupex', 'mariabackup', 'pg_basebackup']
), ),
cfg.BoolOpt('backup'), cfg.BoolOpt('backup'),
cfg.StrOpt('backup-encryption-key'), cfg.StrOpt('backup-encryption-key'),
cfg.StrOpt('db-user'), cfg.StrOpt('db-user'),
cfg.StrOpt('db-password'), cfg.StrOpt('db-password'),
cfg.StrOpt('db-host'), cfg.StrOpt('db-host'),
cfg.StrOpt('db-datadir'),
cfg.StrOpt('os-token'), cfg.StrOpt('os-token'),
cfg.StrOpt('os-auth-url'), cfg.StrOpt('os-auth-url'),
cfg.StrOpt('os-tenant-id'), cfg.StrOpt('os-tenant-id'),
@ -57,6 +58,7 @@ cli_opts = [
help='It is up to the storage driver to decide to validate the ' help='It is up to the storage driver to decide to validate the '
'checksum or not. ' 'checksum or not. '
), ),
cfg.StrOpt('pg-wal-archive-dir'),
] ]
driver_mapping = { driver_mapping = {
@ -64,6 +66,8 @@ driver_mapping = {
'innobackupex_inc': 'backup.drivers.innobackupex.InnoBackupExIncremental', 'innobackupex_inc': 'backup.drivers.innobackupex.InnoBackupExIncremental',
'mariabackup': 'backup.drivers.mariabackup.MariaBackup', 'mariabackup': 'backup.drivers.mariabackup.MariaBackup',
'mariabackup_inc': 'backup.drivers.mariabackup.MariaBackupIncremental', 'mariabackup_inc': 'backup.drivers.mariabackup.MariaBackupIncremental',
'pg_basebackup': 'backup.drivers.postgres.PgBasebackup',
'pg_basebackup_inc': 'backup.drivers.postgres.PgBasebackupIncremental',
} }
storage_mapping = { storage_mapping = {
'swift': 'backup.storage.swift.SwiftStorage', 'swift': 'backup.storage.swift.SwiftStorage',
@ -72,6 +76,7 @@ storage_mapping = {
def stream_backup_to_storage(runner_cls, storage): def stream_backup_to_storage(runner_cls, storage):
parent_metadata = {} parent_metadata = {}
extra_params = {}
if CONF.incremental: if CONF.incremental:
if not CONF.parent_location: if not CONF.parent_location:
@ -88,8 +93,13 @@ def stream_backup_to_storage(runner_cls, storage):
} }
) )
if CONF.pg_wal_archive_dir:
extra_params['wal_archive_dir'] = CONF.pg_wal_archive_dir
extra_params.update(parent_metadata)
try: try:
with runner_cls(filename=CONF.backup_id, **parent_metadata) as bkup: with runner_cls(filename=CONF.backup_id, **extra_params) as bkup:
checksum, location = storage.save( checksum, location = storage.save(
bkup, bkup,
metadata=CONF.swift_extra_metadata, metadata=CONF.swift_extra_metadata,
@ -103,13 +113,19 @@ def stream_backup_to_storage(runner_cls, storage):
def stream_restore_from_storage(runner_cls, storage): def stream_restore_from_storage(runner_cls, storage):
lsn = "" params = {
'storage': storage,
'location': CONF.restore_from,
'checksum': CONF.restore_checksum,
'wal_archive_dir': CONF.pg_wal_archive_dir,
'lsn': None
}
if storage.is_incremental_backup(CONF.restore_from): if storage.is_incremental_backup(CONF.restore_from):
lsn = storage.get_backup_lsn(CONF.restore_from) params['lsn'] = storage.get_backup_lsn(CONF.restore_from)
try: try:
runner = runner_cls(storage=storage, location=CONF.restore_from, runner = runner_cls(**params)
checksum=CONF.restore_checksum, lsn=lsn)
restore_size = runner.restore() restore_size = runner.restore()
LOG.info('Restore successfully, restore_size: %s', restore_size) LOG.info('Restore successfully, restore_size: %s', restore_size)
except Exception as err: except Exception as err:

View File

@ -2,5 +2,7 @@ oslo.config!=4.3.0,!=4.4.0;python_version>='3.0' # Apache-2.0
oslo.log;python_version>='3.0' # Apache-2.0 oslo.log;python_version>='3.0' # Apache-2.0
oslo.utils!=3.39.1,!=3.40.0,!=3.40.1;python_version>='3.0' # Apache-2.0 oslo.utils!=3.39.1,!=3.40.0,!=3.40.1;python_version>='3.0' # Apache-2.0
oslo.concurrency;python_version>='3.0' # Apache-2.0 oslo.concurrency;python_version>='3.0' # Apache-2.0
oslo.service!=1.28.1 # Apache-2.0
keystoneauth1 # Apache-2.0 keystoneauth1 # Apache-2.0
python-swiftclient # Apache-2.0 python-swiftclient # Apache-2.0
psycopg2-binary>=2.6.2 # LGPL/ZPL

View File

@ -185,7 +185,7 @@ class SwiftStorage(base.Storage):
for key, value in metadata.items(): for key, value in metadata.items():
headers[_set_attr(key)] = value headers[_set_attr(key)] = value
LOG.debug('Metadata headers: %s', headers) LOG.info('Metadata headers: %s', headers)
if large_object: if large_object:
manifest_data = json.dumps(segment_results) manifest_data = json.dumps(segment_results)
LOG.info('Creating the SLO manifest file, manifest content: %s', LOG.info('Creating the SLO manifest file, manifest content: %s',
@ -212,8 +212,8 @@ class SwiftStorage(base.Storage):
headers=headers) headers=headers)
# Delete the old segment file that was copied # Delete the old segment file that was copied
LOG.debug('Deleting the old segment file %s.', LOG.info('Deleting the old segment file %s.',
stream_reader.first_segment) stream_reader.first_segment)
self.client.delete_object(container, self.client.delete_object(container,
stream_reader.first_segment) stream_reader.first_segment)
@ -288,7 +288,7 @@ class SwiftStorage(base.Storage):
return False return False
def get_backup_lsn(self, location): def get_backup_lsn(self, location):
"""Get the backup LSN.""" """Get the backup LSN if exists."""
_, container, filename = self._explodeLocation(location) _, container, filename = self._explodeLocation(location)
headers = self.client.head_object(container, filename) headers = self.client.head_object(container, filename)
return headers.get('x-object-meta-lsn') return headers.get('x-object-meta-lsn')

46
backup/utils/__init__.py Normal file
View File

@ -0,0 +1,46 @@
# Copyright 2020 Catalyst Cloud
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_service import loopingcall
def build_polling_task(retriever, condition=lambda value: value,
sleep_time=1, time_out=0, initial_delay=0):
"""Run a function in a loop with backoff on error.
The condition function runs based on the retriever function result.
"""
def poll_and_check():
obj = retriever()
if condition(obj):
raise loopingcall.LoopingCallDone(retvalue=obj)
call = loopingcall.BackOffLoopingCall(f=poll_and_check)
return call.start(initial_delay=initial_delay,
starting_interval=sleep_time,
max_interval=30, timeout=time_out)
def poll_until(retriever, condition=lambda value: value,
sleep_time=3, time_out=0, initial_delay=0):
"""Retrieves object until it passes condition, then returns it.
If time_out_limit is passed in, PollTimeOut will be raised once that
amount of time is eclipsed.
"""
task = build_polling_task(retriever, condition=condition,
sleep_time=sleep_time, time_out=time_out,
initial_delay=initial_delay)
return task.wait()

View File

@ -0,0 +1,53 @@
# Copyright 2020 Catalyst Cloud
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import psycopg2
class PostgresConnection(object):
def __init__(self, user, password='', host='localhost', port=5432):
self.user = user
self.password = password
self.host = host
self.port = port
self.connect_str = (f"user='{self.user}' password='{self.password}' "
f"host='{self.host}' port='{self.port}'")
def __enter__(self, autocommit=False):
self.conn = psycopg2.connect(self.connect_str)
self.conn.autocommit = autocommit
return self
def __exit__(self, exc_type, exc_value, traceback):
self.conn.close()
def execute(self, statement, identifiers=None, data_values=None):
"""Execute a non-returning statement."""
self._execute_stmt(statement, identifiers, data_values, False)
def query(self, query, identifiers=None, data_values=None):
"""Execute a query and return the result set."""
return self._execute_stmt(query, identifiers, data_values, True)
def _execute_stmt(self, statement, identifiers, data_values, fetch):
cmd = self._bind(statement, identifiers)
with self.conn.cursor() as cursor:
cursor.execute(cmd, data_values)
if fetch:
return cursor.fetchall()
def _bind(self, statement, identifiers):
if identifiers:
return statement.format(*identifiers)
return statement

View File

@ -1056,6 +1056,11 @@ postgresql_opts = [
'docker_image', default='postgres', 'docker_image', default='postgres',
help='Database docker image.' help='Database docker image.'
), ),
cfg.StrOpt(
'backup_docker_image',
default='openstacktrove/db-backup-postgresql:1.0.0',
help='The docker image used for backup and restore.'
),
cfg.BoolOpt('icmp', default=False, cfg.BoolOpt('icmp', default=False,
help='Whether to permit ICMP.', help='Whether to permit ICMP.',
deprecated_for_removal=True), deprecated_for_removal=True),
@ -1069,7 +1074,7 @@ postgresql_opts = [
'if trove_security_groups_support is True).'), 'if trove_security_groups_support is True).'),
cfg.PortOpt('postgresql_port', default=5432, cfg.PortOpt('postgresql_port', default=5432,
help='The TCP port the server listens on.'), help='The TCP port the server listens on.'),
cfg.StrOpt('backup_strategy', default='PgBaseBackup', cfg.StrOpt('backup_strategy', default='pg_basebackup',
help='Default strategy to perform backups.'), help='Default strategy to perform backups.'),
cfg.StrOpt('replication_strategy', cfg.StrOpt('replication_strategy',
default='PostgresqlReplicationStreaming', default='PostgresqlReplicationStreaming',

View File

@ -480,7 +480,7 @@ def service_discovery(service_candidates):
return result return result
def _execute_shell_cmd(cmd, options, *args, **kwargs): def execute_shell_cmd(cmd, options, *args, **kwargs):
"""Execute a given shell command passing it """Execute a given shell command passing it
given options (flags) and arguments. given options (flags) and arguments.
@ -519,7 +519,7 @@ def ensure_directory(dir_path, user=None, group=None, force=True, **kwargs):
"""Create a given directory and update its ownership """Create a given directory and update its ownership
(recursively) to the given user and group if any. (recursively) to the given user and group if any.
seealso:: _execute_shell_cmd for valid optional keyword arguments. seealso:: execute_shell_cmd for valid optional keyword arguments.
:param dir_path: Path to the created directory. :param dir_path: Path to the created directory.
:type dir_path: string :type dir_path: string
@ -549,7 +549,7 @@ def ensure_directory(dir_path, user=None, group=None, force=True, **kwargs):
def chown(path, user, group, recursive=True, force=False, **kwargs): def chown(path, user, group, recursive=True, force=False, **kwargs):
"""Changes the owner and group of a given file. """Changes the owner and group of a given file.
seealso:: _execute_shell_cmd for valid optional keyword arguments. seealso:: execute_shell_cmd for valid optional keyword arguments.
:param path: Path to the modified file. :param path: Path to the modified file.
:type path: string :type path: string
@ -579,7 +579,7 @@ def chown(path, user, group, recursive=True, force=False, **kwargs):
owner_group_modifier = _build_user_group_pair(user, group) owner_group_modifier = _build_user_group_pair(user, group)
options = (('f', force), ('R', recursive)) options = (('f', force), ('R', recursive))
_execute_shell_cmd('chown', options, owner_group_modifier, path, **kwargs) execute_shell_cmd('chown', options, owner_group_modifier, path, **kwargs)
def _build_user_group_pair(user, group): def _build_user_group_pair(user, group):
@ -599,14 +599,14 @@ def _create_directory(dir_path, force=True, **kwargs):
""" """
options = (('p', force),) options = (('p', force),)
_execute_shell_cmd('mkdir', options, dir_path, **kwargs) execute_shell_cmd('mkdir', options, dir_path, **kwargs)
def chmod(path, mode, recursive=True, force=False, **kwargs): def chmod(path, mode, recursive=True, force=False, **kwargs):
"""Changes the mode of a given file. """Changes the mode of a given file.
:seealso: Modes for more information on the representation of modes. :seealso: Modes for more information on the representation of modes.
:seealso: _execute_shell_cmd for valid optional keyword arguments. :seealso: execute_shell_cmd for valid optional keyword arguments.
:param path: Path to the modified file. :param path: Path to the modified file.
:type path: string :type path: string
@ -629,7 +629,7 @@ def chmod(path, mode, recursive=True, force=False, **kwargs):
if path: if path:
options = (('f', force), ('R', recursive)) options = (('f', force), ('R', recursive))
shell_modes = _build_shell_chmod_mode(mode) shell_modes = _build_shell_chmod_mode(mode)
_execute_shell_cmd('chmod', options, shell_modes, path, **kwargs) execute_shell_cmd('chmod', options, shell_modes, path, **kwargs)
else: else:
raise exception.UnprocessableEntity( raise exception.UnprocessableEntity(
_("Cannot change mode of a blank file.")) _("Cannot change mode of a blank file."))
@ -639,7 +639,7 @@ def change_user_group(user, group, append=True, add_group=True, **kwargs):
"""Adds a user to groups by using the usermod linux command with -a and """Adds a user to groups by using the usermod linux command with -a and
-G options. -G options.
seealso:: _execute_shell_cmd for valid optional keyword arguments. seealso:: execute_shell_cmd for valid optional keyword arguments.
:param user: Username. :param user: Username.
:type user: string :type user: string
@ -668,7 +668,7 @@ def change_user_group(user, group, append=True, add_group=True, **kwargs):
raise exception.UnprocessableEntity(_("Missing group.")) raise exception.UnprocessableEntity(_("Missing group."))
options = (('a', append), ('G', add_group)) options = (('a', append), ('G', add_group))
_execute_shell_cmd('usermod', options, group, user, **kwargs) execute_shell_cmd('usermod', options, group, user, **kwargs)
def _build_shell_chmod_mode(mode): def _build_shell_chmod_mode(mode):
@ -704,7 +704,7 @@ def _build_shell_chmod_mode(mode):
def remove(path, force=False, recursive=True, **kwargs): def remove(path, force=False, recursive=True, **kwargs):
"""Remove a given file or directory. """Remove a given file or directory.
:seealso: _execute_shell_cmd for valid optional keyword arguments. :seealso: execute_shell_cmd for valid optional keyword arguments.
:param path: Path to the removed file. :param path: Path to the removed file.
:type path: string :type path: string
@ -720,7 +720,7 @@ def remove(path, force=False, recursive=True, **kwargs):
if path: if path:
options = (('f', force), ('R', recursive)) options = (('f', force), ('R', recursive))
_execute_shell_cmd('rm', options, path, **kwargs) execute_shell_cmd('rm', options, path, **kwargs)
else: else:
raise exception.UnprocessableEntity(_("Cannot remove a blank file.")) raise exception.UnprocessableEntity(_("Cannot remove a blank file."))
@ -730,7 +730,7 @@ def move(source, destination, force=False, **kwargs):
Move attempts to preserve the original ownership, permissions and Move attempts to preserve the original ownership, permissions and
timestamps. timestamps.
:seealso: _execute_shell_cmd for valid optional keyword arguments. :seealso: execute_shell_cmd for valid optional keyword arguments.
:param source: Path to the source location. :param source: Path to the source location.
:type source: string :type source: string
@ -751,7 +751,7 @@ def move(source, destination, force=False, **kwargs):
raise exception.UnprocessableEntity(_("Missing destination path.")) raise exception.UnprocessableEntity(_("Missing destination path."))
options = (('f', force),) options = (('f', force),)
_execute_shell_cmd('mv', options, source, destination, **kwargs) execute_shell_cmd('mv', options, source, destination, **kwargs)
def copy(source, destination, force=False, preserve=False, recursive=True, def copy(source, destination, force=False, preserve=False, recursive=True,
@ -761,7 +761,7 @@ def copy(source, destination, force=False, preserve=False, recursive=True,
Copy does NOT attempt to preserve ownership, permissions and timestamps Copy does NOT attempt to preserve ownership, permissions and timestamps
unless the 'preserve' option is enabled. unless the 'preserve' option is enabled.
:seealso: _execute_shell_cmd for valid optional keyword arguments. :seealso: execute_shell_cmd for valid optional keyword arguments.
:param source: Path to the source location. :param source: Path to the source location.
:type source: string :type source: string
@ -793,7 +793,7 @@ def copy(source, destination, force=False, preserve=False, recursive=True,
options = (('f', force), ('p', preserve), ('R', recursive), options = (('f', force), ('p', preserve), ('R', recursive),
('L', dereference)) ('L', dereference))
_execute_shell_cmd('cp', options, source, destination, **kwargs) execute_shell_cmd('cp', options, source, destination, **kwargs)
def get_bytes_free_on_fs(path): def get_bytes_free_on_fs(path):
@ -830,7 +830,7 @@ def list_files_in_directory(root_dir, recursive=False, pattern=None,
if pattern: if pattern:
cmd_args.extend(['-regextype', 'posix-extended', cmd_args.extend(['-regextype', 'posix-extended',
'-regex', os.path.join('.*', pattern) + '$']) '-regex', os.path.join('.*', pattern) + '$'])
files = _execute_shell_cmd('find', [], *cmd_args, as_root=True) files = execute_shell_cmd('find', [], *cmd_args, as_root=True)
return {fp for fp in files.splitlines()} return {fp for fp in files.splitlines()}
return {os.path.abspath(os.path.join(root, name)) return {os.path.abspath(os.path.join(root, name))
@ -851,7 +851,7 @@ def _build_command_options(options):
def get_device(path, as_root=False): def get_device(path, as_root=False):
"""Get the device that a given path exists on.""" """Get the device that a given path exists on."""
stdout = _execute_shell_cmd('df', [], path, as_root=as_root) stdout = execute_shell_cmd('df', [], path, as_root=as_root)
return stdout.splitlines()[1].split()[0] return stdout.splitlines()[1].split()[0]
@ -879,8 +879,8 @@ def create_user(user_name, user_id, group_name=None, group_id=None):
group_id = group_id or user_id group_id = group_id or user_id
try: try:
_execute_shell_cmd('groupadd', [], '--gid', group_id, group_name, execute_shell_cmd('groupadd', [], '--gid', group_id, group_name,
as_root=True) as_root=True)
except exception.ProcessExecutionError as err: except exception.ProcessExecutionError as err:
if 'already exists' not in err.stderr: if 'already exists' not in err.stderr:
raise exception.UnprocessableEntity( raise exception.UnprocessableEntity(
@ -888,8 +888,8 @@ def create_user(user_name, user_id, group_name=None, group_id=None):
) )
try: try:
_execute_shell_cmd('useradd', [], '--uid', user_id, '--gid', group_id, execute_shell_cmd('useradd', [], '--uid', user_id, '--gid', group_id,
'-M', user_name, as_root=True) '-M', user_name, as_root=True)
except exception.ProcessExecutionError as err: except exception.ProcessExecutionError as err:
if 'already exists' not in err.stderr: if 'already exists' not in err.stderr:
raise exception.UnprocessableEntity( raise exception.UnprocessableEntity(
@ -903,4 +903,4 @@ def remove_dir_contents(folder):
Use shell=True here because shell=False doesn't support '*' Use shell=True here because shell=False doesn't support '*'
""" """
path = os.path.join(folder, '*') path = os.path.join(folder, '*')
_execute_shell_cmd(f'rm -rf {path}', [], shell=True, as_root=True) execute_shell_cmd(f'rm -rf {path}', [], shell=True, as_root=True)

View File

@ -303,6 +303,9 @@ class Manager(periodic_task.PeriodicTasks):
LOG.info('No post_prepare work has been defined.') LOG.info('No post_prepare work has been defined.')
pass pass
def stop_db(self, context):
self.app.stop_db()
def restart(self, context): def restart(self, context):
self.app.restart() self.app.restart()
@ -736,12 +739,20 @@ class Manager(periodic_task.PeriodicTasks):
:param backup_info: a dictionary containing the db instance id of the :param backup_info: a dictionary containing the db instance id of the
backup task, location, type, and other data. backup task, location, type, and other data.
""" """
with EndNotification(context): pass
self.app.create_backup(context, backup_info)
def perform_restore(self, context, restore_location, backup_info): def perform_restore(self, context, restore_location, backup_info):
raise exception.DatastoreOperationNotSupported( LOG.info("Starting to restore database from backup %s, "
operation='_perform_restore', datastore=self.manager) "backup_info: %s", backup_info['id'], backup_info)
try:
self.app.restore_backup(context, backup_info, restore_location)
except Exception:
LOG.error("Failed to restore from backup %s.", backup_info['id'])
self.status.set_status(service_status.ServiceStatuses.FAILED)
raise
LOG.info("Finished restore data from backup %s", backup_info['id'])
################ ################
# Database and user management # Database and user management

View File

@ -23,6 +23,7 @@ from trove.common import cfg
from trove.common import configurations from trove.common import configurations
from trove.common import exception from trove.common import exception
from trove.common import utils from trove.common import utils
from trove.common.notification import EndNotification
from trove.guestagent import guest_log from trove.guestagent import guest_log
from trove.guestagent.common import operating_system from trove.guestagent.common import operating_system
from trove.guestagent.datastore import manager from trove.guestagent.datastore import manager
@ -119,12 +120,25 @@ class MySqlManager(manager.Manager):
# This instance is a replication slave # This instance is a replication slave
self.attach_replica(context, snapshot, snapshot['config']) self.attach_replica(context, snapshot, snapshot['config'])
def stop_db(self, context):
self.app.stop_db()
def start_db_with_conf_changes(self, context, config_contents, ds_version): def start_db_with_conf_changes(self, context, config_contents, ds_version):
self.app.start_db_with_conf_changes(config_contents, ds_version) self.app.start_db_with_conf_changes(config_contents, ds_version)
def create_backup(self, context, backup_info):
"""Create backup for the database.
:param context: User context object.
:param backup_info: a dictionary containing the db instance id of the
backup task, location, type, and other data.
"""
LOG.info(f"Creating backup {backup_info['id']}")
with EndNotification(context):
volumes_mapping = {
'/var/lib/mysql': {'bind': '/var/lib/mysql', 'mode': 'rw'}
}
self.app.create_backup(context, backup_info,
volumes_mapping=volumes_mapping,
need_dbuser=True)
def get_datastore_log_defs(self): def get_datastore_log_defs(self):
owner = cfg.get_configuration_property('database_service_uid') owner = cfg.get_configuration_property('database_service_uid')
datastore_dir = self.app.get_data_dir() datastore_dir = self.app.get_data_dir()
@ -189,19 +203,6 @@ class MySqlManager(manager.Manager):
LOG.info("Applying overrides (%s).", overrides) LOG.info("Applying overrides (%s).", overrides)
self.app.apply_overrides(overrides) self.app.apply_overrides(overrides)
def perform_restore(self, context, restore_location, backup_info):
LOG.info("Starting to restore database from backup %s, "
"backup_info: %s", backup_info['id'], backup_info)
try:
self.app.restore_backup(context, backup_info, restore_location)
except Exception:
LOG.error("Failed to restore from backup %s.", backup_info['id'])
self.status.set_status(service_status.ServiceStatuses.FAILED)
raise
LOG.info("Finished restore data from backup %s", backup_info['id'])
def reset_password_for_restore(self, ds_version=None, def reset_password_for_restore(self, ds_version=None,
data_dir='/var/lib/mysql/data'): data_dir='/var/lib/mysql/data'):
"""Reset the root password after restore the db data. """Reset the root password after restore the db data.

View File

@ -17,21 +17,18 @@ import re
from oslo_log import log as logging from oslo_log import log as logging
from oslo_utils import encodeutils from oslo_utils import encodeutils
from oslo_utils import timeutils
import six import six
from six.moves import urllib from six.moves import urllib
import sqlalchemy import sqlalchemy
from sqlalchemy import exc from sqlalchemy import exc
from sqlalchemy.sql.expression import text from sqlalchemy.sql.expression import text
from trove.backup.state import BackupState
from trove.common import cfg from trove.common import cfg
from trove.common import exception from trove.common import exception
from trove.common import utils from trove.common import utils
from trove.common.configurations import MySQLConfParser from trove.common.configurations import MySQLConfParser
from trove.common.db.mysql import models from trove.common.db.mysql import models
from trove.common.i18n import _ from trove.common.i18n import _
from trove.conductor import api as conductor_api
from trove.guestagent.common import guestagent_utils from trove.guestagent.common import guestagent_utils
from trove.guestagent.common import operating_system from trove.guestagent.common import operating_system
from trove.guestagent.common import sql_query from trove.guestagent.common import sql_query
@ -663,107 +660,6 @@ class BaseMySqlApp(service.BaseDbApp):
LOG.info("Finished restarting mysql") LOG.info("Finished restarting mysql")
def create_backup(self, context, backup_info):
storage_driver = CONF.storage_strategy
backup_driver = cfg.get_configuration_property('backup_strategy')
incremental = ''
backup_type = 'full'
if backup_info.get('parent'):
incremental = (
f'--incremental '
f'--parent-location={backup_info["parent"]["location"]} '
f'--parent-checksum={backup_info["parent"]["checksum"]}')
backup_type = 'incremental'
backup_id = backup_info["id"]
image = cfg.get_configuration_property('backup_docker_image')
name = 'db_backup'
volumes = {'/var/lib/mysql': {'bind': '/var/lib/mysql', 'mode': 'rw'}}
admin_pass = self.get_auth_password()
user_token = context.auth_token
auth_url = CONF.service_credentials.auth_url
user_tenant = context.project_id
swift_metadata = (
f'datastore:{backup_info["datastore"]},'
f'datastore_version:{backup_info["datastore_version"]}'
)
swift_container = backup_info.get('swift_container',
CONF.backup_swift_container)
swift_params = (f'--swift-extra-metadata={swift_metadata} '
f'--swift-container {swift_container}')
command = (
f'/usr/bin/python3 main.py --backup --backup-id={backup_id} '
f'--storage-driver={storage_driver} --driver={backup_driver} '
f'--db-user=os_admin --db-password={admin_pass} '
f'--db-host=127.0.0.1 '
f'--os-token={user_token} --os-auth-url={auth_url} '
f'--os-tenant-id={user_tenant} '
f'{swift_params} '
f'{incremental}'
)
# Update backup status in db
conductor = conductor_api.API(context)
mount_point = CONF.get(CONF.datastore_manager).mount_point
stats = guestagent_utils.get_filesystem_volume_stats(mount_point)
backup_state = {
'backup_id': backup_id,
'size': stats.get('used', 0.0),
'state': BackupState.BUILDING,
'backup_type': backup_type
}
conductor.update_backup(CONF.guest_id,
sent=timeutils.utcnow_ts(microsecond=True),
**backup_state)
LOG.debug("Updated state for %s to %s.", backup_id, backup_state)
# Start to run backup inside a separate docker container
try:
LOG.info('Starting to create backup %s, command: %s', backup_id,
command)
output, ret = docker_util.run_container(
self.docker_client, image, name,
volumes=volumes, command=command)
result = output[-1]
if not ret:
msg = f'Failed to run backup container, error: {result}'
LOG.error(msg)
raise Exception(msg)
backup_result = BACKUP_LOG.match(result)
if backup_result:
backup_state.update({
'checksum': backup_result.group('checksum'),
'location': backup_result.group('location'),
'success': True,
'state': BackupState.COMPLETED,
})
else:
LOG.error(f'Cannot parse backup output: {result}')
backup_state.update({
'success': False,
'state': BackupState.FAILED,
})
except Exception as err:
LOG.error("Failed to create backup %s", backup_id)
backup_state.update({
'success': False,
'state': BackupState.FAILED,
})
raise exception.TroveError(
"Failed to create backup %s, error: %s" %
(backup_id, str(err))
)
finally:
LOG.info("Completed backup %s.", backup_id)
conductor.update_backup(CONF.guest_id,
sent=timeutils.utcnow_ts(
microsecond=True),
**backup_state)
LOG.debug("Updated state for %s to %s.", backup_id, backup_state)
def restore_backup(self, context, backup_info, restore_location): def restore_backup(self, context, backup_info, restore_location):
backup_id = backup_info['id'] backup_id = backup_info['id']
storage_driver = CONF.storage_strategy storage_driver = CONF.storage_strategy

View File

@ -16,10 +16,11 @@ import os
from oslo_log import log as logging from oslo_log import log as logging
from trove.common import cfg from trove.common import cfg
from trove.guestagent.common import operating_system from trove.common.notification import EndNotification
from trove.guestagent.datastore.postgres import service
from trove.guestagent.datastore import manager
from trove.guestagent import guest_log from trove.guestagent import guest_log
from trove.guestagent.common import operating_system
from trove.guestagent.datastore import manager
from trove.guestagent.datastore.postgres import service
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
CONF = cfg.CONF CONF = cfg.CONF
@ -45,17 +46,26 @@ class PostgresManager(manager.Manager):
user=CONF.database_service_uid, user=CONF.database_service_uid,
group=CONF.database_service_uid, group=CONF.database_service_uid,
as_root=True) as_root=True)
operating_system.ensure_directory(service.WAL_ARCHIVE_DIR,
user=CONF.database_service_uid,
group=CONF.database_service_uid,
as_root=True)
LOG.info('Preparing database config files') LOG.info('Preparing database config files')
self.app.configuration_manager.save_configuration(config_contents) self.app.configuration_manager.save_configuration(config_contents)
self.app.set_data_dir(self.app.datadir) self.app.set_data_dir(self.app.datadir)
self.app.update_overrides(overrides) self.app.update_overrides(overrides)
# # Restore data from backup and reset root password # Restore data from backup and reset root password
# if backup_info: if backup_info:
# self.perform_restore(context, data_dir, backup_info) self.perform_restore(context, self.app.datadir, backup_info)
# self.reset_password_for_restore(ds_version=ds_version,
# data_dir=data_dir) signal_file = f"{self.app.datadir}/recovery.signal"
operating_system.execute_shell_cmd(
f"touch {signal_file}", [], shell=True, as_root=True)
operating_system.chown(signal_file, CONF.database_service_uid,
CONF.database_service_uid, force=True,
as_root=True)
# config_file can only be set on the postgres command line # config_file can only be set on the postgres command line
command = f"postgres -c config_file={service.CONFIG_FILE}" command = f"postgres -c config_file={service.CONFIG_FILE}"
@ -101,3 +111,26 @@ class PostgresManager(manager.Manager):
def is_log_enabled(self, logname): def is_log_enabled(self, logname):
return self.configuration_manager.get_value('logging_collector', False) return self.configuration_manager.get_value('logging_collector', False)
def create_backup(self, context, backup_info):
"""Create backup for the database.
:param context: User context object.
:param backup_info: a dictionary containing the db instance id of the
backup task, location, type, and other data.
"""
LOG.info(f"Creating backup {backup_info['id']}")
with EndNotification(context):
volumes_mapping = {
'/var/lib/postgresql/data': {
'bind': '/var/lib/postgresql/data', 'mode': 'rw'
},
"/var/run/postgresql": {"bind": "/var/run/postgresql",
"mode": "ro"},
}
extra_params = f"--pg-wal-archive-dir {service.WAL_ARCHIVE_DIR}"
self.app.create_backup(context, backup_info,
volumes_mapping=volumes_mapping,
need_dbuser=False,
extra_params=extra_params)

View File

@ -138,7 +138,7 @@ class UserQuery(object):
@classmethod @classmethod
def drop(cls, name): def drop(cls, name):
"""Query to drop a user.""" """Query to drop a user."""
return f'DROP USER "{name}"' return f'DROP USER IF EXISTS "{name}"'
class AccessQuery(object): class AccessQuery(object):

View File

@ -39,6 +39,8 @@ CNF_EXT = 'conf'
# The same with include_dir config option # The same with include_dir config option
CNF_INCLUDE_DIR = '/etc/postgresql/conf.d' CNF_INCLUDE_DIR = '/etc/postgresql/conf.d'
HBA_CONFIG_FILE = '/etc/postgresql/pg_hba.conf' HBA_CONFIG_FILE = '/etc/postgresql/pg_hba.conf'
# The same with the path in archive_command config option.
WAL_ARCHIVE_DIR = '/var/lib/postgresql/data/wal_archive'
class PgSqlAppStatus(service.BaseDbStatus): class PgSqlAppStatus(service.BaseDbStatus):
@ -113,6 +115,8 @@ class PgSqlApp(service.BaseDbApp):
admin_password = utils.generate_random_password() admin_password = utils.generate_random_password()
os_admin = models.PostgreSQLUser(ADMIN_USER_NAME, admin_password) os_admin = models.PostgreSQLUser(ADMIN_USER_NAME, admin_password)
# Drop os_admin user if exists, this is needed for restore.
PgSqlAdmin(SUPER_USER_NAME).delete_user({'_name': ADMIN_USER_NAME})
PgSqlAdmin(SUPER_USER_NAME).create_admin_user(os_admin, PgSqlAdmin(SUPER_USER_NAME).create_admin_user(os_admin,
encrypt_password=True) encrypt_password=True)
self.save_password(ADMIN_USER_NAME, admin_password) self.save_password(ADMIN_USER_NAME, admin_password)
@ -176,9 +180,9 @@ class PgSqlApp(service.BaseDbApp):
command = command if command else '' command = command if command else ''
try: try:
root_pass = self.get_auth_password(file="root.cnf") postgres_pass = self.get_auth_password(file="postgres.cnf")
except exception.UnprocessableEntity: except exception.UnprocessableEntity:
root_pass = utils.generate_random_password() postgres_pass = utils.generate_random_password()
# Get uid and gid # Get uid and gid
user = "%s:%s" % (CONF.database_service_uid, CONF.database_service_uid) user = "%s:%s" % (CONF.database_service_uid, CONF.database_service_uid)
@ -211,7 +215,7 @@ class PgSqlApp(service.BaseDbApp):
network_mode="host", network_mode="host",
user=user, user=user,
environment={ environment={
"POSTGRES_PASSWORD": root_pass, "POSTGRES_PASSWORD": postgres_pass,
"PGDATA": self.datadir, "PGDATA": self.datadir,
}, },
command=command command=command
@ -219,7 +223,7 @@ class PgSqlApp(service.BaseDbApp):
# Save root password # Save root password
LOG.debug("Saving root credentials to local host.") LOG.debug("Saving root credentials to local host.")
self.save_password('postgres', root_pass) self.save_password('postgres', postgres_pass)
except Exception: except Exception:
LOG.exception("Failed to start database service") LOG.exception("Failed to start database service")
raise exception.TroveError("Failed to start database service") raise exception.TroveError("Failed to start database service")
@ -254,6 +258,55 @@ class PgSqlApp(service.BaseDbApp):
LOG.info("Finished restarting database") LOG.info("Finished restarting database")
def restore_backup(self, context, backup_info, restore_location):
backup_id = backup_info['id']
storage_driver = CONF.storage_strategy
backup_driver = cfg.get_configuration_property('backup_strategy')
image = cfg.get_configuration_property('backup_docker_image')
name = 'db_restore'
volumes = {
'/var/lib/postgresql/data': {
'bind': '/var/lib/postgresql/data',
'mode': 'rw'
}
}
os_cred = (f"--os-token={context.auth_token} "
f"--os-auth-url={CONF.service_credentials.auth_url} "
f"--os-tenant-id={context.project_id}")
command = (
f'/usr/bin/python3 main.py --nobackup '
f'--storage-driver={storage_driver} --driver={backup_driver} '
f'{os_cred} '
f'--restore-from={backup_info["location"]} '
f'--restore-checksum={backup_info["checksum"]} '
f'--pg-wal-archive-dir {WAL_ARCHIVE_DIR}'
)
LOG.debug('Stop the database and clean up the data before restore '
'from %s', backup_id)
self.stop_db()
for dir in [WAL_ARCHIVE_DIR, self.datadir]:
operating_system.remove_dir_contents(dir)
# Start to run restore inside a separate docker container
LOG.info('Starting to restore backup %s, command: %s', backup_id,
command)
output, ret = docker_util.run_container(
self.docker_client, image, name,
volumes=volumes, command=command)
result = output[-1]
if not ret:
msg = f'Failed to run restore container, error: {result}'
LOG.error(msg)
raise Exception(msg)
for dir in [WAL_ARCHIVE_DIR, self.datadir]:
operating_system.chown(dir, CONF.database_service_uid,
CONF.database_service_uid, force=True,
as_root=True)
class PgSqlAdmin(object): class PgSqlAdmin(object):
# Default set of options of an administrative account. # Default set of options of an administrative account.
@ -352,10 +405,7 @@ class PgSqlAdmin(object):
Return a list of serialized Postgres databases. Return a list of serialized Postgres databases.
""" """
user = self._find_user(username) user = self._find_user(username)
if user is not None: return user.databases if user is not None else []
return user.databases
raise exception.UserNotFound(username)
def create_databases(self, databases): def create_databases(self, databases):
"""Create the list of specified databases. """Create the list of specified databases.

View File

@ -13,16 +13,18 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import os import os
import re
import time import time
from oslo_log import log as logging from oslo_log import log as logging
from oslo_utils import timeutils from oslo_utils import timeutils
from trove.backup.state import BackupState
from trove.common import cfg from trove.common import cfg
from trove.common import context as trove_context from trove.common import context as trove_context
from trove.common import exception from trove.common import exception
from trove.common.i18n import _
from trove.common import stream_codecs from trove.common import stream_codecs
from trove.common.i18n import _
from trove.conductor import api as conductor_api from trove.conductor import api as conductor_api
from trove.guestagent.common import guestagent_utils from trove.guestagent.common import guestagent_utils
from trove.guestagent.common import operating_system from trove.guestagent.common import operating_system
@ -31,6 +33,8 @@ from trove.instance import service_status
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
CONF = cfg.CONF CONF = cfg.CONF
BACKUP_LOG_RE = re.compile(r'.*Backup successfully, checksum: '
r'(?P<checksum>.*), location: (?P<location>.*)')
class BaseDbStatus(object): class BaseDbStatus(object):
@ -401,3 +405,109 @@ class BaseDbApp(object):
self.reset_configuration(config_contents) self.reset_configuration(config_contents)
self.start_db(update_db=True, ds_version=ds_version) self.start_db(update_db=True, ds_version=ds_version)
def create_backup(self, context, backup_info, volumes_mapping={},
need_dbuser=True, extra_params=''):
storage_driver = CONF.storage_strategy
backup_driver = cfg.get_configuration_property('backup_strategy')
incremental = ''
backup_type = 'full'
if backup_info.get('parent'):
incremental = (
f'--incremental '
f'--parent-location={backup_info["parent"]["location"]} '
f'--parent-checksum={backup_info["parent"]["checksum"]}')
backup_type = 'incremental'
backup_id = backup_info["id"]
image = cfg.get_configuration_property('backup_docker_image')
name = 'db_backup'
os_cred = (f"--os-token={context.auth_token} "
f"--os-auth-url={CONF.service_credentials.auth_url} "
f"--os-tenant-id={context.project_id}")
db_userinfo = ''
if need_dbuser:
admin_pass = self.get_auth_password()
db_userinfo = (f"--db-host=127.0.0.1 --db-user=os_admin "
f"--db-password={admin_pass}")
swift_metadata = (
f'datastore:{backup_info["datastore"]},'
f'datastore_version:{backup_info["datastore_version"]}'
)
swift_container = (backup_info.get('swift_container') or
CONF.backup_swift_container)
swift_params = (f'--swift-extra-metadata={swift_metadata} '
f'--swift-container {swift_container}')
command = (
f'/usr/bin/python3 main.py --backup --backup-id={backup_id} '
f'--storage-driver={storage_driver} --driver={backup_driver} '
f'{os_cred} '
f'{db_userinfo} '
f'{swift_params} '
f'{incremental} '
f'{extra_params} '
)
# Update backup status in db
conductor = conductor_api.API(context)
mount_point = cfg.get_configuration_property('mount_point')
stats = guestagent_utils.get_filesystem_volume_stats(mount_point)
backup_state = {
'backup_id': backup_id,
'size': stats.get('used', 0.0),
'state': BackupState.BUILDING,
'backup_type': backup_type
}
conductor.update_backup(CONF.guest_id,
sent=timeutils.utcnow_ts(microsecond=True),
**backup_state)
LOG.debug(f"Updated state for backup {backup_id} to {backup_state}")
# Start to run backup inside a separate docker container
try:
LOG.info(f'Starting to create backup {backup_id}, '
f'command: {command}')
output, ret = docker_util.run_container(
self.docker_client, image, name,
volumes=volumes_mapping, command=command)
result = output[-1]
if not ret:
msg = f'Failed to run backup container, error: {result}'
LOG.error(msg)
raise Exception(msg)
backup_result = BACKUP_LOG_RE.match(result)
if backup_result:
backup_state.update({
'checksum': backup_result.group('checksum'),
'location': backup_result.group('location'),
'success': True,
'state': BackupState.COMPLETED,
})
else:
LOG.error(f'Cannot parse backup output: {result}')
backup_state.update({
'success': False,
'state': BackupState.FAILED,
})
except Exception as err:
LOG.error("Failed to create backup %s", backup_id)
backup_state.update({
'success': False,
'state': BackupState.FAILED,
})
raise exception.TroveError(
"Failed to create backup %s, error: %s" %
(backup_id, str(err))
)
finally:
LOG.info("Completed backup %s.", backup_id)
conductor.update_backup(
CONF.guest_id,
sent=timeutils.utcnow_ts(microsecond=True),
**backup_state)
LOG.debug("Updated state for %s to %s.", backup_id, backup_state)

View File

@ -240,10 +240,11 @@ min_wal_size = 80MB
archive_mode = on # enables archiving; off, on, or always archive_mode = on # enables archiving; off, on, or always
# (change requires restart) # (change requires restart)
# (Trove default) # (Trove default)
#archive_command = '' # command to use to archive a logfile segment archive_command = 'test ! -f /var/lib/postgresql/data/wal_archive/%f && cp %p /var/lib/postgresql/data/wal_archive/%f' # command to use to archive a logfile segment
# placeholders: %p = path of file to archive # placeholders: %p = path of file to archive
# %f = file name only # %f = file name only
# e.g. 'test ! -f /mnt/server/archivedir/%f && cp %p /mnt/server/archivedir/%f' # e.g. 'test ! -f /mnt/server/archivedir/%f && cp %p /mnt/server/archivedir/%f'
# (Trove default)
#archive_timeout = 0 # force a logfile segment switch after this #archive_timeout = 0 # force a logfile segment switch after this
# number of seconds; 0 disables # number of seconds; 0 disables
@ -251,11 +252,12 @@ archive_mode = on # enables archiving; off, on, or always
# These are only used in recovery mode. # These are only used in recovery mode.
#restore_command = '' # command to use to restore an archived logfile segment restore_command = 'cp /var/lib/postgresql/data/wal_archive/%f "%p"' # command to use to restore an archived logfile segment
# placeholders: %p = path of file to restore # placeholders: %p = path of file to restore
# %f = file name only # %f = file name only
# e.g. 'cp /mnt/server/archivedir/%f %p' # e.g. 'cp /mnt/server/archivedir/%f %p'
# (change requires restart) # (change requires restart)
# (Trove default)
#archive_cleanup_command = '' # command to execute at every restartpoint #archive_cleanup_command = '' # command to execute at every restartpoint
#recovery_end_command = '' # command to execute at completion of recovery #recovery_end_command = '' # command to execute at completion of recovery
@ -294,7 +296,8 @@ archive_mode = on # enables archiving; off, on, or always
#max_wal_senders = 10 # max number of walsender processes #max_wal_senders = 10 # max number of walsender processes
# (change requires restart) # (change requires restart)
#wal_keep_segments = 0 # in logfile segments; 0 disables wal_keep_segments = 5 # in logfile segments; 0 disables
# (Trove default)
#wal_sender_timeout = 60s # in milliseconds; 0 disables #wal_sender_timeout = 60s # in milliseconds; 0 disables
#max_replication_slots = 10 # max number of replication slots #max_replication_slots = 10 # max number of replication slots