Migrate detailed implementation from __init__.py

... as we don't expect long codes are included in __init__.py .
This patch migrates these implementations to new independent
files, with keeping alias so that we can import things with
the same path.

Change-Id: Iff0b60cd53281c999c930a4e789aaaab360b1c7a
This commit is contained in:
Takashi Kajinami 2019-11-07 15:10:23 +09:00
parent c72226b654
commit 8e31cdc7ba
5 changed files with 304 additions and 265 deletions

View File

@ -1,4 +1,4 @@
# Copyright (c) 2015, 2016 OpenStack Foundation. # Copyright (c) 2016 OpenStack Foundation.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -12,153 +12,8 @@
# implied. # implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from storlets.sbus.sbus import SBus
from ctypes import c_char_p, c_int, CDLL, POINTER __all__ = [
from storlets.sbus.datagram import build_datagram_from_raw_message 'SBus',
]
class SBus(object):
"""
Wrapper class for low level C-API for SBus functionality
"""
SBUS_SO_NAME = '/usr/local/lib/storlets/libsbus.so'
def __init__(self):
# load the C-library
self.sbus_back_ = CDLL(SBus.SBUS_SO_NAME)
# create SBus
self.sbus_back_.sbus_create.argtypes = [c_char_p]
self.sbus_back_.sbus_create.restype = c_int
# listen to SBus
self.sbus_back_.sbus_listen.argtypes = [c_int]
self.sbus_back_.sbus_listen.restype = c_int
# send message
self.sbus_back_.sbus_send_msg.argtypes = [c_char_p,
POINTER(c_int),
c_int,
c_char_p,
c_int,
c_char_p,
c_int]
self.sbus_back_.sbus_send_msg.restype = c_int
# receive message
self.sbus_back_.sbus_recv_msg.argtypes = [c_int,
POINTER(POINTER(c_int)),
POINTER(c_int),
POINTER(c_char_p),
POINTER(c_int),
POINTER(c_char_p),
POINTER(c_int)]
self.sbus_back_.sbus_recv_msg.restype = c_int
@staticmethod
def start_logger(str_log_level='DEBUG', container_id=None):
# load the C-library
sbus_back_ = CDLL(SBus.SBUS_SO_NAME)
sbus_back_.sbus_start_logger.argtypes = [c_char_p, c_char_p]
sbus_back_.sbus_start_logger(str_log_level, container_id)
@staticmethod
def stop_logger():
# load the C-library
sbus_back_ = CDLL(SBus.SBUS_SO_NAME)
sbus_back_.sbus_stop_logger()
def create(self, sbus_name):
return self.sbus_back_.sbus_create(sbus_name.encode("utf-8"))
def listen(self, sbus_handler):
return self.sbus_back_.sbus_listen(sbus_handler)
def receive(self, sbus_handler):
ph_files = POINTER(c_int)()
pp_metadata = (c_char_p)()
pp_params = (c_char_p)()
pn_files = (c_int)()
pn_metadata = (c_int)()
pn_params = (c_int)()
# Invoke C function
n_status = self.sbus_back_.sbus_recv_msg(sbus_handler,
ph_files,
pn_files,
pp_metadata,
pn_metadata,
pp_params,
pn_params)
if n_status < 0:
return None
# The invocation was successful.
# De-serialize the data
# Aggregate file descriptors
n_files = pn_files.value
h_files = []
for i in range(n_files):
h_files.append(ph_files[i])
# Extract Python strings
n_metadata = pn_metadata.value
# NOTE: because the storlets container may not have
# six module inside. Just check the type, not the python
# version. Anyway, the value should be bytes but python2
# can assume the bytes as str.
str_metadata = pp_metadata.value.decode("utf-8") \
if not isinstance(pp_metadata.value, str) else pp_metadata.value
n_params = pn_params.value
str_params = pp_params.value.decode("utf-8") \
if not isinstance(pp_params.value, str) else pp_params.value
# Trim the junk out
if 0 < n_metadata:
str_metadata = str_metadata[0:n_metadata]
str_params = str_params[0:n_params]
# Construct actual result datagram
return build_datagram_from_raw_message(
h_files, str_metadata, str_params)
@staticmethod
def send(sbus_name, datagram):
# Serialize the datagram into JSON strings and C integer array
str_json_params = datagram.serialized_cmd_params
p_params = c_char_p(str_json_params.encode("utf-8"))
n_params = c_int(len(str_json_params))
n_files = c_int(0)
h_files = None
n_metadata = c_int(0)
p_metadata = None
if datagram.num_fds > 0:
str_json_metadata = datagram.serialized_metadata
p_metadata = c_char_p(str_json_metadata.encode("utf-8"))
n_metadata = c_int(len(str_json_metadata))
n_fds = datagram.num_fds
n_files = c_int(n_fds)
file_fds = datagram.fds
h_files = (c_int * n_fds)()
for i in range(n_fds):
h_files[i] = file_fds[i]
# Invoke C function
sbus = SBus()
n_status = sbus.sbus_back_.sbus_send_msg(
sbus_name.encode("utf-8"),
h_files,
n_files,
p_metadata,
n_metadata,
p_params,
n_params)
return n_status

