From 916fbd4474d3ec66b5dc4f6bc1c83e3938455d1d Mon Sep 17 00:00:00 2001
From: Alex Kavanagh <alex@ajkavanagh.co.uk>
Date: Sat, 3 Apr 2021 20:18:02 +0100
Subject: [PATCH] 21.04 libraries freeze for charms on master branch

* charm-helpers sync for classic charms
* build.lock file for reactive charms
* ensure tox.ini is from release-tools
* ensure requirements.txt files are from release-tools
* On reactive charms:
  - ensure stable/21.04 branch for charms.openstack
  - ensure stable/21.04 branch for charm-helpers

Change-Id: I6c46959aa659454d28880e375e3488058227dca7
---
 charm-helpers-hooks.yaml                      |   2 +-
 .../charmhelpers/contrib/charmsupport/nrpe.py |   4 +-
 .../contrib/openstack/amulet/utils.py         |   1 +
 .../contrib/openstack/cert_utils.py           |  27 +-
 .../charmhelpers/contrib/openstack/context.py |  16 +-
 .../contrib/openstack/deferred_events.py      | 410 ++++++++++++++++++
 .../contrib/openstack/exceptions.py           |   5 +
 .../openstack/files/policy_rc_d_script.py     | 196 +++++++++
 .../contrib/openstack/policy_rcd.py           | 173 ++++++++
 hooks/charmhelpers/contrib/openstack/utils.py | 291 ++++++++++++-
 hooks/charmhelpers/core/hookenv.py            |  20 +
 hooks/charmhelpers/core/host.py               | 236 ++++++++--
 .../charmhelpers/core/host_factory/ubuntu.py  |  12 +-
 hooks/charmhelpers/fetch/__init__.py          |   1 +
 hooks/charmhelpers/fetch/ubuntu.py            |  88 +++-
 lib/charms_ceph/utils.py                      |  22 +-
 test-requirements.txt                         |   4 +-
 17 files changed, 1401 insertions(+), 107 deletions(-)
 create mode 100644 hooks/charmhelpers/contrib/openstack/deferred_events.py
 create mode 100755 hooks/charmhelpers/contrib/openstack/files/policy_rc_d_script.py
 create mode 100644 hooks/charmhelpers/contrib/openstack/policy_rcd.py

diff --git a/charm-helpers-hooks.yaml b/charm-helpers-hooks.yaml
index fa9cd645..9d1a4980 100644
--- a/charm-helpers-hooks.yaml
+++ b/charm-helpers-hooks.yaml
@@ -1,4 +1,4 @@
-repo: https://github.com/juju/charm-helpers
+repo: https://github.com/juju/charm-helpers@stable/21.04
 destination: hooks/charmhelpers
 include:
     - core
diff --git a/hooks/charmhelpers/contrib/charmsupport/nrpe.py b/hooks/charmhelpers/contrib/charmsupport/nrpe.py
index c87cf489..e4cb06bc 100644
--- a/hooks/charmhelpers/contrib/charmsupport/nrpe.py
+++ b/hooks/charmhelpers/contrib/charmsupport/nrpe.py
@@ -337,10 +337,8 @@ class NRPE(object):
                 "command": nrpecheck.command,
             }
             # If we were passed max_check_attempts, add that to the relation data
-            try:
+            if nrpecheck.max_check_attempts is not None:
                 nrpe_monitors[nrpecheck.shortname]['max_check_attempts'] = nrpecheck.max_check_attempts
-            except AttributeError:
-                pass
 
         # update-status hooks are configured to firing every 5 minutes by
         # default. When nagios-nrpe-server is restarted, the nagios server
diff --git a/hooks/charmhelpers/contrib/openstack/amulet/utils.py b/hooks/charmhelpers/contrib/openstack/amulet/utils.py
index 63aea1e3..0a14af7e 100644
--- a/hooks/charmhelpers/contrib/openstack/amulet/utils.py
+++ b/hooks/charmhelpers/contrib/openstack/amulet/utils.py
@@ -42,6 +42,7 @@ import pika
 import swiftclient
 
 from charmhelpers.core.decorators import retry_on_exception
+
 from charmhelpers.contrib.amulet.utils import (
     AmuletUtils
 )
diff --git a/hooks/charmhelpers/contrib/openstack/cert_utils.py b/hooks/charmhelpers/contrib/openstack/cert_utils.py
index 24867497..703fc6ef 100644
--- a/hooks/charmhelpers/contrib/openstack/cert_utils.py
+++ b/hooks/charmhelpers/contrib/openstack/cert_utils.py
@@ -47,7 +47,7 @@ from charmhelpers.contrib.network.ip import (
 )
 
 from charmhelpers.core.host import (
-    CA_CERT_DIR,
+    ca_cert_absolute_path,
     install_ca_cert,
     mkdir,
     write_file,
@@ -307,6 +307,26 @@ def install_certs(ssl_dir, certs, chain=None, user='root', group='root'):
             content=bundle['key'], perms=0o640)
 
 
+def get_cert_relation_ca_name(cert_relation_id=None):
+    """Determine CA certificate name as provided by relation.
+
+    The filename on disk depends on the name chosen for the application on the
+    providing end of the certificates relation.
+
+    :param cert_relation_id: (Optional) Relation id providing the certs
+    :type cert_relation_id: str
+    :returns: CA certificate filename without path nor extension
+    :rtype: str
+    """
+    if cert_relation_id is None:
+        try:
+            cert_relation_id = relation_ids('certificates')[0]
+        except IndexError:
+            return ''
+    return '{}_juju_ca_cert'.format(
+        remote_service_name(relid=cert_relation_id))
+
+
 def _manage_ca_certs(ca, cert_relation_id):
     """Manage CA certs.
 
@@ -316,7 +336,7 @@ def _manage_ca_certs(ca, cert_relation_id):
     :type cert_relation_id: str
     """
     config_ssl_ca = config('ssl_ca')
-    config_cert_file = '{}/{}.crt'.format(CA_CERT_DIR, CONFIG_CA_CERT_FILE)
+    config_cert_file = ca_cert_absolute_path(CONFIG_CA_CERT_FILE)
     if config_ssl_ca:
         log("Installing CA certificate from charm ssl_ca config to {}".format(
             config_cert_file), INFO)
