Merge "Remove logging from serialize_remote_exception"
This commit is contained in:
commit
6ee6d55780
oslo_messaging
_drivers
amqp1_driver
amqpdriver.pybase.pycommon.pyimpl_amqp1.pyimpl_fake.pyimpl_kafka.pypika_driver
zmq_driver
rpc
tests
@ -94,11 +94,10 @@ class ListenTask(controller.Task):
|
||||
class ReplyTask(controller.Task):
|
||||
"""A task that sends 'response' message to 'address'.
|
||||
"""
|
||||
def __init__(self, address, response, log_failure):
|
||||
def __init__(self, address, response):
|
||||
super(ReplyTask, self).__init__()
|
||||
self._address = address
|
||||
self._response = response
|
||||
self._log_failure = log_failure
|
||||
self._wakeup = threading.Event()
|
||||
|
||||
def wait(self):
|
||||
|
@ -50,14 +50,13 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
|
||||
self.stopwatch = timeutils.StopWatch()
|
||||
self.stopwatch.start()
|
||||
|
||||
def _send_reply(self, conn, reply=None, failure=None, log_failure=True):
|
||||
def _send_reply(self, conn, reply=None, failure=None):
|
||||
if not self._obsolete_reply_queues.reply_q_valid(self.reply_q,
|
||||
self.msg_id):
|
||||
return
|
||||
|
||||
if failure:
|
||||
failure = rpc_common.serialize_remote_exception(failure,
|
||||
log_failure)
|
||||
failure = rpc_common.serialize_remote_exception(failure)
|
||||
# NOTE(sileht): ending can be removed in N*, see Listener.wait()
|
||||
# for more detail.
|
||||
msg = {'result': reply, 'failure': failure, 'ending': True,
|
||||
@ -74,7 +73,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
|
||||
'elapsed': self.stopwatch.elapsed()})
|
||||
conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
|
||||
|
||||
def reply(self, reply=None, failure=None, log_failure=True):
|
||||
def reply(self, reply=None, failure=None):
|
||||
if not self.msg_id:
|
||||
# NOTE(Alexei_987) not sending reply, if msg_id is empty
|
||||
# because reply should not be expected by caller side
|
||||
@ -96,8 +95,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
|
||||
try:
|
||||
with self.listener.driver._get_connection(
|
||||
rpc_common.PURPOSE_SEND) as conn:
|
||||
self._send_reply(conn, reply, failure,
|
||||
log_failure=log_failure)
|
||||
self._send_reply(conn, reply, failure)
|
||||
return
|
||||
except rpc_amqp.AMQPDestinationNotFound:
|
||||
if timer.check_return() > 0:
|
||||
|
@ -92,7 +92,7 @@ class IncomingMessage(object):
|
||||
class RpcIncomingMessage(IncomingMessage):
|
||||
|
||||
@abc.abstractmethod
|
||||
def reply(self, reply=None, failure=None, log_failure=True):
|
||||
def reply(self, reply=None, failure=None):
|
||||
"""Send a reply or failure back to the client."""
|
||||
|
||||
|
||||
|
@ -162,18 +162,15 @@ class Connection(object):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def serialize_remote_exception(failure_info, log_failure=True):
|
||||
def serialize_remote_exception(failure_info):
|
||||
"""Prepares exception data to be sent over rpc.
|
||||
|
||||
Failure_info should be a sys.exc_info() tuple.
|
||||
|
||||
"""
|
||||
tb = traceback.format_exception(*failure_info)
|
||||
|
||||
failure = failure_info[1]
|
||||
if log_failure:
|
||||
LOG.error(_LE("Returning exception %s to caller"),
|
||||
six.text_type(failure))
|
||||
LOG.error(tb)
|
||||
|
||||
kwargs = {}
|
||||
if hasattr(failure, 'kwargs'):
|
||||
|
@ -98,13 +98,13 @@ class ProtonIncomingMessage(base.RpcIncomingMessage):
|
||||
self._reply_to = message.reply_to
|
||||
self._correlation_id = message.id
|
||||
|
||||
def reply(self, reply=None, failure=None, log_failure=True):
|
||||
def reply(self, reply=None, failure=None):
|
||||
"""Schedule a ReplyTask to send the reply."""
|
||||
if self._reply_to:
|
||||
response = marshal_response(reply=reply, failure=failure)
|
||||
response.correlation_id = self._correlation_id
|
||||
LOG.debug("Replying to %s", self._correlation_id)
|
||||
task = drivertasks.ReplyTask(self._reply_to, response, log_failure)
|
||||
task = drivertasks.ReplyTask(self._reply_to, response)
|
||||
self.listener.driver._ctrl.add_task(task)
|
||||
else:
|
||||
LOG.debug("Ignoring reply as no reply address available")
|
||||
|
@ -30,7 +30,7 @@ class FakeIncomingMessage(base.RpcIncomingMessage):
|
||||
self.requeue_callback = requeue
|
||||
self._reply_q = reply_q
|
||||
|
||||
def reply(self, reply=None, failure=None, log_failure=True):
|
||||
def reply(self, reply=None, failure=None):
|
||||
if self._reply_q:
|
||||
failure = failure[1] if failure else None
|
||||
self._reply_q.put((reply, failure))
|
||||
|
@ -237,7 +237,7 @@ class OsloKafkaMessage(base.RpcIncomingMessage):
|
||||
def requeue(self):
|
||||
LOG.warning(_LW("requeue is not supported"))
|
||||
|
||||
def reply(self, reply=None, failure=None, log_failure=True):
|
||||
def reply(self, reply=None, failure=None):
|
||||
LOG.warning(_LW("reply is not supported"))
|
||||
|
||||
|
||||
|
@ -175,13 +175,11 @@ class RpcPikaIncomingMessage(PikaIncomingMessage, base.RpcIncomingMessage):
|
||||
self.reply_q = properties.reply_to
|
||||
self.msg_id = properties.correlation_id
|
||||
|
||||
def reply(self, reply=None, failure=None, log_failure=True):
|
||||
def reply(self, reply=None, failure=None):
|
||||
"""Send back reply to the RPC client
|
||||
:param reply: Dictionary, reply. In case of exception should be None
|
||||
:param failure: Tuple, should be a sys.exc_info() tuple.
|
||||
Should be None if RPC request was successfully processed.
|
||||
:param log_failure: Boolean, not used in this implementation.
|
||||
It present here to be compatible with driver API
|
||||
|
||||
:return RpcReplyPikaIncomingMessage, message with reply
|
||||
"""
|
||||
|
@ -18,8 +18,7 @@ from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
class Response(object):
|
||||
|
||||
def __init__(self, id=None, type=None, message_id=None,
|
||||
reply_id=None, reply_body=None,
|
||||
failure=None, log_failure=None):
|
||||
reply_id=None, reply_body=None, failure=None):
|
||||
|
||||
self._id = id
|
||||
self._type = type
|
||||
@ -27,7 +26,6 @@ class Response(object):
|
||||
self._reply_id = reply_id
|
||||
self._reply_body = reply_body
|
||||
self._failure = failure
|
||||
self._log_failure = log_failure
|
||||
|
||||
@property
|
||||
def id_(self):
|
||||
@ -53,18 +51,13 @@ class Response(object):
|
||||
def failure(self):
|
||||
return self._failure
|
||||
|
||||
@property
|
||||
def log_failure(self):
|
||||
return self._log_failure
|
||||
|
||||
def to_dict(self):
|
||||
return {zmq_names.FIELD_ID: self._id,
|
||||
zmq_names.FIELD_TYPE: self._type,
|
||||
zmq_names.FIELD_MSG_ID: self._message_id,
|
||||
zmq_names.FIELD_REPLY_ID: self._reply_id,
|
||||
zmq_names.FIELD_REPLY: self._reply_body,
|
||||
zmq_names.FIELD_FAILURE: self._failure,
|
||||
zmq_names.FIELD_LOG_FAILURE: self._log_failure}
|
||||
zmq_names.FIELD_FAILURE: self._failure}
|
||||
|
||||
def __str__(self):
|
||||
return str(self.to_dict())
|
||||
|
@ -37,7 +37,7 @@ class DealerIncomingMessage(base.RpcIncomingMessage):
|
||||
def __init__(self, context, message):
|
||||
super(DealerIncomingMessage, self).__init__(context, message)
|
||||
|
||||
def reply(self, reply=None, failure=None, log_failure=True):
|
||||
def reply(self, reply=None, failure=None):
|
||||
"""Reply is not needed for non-call messages"""
|
||||
|
||||
def acknowledge(self):
|
||||
@ -55,16 +55,14 @@ class DealerIncomingRequest(base.RpcIncomingMessage):
|
||||
self.reply_id = reply_id
|
||||
self.message_id = message_id
|
||||
|
||||
def reply(self, reply=None, failure=None, log_failure=True):
|
||||
def reply(self, reply=None, failure=None):
|
||||
if failure is not None:
|
||||
failure = rpc_common.serialize_remote_exception(failure,
|
||||
log_failure)
|
||||
failure = rpc_common.serialize_remote_exception(failure)
|
||||
response = zmq_response.Response(type=zmq_names.REPLY_TYPE,
|
||||
message_id=self.message_id,
|
||||
reply_id=self.reply_id,
|
||||
reply_body=reply,
|
||||
failure=failure,
|
||||
log_failure=log_failure)
|
||||
failure=failure)
|
||||
|
||||
LOG.debug("Replying %s", self.message_id)
|
||||
|
||||
|
@ -31,7 +31,7 @@ class PullIncomingMessage(base.RpcIncomingMessage):
|
||||
def __init__(self, context, message):
|
||||
super(PullIncomingMessage, self).__init__(context, message)
|
||||
|
||||
def reply(self, reply=None, failure=None, log_failure=True):
|
||||
def reply(self, reply=None, failure=None):
|
||||
"""Reply is not needed for non-call messages."""
|
||||
|
||||
def acknowledge(self):
|
||||
|
@ -37,7 +37,7 @@ class RouterIncomingMessage(base.RpcIncomingMessage):
|
||||
self.msg_id = msg_id
|
||||
self.message = message
|
||||
|
||||
def reply(self, reply=None, failure=None, log_failure=True):
|
||||
def reply(self, reply=None, failure=None):
|
||||
"""Reply is not needed for non-call messages"""
|
||||
|
||||
def acknowledge(self):
|
||||
|
@ -34,7 +34,7 @@ class SubIncomingMessage(base.RpcIncomingMessage):
|
||||
def __init__(self, context, message):
|
||||
super(SubIncomingMessage, self).__init__(context, message)
|
||||
|
||||
def reply(self, reply=None, failure=None, log_failure=True):
|
||||
def reply(self, reply=None, failure=None):
|
||||
"""Reply is not needed for non-call messages."""
|
||||
|
||||
def acknowledge(self):
|
||||
|
@ -39,16 +39,14 @@ class ZmqIncomingRequest(base.RpcIncomingMessage):
|
||||
self.received = None
|
||||
self.poller = poller
|
||||
|
||||
def reply(self, reply=None, failure=None, log_failure=True):
|
||||
def reply(self, reply=None, failure=None):
|
||||
if failure is not None:
|
||||
failure = rpc_common.serialize_remote_exception(failure,
|
||||
log_failure)
|
||||
failure = rpc_common.serialize_remote_exception(failure)
|
||||
response = zmq_response.Response(type=zmq_names.REPLY_TYPE,
|
||||
message_id=self.request.message_id,
|
||||
reply_id=self.reply_id,
|
||||
reply_body=reply,
|
||||
failure=failure,
|
||||
log_failure=log_failure)
|
||||
failure=failure)
|
||||
|
||||
LOG.debug("Replying %s", (str(self.request.message_id)))
|
||||
|
||||
|
@ -20,7 +20,6 @@ zmq = zmq_async.import_zmq()
|
||||
FIELD_TYPE = 'type'
|
||||
FIELD_FAILURE = 'failure'
|
||||
FIELD_REPLY = 'reply'
|
||||
FIELD_LOG_FAILURE = 'log_failure'
|
||||
FIELD_ID = 'id'
|
||||
FIELD_MSG_ID = 'message_id'
|
||||
FIELD_MSG_TYPE = 'msg_type'
|
||||
|
@ -132,15 +132,14 @@ class RPCServer(msg_server.MessageHandlingServer):
|
||||
try:
|
||||
res = self.dispatcher.dispatch(message)
|
||||
except rpc_dispatcher.ExpectedException as e:
|
||||
LOG.debug(u'Expected exception during message handling (%s)',
|
||||
e.exc_info[1])
|
||||
failure = e.exc_info
|
||||
except Exception as e:
|
||||
LOG.debug(u'Expected exception during message handling (%s)', e)
|
||||
except Exception:
|
||||
# current sys.exc_info() content can be overriden
|
||||
# by another exception raise by a log handler during
|
||||
# by another exception raised by a log handler during
|
||||
# LOG.exception(). So keep a copy and delete it later.
|
||||
failure = sys.exc_info()
|
||||
LOG.exception(_LE('Exception during handling message'))
|
||||
LOG.exception(_LE('Exception during message handling'))
|
||||
|
||||
try:
|
||||
if failure is None:
|
||||
|
@ -496,14 +496,6 @@ class TestSendReceive(test_utils.BaseTestCase):
|
||||
senders = []
|
||||
replies = []
|
||||
msgs = []
|
||||
errors = []
|
||||
|
||||
def stub_error(msg, *a, **kw):
|
||||
if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]):
|
||||
a = a[0]
|
||||
errors.append(str(msg) % a)
|
||||
|
||||
self.stubs.Set(driver_common.LOG, 'error', stub_error)
|
||||
|
||||
def send_and_wait_for_reply(i):
|
||||
try:
|
||||
@ -545,8 +537,7 @@ class TestSendReceive(test_utils.BaseTestCase):
|
||||
raise ZeroDivisionError
|
||||
except Exception:
|
||||
failure = sys.exc_info()
|
||||
msgs[i].reply(failure=failure,
|
||||
log_failure=not self.expected)
|
||||
msgs[i].reply(failure=failure)
|
||||
elif self.rx_id:
|
||||
msgs[i].reply({'rx_id': i})
|
||||
else:
|
||||
@ -564,11 +555,6 @@ class TestSendReceive(test_utils.BaseTestCase):
|
||||
else:
|
||||
self.assertEqual(self.reply, reply)
|
||||
|
||||
if not self.timeout and self.failure and not self.expected:
|
||||
self.assertTrue(len(errors) > 0, errors)
|
||||
else:
|
||||
self.assertEqual(0, len(errors), errors)
|
||||
|
||||
|
||||
TestSendReceive.generate_scenarios()
|
||||
|
||||
|
@ -21,6 +21,7 @@ import testscenarios
|
||||
|
||||
import mock
|
||||
import oslo_messaging
|
||||
from oslo_messaging.rpc import server as rpc_server_module
|
||||
from oslo_messaging import server as server_module
|
||||
from oslo_messaging.tests import utils as test_utils
|
||||
|
||||
@ -326,6 +327,22 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
|
||||
def ping(self, ctxt, arg):
|
||||
raise ValueError(arg)
|
||||
|
||||
debugs = []
|
||||
errors = []
|
||||
|
||||
def stub_debug(msg, *a, **kw):
|
||||
if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]):
|
||||
a = a[0]
|
||||
debugs.append(str(msg) % a)
|
||||
|
||||
def stub_error(msg, *a, **kw):
|
||||
if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]):
|
||||
a = a[0]
|
||||
errors.append(str(msg) % a)
|
||||
|
||||
self.stubs.Set(rpc_server_module.LOG, 'debug', stub_debug)
|
||||
self.stubs.Set(rpc_server_module.LOG, 'error', stub_error)
|
||||
|
||||
server_thread = self._setup_server(transport, TestEndpoint())
|
||||
client = self._setup_client(transport)
|
||||
|
||||
@ -334,6 +351,8 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
|
||||
except Exception as ex:
|
||||
self.assertIsInstance(ex, ValueError)
|
||||
self.assertEqual('dsfoo', str(ex))
|
||||
self.assertTrue(len(debugs) == 0)
|
||||
self.assertTrue(len(errors) > 0)
|
||||
else:
|
||||
self.assertTrue(False)
|
||||
|
||||
@ -342,6 +361,22 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
|
||||
def test_expected_failure(self):
|
||||
transport = oslo_messaging.get_transport(self.conf, url='fake:')
|
||||
|
||||
debugs = []
|
||||
errors = []
|
||||
|
||||
def stub_debug(msg, *a, **kw):
|
||||
if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]):
|
||||
a = a[0]
|
||||
debugs.append(str(msg) % a)
|
||||
|
||||
def stub_error(msg, *a, **kw):
|
||||
if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]):
|
||||
a = a[0]
|
||||
errors.append(str(msg) % a)
|
||||
|
||||
self.stubs.Set(rpc_server_module.LOG, 'debug', stub_debug)
|
||||
self.stubs.Set(rpc_server_module.LOG, 'error', stub_error)
|
||||
|
||||
class TestEndpoint(object):
|
||||
@oslo_messaging.expected_exceptions(ValueError)
|
||||
def ping(self, ctxt, arg):
|
||||
@ -355,6 +390,8 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
|
||||
except Exception as ex:
|
||||
self.assertIsInstance(ex, ValueError)
|
||||
self.assertEqual('dsfoo', str(ex))
|
||||
self.assertTrue(len(debugs) > 0)
|
||||
self.assertTrue(len(errors) == 0)
|
||||
else:
|
||||
self.assertTrue(False)
|
||||
|
||||
|
@ -61,11 +61,6 @@ def add_remote_postfix(ex):
|
||||
|
||||
class SerializeRemoteExceptionTestCase(test_utils.BaseTestCase):
|
||||
|
||||
_log_failure = [
|
||||
('log_failure', dict(log_failure=True)),
|
||||
('do_not_log_failure', dict(log_failure=False)),
|
||||
]
|
||||
|
||||
_add_remote = [
|
||||
('add_remote', dict(add_remote=True)),
|
||||
('do_not_add_remote', dict(add_remote=False)),
|
||||
@ -100,27 +95,19 @@ class SerializeRemoteExceptionTestCase(test_utils.BaseTestCase):
|
||||
|
||||
@classmethod
|
||||
def generate_scenarios(cls):
|
||||
cls.scenarios = testscenarios.multiply_scenarios(cls._log_failure,
|
||||
cls._add_remote,
|
||||
cls.scenarios = testscenarios.multiply_scenarios(cls._add_remote,
|
||||
cls._exception_types)
|
||||
|
||||
def setUp(self):
|
||||
super(SerializeRemoteExceptionTestCase, self).setUp()
|
||||
|
||||
def test_serialize_remote_exception(self):
|
||||
errors = []
|
||||
|
||||
def stub_error(msg, *a, **kw):
|
||||
if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]):
|
||||
a = a[0]
|
||||
errors.append(str(msg) % a)
|
||||
|
||||
self.stubs.Set(exceptions.LOG, 'error', stub_error)
|
||||
|
||||
try:
|
||||
try:
|
||||
raise self.cls(*self.args, **self.kwargs)
|
||||
except Exception as ex:
|
||||
# Note: in Python 3 ex variable will be cleared at the end of
|
||||
# the except clause, so explicitly make an extra copy of it
|
||||
cls_error = ex
|
||||
if self.add_remote:
|
||||
ex = add_remote_postfix(ex)
|
||||
@ -128,8 +115,7 @@ class SerializeRemoteExceptionTestCase(test_utils.BaseTestCase):
|
||||
except Exception:
|
||||
exc_info = sys.exc_info()
|
||||
|
||||
serialized = exceptions.serialize_remote_exception(
|
||||
exc_info, log_failure=self.log_failure)
|
||||
serialized = exceptions.serialize_remote_exception(exc_info)
|
||||
|
||||
failure = jsonutils.loads(serialized)
|
||||
|
||||
@ -143,11 +129,6 @@ class SerializeRemoteExceptionTestCase(test_utils.BaseTestCase):
|
||||
tb = cls_error.__class__.__name__ + ': ' + self.msg
|
||||
self.assertIn(tb, ''.join(failure['tb']))
|
||||
|
||||
if self.log_failure:
|
||||
self.assertTrue(len(errors) > 0, errors)
|
||||
else:
|
||||
self.assertEqual(0, len(errors), errors)
|
||||
|
||||
|
||||
SerializeRemoteExceptionTestCase.generate_scenarios()
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user