View File

@ -1,4 +1,4 @@
# Copyright (c) 2016 OpenStack Foundation. # Copyright (c) 2019 OpenStack Foundation.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -12,114 +12,9 @@
# implied. # implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import json from storlets.sbus.client.client import SBusClient, SBusResponse
import os
from storlets.sbus import SBus
from storlets.sbus.command import SBUS_CMD_CANCEL, SBUS_CMD_DAEMON_STATUS, \
SBUS_CMD_HALT, SBUS_CMD_PING, SBUS_CMD_START_DAEMON, \
SBUS_CMD_STOP_DAEMON, SBUS_CMD_STOP_DAEMONS
from storlets.sbus.datagram import SBusFileDescriptor, SBusServiceDatagram
from storlets.sbus.file_description import SBUS_FD_SERVICE_OUT
from storlets.sbus.client.exceptions import SBusClientIOError, \
SBusClientMalformedResponse, SBusClientSendError
__all__ = [
class SBusResponse(object): 'SBusClient',
def __init__(self, status, message): 'SBusResponse'
""" ]
Construct SBusResponse class
:param status: Whether the server succeed to process the given request
:param message: Messages to describe the process result
"""
self.status = status
self.message = message
class SBusClient(object):
def __init__(self, socket_path, chunk_size=16):
self.socket_path = socket_path
self.chunk_size = chunk_size
def _parse_response(self, str_response):
"""
Parse response string recieved from container side
:param str_response: response string
:returns: SBusResponse instance
"""
try:
resp = json.loads(str_response)
status = resp['status']
message = resp['message']
except (ValueError, KeyError):
raise SBusClientMalformedResponse('Got malformed response')
return SBusResponse(status, message)
def _request(self, command, params=None, task_id=None):
read_fd, write_fd = os.pipe()
try:
try:
datagram = SBusServiceDatagram(
command,
[SBusFileDescriptor(SBUS_FD_SERVICE_OUT, write_fd)],
params, task_id)
rc = SBus.send(self.socket_path, datagram)
if rc < 0:
raise SBusClientSendError(
'Faild to send command(%s) to socket %s' %
(datagram.command, self.socket_path))
finally:
# We already sent the write fd to remote, so should close it
# in local side before reading response
os.close(write_fd)
reply = ''
while True:
try:
buf = os.read(read_fd, self.chunk_size)
except IOError:
raise SBusClientIOError('Failed to read data from read '
'pipe')
if not buf:
break
reply = reply + buf
finally:
os.close(read_fd)
return self._parse_response(reply)
def execute(self, *args, **kwargs):
# TODO(takashi): implement this
raise NotImplementedError('Execute command is not supported yet')
def ping(self):
return self._request(SBUS_CMD_PING)
def start_daemon(self, language, storlet_path, storlet_id,
uds_path, log_level, pool_size,
language_version):
params = {'daemon_language': language, 'storlet_path': storlet_path,
'storlet_name': storlet_id, 'uds_path': uds_path,
'log_level': log_level, 'pool_size': pool_size}
if language_version:
params['daemon_language_version'] = language_version
return self._request(SBUS_CMD_START_DAEMON, params)
def stop_daemon(self, storlet_name):
return self._request(SBUS_CMD_STOP_DAEMON,
{'storlet_name': storlet_name})
def stop_daemons(self):
return self._request(SBUS_CMD_STOP_DAEMONS)
def halt(self):
return self._request(SBUS_CMD_HALT)
def daemon_status(self, storlet_name):
return self._request(SBUS_CMD_DAEMON_STATUS,
{'storlet_name': storlet_name})
def cancel(self, task_id):
return self._request(SBUS_CMD_CANCEL, task_id=task_id)

View File

@ -0,0 +1,125 @@
# Copyright (c) 2016 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
from storlets.sbus import SBus
from storlets.sbus.command import SBUS_CMD_CANCEL, SBUS_CMD_DAEMON_STATUS, \
SBUS_CMD_HALT, SBUS_CMD_PING, SBUS_CMD_START_DAEMON, \
SBUS_CMD_STOP_DAEMON, SBUS_CMD_STOP_DAEMONS
from storlets.sbus.datagram import SBusFileDescriptor, SBusServiceDatagram
from storlets.sbus.file_description import SBUS_FD_SERVICE_OUT
from storlets.sbus.client.exceptions import SBusClientIOError, \
SBusClientMalformedResponse, SBusClientSendError
class SBusResponse(object):
def __init__(self, status, message):
"""
Construct SBusResponse class
:param status: Whether the server succeed to process the given request
:param message: Messages to describe the process result
"""
self.status = status
self.message = message
class SBusClient(object):
def __init__(self, socket_path, chunk_size=16):
self.socket_path = socket_path
self.chunk_size = chunk_size
def _parse_response(self, str_response):
"""
Parse response string recieved from container side
:param str_response: response string
:returns: SBusResponse instance
"""
try:
resp = json.loads(str_response)
status = resp['status']
message = resp['message']
except (ValueError, KeyError):
raise SBusClientMalformedResponse('Got malformed response')
return SBusResponse(status, message)
def _request(self, command, params=None, task_id=None):
read_fd, write_fd = os.pipe()
try:
try:
datagram = SBusServiceDatagram(
command,
[SBusFileDescriptor(SBUS_FD_SERVICE_OUT, write_fd)],
params, task_id)
rc = SBus.send(self.socket_path, datagram)
if rc < 0:
raise SBusClientSendError(
'Faild to send command(%s) to socket %s' %
(datagram.command, self.socket_path))
finally:
# We already sent the write fd to remote, so should close it
# in local side before reading response
os.close(write_fd)
reply = ''
while True:
try:
buf = os.read(read_fd, self.chunk_size)
except IOError:
raise SBusClientIOError('Failed to read data from read '
'pipe')
if not buf:
break
reply = reply + buf
finally:
os.close(read_fd)
return self._parse_response(reply)
def execute(self, *args, **kwargs):
# TODO(takashi): implement this
raise NotImplementedError('Execute command is not supported yet')
def ping(self):
return self._request(SBUS_CMD_PING)
def start_daemon(self, language, storlet_path, storlet_id,
uds_path, log_level, pool_size,
language_version):
params = {'daemon_language': language, 'storlet_path': storlet_path,
'storlet_name': storlet_id, 'uds_path': uds_path,
'log_level': log_level, 'pool_size': pool_size}
if language_version:
params['daemon_language_version'] = language_version
return self._request(SBUS_CMD_START_DAEMON, params)
def stop_daemon(self, storlet_name):
return self._request(SBUS_CMD_STOP_DAEMON,
{'storlet_name': storlet_name})
def stop_daemons(self):
return self._request(SBUS_CMD_STOP_DAEMONS)
def halt(self):
return self._request(SBUS_CMD_HALT)
def daemon_status(self, storlet_name):
return self._request(SBUS_CMD_DAEMON_STATUS,
{'storlet_name': storlet_name})
def cancel(self, task_id):
return self._request(SBUS_CMD_CANCEL, task_id=task_id)

164
storlets/sbus/sbus.py Normal file
View File

@ -0,0 +1,164 @@
# Copyright (c) 2015, 2016 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ctypes import c_char_p, c_int, CDLL, POINTER
from storlets.sbus.datagram import build_datagram_from_raw_message
class SBus(object):
"""
Wrapper class for low level C-API for SBus functionality
"""
SBUS_SO_NAME = '/usr/local/lib/storlets/libsbus.so'
def __init__(self):
# load the C-library
self.sbus_back_ = CDLL(SBus.SBUS_SO_NAME)
# create SBus
self.sbus_back_.sbus_create.argtypes = [c_char_p]
self.sbus_back_.sbus_create.restype = c_int
# listen to SBus
self.sbus_back_.sbus_listen.argtypes = [c_int]
self.sbus_back_.sbus_listen.restype = c_int
# send message
self.sbus_back_.sbus_send_msg.argtypes = [c_char_p,
POINTER(c_int),
c_int,
c_char_p,
c_int,
c_char_p,
c_int]
self.sbus_back_.sbus_send_msg.restype = c_int
# receive message
self.sbus_back_.sbus_recv_msg.argtypes = [c_int,
POINTER(POINTER(c_int)),
POINTER(c_int),
POINTER(c_char_p),
POINTER(c_int),
POINTER(c_char_p),
POINTER(c_int)]
self.sbus_back_.sbus_recv_msg.restype = c_int
@staticmethod
def start_logger(str_log_level='DEBUG', container_id=None):
# load the C-library
sbus_back_ = CDLL(SBus.SBUS_SO_NAME)
sbus_back_.sbus_start_logger.argtypes = [c_char_p, c_char_p]
sbus_back_.sbus_start_logger(str_log_level, container_id)
@staticmethod
def stop_logger():
# load the C-library
sbus_back_ = CDLL(SBus.SBUS_SO_NAME)
sbus_back_.sbus_stop_logger()
def create(self, sbus_name):
return self.sbus_back_.sbus_create(sbus_name.encode("utf-8"))
def listen(self, sbus_handler):
return self.sbus_back_.sbus_listen(sbus_handler)
def receive(self, sbus_handler):
ph_files = POINTER(c_int)()
pp_metadata = (c_char_p)()
pp_params = (c_char_p)()
pn_files = (c_int)()
pn_metadata = (c_int)()
pn_params = (c_int)()
# Invoke C function
n_status = self.sbus_back_.sbus_recv_msg(sbus_handler,
ph_files,
pn_files,
pp_metadata,
pn_metadata,
pp_params,
pn_params)
if n_status < 0:
return None
# The invocation was successful.
# De-serialize the data
# Aggregate file descriptors
n_files = pn_files.value
h_files = []
for i in range(n_files):
h_files.append(ph_files[i])
# Extract Python strings
n_metadata = pn_metadata.value
# NOTE: because the storlets container may not have
# six module inside. Just check the type, not the python
# version. Anyway, the value should be bytes but python2
# can assume the bytes as str.
str_metadata = pp_metadata.value.decode("utf-8") \
if not isinstance(pp_metadata.value, str) else pp_metadata.value
n_params = pn_params.value
str_params = pp_params.value.decode("utf-8") \
if not isinstance(pp_params.value, str) else pp_params.value
# Trim the junk out
if 0 < n_metadata:
str_metadata = str_metadata[0:n_metadata]
str_params = str_params[0:n_params]
# Construct actual result datagram
return build_datagram_from_raw_message(
h_files, str_metadata, str_params)
@staticmethod
def send(sbus_name, datagram):
# Serialize the datagram into JSON strings and C integer array
str_json_params = datagram.serialized_cmd_params
p_params = c_char_p(str_json_params.encode("utf-8"))
n_params = c_int(len(str_json_params))
n_files = c_int(0)
h_files = None
n_metadata = c_int(0)
p_metadata = None
if datagram.num_fds > 0:
str_json_metadata = datagram.serialized_metadata
p_metadata = c_char_p(str_json_metadata.encode("utf-8"))
n_metadata = c_int(len(str_json_metadata))
n_fds = datagram.num_fds
n_files = c_int(n_fds)
file_fds = datagram.fds
h_files = (c_int * n_fds)()
for i in range(n_fds):
h_files[i] = file_fds[i]
# Invoke C function
sbus = SBus()
n_status = sbus.sbus_back_.sbus_send_msg(
sbus_name.encode("utf-8"),
h_files,
n_files,
p_metadata,
n_metadata,
p_params,
n_params)
return n_status

View File

@ -27,7 +27,7 @@ from storlets.sbus.client import SBusClient
@contextmanager @contextmanager
def _mock_sbus(send_status=0): def _mock_sbus(send_status=0):
with mock.patch('storlets.sbus.client.SBus.send') as fake_send: with mock.patch('storlets.sbus.client.client.SBus.send') as fake_send:
fake_send.return_value = send_status fake_send.return_value = send_status
yield yield
@ -65,9 +65,9 @@ def _mock_os_pipe(bufs):
except StopIteration: except StopIteration:
raise AssertionError('pipe called more than expected') raise AssertionError('pipe called more than expected')
with mock.patch('storlets.sbus.client.os.pipe', mock_os_pipe), \ with mock.patch('storlets.sbus.client.client.os.pipe', mock_os_pipe), \
mock.patch('storlets.sbus.client.os.read', fake_os_read), \ mock.patch('storlets.sbus.client.client.os.read', fake_os_read), \
mock.patch('storlets.sbus.client.os.close', fake_os_close): mock.patch('storlets.sbus.client.client.os.close', fake_os_close):
yield pipes yield pipes