diff --git a/tools/simulator.py b/tools/simulator.py
index 9b8069c71..a764dfe79 100755
--- a/tools/simulator.py
+++ b/tools/simulator.py
@@ -16,7 +16,7 @@ eventlet.monkey_patch()
 import argparse
 import bisect
 import collections
-import datetime
+import functools
 import itertools
 import logging
 import os
@@ -37,7 +37,7 @@ from oslo_utils import timeutils
 LOG = logging.getLogger()
 RANDOM_GENERATOR = None
 CURRENT_PID = None
-RPC_CLIENTS = []
+CLIENTS = []
 MESSAGES = []
 
 USAGE = """ Usage: ./simulator.py [-h] [--url URL] [-d DEBUG]\
@@ -94,76 +94,183 @@ class LoggingNoParsingFilter(logging.Filter):
         return True
 
 
-class Monitor(object):
-    def __init__(self, show_stats=False, *args, **kwargs):
-        self._count = self._prev_count = 0
-        self.show_stats = show_stats
-        if self.show_stats:
-            self._monitor()
-
-    def _monitor(self):
-        threading.Timer(1.0, self._monitor).start()
-        LOG.debug("%d msg was received per second",
-                  (self._count - self._prev_count))
-        self._prev_count = self._count
-
-    def info(self, *args, **kwargs):
-        self._count += 1
+Message = collections.namedtuple(
+    'Message', ['seq', 'cargo', 'client_ts', 'server_ts', 'return_ts'])
 
 
-class NotifyEndpoint(Monitor):
-    def __init__(self, *args, **kwargs):
-        super(NotifyEndpoint, self).__init__(*args, **kwargs)
-        self.cache = []
+def make_message(seq, cargo, client_ts=0, server_ts=0, return_ts=0):
+    return Message(seq, cargo, client_ts, server_ts, return_ts)
+
+
+def update_message(message, **kwargs):
+    return Message(*message)._replace(**kwargs)
+
+
+class MessageStatsCollector(object):
+    def __init__(self, label):
+        self.label = label
+        self.buffer = []  # buffer to store messages during report interval
+        self.series = []  # stats for every report interval
+        threading.Timer(1.0, self.monitor).start()  # schedule in a second
+
+    def monitor(self):
+        threading.Timer(1.0, self.monitor).start()
+        now = time.time()
+
+        count = len(self.buffer)
+
+        size = 0
+        min_latency = sys.maxint
+        max_latency = 0
+        sum_latencies = 0
+
+        for i in range(count):
+            p = self.buffer[i]
+            size += len(p.cargo)
+
+            latency = None
+            if p.return_ts:
+                latency = p.return_ts - p.client_ts  # round-trip
+            elif p.server_ts:
+                latency = p.server_ts - p.client_ts  # client -> server
+
+            if latency:
+                sum_latencies += latency
+                min_latency = min(min_latency, latency)
+                max_latency = max(max_latency, latency)
+
+        del self.buffer[:count]  # trim processed items
+
+        seq = len(self.series)
+        stats = dict(seq=seq, timestamp=now, count=count, size=size)
+        msg = ('%-14s: seq: %-4d count: %-6d bytes: %-10d' %
+               (self.label, seq, count, size))
+
+        if sum_latencies:
+            latency = sum_latencies / count
+            stats.update(dict(latency=latency,
+                              min_latency=min_latency,
+                              max_latency=max_latency))
+            msg += (' latency: %-9.3f min: %-9.3f max: %-9.3f' %
+                    (latency, min_latency, max_latency))
+
+        self.series.append(stats)
+        LOG.info(msg)
+
+    def push(self, parsed_message):
+        self.buffer.append(parsed_message)
+
+    def get_series(self):
+        return self.series
+
+    @staticmethod
+    def calc_stats(label, *collectors):
+        count = 0
+        size = 0
+        min_latency = sys.maxint
+        max_latency = 0
+        sum_latencies = 0
+        start = sys.maxint
+        end = 0
+
+        for point in itertools.chain(*(c.get_series() for c in collectors)):
+            count += point['count']
+            size += point['size']
+            start = min(start, point['timestamp'])
+            end = max(end, point['timestamp'])
+
+            if 'latency' in point:
+                sum_latencies += point['latency'] * point['count']
+                min_latency = min(min_latency, point['min_latency'])
+                max_latency = max(max_latency, point['max_latency'])
+
+        # start is the timestamp of the earliest block, which inclides samples
+        # for the prior second
+        start -= 1
+        duration = end - start if count else 0
+        stats = dict(count=count, size=size, duration=duration, count_p_s=0,
+                     size_p_s=0)
+        if duration:
+            stats.update(dict(start=start, end=end,
+                              count_p_s=count / duration,
+                              size_p_s=size / duration))
+
+        msg = ('%s: duration: %.2f count: %d (%.1f msg/sec) '
+               'bytes: %d (%.0f bps)' %
+               (label, duration, count, stats['count_p_s'],
+                size, stats['size_p_s']))
+
+        if sum_latencies:
+            latency = sum_latencies / count
+            stats.update(dict(latency=latency,
+                              min_latency=min_latency,
+                              max_latency=max_latency))
+            msg += (' latency: %.3f min: %.3f max: %.3f' %
+                    (latency, min_latency, max_latency))
+
+        LOG.info(msg)
+        return stats
+
+
+class NotifyEndpoint(object):
+    def __init__(self, wait_before_answer, requeue):
+        self.wait_before_answer = wait_before_answer
+        self.requeue = requeue
+        self.received_messages = MessageStatsCollector('server')
+        self.cache = set()
 
     def info(self, ctxt, publisher_id, event_type, payload, metadata):
-        super(NotifyEndpoint, self).info(ctxt, publisher_id, event_type,
-                                         payload, metadata)
-        LOG.debug('msg rcv')
         LOG.debug("%s %s %s %s", ctxt, publisher_id, event_type, payload)
-        if not self.show_stats and payload not in self.cache:
-            LOG.debug('requeue msg')
-            self.cache.append(payload)
-            for i in range(15):
-                eventlet.sleep(1)
+
+        server_ts = time.time()
+
+        message = update_message(payload, server_ts=server_ts)
+        self.received_messages.push(message)
+
+        if self.requeue and message.seq not in self.cache:
+            self.cache.add(message.seq)
+
+            if self.wait_before_answer > 0:
+                time.sleep(self.wait_before_answer)
+
             return messaging.NotificationResult.REQUEUE
-        else:
-            LOG.debug('ack msg')
+
         return messaging.NotificationResult.HANDLED
 
 
-def notify_server(transport, topic, show_stats, duration):
-    endpoints = [NotifyEndpoint(show_stats)]
+def notify_server(transport, topic, wait_before_answer, duration, requeue):
+    endpoints = [NotifyEndpoint(wait_before_answer, requeue)]
     target = messaging.Target(topic=topic)
     server = notify.get_notification_listener(transport, [target],
                                               endpoints, executor='eventlet')
     run_server(server, duration=duration)
 
+    return endpoints[0]
 
-class BatchNotifyEndpoint(Monitor):
-    def __init__(self, *args, **kwargs):
-        super(BatchNotifyEndpoint, self).__init__(*args, **kwargs)
-        self.cache = []
 
-    def info(self, messages):
-        super(BatchNotifyEndpoint, self).info(messages)
-        self._count += len(messages) - 1
+class BatchNotifyEndpoint(object):
+    def __init__(self, wait_before_answer, requeue):
+        self.wait_before_answer = wait_before_answer
+        self.requeue = requeue
+        self.received_messages = MessageStatsCollector('server')
+        self.cache = set()
 
+    def info(self, batch):
         LOG.debug('msg rcv')
-        LOG.debug("%s", messages)
-        if not self.show_stats and messages not in self.cache:
-            LOG.debug('requeue msg')
-            self.cache.append(messages)
-            for i in range(15):
-                eventlet.sleep(1)
-            return messaging.NotificationResult.REQUEUE
-        else:
-            LOG.debug('ack msg')
+        LOG.debug("%s", batch)
+
+        server_ts = time.time()
+
+        for item in batch:
+            message = update_message(item['payload'], server_ts=server_ts)
+            self.received_messages.push(message)
+
         return messaging.NotificationResult.HANDLED
 
 
-def batch_notify_server(transport, topic, show_stats, duration):
-    endpoints = [BatchNotifyEndpoint(show_stats)]
+def batch_notify_server(transport, topic, wait_before_answer, duration,
+                        requeue):
+    endpoints = [BatchNotifyEndpoint(wait_before_answer, requeue)]
     target = messaging.Target(topic=topic)
     server = notify.get_batch_notification_listener(
         transport, [target],
@@ -171,53 +278,83 @@ def batch_notify_server(transport, topic, show_stats, duration):
         batch_size=1000, batch_timeout=5)
     run_server(server, duration=duration)
 
+    return endpoints[0]
 
-class RpcEndpoint(Monitor):
-    def __init__(self, wait_before_answer, show_stats):
-        self.count = None
+
+class RpcEndpoint(object):
+    def __init__(self, wait_before_answer):
         self.wait_before_answer = wait_before_answer
-        self.messages_received = 0
+        self.received_messages = MessageStatsCollector('server')
 
     def info(self, ctxt, message):
-        self.messages_received += 1
-        i = int(message.split(' ')[-1])
-        if self.count is None:
-            self.count = i
-        elif i == 0:
-            self.count = 0
-        else:
-            self.count += 1
+        server_ts = time.time()
+
+        LOG.debug("######## RCV: %s", message)
+
+        reply = update_message(message, server_ts=server_ts)
+        self.received_messages.push(reply)
 
-        LOG.debug("######## RCV: %s/%s", self.count, message)
         if self.wait_before_answer > 0:
             time.sleep(self.wait_before_answer)
-        return "OK: %s" % message
+
+        return reply
 
 
-class RPCClient(object):
-    def __init__(self, transport, target, timeout, method, wait_after_msg):
-        self.client = rpc.RPCClient(transport, target)
-        self.client = self.client.prepare(timeout=timeout)
+class Client(object):
+    def __init__(self, client_id, client, method, has_result,
+                 wait_after_msg):
+        self.client_id = client_id
+        self.client = client
         self.method = method
-        self.bytes = 0
-        self.msg_sent = 0
+        self.wait_after_msg = wait_after_msg
+
+        self.seq = 0
         self.messages_count = len(MESSAGES)
         # Start sending the messages from a random position to avoid
         # memory re-usage and generate more realistic load on the library
         # and a message transport
         self.position = random.randint(0, self.messages_count - 1)
-        self.wait_after_msg = wait_after_msg
+        self.sent_messages = MessageStatsCollector('client-%s' % client_id)
+
+        if has_result:
+            self.round_trip_messages = MessageStatsCollector(
+                'round-trip-%s' % client_id)
 
     def send_msg(self):
-        msg = MESSAGES[self.position]
-        self.method(self.client, msg)
-        self.bytes += len(msg)
-        self.msg_sent += 1
+        msg = make_message(self.seq, MESSAGES[self.position], time.time())
+        self.sent_messages.push(msg)
+
+        res = self.method(self.client, msg)
+        if res:
+            return_ts = time.time()
+            res = update_message(res, return_ts=return_ts)
+            self.round_trip_messages.push(res)
+
+        self.seq += 1
         self.position = (self.position + 1) % self.messages_count
         if self.wait_after_msg > 0:
             time.sleep(self.wait_after_msg)
 
 
+class RPCClient(Client):
+    def __init__(self, client_id, transport, target, timeout, is_cast,
+                 wait_after_msg):
+        client = rpc.RPCClient(transport, target).prepare(timeout=timeout)
+        method = _rpc_cast if is_cast else _rpc_call
+
+        super(RPCClient, self).__init__(client_id, client, method,
+                                        not is_cast, wait_after_msg)
+
+
+class NotifyClient(Client):
+    def __init__(self, client_id, transport, topic, wait_after_msg):
+        client = notify.Notifier(transport, driver='messaging', topic=topic)
+        client = client.prepare(publisher_id='publisher-%d' % client_id)
+        method = _notify
+        super(NotifyClient, self).__init__(client_id, client, method,
+                                           False, wait_after_msg)
+
+
 def generate_messages(messages_count):
     # Limit the messages amount. Clients will reiterate the array again
     # if an amount of messages to be sent is bigger than MESSAGES_LIMIT
@@ -227,57 +364,63 @@ def generate_messages(messages_count):
 
     for i in range(messages_count):
         length = RANDOM_GENERATOR()
-        msg = ''.join(random.choice(string.lowercase) for x in range(length)) \
-              + ' ' + str(i)
+        msg = ''.join(random.choice(string.lowercase) for x in range(length))
         MESSAGES.append(msg)
 
     LOG.info("Messages has been prepared")
 
 
 def run_server(server, duration=None):
-    server.start()
-    if duration:
-        with timeutils.StopWatch(duration) as stop_watch:
-            while not stop_watch.expired():
-                time.sleep(1)
-        server.stop()
-    server.wait()
+    try:
+        server.start()
+        if duration:
+            with timeutils.StopWatch(duration) as stop_watch:
+                while not stop_watch.expired():
+                    time.sleep(1)
+            server.stop()
+        server.wait()
+    except KeyboardInterrupt:  # caught SIGINT
+        LOG.info('Caught SIGINT, terminating')
+        time.sleep(1)  # wait for stats collector to process the last second
 
 
-def rpc_server(transport, target, wait_before_answer, executor, show_stats,
-               duration):
-    endpoints = [RpcEndpoint(wait_before_answer, show_stats)]
+def rpc_server(transport, target, wait_before_answer, executor, duration):
+    endpoints = [RpcEndpoint(wait_before_answer)]
     server = rpc.get_rpc_server(transport, target, endpoints,
                                 executor=executor)
     LOG.debug("starting RPC server for target %s", target)
+
     run_server(server, duration=duration)
-    LOG.info("Received total messages: %d",
-             server.dispatcher.endpoints[0].messages_received)
+
+    return server.dispatcher.endpoints[0]
 
 
-def spawn_notify_clients(threads, *args, **kwargs):
-    p = eventlet.GreenPool(size=threads)
-    for i in range(0, threads):
-        p.spawn_n(notifier, i, *args, **kwargs)
-    p.waitall()
-
-
-def spawn_rpc_clients(threads, transport, targets,
-                      *args, **kwargs):
+def spawn_rpc_clients(threads, transport, targets, wait_after_msg, timeout,
+                      is_cast, messages_count, duration):
     p = eventlet.GreenPool(size=threads)
     targets = itertools.cycle(targets)
     for i in range(0, threads):
         target = targets.next()
         LOG.debug("starting RPC client for target %s", target)
-        p.spawn_n(send_msg, i, transport, target, *args, **kwargs)
+        client_builder = functools.partial(RPCClient, i, transport, target,
+                                           timeout, is_cast, wait_after_msg)
+        p.spawn_n(send_messages, i, client_builder, messages_count, duration)
     p.waitall()
 
 
-def send_msg(c_id, transport, target, wait_after_msg, timeout, is_cast,
-             messages_count, duration):
-    rpc_method = _rpc_cast if is_cast else _rpc_call
-    client = RPCClient(transport, target, timeout, rpc_method, wait_after_msg)
-    RPC_CLIENTS.append(client)
+def spawn_notify_clients(threads, topic, transport, message_count,
+                         wait_after_msg, timeout, duration):
+    p = eventlet.GreenPool(size=threads)
+    for i in range(0, threads):
+        client_builder = functools.partial(NotifyClient, i, transport, topic,
+                                           wait_after_msg)
+        p.spawn_n(send_messages, i, client_builder, message_count, duration)
+    p.waitall()
+
+
+def send_messages(client_id, client_builder, messages_count, duration):
+    client = client_builder()
+    CLIENTS.append(client)
 
     if duration:
         with timeutils.StopWatch(duration) as stop_watch:
@@ -285,11 +428,14 @@ def send_msg(c_id, transport, target, wait_after_msg, timeout, is_cast,
                 client.send_msg()
                 eventlet.sleep()
     else:
-        LOG.debug("Sending %d messages using client %d", messages_count, c_id)
+        LOG.debug("Sending %d messages using client %d",
+                  messages_count, client_id)
         for _ in six.moves.range(0, messages_count):
             client.send_msg()
             eventlet.sleep()
-        LOG.debug("Client %d has sent %d messages", c_id, messages_count)
+        LOG.debug("Client %d has sent %d messages", client_id, messages_count)
+
+    time.sleep(1)  # wait for replies to be collected
 
 
 def _rpc_call(client, msg):
@@ -299,6 +445,7 @@ def _rpc_call(client, msg):
         LOG.exception('Error %s on CALL for message %s', str(e), msg)
     else:
         LOG.debug("SENT: %s, RCV: %s", msg, res)
+        return res
 
 
 def _rpc_cast(client, msg):
@@ -310,29 +457,39 @@ def _rpc_cast(client, msg):
         LOG.debug("SENT: %s", msg)
 
 
-def notifier(_id, topic, transport, messages, wait_after_msg, timeout,
-             duration):
-    n1 = notify.Notifier(transport,
-                         driver='messaging',
-                         topic=topic).prepare(
-        publisher_id='publisher-%d' % _id)
-    payload = dict(msg=0, vm='test', otherdata='ahah')
-    ctxt = {}
+def _notify(notification_client, msg):
+    notification_client.info({}, 'compute.start', msg)
 
-    def send_notif():
-        payload['msg'] += 1
-        LOG.debug("sending notification %s", payload)
-        n1.info(ctxt, 'compute.start1', payload)
-        if wait_after_msg > 0:
-            time.sleep(wait_after_msg)
 
-    if duration:
-        with timeutils.StopWatch(duration) as stop_watch:
-            while not stop_watch.expired():
-                send_notif()
-    else:
-        for i in range(0, messages):
-            send_notif()
+def show_server_stats(endpoint, args):
+    LOG.info('=' * 35 + ' summary ' + '=' * 35)
+    output = dict(series={}, summary={})
+    output['series']['server'] = endpoint.received_messages.get_series()
+    stats = MessageStatsCollector.calc_stats(
+        'server', endpoint.received_messages)
+    output['summary'] = stats
+
+
+def show_client_stats(clients, has_reply=False):
+    LOG.info('=' * 35 + ' summary ' + '=' * 35)
+    output = dict(series={}, summary={})
+
+    for cl in clients:
+        cl_id = cl.client_id
+        output['series']['client_%s' % cl_id] = cl.sent_messages.get_series()
+
+        if has_reply:
+            output['series']['round_trip_%s' % cl_id] = (
+                cl.round_trip_messages.get_series())
+
+    sent_stats = MessageStatsCollector.calc_stats(
+        'client', *(cl.sent_messages for cl in clients))
+    output['summary']['client'] = sent_stats
+
+    if has_reply:
+        round_trip_stats = MessageStatsCollector.calc_stats(
+            'round-trip', *(cl.round_trip_messages for cl in clients))
+        output['summary']['round_trip'] = round_trip_stats
 
 
 def _setup_logging(is_debug):
@@ -375,11 +532,12 @@ def main():
                                        help='notify/rpc server/client mode')
 
     server = subparsers.add_parser('notify-server')
-    server.add_argument('--show-stats', dest='show_stats',
-                        type=bool, default=True)
+    server.add_argument('-w', dest='wait_before_answer', type=int, default=-1)
+    server.add_argument('--requeue', dest='requeue', action='store_true')
+
     server = subparsers.add_parser('batch-notify-server')
-    server.add_argument('--show-stats', dest='show_stats',
-                        type=bool, default=True)
+    server.add_argument('-w', dest='wait_before_answer', type=int, default=-1)
+    server.add_argument('--requeue', dest='requeue', action='store_true')
 
     client = subparsers.add_parser('notify-client')
     client.add_argument('-p', dest='threads', type=int, default=1,
@@ -393,8 +551,6 @@ def main():
 
     server = subparsers.add_parser('rpc-server')
     server.add_argument('-w', dest='wait_before_answer', type=int, default=-1)
-    server.add_argument('--show-stats', dest='show_stats',
-                        type=bool, default=True)
     server.add_argument('-e', '--executor', dest='executor',
                         type=str, default='eventlet',
                         help='name of a message executor')
@@ -442,45 +598,43 @@ def main():
         target = messaging.Target(topic=args.topic, server=args.server)
         if args.url.startswith('zmq'):
             cfg.CONF.rpc_zmq_matchmaker = "redis"
-        rpc_server(transport, target, args.wait_before_answer, args.executor,
-                   args.show_stats, args.duration)
+
+        endpoint = rpc_server(transport, target, args.wait_before_answer,
+                              args.executor, args.duration)
+        show_server_stats(endpoint, args)
+
     elif args.mode == 'notify-server':
-        notify_server(transport, args.topic, args.show_stats, args.duration)
+        endpoint = notify_server(transport, args.topic,
+                                 args.wait_before_answer, args.duration,
+                                 args.requeue)
+        show_server_stats(endpoint, args)
+
     elif args.mode == 'batch-notify-server':
-        batch_notify_server(transport, args.topic, args.show_stats,
-                            args.duration)
+        endpoint = batch_notify_server(transport, args.topic,
+                                       args.wait_before_answer, args.duration,
+                                       args.requeue)
+        show_server_stats(endpoint, args)
+
     elif args.mode == 'notify-client':
         spawn_notify_clients(args.threads, args.topic, transport,
                              args.messages, args.wait_after_msg, args.timeout,
                              args.duration)
+        show_client_stats(CLIENTS)
+
     elif args.mode == 'rpc-client':
         targets = [target.partition('.')[::2] for target in args.targets]
-        start = datetime.datetime.now()
         targets = [messaging.Target(
             topic=topic, server=server_name, fanout=args.is_fanout) for
             topic, server_name in targets]
         spawn_rpc_clients(args.threads, transport, targets,
                           args.wait_after_msg, args.timeout, args.is_cast,
                           args.messages, args.duration)
-        time_elapsed = (datetime.datetime.now() - start).total_seconds()
 
-        msg_count = 0
-        total_bytes = 0
-        for client in RPC_CLIENTS:
-            msg_count += client.msg_sent
-            total_bytes += client.bytes
+        show_client_stats(CLIENTS, not args.is_cast)
 
-        LOG.info('%d messages were sent for %d seconds. '
-                 'Bandwidth was %d msg/sec', msg_count, time_elapsed,
-                 (msg_count / time_elapsed))
-        log_msg = '%s bytes were sent for %d seconds. Bandwidth is %d b/s' % (
-            total_bytes, time_elapsed, (total_bytes / time_elapsed))
-        LOG.info(log_msg)
-        with open('./oslo_res_%s.txt' % args.server, 'a+') as f:
-            f.write(log_msg + '\n')
-
-        LOG.info("calls finished, wait %d seconds", args.exit_wait)
-        time.sleep(args.exit_wait)
+        if args.exit_wait:
+            LOG.info("Finished. waiting for %d seconds", args.exit_wait)
+            time.sleep(args.exit_wait)
 
 
 if __name__ == '__main__':