add rpc function

Change-Id: I410bf1c8d7ca3d07ab9ed544223c5bec06f5f7a0
This commit is contained in:
Frank Jia (贾思瑞) 2023-07-19 10:43:07 +08:00
parent d9e4f38e22
commit bd3fb2a403
7 changed files with 337 additions and 13 deletions

View File

@ -21,12 +21,12 @@ os_region_name = RegionOne
osapi_venus_listen_port = 10010 osapi_venus_listen_port = 10010
osapi_venus_workers = 1 osapi_venus_workers = 1
log_dir = /var/log/nova/ log_dir = /var/log/nova/
logging_default_format_string = %(asctime)s.%(msecs)03d %(process)d %(levelname)s %(name)s [- req-None - - - - -] %(instance)s%(message)s logging_default_format_string = %(asctime)s.%(msecs)03d %(process)d %(levelname)s %(name)s [- - - - - - -] %(instance)s%(message)s
logging_context_format_string = %(asctime)s.%(msecs)03d %(process)d %(levelname)s %(name)s [%(global_request_id)s %(request_id)s %(user_identity)s] %(instance)s%(message)s logging_context_format_string = %(asctime)s.%(msecs)03d %(process)d %(levelname)s %(name)s [%(global_request_id)s %(request_id)s %(user_identity)s] %(instance)s%(message)s
[oslo_messaging_notifications]
transport_url = rabbit://stackrabbit:secret@localhost:5672/
driver = messagingv2
[elasticsearch] [elasticsearch]

View File

@ -23,6 +23,7 @@ from oslo_reports import guru_meditation_report as gmr
from venus.conf import CONF from venus.conf import CONF
from venus import i18n from venus import i18n
from venus import rpc
from venus import service from venus import service
from venus import utils from venus import utils
from venus import version from venus import version
@ -43,6 +44,8 @@ def main():
gmr.TextGuruMeditation.setup_autorun(version) gmr.TextGuruMeditation.setup_autorun(version)
rpc.init(CONF)
server = service.WSGIService('osapi_venus') server = service.WSGIService('osapi_venus')
launcher = service.get_launcher() launcher = service.get_launcher()
launcher.launch_service(server, workers=server.workers) launcher.launch_service(server, workers=server.workers)

51
venus/cmd/task.py Normal file
View File

@ -0,0 +1,51 @@
# Copyright 2020 Inspur
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Starter script for Venus API."""
import os
import sys
from oslo_log import log as logging
from oslo_reports import guru_meditation_report as gmr
from venus.conf import CONF
from venus import i18n
from venus import service
from venus import utils
from venus import version
i18n.enable_lazy()
def main():
CONF(sys.argv[1:], project='venus',
version=version.version_string())
logdir = CONF.log_dir
is_exists = os.path.exists(logdir)
if not is_exists:
os.makedirs(logdir)
logging.setup(CONF, "venus")
utils.monkey_patch()
gmr.TextGuruMeditation.setup_autorun(version)
server = service.Service.create(binary='venus-task',
topic="venus-task")
service.serve(server)
service.wait()
if __name__ == "__main__":
main()

View File

@ -174,6 +174,9 @@ global_opts = [
help='Base URL that will be presented to users in links ' help='Base URL that will be presented to users in links '
'to the OpenStack Venus API', 'to the OpenStack Venus API',
deprecated_name='osapi_compute_link_prefix'), deprecated_name='osapi_compute_link_prefix'),
cfg.StrOpt('task_manager',
default="venus.manager.Manager",
help='Btask_manager')
] ]

93
venus/manager.py Normal file
View File

@ -0,0 +1,93 @@
# Copyright 2020 Inspur
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Base Manager class.
Managers will often provide methods for initial setup of a host or periodic
tasks to a wrapping service.
This module provides Manager, a base class for managers.
"""
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_service import periodic_task
from venus.db import base
from venus import version
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class PeriodicTasks(periodic_task.PeriodicTasks):
def __init__(self):
super(PeriodicTasks, self).__init__(CONF)
class Manager(base.Base, PeriodicTasks):
# Set RPC API version to 1.0 by default.
RPC_API_VERSION = '1.0'
target = messaging.Target(version=RPC_API_VERSION)
def __init__(self, host=None, db_driver=None):
if not host:
host = CONF.host
self.host = host
self.additional_endpoints = []
super(Manager, self).__init__(db_driver)
def periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
return self.run_periodic_tasks(context, raise_on_error=raise_on_error)
def init_host(self):
"""Handle initialization if this is a standalone service.
A hook point for services to execute tasks before the services are made
available (i.e. showing up on RPC and starting to accept RPC calls) to
other components. Child classes should override this method.
"""
pass
def init_host_with_rpc(self):
"""A hook for service to do jobs after RPC is ready.
Like init_host(), this method is a hook where services get a chance
to execute tasks that *need* RPC. Child classes should override
this method.
"""
pass
def service_version(self):
return version.version_string()
def service_config(self):
config = {}
for key in CONF:
config[key] = CONF.get(key, None)
return config
def is_working(self):
"""Method indicating if service is working correctly.
This method is supposed to be overriden by subclasses and return if
manager is working correctly.
"""
return True

