From 2c8f39312f9b103b12586323a5d88f188c787d41 Mon Sep 17 00:00:00 2001
From: Yulia Portnova <yportnova@mirantis.com>
Date: Mon, 11 Jan 2016 18:46:40 +0200
Subject: [PATCH] Add duration option to simulator.py

Change-Id: I992fdc1e22ee0debed34b4beb62cbd563351d12f
---
 tools/simulator.py | 39 ++++++++++++++++++++++++++++++---------
 1 file changed, 30 insertions(+), 9 deletions(-)

diff --git a/tools/simulator.py b/tools/simulator.py
index 6ab9e6dce..0bc0c3ba3 100755
--- a/tools/simulator.py
+++ b/tools/simulator.py
@@ -165,8 +165,10 @@ class RpcEndpoint(Monitor):
     def __init__(self, wait_before_answer, show_stats):
         self.count = None
         self.wait_before_answer = wait_before_answer
+        self.messages_received = 0
 
     def info(self, ctxt, message):
+        self.messages_received += 1
         i = int(message.split(' ')[-1])
         if self.count is None:
             self.count = i
@@ -184,7 +186,7 @@ class RpcEndpoint(Monitor):
 class RPCClient(object):
     def __init__(self, transport, target, timeout, method, wait_after_msg):
         self.client = rpc.RPCClient(transport, target)
-        self.client.prepare(timeout=timeout)
+        self.client = self.client.prepare(timeout=timeout)
         self.method = method
         self.bytes = 0
         self.msg_sent = 0
@@ -222,11 +224,21 @@ def init_msg(messages_count):
     LOG.info("Messages has been prepared")
 
 
-def rpc_server(transport, target, wait_before_answer, executor, show_stats):
+def rpc_server(transport, target, wait_before_answer, executor, show_stats,
+               duration):
     endpoints = [RpcEndpoint(wait_before_answer, show_stats)]
     server = rpc.get_rpc_server(transport, target, endpoints,
                                 executor=executor)
     server.start()
+    if duration:
+        start_t = time.time()
+        while time.time() - start_t < duration:
+            time.sleep(1)
+        server.stop()
+        server.wait()
+        LOG.info("Received total messages: %d",
+                 server.dispatcher.endpoints[0].messages_received)
+        return
     server.wait()
 
 
@@ -238,14 +250,20 @@ def threads_spawner(threads, method, *args, **kwargs):
 
 
 def send_msg(c_id, transport, target, wait_after_msg, timeout, is_cast,
-             messages_count):
-    LOG.debug("Sending %d messages using client %d", messages_count, c_id)
+             messages_count, duration):
     rpc_method = _rpc_cast if is_cast else _rpc_call
     client = RPCClient(transport, target, timeout, rpc_method, wait_after_msg)
     RPC_CLIENTS.append(client)
-    for _ in xrange(0, messages_count):
-        client.send_msg()
-    LOG.debug("Client %d has sent all messages", c_id)
+
+    if duration:
+        start_time = time.time()
+        while time.time() - start_time < duration:
+            client.send_msg()
+    else:
+        LOG.debug("Sending %d messages using client %d", messages_count, c_id)
+        for _ in xrange(0, messages_count):
+            client.send_msg()
+        LOG.debug("Client %d has sent %d messages", c_id, messages_count)
 
 
 def _rpc_call(client, msg):
@@ -304,6 +322,9 @@ def main():
     parser.add_argument('-tp', '--topic', dest='topic',
                         default="profiler_topic",
                         help="Topic to publish/receive messages to/from.")
+    parser.add_argument('-l', dest='duration', type=int,
+                        help='send messages for certain time')
+
     subparsers = parser.add_subparsers(dest='mode',
                                        help='notify/rpc server/client mode')
 
@@ -369,7 +390,7 @@ def main():
             cfg.CONF.rpc_zmq_matchmaker = "redis"
             transport._driver.matchmaker._redis.flushdb()
         rpc_server(transport, target, args.wait_before_answer, args.executor,
-                   args.show_stats)
+                   args.show_stats, args.duration)
     elif args.mode == 'notify-server':
         notify_server(transport, args.show_stats)
     elif args.mode == 'batch-notify-server':
@@ -383,7 +404,7 @@ def main():
         start = datetime.datetime.now()
         threads_spawner(args.threads, send_msg, transport, target,
                         args.wait_after_msg, args.timeout, args.is_cast,
-                        args.messages)
+                        args.messages, args.duration)
         time_elapsed = (datetime.datetime.now() - start).total_seconds()
 
         msg_count = 0