From 8e31cdc7ba95c3f9214c44b9ea875689a7d70784 Mon Sep 17 00:00:00 2001 From: Takashi Kajinami Date: Thu, 7 Nov 2019 15:10:23 +0900 Subject: [PATCH] Migrate detailed implementation from __init__.py ... as we don't expect long codes are included in __init__.py . This patch migrates these implementations to new independent files, with keeping alias so that we can import things with the same path. Change-Id: Iff0b60cd53281c999c930a4e789aaaab360b1c7a --- storlets/sbus/__init__.py | 155 +----------------------- storlets/sbus/client/__init__.py | 117 +----------------- storlets/sbus/client/client.py | 125 ++++++++++++++++++++ storlets/sbus/sbus.py | 164 ++++++++++++++++++++++++++ tests/unit/sbus/client/test_client.py | 8 +- 5 files changed, 304 insertions(+), 265 deletions(-) create mode 100644 storlets/sbus/client/client.py create mode 100644 storlets/sbus/sbus.py diff --git a/storlets/sbus/__init__.py b/storlets/sbus/__init__.py index 4e71a0a0..8bfc8b08 100644 --- a/storlets/sbus/__init__.py +++ b/storlets/sbus/__init__.py @@ -1,4 +1,4 @@ -# Copyright (c) 2015, 2016 OpenStack Foundation. +# Copyright (c) 2016 OpenStack Foundation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,153 +12,8 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. +from storlets.sbus.sbus import SBus -from ctypes import c_char_p, c_int, CDLL, POINTER -from storlets.sbus.datagram import build_datagram_from_raw_message - - -class SBus(object): - """ - Wrapper class for low level C-API for SBus functionality - - """ - SBUS_SO_NAME = '/usr/local/lib/storlets/libsbus.so' - - def __init__(self): - # load the C-library - self.sbus_back_ = CDLL(SBus.SBUS_SO_NAME) - - # create SBus - self.sbus_back_.sbus_create.argtypes = [c_char_p] - self.sbus_back_.sbus_create.restype = c_int - - # listen to SBus - self.sbus_back_.sbus_listen.argtypes = [c_int] - self.sbus_back_.sbus_listen.restype = c_int - - # send message - self.sbus_back_.sbus_send_msg.argtypes = [c_char_p, - POINTER(c_int), - c_int, - c_char_p, - c_int, - c_char_p, - c_int] - self.sbus_back_.sbus_send_msg.restype = c_int - - # receive message - self.sbus_back_.sbus_recv_msg.argtypes = [c_int, - POINTER(POINTER(c_int)), - POINTER(c_int), - POINTER(c_char_p), - POINTER(c_int), - POINTER(c_char_p), - POINTER(c_int)] - self.sbus_back_.sbus_recv_msg.restype = c_int - - @staticmethod - def start_logger(str_log_level='DEBUG', container_id=None): - # load the C-library - sbus_back_ = CDLL(SBus.SBUS_SO_NAME) - - sbus_back_.sbus_start_logger.argtypes = [c_char_p, c_char_p] - sbus_back_.sbus_start_logger(str_log_level, container_id) - - @staticmethod - def stop_logger(): - # load the C-library - sbus_back_ = CDLL(SBus.SBUS_SO_NAME) - sbus_back_.sbus_stop_logger() - - def create(self, sbus_name): - return self.sbus_back_.sbus_create(sbus_name.encode("utf-8")) - - def listen(self, sbus_handler): - return self.sbus_back_.sbus_listen(sbus_handler) - - def receive(self, sbus_handler): - ph_files = POINTER(c_int)() - pp_metadata = (c_char_p)() - pp_params = (c_char_p)() - pn_files = (c_int)() - pn_metadata = (c_int)() - pn_params = (c_int)() - - # Invoke C function - n_status = self.sbus_back_.sbus_recv_msg(sbus_handler, - ph_files, - pn_files, - pp_metadata, - pn_metadata, - pp_params, - pn_params) - if n_status < 0: - return None - - # The invocation was successful. - # De-serialize the data - - # Aggregate file descriptors - n_files = pn_files.value - h_files = [] - for i in range(n_files): - h_files.append(ph_files[i]) - - # Extract Python strings - n_metadata = pn_metadata.value - # NOTE: because the storlets container may not have - # six module inside. Just check the type, not the python - # version. Anyway, the value should be bytes but python2 - # can assume the bytes as str. - str_metadata = pp_metadata.value.decode("utf-8") \ - if not isinstance(pp_metadata.value, str) else pp_metadata.value - n_params = pn_params.value - str_params = pp_params.value.decode("utf-8") \ - if not isinstance(pp_params.value, str) else pp_params.value - - # Trim the junk out - if 0 < n_metadata: - str_metadata = str_metadata[0:n_metadata] - str_params = str_params[0:n_params] - - # Construct actual result datagram - return build_datagram_from_raw_message( - h_files, str_metadata, str_params) - - @staticmethod - def send(sbus_name, datagram): - # Serialize the datagram into JSON strings and C integer array - str_json_params = datagram.serialized_cmd_params - p_params = c_char_p(str_json_params.encode("utf-8")) - n_params = c_int(len(str_json_params)) - - n_files = c_int(0) - h_files = None - n_metadata = c_int(0) - p_metadata = None - - if datagram.num_fds > 0: - str_json_metadata = datagram.serialized_metadata - p_metadata = c_char_p(str_json_metadata.encode("utf-8")) - n_metadata = c_int(len(str_json_metadata)) - - n_fds = datagram.num_fds - n_files = c_int(n_fds) - - file_fds = datagram.fds - h_files = (c_int * n_fds)() - - for i in range(n_fds): - h_files[i] = file_fds[i] - - # Invoke C function - sbus = SBus() - n_status = sbus.sbus_back_.sbus_send_msg( - sbus_name.encode("utf-8"), - h_files, - n_files, - p_metadata, - n_metadata, - p_params, - n_params) - return n_status +__all__ = [ + 'SBus', +] diff --git a/storlets/sbus/client/__init__.py b/storlets/sbus/client/__init__.py index 56b19f30..adf9e742 100644 --- a/storlets/sbus/client/__init__.py +++ b/storlets/sbus/client/__init__.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016 OpenStack Foundation. +# Copyright (c) 2019 OpenStack Foundation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,114 +12,9 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. -import json -import os -from storlets.sbus import SBus -from storlets.sbus.command import SBUS_CMD_CANCEL, SBUS_CMD_DAEMON_STATUS, \ - SBUS_CMD_HALT, SBUS_CMD_PING, SBUS_CMD_START_DAEMON, \ - SBUS_CMD_STOP_DAEMON, SBUS_CMD_STOP_DAEMONS -from storlets.sbus.datagram import SBusFileDescriptor, SBusServiceDatagram -from storlets.sbus.file_description import SBUS_FD_SERVICE_OUT -from storlets.sbus.client.exceptions import SBusClientIOError, \ - SBusClientMalformedResponse, SBusClientSendError +from storlets.sbus.client.client import SBusClient, SBusResponse - -class SBusResponse(object): - def __init__(self, status, message): - """ - Construct SBusResponse class - - :param status: Whether the server succeed to process the given request - :param message: Messages to describe the process result - """ - self.status = status - self.message = message - - -class SBusClient(object): - def __init__(self, socket_path, chunk_size=16): - self.socket_path = socket_path - self.chunk_size = chunk_size - - def _parse_response(self, str_response): - """ - Parse response string recieved from container side - - :param str_response: response string - :returns: SBusResponse instance - """ - try: - resp = json.loads(str_response) - status = resp['status'] - message = resp['message'] - except (ValueError, KeyError): - raise SBusClientMalformedResponse('Got malformed response') - return SBusResponse(status, message) - - def _request(self, command, params=None, task_id=None): - read_fd, write_fd = os.pipe() - try: - try: - datagram = SBusServiceDatagram( - command, - [SBusFileDescriptor(SBUS_FD_SERVICE_OUT, write_fd)], - params, task_id) - rc = SBus.send(self.socket_path, datagram) - if rc < 0: - raise SBusClientSendError( - 'Faild to send command(%s) to socket %s' % - (datagram.command, self.socket_path)) - finally: - # We already sent the write fd to remote, so should close it - # in local side before reading response - os.close(write_fd) - - reply = '' - while True: - try: - buf = os.read(read_fd, self.chunk_size) - except IOError: - raise SBusClientIOError('Failed to read data from read ' - 'pipe') - if not buf: - break - reply = reply + buf - finally: - os.close(read_fd) - - return self._parse_response(reply) - - def execute(self, *args, **kwargs): - # TODO(takashi): implement this - raise NotImplementedError('Execute command is not supported yet') - - def ping(self): - return self._request(SBUS_CMD_PING) - - def start_daemon(self, language, storlet_path, storlet_id, - uds_path, log_level, pool_size, - language_version): - params = {'daemon_language': language, 'storlet_path': storlet_path, - 'storlet_name': storlet_id, 'uds_path': uds_path, - 'log_level': log_level, 'pool_size': pool_size} - if language_version: - params['daemon_language_version'] = language_version - - return self._request(SBUS_CMD_START_DAEMON, params) - - def stop_daemon(self, storlet_name): - return self._request(SBUS_CMD_STOP_DAEMON, - {'storlet_name': storlet_name}) - - def stop_daemons(self): - return self._request(SBUS_CMD_STOP_DAEMONS) - - def halt(self): - return self._request(SBUS_CMD_HALT) - - def daemon_status(self, storlet_name): - return self._request(SBUS_CMD_DAEMON_STATUS, - {'storlet_name': storlet_name}) - - def cancel(self, task_id): - return self._request(SBUS_CMD_CANCEL, task_id=task_id) +__all__ = [ + 'SBusClient', + 'SBusResponse' +] diff --git a/storlets/sbus/client/client.py b/storlets/sbus/client/client.py new file mode 100644 index 00000000..56b19f30 --- /dev/null +++ b/storlets/sbus/client/client.py @@ -0,0 +1,125 @@ +# Copyright (c) 2016 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import json +import os +from storlets.sbus import SBus +from storlets.sbus.command import SBUS_CMD_CANCEL, SBUS_CMD_DAEMON_STATUS, \ + SBUS_CMD_HALT, SBUS_CMD_PING, SBUS_CMD_START_DAEMON, \ + SBUS_CMD_STOP_DAEMON, SBUS_CMD_STOP_DAEMONS +from storlets.sbus.datagram import SBusFileDescriptor, SBusServiceDatagram +from storlets.sbus.file_description import SBUS_FD_SERVICE_OUT +from storlets.sbus.client.exceptions import SBusClientIOError, \ + SBusClientMalformedResponse, SBusClientSendError + + +class SBusResponse(object): + def __init__(self, status, message): + """ + Construct SBusResponse class + + :param status: Whether the server succeed to process the given request + :param message: Messages to describe the process result + """ + self.status = status + self.message = message + + +class SBusClient(object): + def __init__(self, socket_path, chunk_size=16): + self.socket_path = socket_path + self.chunk_size = chunk_size + + def _parse_response(self, str_response): + """ + Parse response string recieved from container side + + :param str_response: response string + :returns: SBusResponse instance + """ + try: + resp = json.loads(str_response) + status = resp['status'] + message = resp['message'] + except (ValueError, KeyError): + raise SBusClientMalformedResponse('Got malformed response') + return SBusResponse(status, message) + + def _request(self, command, params=None, task_id=None): + read_fd, write_fd = os.pipe() + try: + try: + datagram = SBusServiceDatagram( + command, + [SBusFileDescriptor(SBUS_FD_SERVICE_OUT, write_fd)], + params, task_id) + rc = SBus.send(self.socket_path, datagram) + if rc < 0: + raise SBusClientSendError( + 'Faild to send command(%s) to socket %s' % + (datagram.command, self.socket_path)) + finally: + # We already sent the write fd to remote, so should close it + # in local side before reading response + os.close(write_fd) + + reply = '' + while True: + try: + buf = os.read(read_fd, self.chunk_size) + except IOError: + raise SBusClientIOError('Failed to read data from read ' + 'pipe') + if not buf: + break + reply = reply + buf + finally: + os.close(read_fd) + + return self._parse_response(reply) + + def execute(self, *args, **kwargs): + # TODO(takashi): implement this + raise NotImplementedError('Execute command is not supported yet') + + def ping(self): + return self._request(SBUS_CMD_PING) + + def start_daemon(self, language, storlet_path, storlet_id, + uds_path, log_level, pool_size, + language_version): + params = {'daemon_language': language, 'storlet_path': storlet_path, + 'storlet_name': storlet_id, 'uds_path': uds_path, + 'log_level': log_level, 'pool_size': pool_size} + if language_version: + params['daemon_language_version'] = language_version + + return self._request(SBUS_CMD_START_DAEMON, params) + + def stop_daemon(self, storlet_name): + return self._request(SBUS_CMD_STOP_DAEMON, + {'storlet_name': storlet_name}) + + def stop_daemons(self): + return self._request(SBUS_CMD_STOP_DAEMONS) + + def halt(self): + return self._request(SBUS_CMD_HALT) + + def daemon_status(self, storlet_name): + return self._request(SBUS_CMD_DAEMON_STATUS, + {'storlet_name': storlet_name}) + + def cancel(self, task_id): + return self._request(SBUS_CMD_CANCEL, task_id=task_id) diff --git a/storlets/sbus/sbus.py b/storlets/sbus/sbus.py new file mode 100644 index 00000000..4e71a0a0 --- /dev/null +++ b/storlets/sbus/sbus.py @@ -0,0 +1,164 @@ +# Copyright (c) 2015, 2016 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ctypes import c_char_p, c_int, CDLL, POINTER +from storlets.sbus.datagram import build_datagram_from_raw_message + + +class SBus(object): + """ + Wrapper class for low level C-API for SBus functionality + + """ + SBUS_SO_NAME = '/usr/local/lib/storlets/libsbus.so' + + def __init__(self): + # load the C-library + self.sbus_back_ = CDLL(SBus.SBUS_SO_NAME) + + # create SBus + self.sbus_back_.sbus_create.argtypes = [c_char_p] + self.sbus_back_.sbus_create.restype = c_int + + # listen to SBus + self.sbus_back_.sbus_listen.argtypes = [c_int] + self.sbus_back_.sbus_listen.restype = c_int + + # send message + self.sbus_back_.sbus_send_msg.argtypes = [c_char_p, + POINTER(c_int), + c_int, + c_char_p, + c_int, + c_char_p, + c_int] + self.sbus_back_.sbus_send_msg.restype = c_int + + # receive message + self.sbus_back_.sbus_recv_msg.argtypes = [c_int, + POINTER(POINTER(c_int)), + POINTER(c_int), + POINTER(c_char_p), + POINTER(c_int), + POINTER(c_char_p), + POINTER(c_int)] + self.sbus_back_.sbus_recv_msg.restype = c_int + + @staticmethod + def start_logger(str_log_level='DEBUG', container_id=None): + # load the C-library + sbus_back_ = CDLL(SBus.SBUS_SO_NAME) + + sbus_back_.sbus_start_logger.argtypes = [c_char_p, c_char_p] + sbus_back_.sbus_start_logger(str_log_level, container_id) + + @staticmethod + def stop_logger(): + # load the C-library + sbus_back_ = CDLL(SBus.SBUS_SO_NAME) + sbus_back_.sbus_stop_logger() + + def create(self, sbus_name): + return self.sbus_back_.sbus_create(sbus_name.encode("utf-8")) + + def listen(self, sbus_handler): + return self.sbus_back_.sbus_listen(sbus_handler) + + def receive(self, sbus_handler): + ph_files = POINTER(c_int)() + pp_metadata = (c_char_p)() + pp_params = (c_char_p)() + pn_files = (c_int)() + pn_metadata = (c_int)() + pn_params = (c_int)() + + # Invoke C function + n_status = self.sbus_back_.sbus_recv_msg(sbus_handler, + ph_files, + pn_files, + pp_metadata, + pn_metadata, + pp_params, + pn_params) + if n_status < 0: + return None + + # The invocation was successful. + # De-serialize the data + + # Aggregate file descriptors + n_files = pn_files.value + h_files = [] + for i in range(n_files): + h_files.append(ph_files[i]) + + # Extract Python strings + n_metadata = pn_metadata.value + # NOTE: because the storlets container may not have + # six module inside. Just check the type, not the python + # version. Anyway, the value should be bytes but python2 + # can assume the bytes as str. + str_metadata = pp_metadata.value.decode("utf-8") \ + if not isinstance(pp_metadata.value, str) else pp_metadata.value + n_params = pn_params.value + str_params = pp_params.value.decode("utf-8") \ + if not isinstance(pp_params.value, str) else pp_params.value + + # Trim the junk out + if 0 < n_metadata: + str_metadata = str_metadata[0:n_metadata] + str_params = str_params[0:n_params] + + # Construct actual result datagram + return build_datagram_from_raw_message( + h_files, str_metadata, str_params) + + @staticmethod + def send(sbus_name, datagram): + # Serialize the datagram into JSON strings and C integer array + str_json_params = datagram.serialized_cmd_params + p_params = c_char_p(str_json_params.encode("utf-8")) + n_params = c_int(len(str_json_params)) + + n_files = c_int(0) + h_files = None + n_metadata = c_int(0) + p_metadata = None + + if datagram.num_fds > 0: + str_json_metadata = datagram.serialized_metadata + p_metadata = c_char_p(str_json_metadata.encode("utf-8")) + n_metadata = c_int(len(str_json_metadata)) + + n_fds = datagram.num_fds + n_files = c_int(n_fds) + + file_fds = datagram.fds + h_files = (c_int * n_fds)() + + for i in range(n_fds): + h_files[i] = file_fds[i] + + # Invoke C function + sbus = SBus() + n_status = sbus.sbus_back_.sbus_send_msg( + sbus_name.encode("utf-8"), + h_files, + n_files, + p_metadata, + n_metadata, + p_params, + n_params) + return n_status diff --git a/tests/unit/sbus/client/test_client.py b/tests/unit/sbus/client/test_client.py index ce163ac8..56f7d84b 100644 --- a/tests/unit/sbus/client/test_client.py +++ b/tests/unit/sbus/client/test_client.py @@ -27,7 +27,7 @@ from storlets.sbus.client import SBusClient @contextmanager def _mock_sbus(send_status=0): - with mock.patch('storlets.sbus.client.SBus.send') as fake_send: + with mock.patch('storlets.sbus.client.client.SBus.send') as fake_send: fake_send.return_value = send_status yield @@ -65,9 +65,9 @@ def _mock_os_pipe(bufs): except StopIteration: raise AssertionError('pipe called more than expected') - with mock.patch('storlets.sbus.client.os.pipe', mock_os_pipe), \ - mock.patch('storlets.sbus.client.os.read', fake_os_read), \ - mock.patch('storlets.sbus.client.os.close', fake_os_close): + with mock.patch('storlets.sbus.client.client.os.pipe', mock_os_pipe), \ + mock.patch('storlets.sbus.client.client.os.read', fake_os_read), \ + mock.patch('storlets.sbus.client.client.os.close', fake_os_close): yield pipes