165
venus/rpc.py Normal file
View File

@ -0,0 +1,165 @@
# Copyright 2020 Inspur
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__all__ = [
'init',
'cleanup',
'set_defaults',
'add_extra_exmods',
'clear_extra_exmods',
'get_allowed_exmods',
'RequestContextSerializer',
'get_client',
'get_server',
'get_notifier',
'TRANSPORT_ALIASES',
]
import oslo_messaging as messaging
from oslo_serialization import jsonutils
from osprofiler import profiler
from venus.conf import CONF
import venus.context
import venus.exception
TRANSPORT = None
NOTIFIER = None
ALLOWED_EXMODS = [
venus.exception.__name__,
]
EXTRA_EXMODS = []
# NOTE(flaper87): The venus.openstack.common.rpc entries are
# for backwards compat with Havana rpc_backend configuration
# values. The venus.rpc entries are for compat with Folsom values.
TRANSPORT_ALIASES = {
'venus.openstack.common.rpc.impl_kombu': 'rabbit',
'venus.openstack.common.rpc.impl_qpid': 'qpid',
'venus.openstack.common.rpc.impl_zmq': 'zmq',
'venus.rpc.impl_kombu': 'rabbit',
'venus.rpc.impl_qpid': 'qpid',
'venus.rpc.impl_zmq': 'zmq',
}
def init(conf):
global TRANSPORT, NOTIFIER
exmods = get_allowed_exmods()
TRANSPORT = messaging.get_transport(conf,
allowed_remote_exmods=exmods)
serializer = RequestContextSerializer(JsonPayloadSerializer())
NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer)
def initialized():
return None not in [TRANSPORT, NOTIFIER]
def cleanup():
global TRANSPORT, NOTIFIER
assert TRANSPORT is not None
assert NOTIFIER is not None
TRANSPORT.cleanup()
TRANSPORT = NOTIFIER = None
def set_defaults(control_exchange):
messaging.set_transport_defaults(control_exchange)
def add_extra_exmods(*args):
EXTRA_EXMODS.extend(args)
def clear_extra_exmods():
del EXTRA_EXMODS[:]
def get_allowed_exmods():
return ALLOWED_EXMODS + EXTRA_EXMODS
class JsonPayloadSerializer(messaging.NoOpSerializer):
@staticmethod
def serialize_entity(context, entity):
return jsonutils.to_primitive(entity, convert_instances=True)
class RequestContextSerializer(messaging.Serializer):
def __init__(self, base):
self._base = base
def serialize_entity(self, context, entity):
if not self._base:
return entity
return self._base.serialize_entity(context, entity)
def deserialize_entity(self, context, entity):
if not self._base:
return entity
return self._base.deserialize_entity(context, entity)
def serialize_context(self, context):
_context = context.to_dict()
prof = profiler.get()
if prof:
trace_info = {
"hmac_key": prof.hmac_key,
"base_id": prof.get_base_id(),
"parent_id": prof.get_id()
}
_context.update({"trace_info": trace_info})
return _context
def deserialize_context(self, context):
trace_info = context.pop("trace_info", None)
if trace_info:
profiler.init(**trace_info)
return venus.context.RequestContext.from_dict(context)
def get_transport_url(url_str=None):
return messaging.TransportURL.parse(CONF, url_str, TRANSPORT_ALIASES)
def get_client(target, version_cap=None, serializer=None):
assert TRANSPORT is not None
serializer = RequestContextSerializer(serializer)
return messaging.RPCClient(TRANSPORT,
target,
version_cap=version_cap,
serializer=serializer)
def get_server(target, endpoints, serializer=None):
assert TRANSPORT is not None
serializer = RequestContextSerializer(serializer)
return messaging.get_rpc_server(TRANSPORT,
target,
endpoints,
executor='eventlet',
serializer=serializer)
def get_notifier(service=None, host=None, publisher_id=None):
assert NOTIFIER is not None
if not publisher_id:
publisher_id = "%s.%s" % (service, host or CONF.host)
return NOTIFIER.prepare(publisher_id=publisher_id)

