From 28de384e9c667e6a9aae9ba99c8254e93c6a296b Mon Sep 17 00:00:00 2001
From: Victor Sergeyev <vsergeyev@mirantis.com>
Date: Tue, 18 Aug 2015 17:50:02 +0300
Subject: [PATCH] Improve simulator.py

- added config options to set debug level
- added config options to show proceed messages per second on rpc-server
- added config options to select executor for rpc-server
- added config options to select call or cast mesages for rpc-client

Usage section updated

Change-Id: Ieadbc600f556ca5eb43b05abec69315b46023662
---
 tools/simulator.py | 138 +++++++++++++++++++++++++++++++++------------
 1 file changed, 102 insertions(+), 36 deletions(-)

diff --git a/tools/simulator.py b/tools/simulator.py
index 2f54e0b3d..843abc8ce 100755
--- a/tools/simulator.py
+++ b/tools/simulator.py
@@ -10,19 +10,14 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-# Usage example:
-#  python tools/simulator.py \
-#     --url rabbit://stackrabbit:secretrabbit@localhost/ rpc-server
-#  python tools/simulator.py
-#     --url rabbit://stackrabbit:secretrabbit@localhost/ rpc-client \
-#     --exit-wait 15000 -p 64 -m 64
-
 import eventlet
 eventlet.monkey_patch()
 
 import argparse
+import datetime
 import logging
 import sys
+import threading
 import time
 
 from oslo_config import cfg
@@ -30,8 +25,19 @@ import oslo_messaging as messaging
 from oslo_messaging import notify  # noqa
 from oslo_messaging import rpc  # noqa
 
+
 LOG = logging.getLogger()
 
+USAGE = """ Usage: ./simulator.py [-h] [--url URL] [-d DEBUG]\
+ {notify-server,notify-client,rpc-server,rpc-client} ...
+
+Usage example:
+ python tools/simulator.py\
+ --url rabbit://stackrabbit:secretrabbit@localhost/ rpc-server
+ python tools/simulator.py\
+ --url rabbit://stackrabbit:secretrabbit@localhost/ rpc-client\
+ --exit-wait 15000 -p 64 -m 64"""
+
 
 class LoggingNoParsingFilter(logging.Filter):
     def filter(self, record):
@@ -75,7 +81,7 @@ class RpcEndpoint(object):
         self.wait_before_answer = wait_before_answer
 
     def info(self, ctxt, message):
-        i = int(message.replace('test ', ''))
+        i = int(message.split(' ')[-1])
         if self.count is None:
             self.count = i
         elif i == 0:
@@ -89,11 +95,29 @@ class RpcEndpoint(object):
         return "OK: %s" % message
 
 
-def rpc_server(transport, wait_before_answer):
-    endpoints = [RpcEndpoint(wait_before_answer)]
-    target = messaging.Target(topic='t1', server='moi')
-    server = rpc.get_rpc_server(transport, target,
-                                endpoints, executor='eventlet')
+class RpcEndpointMonitor(RpcEndpoint):
+    def __init__(self, *args, **kwargs):
+        super(RpcEndpointMonitor, self).__init__(*args, **kwargs)
+
+        self._count = self._prev_count = 0
+        self._monitor()
+
+    def _monitor(self):
+        threading.Timer(1.0, self._monitor).start()
+        print ("%d msg was received per second"
+               % (self._count - self._prev_count))
+        self._prev_count = self._count
+
+    def info(self, *args, **kwargs):
+        self._count += 1
+        super(RpcEndpointMonitor, self).info(*args, **kwargs)
+
+
+def rpc_server(transport, target, wait_before_answer, executor, show_stats):
+    endpoint_cls = RpcEndpointMonitor if show_stats else RpcEndpoint
+    endpoints = [endpoint_cls(wait_before_answer)]
+    server = rpc.get_rpc_server(transport, target, endpoints,
+                                executor=executor)
     server.start()
     server.wait()
 
@@ -105,23 +129,38 @@ def threads_spawner(threads, method, *args, **kwargs):
     p.waitall()
 
 
-def rpc_call(_id, transport, messages, wait_after_msg, timeout):
-    target = messaging.Target(topic='t1', server='moi')
-    c = rpc.RPCClient(transport, target)
-    c = c.prepare(timeout=timeout)
+def send_msg(_id, transport, target, messages, wait_after_msg, timeout,
+             is_cast):
+    client = rpc.RPCClient(transport, target)
+    client = client.prepare(timeout=timeout)
+    rpc_method = _rpc_cast if is_cast else _rpc_call
+
     for i in range(0, messages):
-        payload = "test %d" % i
-        LOG.info("SEND: %s" % payload)
-        try:
-            res = c.call({}, 'info', message=payload)
-        except Exception:
-            LOG.exception('no RCV for %s' % i)
-        else:
-            LOG.info("RCV: %s" % res)
+        msg = "test message %d" % i
+        LOG.info("SEND: %s" % msg)
+        rpc_method(client, msg)
         if wait_after_msg > 0:
             time.sleep(wait_after_msg)
 
 
