e2c31de9b3
Packstack needs to get adapted to Python 3. This patch adds initial compatibility fixes and a tox-py36 job (non-voting) to test it. Change-Id: I653454b523224615ea5f0e9f5a7d799031f57649
157 lines
5.4 KiB
Python
157 lines
5.4 KiB
Python
# -*- coding: utf-8 -*-
|
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
|
|
# Copyright 2013, Red Hat, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import shutil
|
|
import tempfile
|
|
import subprocess
|
|
import logging
|
|
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
class FakePopen(object):
|
|
'''The FakePopen class replaces subprocess.Popen. Instead of actually
|
|
executing commands, it permits the caller to register a list of
|
|
commands the output to produce using the FakePopen.register and
|
|
FakePopen.register_as_script method. By default, FakePopen will return
|
|
empty stdout and stderr and a successful (0) returncode.
|
|
'''
|
|
|
|
cmd_registry = {}
|
|
script_registry = {}
|
|
|
|
@classmethod
|
|
def register(cls, args, stdout='', stderr='', returncode=0):
|
|
'''Register a fake command.'''
|
|
if isinstance(args, list):
|
|
args = tuple(args)
|
|
cls.cmd_registry[args] = {'stdout': stdout,
|
|
'stderr': stderr,
|
|
'returncode': returncode}
|
|
|
|
@classmethod
|
|
def register_as_script(cls, args, stdout='', stderr='', returncode=0):
|
|
'''Register a fake script.'''
|
|
if isinstance(args, list):
|
|
args = '\n'.join(args)
|
|
|
|
prefix = "function t(){ exit $? ; } \n trap t ERR \n "
|
|
args = prefix + args
|
|
cls.script_registry[args] = {'stdout': stdout,
|
|
'stderr': stderr,
|
|
'returncode': returncode}
|
|
|
|
def __init__(self, args, **kwargs):
|
|
script = ["ssh", "-o", "StrictHostKeyChecking=no",
|
|
"-o", "UserKnownHostsFile=/dev/null"]
|
|
if args[-1] == "bash -x" and args[:5] == script:
|
|
self._init_as_script(args, **kwargs)
|
|
else:
|
|
self._init_as_cmd(args, **kwargs)
|
|
|
|
def _init_as_cmd(self, args, **kwargs):
|
|
self._is_script = False
|
|
if isinstance(args, list):
|
|
args = tuple(args)
|
|
cmd = ' '.join(args)
|
|
else:
|
|
cmd = args
|
|
|
|
if args in self.cmd_registry:
|
|
this = self.cmd_registry[args]
|
|
else:
|
|
LOG.warning('call to unregistered command: %s', cmd)
|
|
this = {'stdout': '', 'stderr': '', 'returncode': 0}
|
|
|
|
self.stdout = this['stdout']
|
|
self.stderr = this['stderr']
|
|
self.returncode = this['returncode']
|
|
|
|
def _init_as_script(self, args, **kwargs):
|
|
self._is_script = True
|
|
|
|
def communicate(self, input=None):
|
|
if self._is_script:
|
|
if input.decode('utf-8') in self.script_registry:
|
|
this = self.script_registry[input.decode('utf-8')]
|
|
else:
|
|
LOG.warning('call to unregistered script: %s', input)
|
|
this = {'stdout': '', 'stderr': '', 'returncode': 0}
|
|
self.stdout = this['stdout']
|
|
self.stderr = this['stderr']
|
|
self.returncode = this['returncode']
|
|
return self.stdout, self.stderr
|
|
|
|
|
|
class PackstackTestCaseMixin(object):
|
|
"""
|
|
Implementation of some assertion methods available by default
|
|
in Python2.7+ only
|
|
"""
|
|
def setUp(self):
|
|
# Creating a temp directory that can be used by tests
|
|
self.tempdir = tempfile.mkdtemp()
|
|
|
|
# some plugins call popen, we're replacing it for tests
|
|
self._Popen = subprocess.Popen
|
|
self.fake_popen = subprocess.Popen = FakePopen
|
|
|
|
def tearDown(self):
|
|
# remove the temp directory
|
|
shutil.rmtree(self.tempdir)
|
|
subprocess.Popen = self._Popen
|
|
|
|
def assertItemsEqual(self, list1, list2, msg=None):
|
|
f, s = len(list1), len(list2)
|
|
_msg = msg or ('Element counts were not equal. First has %s, '
|
|
'Second has %s' % (f, s))
|
|
self.assertEqual(f, s, msg=_msg)
|
|
|
|
_msg = msg or ('Given lists differ:\n%(list1)s'
|
|
'\n%(list2)s' % locals())
|
|
for i in list1:
|
|
if i not in list2:
|
|
raise AssertionError(_msg)
|
|
|
|
def assertListEqual(self, list1, list2, msg=None):
|
|
f, s = len(list(list1)), len(list(list2))
|
|
_msg = msg or ('Element counts were not equal. First has %s, '
|
|
'Second has %s' % (f, s))
|
|
self.assertEqual(f, s, msg=_msg)
|
|
|
|
_msg = msg or ('Given lists differ:\n%(list1)s'
|
|
'\n%(list2)s' % locals())
|
|
for index, item in enumerate(list1):
|
|
if item != list2[index]:
|
|
raise AssertionError(_msg)
|
|
|
|
def assertIsInstance(self, obj, cls, msg=None):
|
|
_msg = msg or ('%s is not an instance of %s' % (obj, cls))
|
|
if not isinstance(obj, cls):
|
|
raise AssertionError(_msg)
|
|
|
|
def assertIn(self, first, second, msg=None):
|
|
_msg = msg or ('%s is not a member of %s' % (first, second))
|
|
if first not in second:
|
|
raise AssertionError(_msg)
|
|
|
|
def assertIsNone(self, expr, msg=None):
|
|
_msg = msg or ('%s is not None' % expr)
|
|
if expr is not None:
|
|
raise AssertionError(_msg)
|