@@ -329,8 +349,7 @@ def _manage_ca_certs(ca, cert_relation_id):
     log("Installing CA certificate from certificate relation", INFO)
     install_ca_cert(
         ca.encode(),
-        name='{}_juju_ca_cert'.format(
-            remote_service_name(relid=cert_relation_id)))
+        name=get_cert_relation_ca_name(cert_relation_id))
 
 
 def process_certificates(service_name, relation_id, unit,
diff --git a/hooks/charmhelpers/contrib/openstack/context.py b/hooks/charmhelpers/contrib/openstack/context.py
index c242d18d..b67dafda 100644
--- a/hooks/charmhelpers/contrib/openstack/context.py
+++ b/hooks/charmhelpers/contrib/openstack/context.py
@@ -74,7 +74,6 @@ from charmhelpers.core.host import (
     pwgen,
     lsb_release,
     CompareHostReleases,
-    is_container,
 )
 from charmhelpers.contrib.hahelpers.cluster import (
     determine_apache_port,
@@ -1596,16 +1595,21 @@ def _calculate_workers():
 
     @returns int: number of worker processes to use
     '''
-    multiplier = config('worker-multiplier') or DEFAULT_MULTIPLIER
+    multiplier = config('worker-multiplier')
+
+    # distinguish an empty config and an explicit config as 0.0
+    if multiplier is None:
+        multiplier = DEFAULT_MULTIPLIER
+
     count = int(_num_cpus() * multiplier)
-    if multiplier > 0 and count == 0:
+    if count <= 0:
+        # assign at least one worker
         count = 1
 
-    if config('worker-multiplier') is None and is_container():
+    if config('worker-multiplier') is None:
         # NOTE(jamespage): Limit unconfigured worker-multiplier
         #                  to MAX_DEFAULT_WORKERS to avoid insane
-        #                  worker configuration in LXD containers
-        #                  on large servers
+        #                  worker configuration on large servers
         # Reference: https://pad.lv/1665270
         count = min(count, MAX_DEFAULT_WORKERS)
 
diff --git a/hooks/charmhelpers/contrib/openstack/deferred_events.py b/hooks/charmhelpers/contrib/openstack/deferred_events.py
new file mode 100644
index 00000000..fd073a04
--- /dev/null
+++ b/hooks/charmhelpers/contrib/openstack/deferred_events.py
@@ -0,0 +1,410 @@
+# Copyright 2021 Canonical Limited.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Module for managing deferred service events.
+
+This module is used to manage deferred service events from both charm actions
+and package actions.
+"""
+
+import datetime
+import glob
+import yaml
+import os
+import time
+import uuid
+
+import charmhelpers.contrib.openstack.policy_rcd as policy_rcd
+import charmhelpers.core.hookenv as hookenv
+import charmhelpers.core.host as host
+import charmhelpers.core.unitdata as unitdata
+
+import subprocess
+
+
+# Deferred events generated from the charm are stored along side those
+# generated from packaging.
+DEFERRED_EVENTS_DIR = policy_rcd.POLICY_DEFERRED_EVENTS_DIR
+
+
+class ServiceEvent():
+
+    def __init__(self, timestamp, service, reason, action,
+                 policy_requestor_name=None, policy_requestor_type=None):
+        self.timestamp = timestamp
+        self.service = service
+        self.reason = reason
+        self.action = action
+        if not policy_requestor_name:
+            self.policy_requestor_name = hookenv.service_name()
+        if not policy_requestor_type:
+            self.policy_requestor_type = 'charm'
+
+    def __eq__(self, other):
+        for attr in vars(self):
+            if getattr(self, attr) != getattr(other, attr):
+                return False
+        return True
+
+    def matching_request(self, other):
+        for attr in ['service', 'action', 'reason']:
+            if getattr(self, attr) != getattr(other, attr):
+                return False
+        return True
+
+    @classmethod
+    def from_dict(cls, data):
+        return cls(
+            data['timestamp'],
+            data['service'],
+            data['reason'],
+            data['action'],
+            data.get('policy_requestor_name'),
+            data.get('policy_requestor_type'))
+
+
+def deferred_events_files():
+    """Deferred event files
+
+    Deferred event files that were generated by service_name() policy.
+
+    :returns: Deferred event files
+    :rtype: List[str]
+    """
+    return glob.glob('{}/*.deferred'.format(DEFERRED_EVENTS_DIR))
+
+
+def read_event_file(file_name):
+    """Read a file and return the corresponding objects.
+
+    :param file_name: Name of file to read.
+    :type file_name: str
+    :returns: ServiceEvent from file.
+    :rtype: ServiceEvent
+    """
+    with open(file_name, 'r') as f:
+        contents = yaml.safe_load(f)
+    event = ServiceEvent(
+        contents['timestamp'],
+        contents['service'],
+        contents['reason'],
+        contents['action'])
+    return event
+
+
+def deferred_events():
+    """Get list of deferred events.
+
+    List of deferred events. Events are represented by dicts of the form:
+
+       {
+           action: restart,
+           policy_requestor_name: neutron-openvswitch,
+           policy_requestor_type: charm,
+           reason: 'Pkg update',
+           service: openvswitch-switch,
+           time: 1614328743}
+
+    :returns: List of deferred events.
+    :rtype: List[ServiceEvent]
+    """
+    events = []
+    for defer_file in deferred_events_files():
+        events.append((defer_file, read_event_file(defer_file)))
+    return events
+
+
+def duplicate_event_files(event):
+    """Get list of event files that have equivalent deferred events.
+
+    :param event: Event to compare
+    :type event: ServiceEvent
+    :returns: List of event files
+    :rtype: List[str]
+    """
+    duplicates = []
+    for event_file, existing_event in deferred_events():
+        if event.matching_request(existing_event):
+            duplicates.append(event_file)
+    return duplicates
+
+
+def get_event_record_file(policy_requestor_type, policy_requestor_name):
+    """Generate filename for storing a new event.
+
+    :param policy_requestor_type: System that blocked event
+    :type policy_requestor_type: str
+    :param policy_requestor_name: Name of application that blocked event
+    :type policy_requestor_name: str
+    :returns: File name
+    :rtype: str
+    """
+    file_name = '{}/{}-{}-{}.deferred'.format(
+        DEFERRED_EVENTS_DIR,
+        policy_requestor_type,
+        policy_requestor_name,
+        uuid.uuid1())
+    return file_name
+
+
+def save_event(event):
+    """Write deferred events to backend.
+
+    :param event: Event to save
+    :type event: ServiceEvent
+    """
+    requestor_name = hookenv.service_name()
+    requestor_type = 'charm'
+    init_policy_log_dir()
+    if duplicate_event_files(event):
+        hookenv.log(
+            "Not writing new event, existing event found. {} {} {}".format(
+                event.service,
+                event.action,
+                event.reason),
+            level="DEBUG")
+    else:
+        record_file = get_event_record_file(
+            policy_requestor_type=requestor_type,
+            policy_requestor_name=requestor_name)
+
+        with open(record_file, 'w') as f:
+            data = {
+                'timestamp': event.timestamp,
+                'service': event.service,
+                'action': event.action,
+                'reason': event.reason,
+                'policy_requestor_type': requestor_type,
+                'policy_requestor_name': requestor_name}
+            yaml.dump(data, f)
+
+
+def clear_deferred_events(svcs, action):
+    """Remove any outstanding deferred events.
+
+    Remove a deferred event if its service is in the services list and its
+    action matches.
+
+    :param svcs: List of services to remove.
+    :type svcs: List[str]
+    :param action: Action to remove
+    :type action: str
+    """
+    # XXX This function is not currently processing the action. It needs to
+    #     match the action and also take account of try-restart and the
+    #     equivalnce of stop-start and restart.
+    for defer_file in deferred_events_files():
+        deferred_event = read_event_file(defer_file)
+        if deferred_event.service in svcs:
+            os.remove(defer_file)
+
+
+def init_policy_log_dir():
+    """Ensure directory to store events exists."""
+    if not os.path.exists(DEFERRED_EVENTS_DIR):
+        os.mkdir(DEFERRED_EVENTS_DIR)
+
+
+def get_deferred_events():
+    """Return a list of deferred events requested by the charm and packages.
+
+    :returns: List of deferred events
+    :rtype: List[ServiceEvent]
+    """
+    events = []
+    for _, event in deferred_events():
+        events.append(event)
+    return events
+
+
+def get_deferred_restarts():
+    """List of deferred restart events requested by the charm and packages.
+
+    :returns: List of deferred restarts
+    :rtype: List[ServiceEvent]
+    """
+    return [e for e in get_deferred_events() if e.action == 'restart']
+
+
+def clear_deferred_restarts(services):
+    """Clear deferred restart events targetted at `services`.
+
+    :param services: Services with deferred actions to clear.
+    :type services: List[str]
+    """
+    clear_deferred_events(services, 'restart')
+
+
+def process_svc_restart(service):
+    """Respond to a service restart having occured.
+
+    :param service: Services that the action was performed against.
+    :type service: str
+    """
+    clear_deferred_restarts([service])
+
+
+def is_restart_permitted():
+    """Check whether restarts are permitted.
+
+    :returns: Whether restarts are permitted
+    :rtype: bool
+    """
+    if hookenv.config('enable-auto-restarts') is None:
+        return True
+    return hookenv.config('enable-auto-restarts')
+
+
+def check_and_record_restart_request(service, changed_files):
+    """Check if restarts are permitted, if they are not log the request.
+
+    :param service: Service to be restarted
+    :type service: str
+    :param changed_files: Files that have changed to trigger restarts.
+    :type changed_files: List[str]
+    :returns: Whether restarts are permitted
+    :rtype: bool
+    """
+    changed_files = sorted(list(set(changed_files)))
+    permitted = is_restart_permitted()
+    if not permitted:
+        save_event(ServiceEvent(
+            timestamp=round(time.time()),
+            service=service,
+            reason='File(s) changed: {}'.format(
+                ', '.join(changed_files)),
+            action='restart'))
+    return permitted
+
+
+def deferrable_svc_restart(service, reason=None):
+    """Restarts service if permitted, if not defer it.
+
+    :param service: Service to be restarted
+    :type service: str
+    :param reason: Reason for restart
+    :type reason: Union[str, None]
+    """
+    if is_restart_permitted():
+        host.service_restart(service)
+    else:
+        save_event(ServiceEvent(
+            timestamp=round(time.time()),
+            service=service,
+            reason=reason,
+            action='restart'))
+
+
+def configure_deferred_restarts(services):
+    """Setup deferred restarts.
+
+    :param services: Services to block restarts of.
+    :type services: List[str]
+    """
+    policy_rcd.install_policy_rcd()
+    if is_restart_permitted():
+        policy_rcd.remove_policy_file()
+    else:
+        blocked_actions = ['stop', 'restart', 'try-restart']
+        for svc in services:
+            policy_rcd.add_policy_block(svc, blocked_actions)
+
+
+def get_service_start_time(service):
+    """Find point in time when the systemd unit transitioned to active state.
+
+    :param service: Services to check timetsamp of.
+    :type service: str
+    """
+    start_time = None
+    out = subprocess.check_output(
+        [
+            'systemctl',
+            'show',
+            service,
+            '--property=ActiveEnterTimestamp'])
+    str_time = out.decode().rstrip().replace('ActiveEnterTimestamp=', '')
+    if str_time:
+        start_time = datetime.datetime.strptime(
+            str_time,
+            '%a %Y-%m-%d %H:%M:%S %Z')
+    return start_time
+
+
+def check_restart_timestamps():
+    """Check deferred restarts against systemd units start time.
+
+    Check if a service has a deferred event and clear it if it has been
+    subsequently restarted.
+    """
+    for event in get_deferred_restarts():
+        start_time = get_service_start_time(event.service)
+        deferred_restart_time = datetime.datetime.fromtimestamp(
+            event.timestamp)
+        if start_time and start_time < deferred_restart_time:
+            hookenv.log(
+                ("Restart still required, {} was started at {}, restart was "
+                 "requested after that at {}").format(
+                    event.service,
+                    start_time,
+                    deferred_restart_time),
+                level='DEBUG')
+        else:
+            clear_deferred_restarts([event.service])
+
+
+def set_deferred_hook(hookname):
+    """Record that a hook has been deferred.
+
+    :param hookname: Name of hook that was deferred.
+    :type hookname: str
+    """
+    with unitdata.HookData()() as t:
+        kv = t[0]
+        deferred_hooks = kv.get('deferred-hooks', [])
+        if hookname not in deferred_hooks:
+            deferred_hooks.append(hookname)
+            kv.set('deferred-hooks', sorted(list(set(deferred_hooks))))
+
+
+def get_deferred_hooks():
+    """Get a list of deferred hooks.
+
+    :returns: List of hook names.
+    :rtype: List[str]
+    """
+    with unitdata.HookData()() as t:
+        kv = t[0]
+        return kv.get('deferred-hooks', [])
+
+
+def clear_deferred_hooks():
+    """Clear any deferred hooks."""
+    with unitdata.HookData()() as t:
+        kv = t[0]
+        kv.set('deferred-hooks', [])
+
+
+def clear_deferred_hook(hookname):
+    """Clear a specific deferred hooks.
+
+    :param hookname: Name of hook to remove.
+    :type hookname: str
+    """
+    with unitdata.HookData()() as t:
+        kv = t[0]
+        deferred_hooks = kv.get('deferred-hooks', [])
+        if hookname in deferred_hooks:
+            deferred_hooks.remove(hookname)
+            kv.set('deferred-hooks', deferred_hooks)
diff --git a/hooks/charmhelpers/contrib/openstack/exceptions.py b/hooks/charmhelpers/contrib/openstack/exceptions.py
index f85ae4f4..b2330637 100644
--- a/hooks/charmhelpers/contrib/openstack/exceptions.py
+++ b/hooks/charmhelpers/contrib/openstack/exceptions.py
@@ -19,3 +19,8 @@ class OSContextError(Exception):
     This exception is principally used in contrib.openstack.context
     """
     pass
+
+
+class ServiceActionError(Exception):
+    """Raised when a service action (stop/start/ etc) failed."""
+    pass
diff --git a/hooks/charmhelpers/contrib/openstack/files/policy_rc_d_script.py b/hooks/charmhelpers/contrib/openstack/files/policy_rc_d_script.py
new file mode 100755
index 00000000..344a7662
--- /dev/null
+++ b/hooks/charmhelpers/contrib/openstack/files/policy_rc_d_script.py
@@ -0,0 +1,196 @@
+#!/usr/bin/env python3
+
+"""This script is an implemenation of policy-rc.d
+
+For further information on policy-rc.d see *1
+
+*1 https://people.debian.org/~hmh/invokerc.d-policyrc.d-specification.txt
+"""
+import collections
+import glob
+import os
+import logging
+import sys
+import time
+import uuid
+import yaml
+
+
+SystemPolicy = collections.namedtuple(
+    'SystemPolicy',
+    [
+        'policy_requestor_name',
+        'policy_requestor_type',
+        'service',
+        'blocked_actions'])
+
+DEFAULT_POLICY_CONFIG_DIR = '/etc/policy-rc.d'
+DEFAULT_POLICY_LOG_DIR = '/var/lib/policy-rc.d'
+
+
+def read_policy_file(policy_file):
+    """Return system policies from given file.
+
+    :param file_name: Name of file to read.
+    :type file_name: str
+    :returns: Policy
+    :rtype: List[SystemPolicy]
+    """
+    policies = []
+    if os.path.exists(policy_file):
+        with open(policy_file, 'r') as f:
+            policy = yaml.safe_load(f)
+        for service, actions in policy['blocked_actions'].items():
+            service = service.replace('.service', '')
+            policies.append(SystemPolicy(
+                policy_requestor_name=policy['policy_requestor_name'],
+                policy_requestor_type=policy['policy_requestor_type'],
+                service=service,
+                blocked_actions=actions))
+    return policies
+
+
+def get_policies(policy_config_dir):
+    """Return all system policies in policy_config_dir.
+
+    :param policy_config_dir: Name of file to read.
+    :type policy_config_dir: str
+    :returns: Policy
+    :rtype: List[SystemPolicy]
+    """
+    _policy = []
+    for f in glob.glob('{}/*.policy'.format(policy_config_dir)):
+        _policy.extend(read_policy_file(f))
+    return _policy
+
+
+def record_blocked_action(service, action, blocking_policies, policy_log_dir):
+    """Record that an action was requested but deniedl
+
+    :param service: Service that was blocked
+    :type service: str
+    :param action: Action that was blocked.
+    :type action: str
+    :param blocking_policies: Policies that blocked the action on the service.
+    :type blocking_policies: List[SystemPolicy]
+    :param policy_log_dir: Directory to place the blocking action record.
+    :type policy_log_dir: str
+    """
+    if not os.path.exists(policy_log_dir):
+        os.mkdir(policy_log_dir)
+    seconds = round(time.time())
+    for policy in blocking_policies:
+        if not os.path.exists(policy_log_dir):
+            os.mkdir(policy_log_dir)
+        file_name = '{}/{}-{}-{}.deferred'.format(
+            policy_log_dir,
+            policy.policy_requestor_type,
+            policy.policy_requestor_name,
+            uuid.uuid1())
+        with open(file_name, 'w') as f:
+            data = {
+                'timestamp': seconds,
+                'service': service,
+                'action': action,
+                'reason': 'Package update',
+                'policy_requestor_type': policy.policy_requestor_type,
+                'policy_requestor_name': policy.policy_requestor_name}
+            yaml.dump(data, f)
+
+
+def get_blocking_policies(service, action, policy_config_dir):
+    """Record that an action was requested but deniedl
+
+    :param service: Service that action is requested against.
+    :type service: str
+    :param action: Action that is requested.
+    :type action: str
+    :param policy_config_dir: Directory that stores policy files.
+    :type policy_config_dir: str
+    :returns: Policies
+    :rtype: List[SystemPolicy]
+    """
+    service = service.replace('.service', '')
+    blocking_policies = [
+        policy
+        for policy in get_policies(policy_config_dir)
+        if policy.service == service and action in policy.blocked_actions]
+    return blocking_policies
+
+
+def process_action_request(service, action, policy_config_dir, policy_log_dir):
+    """Take the requested action against service and check if it is permitted.
+
+    :param service: Service that action is requested against.
+    :type service: str
+    :param action: Action that is requested.
+    :type action: str
+    :param policy_config_dir: Directory that stores policy files.
+    :type policy_config_dir: str
+    :param policy_log_dir: Directory that stores policy files.
+    :type policy_log_dir: str
+    :returns: Tuple of whether the action is permitted and explanation.
+    :rtype: (boolean, str)
+    """
+    blocking_policies = get_blocking_policies(
+        service,
+        action,
+        policy_config_dir)
+    if blocking_policies:
+        policy_msg = [
+            '{} {}'.format(p.policy_requestor_type, p.policy_requestor_name)
+            for p in sorted(blocking_policies)]
+        message = '{} of {} blocked by {}'.format(
+            action,
+            service,
+            ', '.join(policy_msg))
+        record_blocked_action(
+            service,
+            action,
+            blocking_policies,
+            policy_log_dir)
+        action_permitted = False
+    else:
+        message = "Permitting {} {}".format(service, action)
+        action_permitted = True
+    return action_permitted, message
+
+
+def main():
+    logging.basicConfig(
+        filename='/var/log/policy-rc.d.log',
+        level=logging.DEBUG,
+        format='%(asctime)s %(message)s')
+
+    service = sys.argv[1]
+    action = sys.argv[2]
+
+    permitted, message = process_action_request(
+        service,
+        action,
+        DEFAULT_POLICY_CONFIG_DIR,
+        DEFAULT_POLICY_LOG_DIR)
+    logging.info(message)
+
+    # https://people.debian.org/~hmh/invokerc.d-policyrc.d-specification.txt
+    # Exit status codes:
+    #  0 - action allowed
+    #  1 - unknown action (therefore, undefined policy)
+    # 100 - unknown initscript id
+    # 101 - action forbidden by policy
+    # 102 - subsystem error
+    # 103 - syntax error
+    # 104 - [reserved]
+    # 105 - behaviour uncertain, policy undefined.
+    # 106 - action not allowed. Use the returned fallback actions
+    #       (which are implied to be "allowed") instead.
+
+    if permitted:
+        return 0
+    else:
+        return 101
+
+
+if __name__ == "__main__":
+    rc = main()
+    sys.exit(rc)
diff --git a/hooks/charmhelpers/contrib/openstack/policy_rcd.py b/hooks/charmhelpers/contrib/openstack/policy_rcd.py
new file mode 100644
index 00000000..ecffbc68
--- /dev/null
+++ b/hooks/charmhelpers/contrib/openstack/policy_rcd.py
@@ -0,0 +1,173 @@
+# Copyright 2021 Canonical Limited.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Module for managing policy-rc.d script and associated files.
+
+This module manages the installation of /usr/sbin/policy-rc.d, the
+policy files and the event files. When a package update occurs the
+packaging system calls:
+
+policy-rc.d [options] <initscript ID> <actions>
+
+The return code of the script determines if the packaging system
+will perform that action on the given service. The policy-rc.d
+implementation installed by this module checks if an action is
+permitted by checking policy files placed in /etc/policy-rc.d.
+If a policy file exists which denies the requested action then
+this is recorded in an event file which is placed in
+/var/lib/policy-rc.d.
+"""
+
+import os
+import shutil
+import tempfile
+import yaml
+
+import charmhelpers.contrib.openstack.files as os_files
+import charmhelpers.contrib.openstack.alternatives as alternatives
+import charmhelpers.core.hookenv as hookenv
+import charmhelpers.core.host as host
+
+POLICY_HEADER = """# Managed by juju\n"""
+POLICY_DEFERRED_EVENTS_DIR = '/var/lib/policy-rc.d'
+POLICY_CONFIG_DIR = '/etc/policy-rc.d'
+
+
+def get_policy_file_name():
+    """Get the name of the policy file for this application.
+
+    :returns: Policy file name
+    :rtype: str
+    """
+    application_name = hookenv.service_name()
+    return '{}/charm-{}.policy'.format(POLICY_CONFIG_DIR, application_name)
+
+
+def read_default_policy_file():
+    """Return the policy file.
+
+    A policy is in the form:
+        blocked_actions:
+            neutron-dhcp-agent: [restart, stop, try-restart]
+            neutron-l3-agent: [restart, stop, try-restart]
+            neutron-metadata-agent: [restart, stop, try-restart]
+            neutron-openvswitch-agent: [restart, stop, try-restart]
+            openvswitch-switch: [restart, stop, try-restart]
+            ovs-vswitchd: [restart, stop, try-restart]
+            ovs-vswitchd-dpdk: [restart, stop, try-restart]
+            ovsdb-server: [restart, stop, try-restart]
+        policy_requestor_name: neutron-openvswitch
+        policy_requestor_type: charm
+
+    :returns: Policy
+    :rtype: Dict[str, Union[str, Dict[str, List[str]]]
+    """
+    policy = {}
+    policy_file = get_policy_file_name()
+    if os.path.exists(policy_file):
+        with open(policy_file, 'r') as f:
+            policy = yaml.safe_load(f)
+    return policy
+
+
+def write_policy_file(policy_file, policy):
+    """Write policy to disk.
+
+    :param policy_file: Name of policy file
+    :type policy_file: str
+    :param policy: Policy
+    :type policy: Dict[str, Union[str, Dict[str, List[str]]]]
+    """
+    with tempfile.NamedTemporaryFile('w', delete=False) as f:
+        f.write(POLICY_HEADER)
+        yaml.dump(policy, f)
+        tmp_file_name = f.name
+    shutil.move(tmp_file_name, policy_file)
+
+
+def remove_policy_file():
+    """Remove policy file."""
+    try:
+        os.remove(get_policy_file_name())
+    except FileNotFoundError:
+        pass
+
+
+def install_policy_rcd():
+    """Install policy-rc.d components."""
+    source_file_dir = os.path.dirname(os.path.abspath(os_files.__file__))
+    policy_rcd_exec = "/var/lib/charm/{}/policy-rc.d".format(
+        hookenv.service_name())
+    host.mkdir(os.path.dirname(policy_rcd_exec))
+    shutil.copy2(
+        '{}/policy_rc_d_script.py'.format(source_file_dir),
+        policy_rcd_exec)
+    # policy-rc.d must be installed via the alternatives system:
+    # https://people.debian.org/~hmh/invokerc.d-policyrc.d-specification.txt
+    if not os.path.exists('/usr/sbin/policy-rc.d'):
+        alternatives.install_alternative(
+            'policy-rc.d',
+            '/usr/sbin/policy-rc.d',
+            policy_rcd_exec)
+    host.mkdir(POLICY_CONFIG_DIR)
+
+
+def get_default_policy():
+    """Return the default policy structure.
+
+    :returns: Policy
+    :rtype: Dict[str, Union[str, Dict[str, List[str]]]
+    """
+    policy = {
+        'policy_requestor_name': hookenv.service_name(),
+        'policy_requestor_type': 'charm',
+        'blocked_actions': {}}
+    return policy
+
+
+def add_policy_block(service, blocked_actions):
+    """Update a policy file with new list of actions.
+
+    :param service: Service name
+    :type service: str
+    :param blocked_actions: Action to block
+    :type blocked_actions: List[str]
+    """
+    policy = read_default_policy_file() or get_default_policy()
+    policy_file = get_policy_file_name()
+    if policy['blocked_actions'].get(service):
+        policy['blocked_actions'][service].extend(blocked_actions)
+    else:
+        policy['blocked_actions'][service] = blocked_actions
+    policy['blocked_actions'][service] = sorted(
+        list(set(policy['blocked_actions'][service])))
+    write_policy_file(policy_file, policy)
+
+
+def remove_policy_block(service, unblocked_actions):
+    """Remove list of actions from policy file.
+
+    :param service: Service name
+    :type service: str
+    :param unblocked_actions: Action to unblock
+    :type unblocked_actions: List[str]
+    """
+    policy_file = get_policy_file_name()
+    policy = read_default_policy_file()
+    for action in unblocked_actions:
+        try:
+            policy['blocked_actions'][service].remove(action)
+        except (KeyError, ValueError):
+            continue
+    write_policy_file(policy_file, policy)
diff --git a/hooks/charmhelpers/contrib/openstack/utils.py b/hooks/charmhelpers/contrib/openstack/utils.py
index f27aa6c9..2ad8ab94 100644
--- a/hooks/charmhelpers/contrib/openstack/utils.py
+++ b/hooks/charmhelpers/contrib/openstack/utils.py
@@ -14,7 +14,7 @@
 
 # Common python helper functions used for OpenStack charms.
 from collections import OrderedDict, namedtuple
-from functools import wraps
+from functools import partial, wraps
 
 import subprocess
 import json
@@ -36,9 +36,12 @@ from charmhelpers.contrib.network import ip
 
 from charmhelpers.core import decorators, unitdata
 
+import charmhelpers.contrib.openstack.deferred_events as deferred_events
+
 from charmhelpers.core.hookenv import (
     WORKLOAD_STATES,
     action_fail,
+    action_get,
     action_set,
     config,
     expected_peer_units,
@@ -112,7 +115,7 @@ from charmhelpers.fetch.snap import (
 
 from charmhelpers.contrib.storage.linux.utils import is_block_device, zap_disk
 from charmhelpers.contrib.storage.linux.loopback import ensure_loopback_device
-from charmhelpers.contrib.openstack.exceptions import OSContextError
+from charmhelpers.contrib.openstack.exceptions import OSContextError, ServiceActionError
 from charmhelpers.contrib.openstack.policyd import (
     policyd_status_message_prefix,
     POLICYD_CONFIG_NAME,
@@ -148,6 +151,7 @@ OPENSTACK_RELEASES = (
     'train',
     'ussuri',
     'victoria',
+    'wallaby',
 )
 
 UBUNTU_OPENSTACK_RELEASE = OrderedDict([
@@ -170,6 +174,7 @@ UBUNTU_OPENSTACK_RELEASE = OrderedDict([
     ('eoan', 'train'),
     ('focal', 'ussuri'),
     ('groovy', 'victoria'),
+    ('hirsute', 'wallaby'),
 ])
 
 
@@ -193,6 +198,7 @@ OPENSTACK_CODENAMES = OrderedDict([
     ('2019.2', 'train'),
     ('2020.1', 'ussuri'),
     ('2020.2', 'victoria'),
+    ('2021.1', 'wallaby'),
 ])
 
 # The ugly duckling - must list releases oldest to newest
@@ -301,8 +307,8 @@ PACKAGE_CODENAMES = {
         ('14', 'rocky'),
         ('15', 'stein'),
         ('16', 'train'),
-        ('18', 'ussuri'),
-        ('19', 'victoria'),
+        ('18', 'ussuri'),  # Note this was actually 17.0 - 18.3
+        ('19', 'victoria'),  # Note this is really 18.6
     ]),
     'ceilometer-common': OrderedDict([
         ('5', 'liberty'),
@@ -483,9 +489,26 @@ def get_swift_codename(version):
     return None
 
 
-@deprecate("moved to charmhelpers.contrib.openstack.utils.get_installed_os_version()", "2021-01", log=juju_log)
 def get_os_codename_package(package, fatal=True):
-    '''Derive OpenStack release codename from an installed package.'''
+    """Derive OpenStack release codename from an installed package.
+
+    Initially, see if the openstack-release pkg is available (by trying to
+    install it) and use it instead.
+
+    If it isn't then it falls back to the existing method of checking the
+    version of the package passed and then resolving the version from that
+    using lookup tables.
+
+    Note: if possible, charms should use get_installed_os_version() to
+    determine the version of the "openstack-release" pkg.
+
+    :param package: the package to test for version information.
+    :type package: str
+    :param fatal: If True (default), then die via error_out()
+    :type fatal: bool
+    :returns: the OpenStack release codename (e.g. ussuri)
+    :rtype: str
+    """
 
     codename = get_installed_os_version()
     if codename:
@@ -579,8 +602,22 @@ def get_os_version_package(pkg, fatal=True):
 
 
 def get_installed_os_version():
-    apt_install(filter_installed_packages(['openstack-release']), fatal=False)
-    print("OpenStack Release: {}".format(openstack_release()))
+    """Determine the OpenStack release code name from openstack-release pkg.
+
+    This uses the "openstack-release" pkg (if it exists) to return the
+    OpenStack release codename (e.g. usurri, mitaka, ocata, etc.)
+
+    Note, it caches the result so that it is only done once per hook.
+
+    :returns: the OpenStack release codename, if available
+    :rtype: Optional[str]
+    """
+    @cached
+    def _do_install():
+        apt_install(filter_installed_packages(['openstack-release']),
+                    fatal=False, quiet=True)
+
+    _do_install()
     return openstack_release().get('OPENSTACK_CODENAME')
 
 
@@ -1052,6 +1089,18 @@ def _determine_os_workload_status(
     try:
         if config(POLICYD_CONFIG_NAME):
             message = "{} {}".format(policyd_status_message_prefix(), message)
+        deferred_restarts = list(set(
+            [e.service for e in deferred_events.get_deferred_restarts()]))
+        if deferred_restarts:
+            svc_msg = "Services queued for restart: {}".format(
+                ', '.join(sorted(deferred_restarts)))
+            message = "{}. {}".format(message, svc_msg)
+        deferred_hooks = deferred_events.get_deferred_hooks()
+        if deferred_hooks:
+            svc_msg = "Hooks skipped due to disabled auto restarts: {}".format(
+                ', '.join(sorted(deferred_hooks)))
+            message = "{}. {}".format(message, svc_msg)
+
     except Exception:
         pass
 
@@ -1536,6 +1585,33 @@ def is_unit_paused_set():
         return False
 
 
+def is_hook_allowed(hookname, check_deferred_restarts=True):
+    """Check if hook can run.
+
+    :param hookname: Name of hook to check..
+    :type hookname: str
+    :param check_deferred_restarts: Whether to check deferred restarts.
+    :type check_deferred_restarts: bool
+    """
+    permitted = True
+    reasons = []
+    if is_unit_paused_set():
+        reasons.append(
+            "Unit is pause or upgrading. Skipping {}".format(hookname))
+        permitted = False
+
+    if check_deferred_restarts:
+        if deferred_events.is_restart_permitted():
+            permitted = True
+            deferred_events.clear_deferred_hook(hookname)
+        else:
+            if not config().changed('enable-auto-restarts'):
+                deferred_events.set_deferred_hook(hookname)
+            reasons.append("auto restarts are disabled")
+            permitted = False
+    return permitted, " and ".join(reasons)
+
+
 def manage_payload_services(action, services=None, charm_func=None):
     """Run an action against all services.
 
@@ -1696,6 +1772,43 @@ def resume_unit(assess_status_func, services=None, ports=None,
         raise Exception("Couldn't resume: {}".format("; ".join(messages)))
 
 
+def restart_services_action(services=None, when_all_stopped_func=None,
+                            deferred_only=None):
+    """Manage a service restart request via charm action.
+
+    :param services: Services to be restarted
+    :type model_name: List[str]
+    :param when_all_stopped_func: Function to call when all services are
+                                  stopped.
+    :type when_all_stopped_func: Callable[]
+    :param model_name: Only restart services which have a deferred restart
+                       event.
+    :type model_name: bool
+    """
+    if services and deferred_only:
+        raise ValueError(
+            "services and deferred_only are mutually exclusive")
+    if deferred_only:
+        services = list(set(
+            [a.service for a in deferred_events.get_deferred_restarts()]))
+    _, messages = manage_payload_services(
+        'stop',
+        services=services,
+        charm_func=when_all_stopped_func)
+    if messages:
+        raise ServiceActionError(
+            "Error processing service stop request: {}".format(
+                "; ".join(messages)))
+    _, messages = manage_payload_services(
+        'start',
+        services=services)
+    if messages:
+        raise ServiceActionError(
+            "Error processing service start request: {}".format(
+                "; ".join(messages)))
+    deferred_events.clear_deferred_restarts(services)
+
+
 def make_assess_status_func(*args, **kwargs):
     """Creates an assess_status_func() suitable for handing to pause_unit()
     and resume_unit().
@@ -1717,7 +1830,10 @@ def make_assess_status_func(*args, **kwargs):
 
 
 def pausable_restart_on_change(restart_map, stopstart=False,
-                               restart_functions=None):
+                               restart_functions=None,
+                               can_restart_now_f=None,
+                               post_svc_restart_f=None,
+                               pre_restarts_wait_f=None):
     """A restart_on_change decorator that checks to see if the unit is
     paused. If it is paused then the decorated function doesn't fire.
 
@@ -1743,11 +1859,28 @@ def pausable_restart_on_change(restart_map, stopstart=False,
     function won't be called if the decorated function is never called.  Note,
     retains backwards compatibility for passing a non-callable dictionary.
 
-    @param f: the function to decorate
-    @param restart_map: (optionally callable, which then returns the
-        restart_map) the restart map {conf_file: [services]}
-    @param stopstart: DEFAULT false; whether to stop, start or just restart
-    @returns decorator to use a restart_on_change with pausability
+    :param f: function to decorate.
+    :type f: Callable
+    :param restart_map: Optionally callable, which then returns the restart_map or
+                        the restart map {conf_file: [services]}
+    :type restart_map: Union[Callable[[],], Dict[str, List[str,]]
+    :param stopstart: whether to stop, start or restart a service
+    :type stopstart: booleean
+    :param restart_functions: nonstandard functions to use to restart services
+                              {svc: func, ...}
+    :type restart_functions: Dict[str, Callable[[str], None]]
+    :param can_restart_now_f: A function used to check if the restart is
+                              permitted.
+    :type can_restart_now_f: Callable[[str, List[str]], boolean]
+    :param post_svc_restart_f: A function run after a service has
+                               restarted.
+    :type post_svc_restart_f: Callable[[str], None]
+    :param pre_restarts_wait_f: A function callled before any restarts.
+    :type pre_restarts_wait_f: Callable[None, None]
+    :returns: decorator to use a restart_on_change with pausability
+    :rtype: decorator
+
+
     """
     def wrap(f):
         # py27 compatible nonlocal variable.  When py3 only, replace with
@@ -1763,8 +1896,13 @@ def pausable_restart_on_change(restart_map, stopstart=False,
                     if callable(restart_map) else restart_map
             # otherwise, normal restart_on_change functionality
             return restart_on_change_helper(
-                (lambda: f(*args, **kwargs)), __restart_map_cache['cache'],
-                stopstart, restart_functions)
+                (lambda: f(*args, **kwargs)),
+                __restart_map_cache['cache'],
+                stopstart,
+                restart_functions,
+                can_restart_now_f,
+                post_svc_restart_f,
+                pre_restarts_wait_f)
         return wrapped_f
     return wrap
 
@@ -2145,6 +2283,23 @@ def container_scoped_relations():
     return relations
 
 
+def container_scoped_relation_get(attribute=None):
+    """Get relation data from all container scoped relations.
+
+    :param attribute: Name of attribute to get
+    :type attribute: Optional[str]
+    :returns: Iterator with relation data
+    :rtype: Iterator[Optional[any]]
+    """
+    for endpoint_name in container_scoped_relations():
+        for rid in relation_ids(endpoint_name):
+            for unit in related_units(rid):
+                yield relation_get(
+                    attribute=attribute,
+                    unit=unit,
+                    rid=rid)
+
+
 def is_db_ready(use_current_context=False, rel_name=None):
     """Check remote database is ready to be used.
 
@@ -2418,3 +2573,107 @@ def get_api_application_status():
             msg = 'Some units are not ready'
     juju_log(msg, 'DEBUG')
     return app_state, msg
+
+
+def sequence_status_check_functions(*functions):
+    """Sequence the functions passed so that they all get a chance to run as
+    the charm status check functions.
+
+    :param *functions: a list of functions that return (state, message)
+    :type *functions: List[Callable[[OSConfigRender], (str, str)]]
+    :returns: the Callable that takes configs and returns (state, message)
+    :rtype: Callable[[OSConfigRender], (str, str)]
+    """
+    def _inner_sequenced_functions(configs):
+        state, message = 'unknown', ''
+        for f in functions:
+            new_state, new_message = f(configs)
+            state = workload_state_compare(state, new_state)
+            if message:
+                message = "{}, {}".format(message, new_message)
+            else:
+                message = new_message
+        return state, message
+
+    return _inner_sequenced_functions
+
+
+SubordinatePackages = namedtuple('SubordinatePackages', ['install', 'purge'])
+
+
+def get_subordinate_release_packages(os_release, package_type='deb'):
+    """Iterate over subordinate relations and get package information.
+
+    :param os_release: OpenStack release to look for
+    :type os_release: str
+    :param package_type: Package type (one of 'deb' or 'snap')
+    :type package_type: str
+    :returns: Packages to install and packages to purge or None
+    :rtype: SubordinatePackages[set,set]
+    """
+    install = set()
+    purge = set()
+
+    for rdata in container_scoped_relation_get('releases-packages-map'):
+        rp_map = json.loads(rdata or '{}')
+        # The map provided by subordinate has OpenStack release name as key.
+        # Find package information from subordinate matching requested release
+        # or the most recent release prior to requested release by sorting the
+        # keys in reverse order. This follows established patterns in our
+        # charms for templates and reactive charm implementations, i.e. as long
+        # as nothing has changed the definitions for the prior OpenStack
+        # release is still valid.
+        for release in sorted(rp_map.keys(), reverse=True):
+            if (CompareOpenStackReleases(release) <= os_release and
+                    package_type in rp_map[release]):
+                for name, container in (
+                        ('install', install),
+                        ('purge', purge)):
+                    for pkg in rp_map[release][package_type].get(name, []):
+                        container.add(pkg)
+                break
+    return SubordinatePackages(install, purge)
+
+
+os_restart_on_change = partial(
+    pausable_restart_on_change,
+    can_restart_now_f=deferred_events.check_and_record_restart_request,
+    post_svc_restart_f=deferred_events.process_svc_restart)
+
+
+def restart_services_action_helper(all_services):
+    """Helper to run the restart-services action.
+
+    NOTE: all_services is all services that could be restarted but
+          depending on the action arguments it may be a subset of
+          these that are actually restarted.
+
+    :param all_services: All services that could be restarted
+    :type all_services: List[str]
+    """
+    deferred_only = action_get("deferred-only")
+    services = action_get("services")
+    if services:
+        services = services.split()
+    else:
+        services = all_services
+    if deferred_only:
+        restart_services_action(deferred_only=True)
+    else:
+        restart_services_action(services=services)
+
+
+def show_deferred_events_action_helper():
+    """Helper to run the show-deferred-restarts action."""
+    restarts = []
+    for event in deferred_events.get_deferred_events():
+        restarts.append('{} {} {}'.format(
+            str(event.timestamp),
+            event.service.ljust(40),
+            event.reason))
+    restarts.sort()
+    output = {
+        'restarts': restarts,
+        'hooks': deferred_events.get_deferred_hooks()}
+    action_set({'output': "{}".format(
+        yaml.dump(output, default_flow_style=False))})
diff --git a/hooks/charmhelpers/core/hookenv.py b/hooks/charmhelpers/core/hookenv.py
index db7ce728..778aa4b6 100644
--- a/hooks/charmhelpers/core/hookenv.py
+++ b/hooks/charmhelpers/core/hookenv.py
@@ -226,6 +226,17 @@ def relation_id(relation_name=None, service_or_unit=None):
         raise ValueError('Must specify neither or both of relation_name and service_or_unit')
 
 
+def departing_unit():
+    """The departing unit for the current relation hook.
+
+    Available since juju 2.8.
+
+    :returns: the departing unit, or None if the information isn't available.
+    :rtype: Optional[str]
+    """
+    return os.environ.get('JUJU_DEPARTING_UNIT', None)
+
+
 def local_unit():
     """Local unit ID"""
     return os.environ['JUJU_UNIT_NAME']
@@ -1611,3 +1622,12 @@ def _contains_range(addresses):
         addresses.startswith(".") or
         ",." in addresses or
         " ." in addresses)
+
+
+def is_subordinate():
+    """Check whether charm is subordinate in unit metadata.
+
+    :returns: True if unit is subordniate, False otherwise.
+    :rtype: bool
+    """
+    return metadata().get('subordinate') is True
diff --git a/hooks/charmhelpers/core/host.py b/hooks/charmhelpers/core/host.py
index f826f6fe..d25e6c59 100644
--- a/hooks/charmhelpers/core/host.py
+++ b/hooks/charmhelpers/core/host.py
@@ -34,7 +34,7 @@ import itertools
 import six
 
 from contextlib import contextmanager
-from collections import OrderedDict
+from collections import OrderedDict, defaultdict
 from .hookenv import log, INFO, DEBUG, local_unit, charm_name
 from .fstab import Fstab
 from charmhelpers.osplatform import get_platform
@@ -694,74 +694,223 @@ class ChecksumError(ValueError):
     pass
 
 
-def restart_on_change(restart_map, stopstart=False, restart_functions=None):
-    """Restart services based on configuration files changing
+class restart_on_change(object):
+    """Decorator and context manager to handle restarts.
 
-    This function is used a decorator, for example::
+    Usage:
 
-        @restart_on_change({
-            '/etc/ceph/ceph.conf': [ 'cinder-api', 'cinder-volume' ]
-            '/etc/apache/sites-enabled/*': [ 'apache2' ]
-            })
-        def config_changed():
-            pass  # your code here
+       @restart_on_change(restart_map, ...)
+       def function_that_might_trigger_a_restart(...)
+           ...
 
-    In this example, the cinder-api and cinder-volume services
-    would be restarted if /etc/ceph/ceph.conf is changed by the
-    ceph_client_changed function. The apache2 service would be
-    restarted if any file matching the pattern got changed, created
-    or removed. Standard wildcards are supported, see documentation
-    for the 'glob' module for more information.
+    Or:
 
-    @param restart_map: {path_file_name: [service_name, ...]
-    @param stopstart: DEFAULT false; whether to stop, start OR restart
-    @param restart_functions: nonstandard functions to use to restart services
-                              {svc: func, ...}
-    @returns result from decorated function
+       with restart_on_change(restart_map, ...):
+           do_stuff_that_might_trigger_a_restart()
+           ...
     """
-    def wrap(f):
+
+    def __init__(self, restart_map, stopstart=False, restart_functions=None,
+                 can_restart_now_f=None, post_svc_restart_f=None,
+                 pre_restarts_wait_f=None):
+        """
+        :param restart_map: {file: [service, ...]}
+        :type restart_map: Dict[str, List[str,]]
+        :param stopstart: whether to stop, start or restart a service
+        :type stopstart: booleean
+        :param restart_functions: nonstandard functions to use to restart
+                                  services {svc: func, ...}
+        :type restart_functions: Dict[str, Callable[[str], None]]
+        :param can_restart_now_f: A function used to check if the restart is
+                                  permitted.
+        :type can_restart_now_f: Callable[[str, List[str]], boolean]
+        :param post_svc_restart_f: A function run after a service has
+                                   restarted.
+        :type post_svc_restart_f: Callable[[str], None]
+        :param pre_restarts_wait_f: A function callled before any restarts.
+        :type pre_restarts_wait_f: Callable[None, None]
+        """
+        self.restart_map = restart_map
+        self.stopstart = stopstart
+        self.restart_functions = restart_functions
+        self.can_restart_now_f = can_restart_now_f
+        self.post_svc_restart_f = post_svc_restart_f
+        self.pre_restarts_wait_f = pre_restarts_wait_f
+
+    def __call__(self, f):
+        """Work like a decorator.
+
+        Returns a wrapped function that performs the restart if triggered.
+
+        :param f: The function that is being wrapped.
+        :type f: Callable[[Any], Any]
+        :returns: the wrapped function
+        :rtype: Callable[[Any], Any]
+        """
         @functools.wraps(f)
         def wrapped_f(*args, **kwargs):
             return restart_on_change_helper(
-                (lambda: f(*args, **kwargs)), restart_map, stopstart,
-                restart_functions)
+                (lambda: f(*args, **kwargs)),
+                self.restart_map,
+                stopstart=self.stopstart,
+                restart_functions=self.restart_functions,
+                can_restart_now_f=self.can_restart_now_f,
+                post_svc_restart_f=self.post_svc_restart_f,
+                pre_restarts_wait_f=self.pre_restarts_wait_f)
         return wrapped_f
-    return wrap
+
+    def __enter__(self):
+        """Enter the runtime context related to this object. """
+        self.checksums = _pre_restart_on_change_helper(self.restart_map)
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        """Exit the runtime context related to this object.
+
+        The parameters describe the exception that caused the context to be
+        exited. If the context was exited without an exception, all three
+        arguments will be None.
+        """
+        if exc_type is None:
+            _post_restart_on_change_helper(
+                self.checksums,
+                self.restart_map,
+                stopstart=self.stopstart,
+                restart_functions=self.restart_functions,
+                can_restart_now_f=self.can_restart_now_f,
+                post_svc_restart_f=self.post_svc_restart_f,
+                pre_restarts_wait_f=self.pre_restarts_wait_f)
+        # All is good, so return False; any exceptions will propagate.
+        return False
 
 
 def restart_on_change_helper(lambda_f, restart_map, stopstart=False,
-                             restart_functions=None):
+                             restart_functions=None,
+                             can_restart_now_f=None,
+                             post_svc_restart_f=None,
+                             pre_restarts_wait_f=None):
     """Helper function to perform the restart_on_change function.
 
     This is provided for decorators to restart services if files described
     in the restart_map have changed after an invocation of lambda_f().
 
-    @param lambda_f: function to call.
-    @param restart_map: {file: [service, ...]}
-    @param stopstart: whether to stop, start or restart a service
-    @param restart_functions: nonstandard functions to use to restart services
+    This functions allows for a number of helper functions to be passed.
+
+    `restart_functions` is a map with a service as the key and the
+    corresponding value being the function to call to restart the service. For
+    example if `restart_functions={'some-service': my_restart_func}` then
+    `my_restart_func` should a function which takes one argument which is the
+    service name to be retstarted.
+
+    `can_restart_now_f` is a function which checks that a restart is permitted.
+    It should return a bool which indicates if a restart is allowed and should
+    take a service name (str) and a list of changed files (List[str]) as
+    arguments.
+
+    `post_svc_restart_f` is a function which runs after a service has been
+    restarted. It takes the service name that was restarted as an argument.
+
+    `pre_restarts_wait_f` is a function which is called before any restarts
+    occur. The use case for this is an application which wants to try and
+    stagger restarts between units.
+
+    :param lambda_f: function to call.
+    :type lambda_f: Callable[[], ANY]
+    :param restart_map: {file: [service, ...]}
+    :type restart_map: Dict[str, List[str,]]
+    :param stopstart: whether to stop, start or restart a service
+    :type stopstart: booleean
+    :param restart_functions: nonstandard functions to use to restart services
                               {svc: func, ...}
-    @returns result of lambda_f()
+    :type restart_functions: Dict[str, Callable[[str], None]]
+    :param can_restart_now_f: A function used to check if the restart is
+                              permitted.
+    :type can_restart_now_f: Callable[[str, List[str]], boolean]
+    :param post_svc_restart_f: A function run after a service has
+                               restarted.
+    :type post_svc_restart_f: Callable[[str], None]
+    :param pre_restarts_wait_f: A function callled before any restarts.
+    :type pre_restarts_wait_f: Callable[None, None]
+    :returns: result of lambda_f()
+    :rtype: ANY
+    """
+    checksums = _pre_restart_on_change_helper(restart_map)
+    r = lambda_f()
+    _post_restart_on_change_helper(checksums,
+                                   restart_map,
+                                   stopstart,
+                                   restart_functions,
+                                   can_restart_now_f,
+                                   post_svc_restart_f,
+                                   pre_restarts_wait_f)
+    return r
+
+
+def _pre_restart_on_change_helper(restart_map):
+    """Take a snapshot of file hashes.
+
+    :param restart_map: {file: [service, ...]}
+    :type restart_map: Dict[str, List[str,]]
+    :returns: Dictionary of file paths and the files checksum.
+    :rtype: Dict[str, str]
+    """
+    return {path: path_hash(path) for path in restart_map}
+
+
+def _post_restart_on_change_helper(checksums,
+                                   restart_map,
+                                   stopstart=False,
+                                   restart_functions=None,
+                                   can_restart_now_f=None,
+                                   post_svc_restart_f=None,
+                                   pre_restarts_wait_f=None):
+    """Check whether files have changed.
+
+    :param checksums: Dictionary of file paths and the files checksum.
+    :type checksums: Dict[str, str]
+    :param restart_map: {file: [service, ...]}
+    :type restart_map: Dict[str, List[str,]]
+    :param stopstart: whether to stop, start or restart a service
+    :type stopstart: booleean
+    :param restart_functions: nonstandard functions to use to restart services
+                              {svc: func, ...}
+    :type restart_functions: Dict[str, Callable[[str], None]]
+    :param can_restart_now_f: A function used to check if the restart is
+                              permitted.
+    :type can_restart_now_f: Callable[[str, List[str]], boolean]
+    :param post_svc_restart_f: A function run after a service has
+                               restarted.
+    :type post_svc_restart_f: Callable[[str], None]
+    :param pre_restarts_wait_f: A function callled before any restarts.
+    :type pre_restarts_wait_f: Callable[None, None]
     """
     if restart_functions is None:
         restart_functions = {}
-    checksums = {path: path_hash(path) for path in restart_map}
-    r = lambda_f()
+    changed_files = defaultdict(list)
+    restarts = []
     # create a list of lists of the services to restart
-    restarts = [restart_map[path]
-                for path in restart_map
-                if path_hash(path) != checksums[path]]
+    for path, services in restart_map.items():
+        if path_hash(path) != checksums[path]:
+            restarts.append(services)
+            for svc in services:
+                changed_files[svc].append(path)
     # create a flat list of ordered services without duplicates from lists
     services_list = list(OrderedDict.fromkeys(itertools.chain(*restarts)))
     if services_list:
+        if pre_restarts_wait_f:
+            pre_restarts_wait_f()
         actions = ('stop', 'start') if stopstart else ('restart',)
         for service_name in services_list:
+            if can_restart_now_f:
+                if not can_restart_now_f(service_name,
+                                         changed_files[service_name]):
+                    continue
             if service_name in restart_functions:
                 restart_functions[service_name](service_name)
             else:
                 for action in actions:
                     service(action, service_name)
-    return r
+            if post_svc_restart_f:
+                post_svc_restart_f(service_name)
 
 
 def pwgen(length=None):
@@ -1068,6 +1217,17 @@ def modulo_distribution(modulo=3, wait=30, non_zero_wait=False):
         return calculated_wait_time
 
 
+def ca_cert_absolute_path(basename_without_extension):
+    """Returns absolute path to CA certificate.
+
+    :param basename_without_extension: Filename without extension
+    :type basename_without_extension: str
+    :returns: Absolute full path
+    :rtype: str
+    """
+    return '{}/{}.crt'.format(CA_CERT_DIR, basename_without_extension)
+
+
 def install_ca_cert(ca_cert, name=None):
     """
     Install the given cert as a trusted CA.
@@ -1083,7 +1243,7 @@ def install_ca_cert(ca_cert, name=None):
         ca_cert = ca_cert.encode('utf8')
     if not name:
         name = 'juju-{}'.format(charm_name())
-    cert_file = '{}/{}.crt'.format(CA_CERT_DIR, name)
+    cert_file = ca_cert_absolute_path(name)
     new_hash = hashlib.md5(ca_cert).hexdigest()
     if file_hash(cert_file) == new_hash:
         return
diff --git a/hooks/charmhelpers/core/host_factory/ubuntu.py b/hooks/charmhelpers/core/host_factory/ubuntu.py
index a3ec6947..7ee8a6ed 100644
--- a/hooks/charmhelpers/core/host_factory/ubuntu.py
+++ b/hooks/charmhelpers/core/host_factory/ubuntu.py
@@ -96,12 +96,14 @@ def cmp_pkgrevno(package, revno, pkgcache=None):
     the pkgcache argument is None. Be sure to add charmhelpers.fetch if
     you call this function, or pass an apt_pkg.Cache() instance.
     """
-    from charmhelpers.fetch import apt_pkg
+    from charmhelpers.fetch import apt_pkg, get_installed_version
     if not pkgcache:
-        from charmhelpers.fetch import apt_cache
-        pkgcache = apt_cache()
-    pkg = pkgcache[package]
-    return apt_pkg.version_compare(pkg.current_ver.ver_str, revno)
+        current_ver = get_installed_version(package)
+    else:
+        pkg = pkgcache[package]
+        current_ver = pkg.current_ver
+
+    return apt_pkg.version_compare(current_ver.ver_str, revno)
 
 
 @cached
diff --git a/hooks/charmhelpers/fetch/__init__.py b/hooks/charmhelpers/fetch/__init__.py
index 0cc7fc85..5b689f5b 100644
--- a/hooks/charmhelpers/fetch/__init__.py
+++ b/hooks/charmhelpers/fetch/__init__.py
@@ -105,6 +105,7 @@ if __platform__ == "ubuntu":
     get_upstream_version = fetch.get_upstream_version
     apt_pkg = fetch.ubuntu_apt_pkg
     get_apt_dpkg_env = fetch.get_apt_dpkg_env
+    get_installed_version = fetch.get_installed_version
 elif __platform__ == "centos":
     yum_search = fetch.yum_search
 
diff --git a/hooks/charmhelpers/fetch/ubuntu.py b/hooks/charmhelpers/fetch/ubuntu.py
index b5953019..b38edcc1 100644
--- a/hooks/charmhelpers/fetch/ubuntu.py
+++ b/hooks/charmhelpers/fetch/ubuntu.py
@@ -13,6 +13,7 @@
 # limitations under the License.
 
 from collections import OrderedDict
+import os
 import platform
 import re
 import six
@@ -20,6 +21,7 @@ import subprocess
 import sys
 import time
 
+from charmhelpers import deprecate
 from charmhelpers.core.host import get_distrib_codename, get_system_env
 
 from charmhelpers.core.hookenv import (
@@ -198,6 +200,14 @@ CLOUD_ARCHIVE_POCKETS = {
     'victoria/proposed': 'focal-proposed/victoria',
     'focal-victoria/proposed': 'focal-proposed/victoria',
     'focal-proposed/victoria': 'focal-proposed/victoria',
+    # Wallaby
+    'wallaby': 'focal-updates/wallaby',
+    'focal-wallaby': 'focal-updates/wallaby',
+    'focal-wallaby/updates': 'focal-updates/wallaby',
+    'focal-updates/wallaby': 'focal-updates/wallaby',
+    'wallaby/proposed': 'focal-proposed/wallaby',
+    'focal-wallaby/proposed': 'focal-proposed/wallaby',
+    'focal-proposed/wallaby': 'focal-proposed/wallaby',
 }
 
 
@@ -251,13 +261,19 @@ def apt_cache(*_, **__):
         # Detect this situation, log a warning and make the call to
         # ``apt_pkg.init()`` to avoid the consumer Python interpreter from
         # crashing with a segmentation fault.
-        log('Support for use of upstream ``apt_pkg`` module in conjunction'
-            'with charm-helpers is deprecated since 2019-06-25', level=WARNING)
+        @deprecate(
+            'Support for use of upstream ``apt_pkg`` module in conjunction'
+            'with charm-helpers is deprecated since 2019-06-25',
+            date=None, log=lambda x: log(x, level=WARNING))
+        def one_shot_log():
+            pass
+
+        one_shot_log()
         sys.modules['apt_pkg'].init()
     return ubuntu_apt_pkg.Cache()
 
 
-def apt_install(packages, options=None, fatal=False):
+def apt_install(packages, options=None, fatal=False, quiet=False):
     """Install one or more packages.
 
     :param packages: Package(s) to install
@@ -267,6 +283,8 @@ def apt_install(packages, options=None, fatal=False):
     :param fatal: Whether the command's output should be checked and
                   retried.
     :type fatal: bool
+    :param quiet: if True (default), supress log message to stdout/stderr
+    :type quiet: bool
     :raises: subprocess.CalledProcessError
     """
     if options is None:
@@ -279,9 +297,10 @@ def apt_install(packages, options=None, fatal=False):
         cmd.append(packages)
     else:
         cmd.extend(packages)
-    log("Installing {} with options: {}".format(packages,
-                                                options))
-    _run_apt_command(cmd, fatal)
+    if not quiet:
+        log("Installing {} with options: {}"
+            .format(packages, options))
+    _run_apt_command(cmd, fatal, quiet=quiet)
 
 
 def apt_upgrade(options=None, fatal=False, dist=False):
@@ -639,14 +658,17 @@ def _add_apt_repository(spec):
     :param spec: the parameter to pass to add_apt_repository
     :type spec: str
     """
+    series = get_distrib_codename()
     if '{series}' in spec:
-        series = get_distrib_codename()
         spec = spec.replace('{series}', series)
     # software-properties package for bionic properly reacts to proxy settings
-    # passed as environment variables (See lp:1433761). This is not the case
-    # LTS and non-LTS releases below bionic.
-    _run_with_retries(['add-apt-repository', '--yes', spec],
-                      cmd_env=env_proxy_settings(['https', 'http']))
+    # set via apt.conf (see lp:1433761), however this is not the case for LTS
+    # and non-LTS releases before bionic.
+    if series in ('trusty', 'xenial'):
+        _run_with_retries(['add-apt-repository', '--yes', spec],
+                          cmd_env=env_proxy_settings(['https', 'http']))
+    else:
+        _run_with_retries(['add-apt-repository', '--yes', spec])
 
 
 def _add_cloud_pocket(pocket):
@@ -723,7 +745,7 @@ def _verify_is_ubuntu_rel(release, os_release):
 
 
 def _run_with_retries(cmd, max_retries=CMD_RETRY_COUNT, retry_exitcodes=(1,),
-                      retry_message="", cmd_env=None):
+                      retry_message="", cmd_env=None, quiet=False):
     """Run a command and retry until success or max_retries is reached.
 
     :param cmd: The apt command to run.
@@ -738,11 +760,20 @@ def _run_with_retries(cmd, max_retries=CMD_RETRY_COUNT, retry_exitcodes=(1,),
     :type retry_message: str
     :param: cmd_env: Environment variables to add to the command run.
     :type cmd_env: Option[None, Dict[str, str]]
+    :param quiet: if True, silence the output of the command from stdout and
+        stderr
+    :type quiet: bool
     """
     env = get_apt_dpkg_env()
     if cmd_env:
         env.update(cmd_env)
 
+    kwargs = {}
+    if quiet:
+        devnull = os.devnull if six.PY2 else subprocess.DEVNULL
+        kwargs['stdout'] = devnull
+        kwargs['stderr'] = devnull
+
     if not retry_message:
         retry_message = "Failed executing '{}'".format(" ".join(cmd))
     retry_message += ". Will retry in {} seconds".format(CMD_RETRY_DELAY)
@@ -753,7 +784,7 @@ def _run_with_retries(cmd, max_retries=CMD_RETRY_COUNT, retry_exitcodes=(1,),
     retry_results = (None,) + retry_exitcodes
     while result in retry_results:
         try:
-            result = subprocess.check_call(cmd, env=env)
+            result = subprocess.check_call(cmd, env=env, **kwargs)
         except subprocess.CalledProcessError as e:
             retry_count = retry_count + 1
             if retry_count > max_retries:
@@ -763,7 +794,7 @@ def _run_with_retries(cmd, max_retries=CMD_RETRY_COUNT, retry_exitcodes=(1,),
             time.sleep(CMD_RETRY_DELAY)
 
 
-def _run_apt_command(cmd, fatal=False):
+def _run_apt_command(cmd, fatal=False, quiet=False):
     """Run an apt command with optional retries.
 
     :param cmd: The apt command to run.
@@ -771,13 +802,22 @@ def _run_apt_command(cmd, fatal=False):
     :param fatal: Whether the command's output should be checked and
                   retried.
     :type fatal: bool
+    :param quiet: if True, silence the output of the command from stdout and
+        stderr
+    :type quiet: bool
     """
     if fatal:
         _run_with_retries(
             cmd, retry_exitcodes=(1, APT_NO_LOCK,),
-            retry_message="Couldn't acquire DPKG lock")
+            retry_message="Couldn't acquire DPKG lock",
+            quiet=quiet)
     else:
-        subprocess.call(cmd, env=get_apt_dpkg_env())
+        kwargs = {}
+        if quiet:
+            devnull = os.devnull if six.PY2 else subprocess.DEVNULL
+            kwargs['stdout'] = devnull
+            kwargs['stderr'] = devnull
+        subprocess.call(cmd, env=get_apt_dpkg_env(), **kwargs)
 
 
 def get_upstream_version(package):
@@ -799,6 +839,22 @@ def get_upstream_version(package):
     return ubuntu_apt_pkg.upstream_version(pkg.current_ver.ver_str)
 
 
+def get_installed_version(package):
+    """Determine installed version of a package
+
+    @returns None (if not installed) or the installed version as
+    Version object
+    """
+    cache = apt_cache()
+    dpkg_result = cache._dpkg_list([package]).get(package, {})
+    current_ver = None
+    installed_version = dpkg_result.get('version')
+
+    if installed_version:
+        current_ver = ubuntu_apt_pkg.Version({'ver_str': installed_version})
+    return current_ver
+
+
 def get_apt_dpkg_env():
     """Get environment suitable for execution of APT and DPKG tools.
 
diff --git a/lib/charms_ceph/utils.py b/lib/charms_ceph/utils.py
index 52d380b4..e5c38793 100644
--- a/lib/charms_ceph/utils.py
+++ b/lib/charms_ceph/utils.py
@@ -56,11 +56,11 @@ from charmhelpers.core.hookenv import (
 )
 from charmhelpers.fetch import (
     add_source,
-    apt_cache,
     apt_install,
     apt_purge,
     apt_update,
-    filter_missing_packages
+    filter_missing_packages,
+    get_installed_version
 )
 from charmhelpers.contrib.storage.linux.ceph import (
     get_mon_map,
@@ -497,10 +497,7 @@ def tune_dev(block_dev):
 
 
 def ceph_user():
-    if get_version() > 1:
-        return 'ceph'
-    else:
-        return "root"
+    return 'ceph'
 
 
 class CrushLocation(object):
@@ -715,22 +712,15 @@ def get_version():
     """Derive Ceph release from an installed package."""
     import apt_pkg as apt
 
-    cache = apt_cache()
     package = "ceph"
-    try:
-        pkg = cache[package]
-    except KeyError:
-        # the package is unknown to the current apt cache.
-        e = 'Could not determine version of package with no installation ' \
-            'candidate: %s' % package
-        error_out(e)
 
-    if not pkg.current_ver:
+    current_ver = get_installed_version(package)
+    if not current_ver:
         # package is known, but no version is currently installed.
         e = 'Could not determine version of uninstalled package: %s' % package
         error_out(e)
 
-    vers = apt.upstream_version(pkg.current_ver.ver_str)
+    vers = apt.upstream_version(current_ver.ver_str)
 
     # x.y match only for 20XX.X
     # and ignore patch level for other packages
diff --git a/test-requirements.txt b/test-requirements.txt
index 9aea716b..394e4d37 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -42,8 +42,8 @@ oslo.utils<=3.41.0;python_version<'3.6'
 
 coverage>=4.5.2
 pyudev              # for ceph-* charm unit tests (need to fix the ceph-* charm unit tests/mocking)
-git+https://github.com/openstack-charmers/zaza.git#egg=zaza;python_version>='3.0'
-git+https://github.com/openstack-charmers/zaza-openstack-tests.git#egg=zaza.openstack
+git+https://github.com/openstack-charmers/zaza.git@stable/21.04#egg=zaza;python_version>='3.0'
+git+https://github.com/openstack-charmers/zaza-openstack-tests.git@stable/21.04#egg=zaza.openstack
 
 # Needed for charm-glance:
 git+https://opendev.org/openstack/tempest.git#egg=tempest;python_version>='3.6'