+def _rpc_call(client, msg):
+    try:
+        res = client.call({}, 'info', message=msg)
+    except Exception as e:
+        LOG.exception('Error %s on CALL for message %s' % (str(e), msg))
+    else:
+        LOG.info("SENT: %s, RCV: %s" % (msg, res))
+
+
+def _rpc_cast(client, msg):
+    try:
+        client.cast({}, 'info', message=msg)
+    except Exception as e:
+        LOG.exception('Error %s on CAST for message %s' % (str(e), msg))
+    else:
+        LOG.info("SENT: %s" % msg)
+
+
 def notifier(_id, transport, messages, wait_after_msg, timeout):
     n1 = notify.Notifier(transport, topic="n-t1").prepare(
         publisher_id='publisher-%d' % _id)
@@ -137,11 +176,26 @@ def notifier(_id, transport, messages, wait_after_msg, timeout):
             time.sleep(wait_after_msg)
 
 
+def _setup_logging(is_debug):
+    log_level = logging.DEBUG if is_debug else logging.WARN
+    logging.basicConfig(stream=sys.stdout, level=log_level)
+    logging.getLogger().handlers[0].addFilter(LoggingNoParsingFilter())
+    for i in ['kombu', 'amqp', 'stevedore', 'qpid.messaging'
+              'oslo.messaging._drivers.amqp', ]:
+        logging.getLogger(i).setLevel(logging.WARN)
+
+
 def main():
-    parser = argparse.ArgumentParser(description='RPC DEMO')
+    parser = argparse.ArgumentParser(
+        description='Tools to play with oslo.messaging\'s RPC',
+        usage=USAGE,
+    )
     parser.add_argument('--url', dest='url',
                         default='rabbit://guest:password@localhost/',
                         help="oslo.messaging transport url")
+    parser.add_argument('-d', '--debug', dest='debug', type=bool,
+                        default=False,
+                        help="Turn on DEBUG logging level instead of WARN")
     subparsers = parser.add_subparsers(dest='mode',
                                        help='notify/rpc server/client mode')
 
@@ -158,6 +212,11 @@ def main():
 
     server = subparsers.add_parser('rpc-server')
     server.add_argument('-w', dest='wait_before_answer', type=int, default=-1)
+    server.add_argument('--show-stats', dest='show_stats',
+                        type=bool, default=True)
+    server.add_argument('-e', '--executor', dest='executor',
+                        type=str, default='eventlet',
+                        help='name of a message executor')
 
     client = subparsers.add_parser('rpc-client')
     client.add_argument('-p', dest='threads', type=int, default=1,
@@ -171,34 +230,41 @@ def main():
     client.add_argument('--exit-wait', dest='exit_wait', type=int, default=0,
                         help='Keep connections open N seconds after calls '
                         'have been done')
+    client.add_argument('--is-cast', dest='is_cast', type=bool, default=False,
+                        help='Use `call` or `cast` RPC methods')
 
     args = parser.parse_args()
 
-    # Setup logging
-    logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
-    logging.getLogger().handlers[0].addFilter(LoggingNoParsingFilter())
-    for i in ['kombu', 'amqp', 'stevedore', 'qpid.messaging'
-              'oslo.messaging._drivers.amqp', ]:
-        logging.getLogger(i).setLevel(logging.WARN)
+    _setup_logging(is_debug=args.debug)
 
     # oslo.config defaults
     cfg.CONF.heartbeat_interval = 5
     cfg.CONF.notification_topics = "notif"
     cfg.CONF.notification_driver = "messaging"
 
-    # the transport
     transport = messaging.get_transport(cfg.CONF, url=args.url)
+    target = messaging.Target(topic='profiler_topic', server='profiler_server')
 
     if args.mode == 'rpc-server':
-        rpc_server(transport, args.wait_before_answer)
+        if args.url.startswith('zmq'):
+            cfg.CONF.rpc_zmq_matchmaker = "redis"
+            transport._driver.matchmaker._redis.flushdb()
+        rpc_server(transport, target, args.wait_before_answer, args.executor,
+                   args.show_stats)
     elif args.mode == 'notify-server':
         notify_server(transport)
     elif args.mode == 'notify-client':
         threads_spawner(args.threads, notifier, transport, args.messages,
                         args.wait_after_msg, args.timeout)
     elif args.mode == 'rpc-client':
-        threads_spawner(args.threads, rpc_call, transport, args.messages,
-                        args.wait_after_msg, args.timeout)
+        start = datetime.datetime.now()
+        threads_spawner(args.threads, send_msg, transport, target,
+                        args.messages, args.wait_after_msg, args.timeout,
+                        args.is_cast)
+        time_ellapsed = (datetime.datetime.now() - start).total_seconds()
+        msg_count = args.messages * args.threads
+        print ('%d messages was sent for %s seconds. Bandwight is %s msg/sec'
+               % (msg_count, time_ellapsed, (msg_count / time_ellapsed)))
         LOG.info("calls finished, wait %d seconds" % args.exit_wait)
         time.sleep(args.exit_wait)