promenade/promenade/encryption_method.py
Sergiy Markin 32ad8a96b0 [focal] Python modules sync with Airship project
- uplifted/downgraded some python modules
- fixed falcon.API deprecation - -> falcon.App
- uplifted deckhand reference for python deps
- fixed formatting style  using yapf linter
- added bindep role and bindep.txt file with required deps
- fixed quai docker image publishing
- re-enabled openstack-tox-py38 gate job

Change-Id: I0e248182efad75630721a1291bc86a5edc79c22a
2023-04-21 06:09:14 +00:00

186 lines
5.4 KiB
Python

from . import exceptions, logging
import abc
import os
# Ignore bandit false positive: B404:blacklist
# The purpose of this module is to safely encapsulate calls via fork.
import subprocess # nosec
import tempfile
__all__ = ['EncryptionMethod']
LOG = logging.getLogger(__name__)
class EncryptionMethod(metaclass=abc.ABCMeta):
@abc.abstractmethod
def encrypt(self, data):
pass
@abc.abstractmethod
def get_decrypt_setup_command(self):
pass
@abc.abstractmethod
def get_decrypt_command(self):
pass
@abc.abstractmethod
def get_decrypt_teardown_command(self):
pass
@staticmethod
def from_config(config):
LOG.debug('Building EncryptionMethod from: %s', config)
if config:
# NOTE(mark-burnett): Relying on the schema to ensure valid
# configuration.
name = list(config.keys())[0]
kwargs = config[name]
if name == 'gpg':
return GPGEncryptionMethod(**kwargs)
else:
raise NotImplementedError('Unknown Encryption method')
else:
return NullEncryptionMethod()
def notify_user(self, message):
print('=== BEGIN NOTICE ===')
print(message)
print('=== END NOTICE ===')
class NullEncryptionMethod(EncryptionMethod):
def encrypt(self, data):
LOG.debug('Performing NOOP encryption')
return data
def get_decrypt_setup_command(self):
return ''
def get_decrypt_command(self):
return 'cat'
def get_decrypt_teardown_command(self):
return ''
class GPGEncryptionMethod(EncryptionMethod):
ENCRYPTION_KEY_ENV_NAME = 'PROMENADE_ENCRYPTION_KEY'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._gpg_version = _detect_gpg_version()
def encrypt(self, data):
key = self._get_key()
return self._encrypt_data(key, data)
def get_decrypt_setup_command(self):
return '''
export DECRYPTION_KEY=${PROMENADE_ENCRYPTION_KEY:-"NONE"}
if [[ ${PROMENADE_ENCRYPTION_KEY} = "NONE" ]]; then
read -p "Script decryption key: " -s DECRYPTION_KEY
fi
'''
def get_decrypt_command(self):
return ('/usr/bin/gpg --verbose --decrypt --batch '
'--passphrase "${DECRYPTION_KEY}"')
def get_decrypt_teardown_command(self):
return 'unset DECRYPTION_KEY'
def _get_key(self):
key = os.environ.get(self.ENCRYPTION_KEY_ENV_NAME)
if key is None:
key = _generate_key()
self.notify_user('Copy this decryption key for use during script '
'execution:\n%s' % key)
else:
LOG.info('Using encryption key from %s',
self.ENCRYPTION_KEY_ENV_NAME)
return key
def _encrypt_data(self, key, data):
with tempfile.TemporaryDirectory() as tmp:
# Ignore bandit false positive:
# B603:subprocess_without_shell_equals_true
# Here user input is allowed to be arbitrary, as it's simply input
# to the specified encryption algorithm. Regardless, we only put a
# tarball here.
p = subprocess.Popen( # nosec
[
'/usr/bin/gpg',
'--verbose',
'--symmetric',
'--homedir',
tmp,
'--passphrase',
key,
] + self._gpg_encrypt_options(),
cwd=tmp,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
try:
out, err = p.communicate(data, timeout=120)
except subprocess.TimeoutExpired:
p.kill()
out, err = p.communicate()
if p.returncode != 0:
LOG.error('Got errors from gpg encrypt: %s', err)
raise exceptions.EncryptionException(description=str(err))
return out
def _gpg_encrypt_options(self):
options = {
1: [],
2: ['--pinentry-mode', 'loopback'],
}
return options[self._gpg_version[0]]
DETECTION_PREFIX = 'gpg (GnuPG) '
def _detect_gpg_version():
with tempfile.TemporaryDirectory() as tmp:
# Ignore bandit false positive:
# B603:subprocess_without_shell_equals_true
# This method takes no input and simply queries the version of gpg.
output = subprocess.check_output( # nosec
[
'/usr/bin/gpg',
'--version',
], cwd=tmp)
lines = output.decode('utf-8').strip().splitlines()
if lines:
version = lines[0][len(DETECTION_PREFIX):]
LOG.debug('Found GPG version %s', version)
return tuple(map(int, version.split('.')[:2]))
else:
raise exceptions.GPGDetectionException()
def _generate_key():
with tempfile.TemporaryDirectory() as tmp:
# Ignore bandit false positive:
# B603:subprocess_without_shell_equals_true
# This method takes no input and generates random output.
result = subprocess.run( # nosec
['/usr/bin/openssl', 'rand', '-hex', '48'],
check=True,
env={
'RANDFILE': tmp,
},
stdout=subprocess.PIPE,
)
return result.stdout.decode().strip()