Merge "Fix bandit error: Ascend driver:[B602:subprocess_popen_with_shell_equals_true]"

This commit is contained in:
Zuul 2020-04-24 02:35:08 +00:00 committed by Gerrit Code Review
commit 906c68ea08
2 changed files with 26 additions and 22 deletions

View File

@ -10,19 +10,19 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_concurrency import processutils
from oslo_serialization import jsonutils
import re
from cyborg.accelerator.common import utils
from cyborg.accelerator.drivers.driver import GenericDriver
from cyborg.common import constants
from cyborg.objects.driver_objects import driver_attach_handle
from cyborg.objects.driver_objects import driver_controlpath_id
from cyborg.objects.driver_objects import driver_deployable
from cyborg.objects.driver_objects import driver_device
import re
import subprocess
from cyborg.accelerator.common import utils
from cyborg.common import constants
from oslo_serialization import jsonutils
import cyborg.privsep
PCI_INFO_PATTERN = re.compile(r"(?P<slot>[0-9a-f]{4}:[0-9a-f]{2}:"
r"[0-9a-f]{2}\.[0-9a-f]) "
@ -32,6 +32,12 @@ PCI_INFO_PATTERN = re.compile(r"(?P<slot>[0-9a-f]{4}:[0-9a-f]{2}:"
r"[(rev ](?P<revision>[0-9a-f]{2})")
@cyborg.privsep.sys_admin_pctxt.entrypoint
def lspci_privileged():
cmd = ['lspci', '-nnn', '-D']
return processutils.execute(*cmd)
class AscendDriver(GenericDriver):
"""The class for Ascend AI Chip drivers.
@ -66,14 +72,13 @@ class AscendDriver(GenericDriver):
# TODO(yikun): can be extracted into PCIDeviceDriver
def _get_pci_lines(self, keywords=()):
cmd = "sudo lspci -nnn -D"
pci_lines = []
if keywords:
cmd += "| grep -E %s" % '|'.join(keywords)
# FIXME(wangzhh): Use oslo.privsep instead of subprocess here to
# prevent shell injection attacks.
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
p.wait()
pci_lines = p.stdout.readlines()
lspci_out = lspci_privileged()[0].split('\n')
for i in range(len(lspci_out)):
# filter out pci devices info that contains all keywords
if all([k in (lspci_out[i]) for k in keywords]):
pci_lines.append(lspci_out[i])
return pci_lines
def discover(self):

View File

@ -16,18 +16,17 @@ import mock
from cyborg.accelerator.drivers.aichip.huawei.ascend import AscendDriver
from cyborg.tests import base
d100_pci_res = [
"0000:00:0c.0 Processing accelerators [1200]:"
" Device [19e5:d100] (rev 20)\n",
"0000:00:0d.0 Processing accelerators [1200]:"
" Device [19e5:d100] (rev 20)"
]
d100_pci_res = (
'0000:00:0c.0 Processing accelerators [1200]:'
' Device [19e5:d100] (rev 20)\n'
'0000:00:0d.0 Processing accelerators [1200]:'
' Device [19e5:d100] (rev 20)\n',)
class TestAscendDriver(base.TestCase):
@mock.patch('cyborg.accelerator.drivers.aichip.'
'huawei.ascend.AscendDriver._get_pci_lines',
'huawei.ascend.lspci_privileged',
return_value=d100_pci_res)
def test_discover(self, mock_pci):
ascend_driver = AscendDriver()