View File

@ -17,21 +17,25 @@
import inspect import inspect
import os import os
import oslo_messaging as messaging
import osprofiler.notifier
import osprofiler.web
import random import random
from oslo_concurrency import processutils from oslo_concurrency import processutils
from oslo_service import loopingcall from oslo_service import loopingcall
from oslo_service import service from oslo_service import service
from oslo_utils import importutils from oslo_utils import importutils
import osprofiler.notifier
from osprofiler import profiler
import osprofiler.web
from venus.common.utils import LOG from venus.common.utils import LOG
from venus.conf import CONF from venus.conf import CONF
from venus import context from venus import context
from venus import exception from venus import exception
from venus.i18n import _, _LI, _LW from venus.i18n import _, _LI, _LW
from venus.objects import base as objects_base
from venus import rpc
from venus import version from venus import version
from venus.wsgi import common as wsgi_common from venus.wsgi import common as wsgi_common
from venus.wsgi import eventlet_server as wsgi from venus.wsgi import eventlet_server as wsgi
@ -71,11 +75,8 @@ class Service(service.Service):
self.topic = topic self.topic = topic
self.manager_class_name = manager self.manager_class_name = manager
manager_class = importutils.import_class(self.manager_class_name) manager_class = importutils.import_class(self.manager_class_name)
manager_class = profiler.trace_cls("rpc")(manager_class)
self.manager = manager_class(host=self.host, self.manager = manager_class(*args, **kwargs)
service_name=service_name,
*args, **kwargs)
self.periodic_interval = periodic_interval self.periodic_interval = periodic_interval
self.periodic_fuzzy_delay = periodic_fuzzy_delay self.periodic_fuzzy_delay = periodic_fuzzy_delay
self.saved_args, self.saved_kwargs = args, kwargs self.saved_args, self.saved_kwargs = args, kwargs
@ -185,7 +186,7 @@ class Service(service.Service):
self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error) self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
class WSGIService(service.ServiceBase): class WSGIService(service.Service):
"""Provides ability to launch API from a 'paste' configuration.""" """Provides ability to launch API from a 'paste' configuration."""
def __init__(self, name, loader=None): def __init__(self, name, loader=None):
@ -212,6 +213,7 @@ class WSGIService(service.ServiceBase):
'workers': self.workers}) 'workers': self.workers})
raise exception.InvalidInput(msg) raise exception.InvalidInput(msg)
setup_profiler(name, self.host) setup_profiler(name, self.host)
self.rpcserver = None
self.server = wsgi.Server(name, self.server = wsgi.Server(name,
self.app, self.app,
@ -250,8 +252,15 @@ class WSGIService(service.ServiceBase):
""" """
if self.manager: if self.manager:
self.manager.init_host() self.manager.init_host()
self.topic = "xxxxxxx"
target = messaging.Target(topic=self.topic, server="10.49.38.57")
endpoints = [self.manager]
# endpoints.extend(self.manager.additional_endpoints)
serializer = objects_base.VenusObjectSerializer()
self.rpcserver = rpc.get_server(target, endpoints, serializer)
self.rpcserver.start()
self.server.start() self.server.start()
self.port = self.server.port
def stop(self): def stop(self):
"""Stop serving this API. """Stop serving this API.