Merge "Simulator: calculate message latency statistics"
This commit is contained in:
commit
3f6daa8b77
@ -16,7 +16,7 @@ eventlet.monkey_patch()
|
||||
import argparse
|
||||
import bisect
|
||||
import collections
|
||||
import datetime
|
||||
import functools
|
||||
import itertools
|
||||
import logging
|
||||
import os
|
||||
@ -37,7 +37,7 @@ from oslo_utils import timeutils
|
||||
LOG = logging.getLogger()
|
||||
RANDOM_GENERATOR = None
|
||||
CURRENT_PID = None
|
||||
RPC_CLIENTS = []
|
||||
CLIENTS = []
|
||||
MESSAGES = []
|
||||
|
||||
USAGE = """ Usage: ./simulator.py [-h] [--url URL] [-d DEBUG]\
|
||||
@ -94,76 +94,183 @@ class LoggingNoParsingFilter(logging.Filter):
|
||||
return True
|
||||
|
||||
|
||||
class Monitor(object):
|
||||
def __init__(self, show_stats=False, *args, **kwargs):
|
||||
self._count = self._prev_count = 0
|
||||
self.show_stats = show_stats
|
||||
if self.show_stats:
|
||||
self._monitor()
|
||||
|
||||
def _monitor(self):
|
||||
threading.Timer(1.0, self._monitor).start()
|
||||
LOG.debug("%d msg was received per second",
|
||||
(self._count - self._prev_count))
|
||||
self._prev_count = self._count
|
||||
|
||||
def info(self, *args, **kwargs):
|
||||
self._count += 1
|
||||
Message = collections.namedtuple(
|
||||
'Message', ['seq', 'cargo', 'client_ts', 'server_ts', 'return_ts'])
|
||||
|
||||
|
||||
class NotifyEndpoint(Monitor):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(NotifyEndpoint, self).__init__(*args, **kwargs)
|
||||
self.cache = []
|
||||
def make_message(seq, cargo, client_ts=0, server_ts=0, return_ts=0):
|
||||
return Message(seq, cargo, client_ts, server_ts, return_ts)
|
||||
|
||||
|
||||
def update_message(message, **kwargs):
|
||||
return Message(*message)._replace(**kwargs)
|
||||
|
||||
|
||||
class MessageStatsCollector(object):
|
||||
def __init__(self, label):
|
||||
self.label = label
|
||||
self.buffer = [] # buffer to store messages during report interval
|
||||
self.series = [] # stats for every report interval
|
||||
threading.Timer(1.0, self.monitor).start() # schedule in a second
|
||||
|
||||
def monitor(self):
|
||||
threading.Timer(1.0, self.monitor).start()
|
||||
now = time.time()
|
||||
|
||||
count = len(self.buffer)
|
||||
|
||||
size = 0
|
||||
min_latency = sys.maxint
|
||||
max_latency = 0
|
||||
sum_latencies = 0
|
||||
|
||||
for i in range(count):
|
||||
p = self.buffer[i]
|
||||
size += len(p.cargo)
|
||||
|
||||
latency = None
|
||||
if p.return_ts:
|
||||
latency = p.return_ts - p.client_ts # round-trip
|
||||
elif p.server_ts:
|
||||
latency = p.server_ts - p.client_ts # client -> server
|
||||
|
||||
if latency:
|
||||
sum_latencies += latency
|
||||
min_latency = min(min_latency, latency)
|
||||
max_latency = max(max_latency, latency)
|
||||
|
||||
del self.buffer[:count] # trim processed items
|
||||
|
||||
seq = len(self.series)
|
||||
stats = dict(seq=seq, timestamp=now, count=count, size=size)
|
||||
msg = ('%-14s: seq: %-4d count: %-6d bytes: %-10d' %
|
||||
(self.label, seq, count, size))
|
||||
|
||||
if sum_latencies:
|
||||
latency = sum_latencies / count
|
||||
stats.update(dict(latency=latency,
|
||||
min_latency=min_latency,
|
||||
max_latency=max_latency))
|
||||
msg += (' latency: %-9.3f min: %-9.3f max: %-9.3f' %
|
||||
(latency, min_latency, max_latency))
|
||||
|
||||
self.series.append(stats)
|
||||
LOG.info(msg)
|
||||
|
||||
def push(self, parsed_message):
|
||||
self.buffer.append(parsed_message)
|
||||
|
||||
def get_series(self):
|
||||
return self.series
|
||||
|
||||
@staticmethod
|
||||
def calc_stats(label, *collectors):
|
||||
count = 0
|
||||
size = 0
|
||||
min_latency = sys.maxint
|
||||
max_latency = 0
|
||||
sum_latencies = 0
|
||||
start = sys.maxint
|
||||
end = 0
|
||||
|
||||
for point in itertools.chain(*(c.get_series() for c in collectors)):
|
||||
count += point['count']
|
||||
size += point['size']
|
||||
start = min(start, point['timestamp'])
|
||||
end = max(end, point['timestamp'])
|
||||
|
||||
if 'latency' in point:
|
||||
sum_latencies += point['latency'] * point['count']
|
||||
min_latency = min(min_latency, point['min_latency'])
|
||||
max_latency = max(max_latency, point['max_latency'])
|
||||
|
||||
# start is the timestamp of the earliest block, which inclides samples
|
||||
# for the prior second
|
||||
start -= 1
|
||||
duration = end - start if count else 0
|
||||
stats = dict(count=count, size=size, duration=duration, count_p_s=0,
|
||||
size_p_s=0)
|
||||
if duration:
|
||||
stats.update(dict(start=start, end=end,
|
||||
count_p_s=count / duration,
|
||||
size_p_s=size / duration))
|
||||
|
||||
msg = ('%s: duration: %.2f count: %d (%.1f msg/sec) '
|
||||
'bytes: %d (%.0f bps)' %
|
||||
(label, duration, count, stats['count_p_s'],
|
||||
size, stats['size_p_s']))
|
||||
|
||||
if sum_latencies:
|
||||
latency = sum_latencies / count
|
||||
stats.update(dict(latency=latency,
|
||||
min_latency=min_latency,
|
||||
max_latency=max_latency))
|
||||
msg += (' latency: %.3f min: %.3f max: %.3f' %
|
||||
(latency, min_latency, max_latency))
|
||||
|
||||
LOG.info(msg)
|
||||
return stats
|
||||
|
||||
|
||||
class NotifyEndpoint(object):
|
||||
def __init__(self, wait_before_answer, requeue):
|
||||
self.wait_before_answer = wait_before_answer
|
||||
self.requeue = requeue
|
||||
self.received_messages = MessageStatsCollector('server')
|
||||
self.cache = set()
|
||||
|
||||
def info(self, ctxt, publisher_id, event_type, payload, metadata):
|
||||
super(NotifyEndpoint, self).info(ctxt, publisher_id, event_type,
|
||||
payload, metadata)
|
||||
LOG.debug('msg rcv')
|
||||
LOG.debug("%s %s %s %s", ctxt, publisher_id, event_type, payload)
|
||||
if not self.show_stats and payload not in self.cache:
|
||||
LOG.debug('requeue msg')
|
||||
self.cache.append(payload)
|
||||
for i in range(15):
|
||||
eventlet.sleep(1)
|
||||
|
||||
server_ts = time.time()
|
||||
|
||||
message = update_message(payload, server_ts=server_ts)
|
||||
self.received_messages.push(message)
|
||||
|
||||
if self.requeue and message.seq not in self.cache:
|
||||
self.cache.add(message.seq)
|
||||
|
||||
if self.wait_before_answer > 0:
|
||||
time.sleep(self.wait_before_answer)
|
||||
|
||||
return messaging.NotificationResult.REQUEUE
|
||||
else:
|
||||
LOG.debug('ack msg')
|
||||
|
||||
return messaging.NotificationResult.HANDLED
|
||||
|
||||
|
||||
def notify_server(transport, topic, show_stats, duration):
|
||||
endpoints = [NotifyEndpoint(show_stats)]
|
||||
def notify_server(transport, topic, wait_before_answer, duration, requeue):
|
||||
endpoints = [NotifyEndpoint(wait_before_answer, requeue)]
|
||||
target = messaging.Target(topic=topic)
|
||||
server = notify.get_notification_listener(transport, [target],
|
||||
endpoints, executor='eventlet')
|
||||
run_server(server, duration=duration)
|
||||
|
||||
return endpoints[0]
|
||||
|
||||
class BatchNotifyEndpoint(Monitor):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(BatchNotifyEndpoint, self).__init__(*args, **kwargs)
|
||||
self.cache = []
|
||||
|
||||
def info(self, messages):
|
||||
super(BatchNotifyEndpoint, self).info(messages)
|
||||
self._count += len(messages) - 1
|
||||
class BatchNotifyEndpoint(object):
|
||||
def __init__(self, wait_before_answer, requeue):
|
||||
self.wait_before_answer = wait_before_answer
|
||||
self.requeue = requeue
|
||||
self.received_messages = MessageStatsCollector('server')
|
||||
self.cache = set()
|
||||
|
||||
def info(self, batch):
|
||||
LOG.debug('msg rcv')
|
||||
LOG.debug("%s", messages)
|
||||
if not self.show_stats and messages not in self.cache:
|
||||
LOG.debug('requeue msg')
|
||||
self.cache.append(messages)
|
||||
for i in range(15):
|
||||
eventlet.sleep(1)
|
||||
return messaging.NotificationResult.REQUEUE
|
||||
else:
|
||||
LOG.debug('ack msg')
|
||||
LOG.debug("%s", batch)
|
||||
|
||||
server_ts = time.time()
|
||||
|
||||
for item in batch:
|
||||
message = update_message(item['payload'], server_ts=server_ts)
|
||||
self.received_messages.push(message)
|
||||
|
||||
return messaging.NotificationResult.HANDLED
|
||||
|
||||
|
||||
def batch_notify_server(transport, topic, show_stats, duration):
|
||||
endpoints = [BatchNotifyEndpoint(show_stats)]
|
||||
def batch_notify_server(transport, topic, wait_before_answer, duration,
|
||||
requeue):
|
||||
endpoints = [BatchNotifyEndpoint(wait_before_answer, requeue)]
|
||||
target = messaging.Target(topic=topic)
|
||||
server = notify.get_batch_notification_listener(
|
||||
transport, [target],
|
||||
@ -171,53 +278,83 @@ def batch_notify_server(transport, topic, show_stats, duration):
|
||||
batch_size=1000, batch_timeout=5)
|
||||
run_server(server, duration=duration)
|
||||
|
||||
return endpoints[0]
|
||||
|
||||
class RpcEndpoint(Monitor):
|
||||
def __init__(self, wait_before_answer, show_stats):
|
||||
self.count = None
|
||||
|
||||
class RpcEndpoint(object):
|
||||
def __init__(self, wait_before_answer):
|
||||
self.wait_before_answer = wait_before_answer
|
||||
self.messages_received = 0
|
||||
self.received_messages = MessageStatsCollector('server')
|
||||
|
||||
def info(self, ctxt, message):
|
||||
self.messages_received += 1
|
||||
i = int(message.split(' ')[-1])
|
||||
if self.count is None:
|
||||
self.count = i
|
||||
elif i == 0:
|
||||
self.count = 0
|
||||
else:
|
||||
self.count += 1
|
||||
server_ts = time.time()
|
||||
|
||||
LOG.debug("######## RCV: %s", message)
|
||||
|
||||
reply = update_message(message, server_ts=server_ts)
|
||||
self.received_messages.push(reply)
|
||||
|
||||
LOG.debug("######## RCV: %s/%s", self.count, message)
|
||||
if self.wait_before_answer > 0:
|
||||
time.sleep(self.wait_before_answer)
|
||||
return "OK: %s" % message
|
||||
|
||||
return reply
|
||||
|
||||
|
||||
class RPCClient(object):
|
||||
def __init__(self, transport, target, timeout, method, wait_after_msg):
|
||||
self.client = rpc.RPCClient(transport, target)
|
||||
self.client = self.client.prepare(timeout=timeout)
|
||||
class Client(object):
|
||||
def __init__(self, client_id, client, method, has_result,
|
||||
wait_after_msg):
|
||||
self.client_id = client_id
|
||||
self.client = client
|
||||
self.method = method
|
||||
self.bytes = 0
|
||||
self.msg_sent = 0
|
||||
self.wait_after_msg = wait_after_msg
|
||||
|
||||
self.seq = 0
|
||||
self.messages_count = len(MESSAGES)
|
||||
# Start sending the messages from a random position to avoid
|
||||
# memory re-usage and generate more realistic load on the library
|
||||
# and a message transport
|
||||
self.position = random.randint(0, self.messages_count - 1)
|
||||
self.wait_after_msg = wait_after_msg
|
||||
self.sent_messages = MessageStatsCollector('client-%s' % client_id)
|
||||
|
||||
if has_result:
|
||||
self.round_trip_messages = MessageStatsCollector(
|
||||
'round-trip-%s' % client_id)
|
||||
|
||||
def send_msg(self):
|
||||
msg = MESSAGES[self.position]
|
||||
self.method(self.client, msg)
|
||||
self.bytes += len(msg)
|
||||
self.msg_sent += 1
|
||||
msg = make_message(self.seq, MESSAGES[self.position], time.time())
|
||||
self.sent_messages.push(msg)
|
||||
|
||||
res = self.method(self.client, msg)
|
||||
if res:
|
||||
return_ts = time.time()
|
||||
res = update_message(res, return_ts=return_ts)
|
||||
self.round_trip_messages.push(res)
|
||||
|
||||
self.seq += 1
|
||||
self.position = (self.position + 1) % self.messages_count
|
||||
if self.wait_after_msg > 0:
|
||||
time.sleep(self.wait_after_msg)
|
||||
|
||||
|
||||
class RPCClient(Client):
|
||||
def __init__(self, client_id, transport, target, timeout, is_cast,
|
||||
wait_after_msg):
|
||||
client = rpc.RPCClient(transport, target).prepare(timeout=timeout)
|
||||
method = _rpc_cast if is_cast else _rpc_call
|
||||
|
||||
super(RPCClient, self).__init__(client_id, client, method,
|
||||
not is_cast, wait_after_msg)
|
||||
|
||||
|
||||
class NotifyClient(Client):
|
||||
def __init__(self, client_id, transport, topic, wait_after_msg):
|
||||
client = notify.Notifier(transport, driver='messaging', topic=topic)
|
||||
client = client.prepare(publisher_id='publisher-%d' % client_id)
|
||||
method = _notify
|
||||
super(NotifyClient, self).__init__(client_id, client, method,
|
||||
False, wait_after_msg)
|
||||
|
||||
|
||||
def generate_messages(messages_count):
|
||||
# Limit the messages amount. Clients will reiterate the array again
|
||||
# if an amount of messages to be sent is bigger than MESSAGES_LIMIT
|
||||
@ -227,57 +364,63 @@ def generate_messages(messages_count):
|
||||
|
||||
for i in range(messages_count):
|
||||
length = RANDOM_GENERATOR()
|
||||
msg = ''.join(random.choice(string.lowercase) for x in range(length)) \
|
||||
+ ' ' + str(i)
|
||||
msg = ''.join(random.choice(string.lowercase) for x in range(length))
|
||||
MESSAGES.append(msg)
|
||||
|
||||
LOG.info("Messages has been prepared")
|
||||
|
||||
|
||||
def run_server(server, duration=None):
|
||||
server.start()
|
||||
if duration:
|
||||
with timeutils.StopWatch(duration) as stop_watch:
|
||||
while not stop_watch.expired():
|
||||
time.sleep(1)
|
||||
server.stop()
|
||||
server.wait()
|
||||
try:
|
||||
server.start()
|
||||
if duration:
|
||||
with timeutils.StopWatch(duration) as stop_watch:
|
||||
while not stop_watch.expired():
|
||||
time.sleep(1)
|
||||
server.stop()
|
||||
server.wait()
|
||||
except KeyboardInterrupt: # caught SIGINT
|
||||
LOG.info('Caught SIGINT, terminating')
|
||||
time.sleep(1) # wait for stats collector to process the last second
|
||||
|
||||
|
||||
def rpc_server(transport, target, wait_before_answer, executor, show_stats,
|
||||
duration):
|
||||
endpoints = [RpcEndpoint(wait_before_answer, show_stats)]
|
||||
def rpc_server(transport, target, wait_before_answer, executor, duration):
|
||||
endpoints = [RpcEndpoint(wait_before_answer)]
|
||||
server = rpc.get_rpc_server(transport, target, endpoints,
|
||||
executor=executor)
|
||||
LOG.debug("starting RPC server for target %s", target)
|
||||
|
||||
run_server(server, duration=duration)
|
||||
LOG.info("Received total messages: %d",
|
||||
server.dispatcher.endpoints[0].messages_received)
|
||||
|
||||
return server.dispatcher.endpoints[0]
|
||||
|
||||
|
||||
def spawn_notify_clients(threads, *args, **kwargs):
|
||||
p = eventlet.GreenPool(size=threads)
|
||||
for i in range(0, threads):
|
||||
p.spawn_n(notifier, i, *args, **kwargs)
|
||||
p.waitall()
|
||||
|
||||
|
||||
def spawn_rpc_clients(threads, transport, targets,
|
||||
*args, **kwargs):
|
||||
def spawn_rpc_clients(threads, transport, targets, wait_after_msg, timeout,
|
||||
is_cast, messages_count, duration):
|
||||
p = eventlet.GreenPool(size=threads)
|
||||
targets = itertools.cycle(targets)
|
||||
for i in range(0, threads):
|
||||
target = targets.next()
|
||||
LOG.debug("starting RPC client for target %s", target)
|
||||
p.spawn_n(send_msg, i, transport, target, *args, **kwargs)
|
||||
client_builder = functools.partial(RPCClient, i, transport, target,
|
||||
timeout, is_cast, wait_after_msg)
|
||||
p.spawn_n(send_messages, i, client_builder, messages_count, duration)
|
||||
p.waitall()
|
||||
|
||||
|
||||
def send_msg(c_id, transport, target, wait_after_msg, timeout, is_cast,
|
||||
messages_count, duration):
|
||||
rpc_method = _rpc_cast if is_cast else _rpc_call
|
||||
client = RPCClient(transport, target, timeout, rpc_method, wait_after_msg)
|
||||
RPC_CLIENTS.append(client)
|
||||
def spawn_notify_clients(threads, topic, transport, message_count,
|
||||
wait_after_msg, timeout, duration):
|
||||
p = eventlet.GreenPool(size=threads)
|
||||
for i in range(0, threads):
|
||||
client_builder = functools.partial(NotifyClient, i, transport, topic,
|
||||
wait_after_msg)
|
||||
p.spawn_n(send_messages, i, client_builder, message_count, duration)
|
||||
p.waitall()
|
||||
|
||||
|
||||
def send_messages(client_id, client_builder, messages_count, duration):
|
||||
client = client_builder()
|
||||
CLIENTS.append(client)
|
||||
|
||||
if duration:
|
||||
with timeutils.StopWatch(duration) as stop_watch:
|
||||
@ -285,11 +428,14 @@ def send_msg(c_id, transport, target, wait_after_msg, timeout, is_cast,
|
||||
client.send_msg()
|
||||
eventlet.sleep()
|
||||
else:
|
||||
LOG.debug("Sending %d messages using client %d", messages_count, c_id)
|
||||
LOG.debug("Sending %d messages using client %d",
|
||||
messages_count, client_id)
|
||||
for _ in six.moves.range(0, messages_count):
|
||||
client.send_msg()
|
||||
eventlet.sleep()
|
||||
LOG.debug("Client %d has sent %d messages", c_id, messages_count)
|
||||
LOG.debug("Client %d has sent %d messages", client_id, messages_count)
|
||||
|
||||
time.sleep(1) # wait for replies to be collected
|
||||
|
||||
|
||||
def _rpc_call(client, msg):
|
||||
@ -299,6 +445,7 @@ def _rpc_call(client, msg):
|
||||
LOG.exception('Error %s on CALL for message %s', str(e), msg)
|
||||
else:
|
||||
LOG.debug("SENT: %s, RCV: %s", msg, res)
|
||||
return res
|
||||
|
||||
|
||||
def _rpc_cast(client, msg):
|
||||
@ -310,29 +457,39 @@ def _rpc_cast(client, msg):
|
||||
LOG.debug("SENT: %s", msg)
|
||||
|
||||
|
||||
def notifier(_id, topic, transport, messages, wait_after_msg, timeout,
|
||||
duration):
|
||||
n1 = notify.Notifier(transport,
|
||||
driver='messaging',
|
||||
topic=topic).prepare(
|
||||
publisher_id='publisher-%d' % _id)
|
||||
payload = dict(msg=0, vm='test', otherdata='ahah')
|
||||
ctxt = {}
|
||||
def _notify(notification_client, msg):
|
||||
notification_client.info({}, 'compute.start', msg)
|
||||
|
||||
def send_notif():
|
||||
payload['msg'] += 1
|
||||
LOG.debug("sending notification %s", payload)
|
||||
n1.info(ctxt, 'compute.start1', payload)
|
||||
if wait_after_msg > 0:
|
||||
time.sleep(wait_after_msg)
|
||||
|
||||
if duration:
|
||||
with timeutils.StopWatch(duration) as stop_watch:
|
||||
while not stop_watch.expired():
|
||||
send_notif()
|
||||
else:
|
||||
for i in range(0, messages):
|
||||
send_notif()
|
||||
def show_server_stats(endpoint, args):
|
||||
LOG.info('=' * 35 + ' summary ' + '=' * 35)
|
||||
output = dict(series={}, summary={})
|
||||
output['series']['server'] = endpoint.received_messages.get_series()
|
||||
stats = MessageStatsCollector.calc_stats(
|
||||
'server', endpoint.received_messages)
|
||||
output['summary'] = stats
|
||||
|
||||
|
||||
def show_client_stats(clients, has_reply=False):
|
||||
LOG.info('=' * 35 + ' summary ' + '=' * 35)
|
||||
output = dict(series={}, summary={})
|
||||
|
||||
for cl in clients:
|
||||
cl_id = cl.client_id
|
||||
output['series']['client_%s' % cl_id] = cl.sent_messages.get_series()
|
||||
|
||||
if has_reply:
|
||||
output['series']['round_trip_%s' % cl_id] = (
|
||||
cl.round_trip_messages.get_series())
|
||||
|
||||
sent_stats = MessageStatsCollector.calc_stats(
|
||||
'client', *(cl.sent_messages for cl in clients))
|
||||
output['summary']['client'] = sent_stats
|
||||
|
||||
if has_reply:
|
||||
round_trip_stats = MessageStatsCollector.calc_stats(
|
||||
'round-trip', *(cl.round_trip_messages for cl in clients))
|
||||
output['summary']['round_trip'] = round_trip_stats
|
||||
|
||||
|
||||
def _setup_logging(is_debug):
|
||||
@ -375,11 +532,12 @@ def main():
|
||||
help='notify/rpc server/client mode')
|
||||
|
||||
server = subparsers.add_parser('notify-server')
|
||||
server.add_argument('--show-stats', dest='show_stats',
|
||||
type=bool, default=True)
|
||||
server.add_argument('-w', dest='wait_before_answer', type=int, default=-1)
|
||||
server.add_argument('--requeue', dest='requeue', action='store_true')
|
||||
|
||||
server = subparsers.add_parser('batch-notify-server')
|
||||
server.add_argument('--show-stats', dest='show_stats',
|
||||
type=bool, default=True)
|
||||
server.add_argument('-w', dest='wait_before_answer', type=int, default=-1)
|
||||
server.add_argument('--requeue', dest='requeue', action='store_true')
|
||||
|
||||
client = subparsers.add_parser('notify-client')
|
||||
client.add_argument('-p', dest='threads', type=int, default=1,
|
||||
@ -393,8 +551,6 @@ def main():
|
||||
|
||||
server = subparsers.add_parser('rpc-server')
|
||||
server.add_argument('-w', dest='wait_before_answer', type=int, default=-1)
|
||||
server.add_argument('--show-stats', dest='show_stats',
|
||||
type=bool, default=True)
|
||||
server.add_argument('-e', '--executor', dest='executor',
|
||||
type=str, default='eventlet',
|
||||
help='name of a message executor')
|
||||
@ -442,45 +598,43 @@ def main():
|
||||
target = messaging.Target(topic=args.topic, server=args.server)
|
||||
if args.url.startswith('zmq'):
|
||||
cfg.CONF.rpc_zmq_matchmaker = "redis"
|
||||
rpc_server(transport, target, args.wait_before_answer, args.executor,
|
||||
args.show_stats, args.duration)
|
||||
|
||||
endpoint = rpc_server(transport, target, args.wait_before_answer,
|
||||
args.executor, args.duration)
|
||||
show_server_stats(endpoint, args)
|
||||
|
||||
elif args.mode == 'notify-server':
|
||||
notify_server(transport, args.topic, args.show_stats, args.duration)
|
||||
endpoint = notify_server(transport, args.topic,
|
||||
args.wait_before_answer, args.duration,
|
||||
args.requeue)
|
||||
show_server_stats(endpoint, args)
|
||||
|
||||
elif args.mode == 'batch-notify-server':
|
||||
batch_notify_server(transport, args.topic, args.show_stats,
|
||||
args.duration)
|
||||
endpoint = batch_notify_server(transport, args.topic,
|
||||
args.wait_before_answer, args.duration,
|
||||
args.requeue)
|
||||
show_server_stats(endpoint, args)
|
||||
|
||||
elif args.mode == 'notify-client':
|
||||
spawn_notify_clients(args.threads, args.topic, transport,
|
||||
args.messages, args.wait_after_msg, args.timeout,
|
||||
args.duration)
|
||||
show_client_stats(CLIENTS)
|
||||
|
||||
elif args.mode == 'rpc-client':
|
||||
targets = [target.partition('.')[::2] for target in args.targets]
|
||||
start = datetime.datetime.now()
|
||||
targets = [messaging.Target(
|
||||
topic=topic, server=server_name, fanout=args.is_fanout) for
|
||||
topic, server_name in targets]
|
||||
spawn_rpc_clients(args.threads, transport, targets,
|
||||
args.wait_after_msg, args.timeout, args.is_cast,
|
||||
args.messages, args.duration)
|
||||
time_elapsed = (datetime.datetime.now() - start).total_seconds()
|
||||
|
||||
msg_count = 0
|
||||
total_bytes = 0
|
||||
for client in RPC_CLIENTS:
|
||||
msg_count += client.msg_sent
|
||||
total_bytes += client.bytes
|
||||
show_client_stats(CLIENTS, not args.is_cast)
|
||||
|
||||
LOG.info('%d messages were sent for %d seconds. '
|
||||
'Bandwidth was %d msg/sec', msg_count, time_elapsed,
|
||||
(msg_count / time_elapsed))
|
||||
log_msg = '%s bytes were sent for %d seconds. Bandwidth is %d b/s' % (
|
||||
total_bytes, time_elapsed, (total_bytes / time_elapsed))
|
||||
LOG.info(log_msg)
|
||||
with open('./oslo_res_%s.txt' % args.server, 'a+') as f:
|
||||
f.write(log_msg + '\n')
|
||||
|
||||
LOG.info("calls finished, wait %d seconds", args.exit_wait)
|
||||
time.sleep(args.exit_wait)
|
||||
if args.exit_wait:
|
||||
LOG.info("Finished. waiting for %d seconds", args.exit_wait)
|
||||
time.sleep(args.exit_wait)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
Loading…
x
Reference in New Issue
Block a user