Merge "Modify simulator.py tool"
This commit is contained in:
commit
48ac4f5e9b
13
tools/messages_length.yaml
Normal file
13
tools/messages_length.yaml
Normal file
File diff suppressed because one or more lines are too long
@ -13,14 +13,19 @@
|
|||||||
import eventlet
|
import eventlet
|
||||||
eventlet.monkey_patch()
|
eventlet.monkey_patch()
|
||||||
|
|
||||||
import os
|
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
|
import collections
|
||||||
import datetime
|
import datetime
|
||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
|
import random
|
||||||
|
import string
|
||||||
import sys
|
import sys
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
import yaml
|
||||||
|
|
||||||
|
from scipy.stats import rv_discrete
|
||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
import oslo_messaging as messaging
|
import oslo_messaging as messaging
|
||||||
@ -28,6 +33,8 @@ from oslo_messaging import notify # noqa
|
|||||||
from oslo_messaging import rpc # noqa
|
from oslo_messaging import rpc # noqa
|
||||||
|
|
||||||
LOG = logging.getLogger()
|
LOG = logging.getLogger()
|
||||||
|
RANDOM_VARIABLE = None
|
||||||
|
CURRENT_PID = None
|
||||||
|
|
||||||
USAGE = """ Usage: ./simulator.py [-h] [--url URL] [-d DEBUG]\
|
USAGE = """ Usage: ./simulator.py [-h] [--url URL] [-d DEBUG]\
|
||||||
{notify-server,notify-client,rpc-server,rpc-client} ...
|
{notify-server,notify-client,rpc-server,rpc-client} ...
|
||||||
@ -40,6 +47,29 @@ Usage example:
|
|||||||
--exit-wait 15000 -p 64 -m 64"""
|
--exit-wait 15000 -p 64 -m 64"""
|
||||||
|
|
||||||
|
|
||||||
|
def init_random_generator():
|
||||||
|
data = []
|
||||||
|
with open('./messages_length.yaml') as m_file:
|
||||||
|
content = yaml.load(m_file)
|
||||||
|
data += [int(n) for n in content[
|
||||||
|
'test_data']['string_lengths'].split(', ')]
|
||||||
|
|
||||||
|
ranges = collections.defaultdict(int)
|
||||||
|
for msg_length in data:
|
||||||
|
range_start = (msg_length / 500) * 500 + 1
|
||||||
|
ranges[range_start] += 1
|
||||||
|
|
||||||
|
ranges_start = sorted(ranges.keys())
|
||||||
|
total_count = len(data)
|
||||||
|
ranges_dist = []
|
||||||
|
for r in ranges_start:
|
||||||
|
r_dist = float(ranges[r]) / total_count
|
||||||
|
ranges_dist.append(r_dist)
|
||||||
|
|
||||||
|
random_var = rv_discrete(values=(ranges_start, ranges_dist))
|
||||||
|
return random_var
|
||||||
|
|
||||||
|
|
||||||
class LoggingNoParsingFilter(logging.Filter):
|
class LoggingNoParsingFilter(logging.Filter):
|
||||||
def filter(self, record):
|
def filter(self, record):
|
||||||
msg = record.getMessage()
|
msg = record.getMessage()
|
||||||
@ -136,9 +166,19 @@ def send_msg(_id, transport, target, messages, wait_after_msg, timeout,
|
|||||||
client = client.prepare(timeout=timeout)
|
client = client.prepare(timeout=timeout)
|
||||||
rpc_method = _rpc_cast if is_cast else _rpc_call
|
rpc_method = _rpc_cast if is_cast else _rpc_call
|
||||||
|
|
||||||
for i in range(0, messages):
|
ranges = RANDOM_VARIABLE.rvs(size=messages)
|
||||||
msg = "test message %d" % i
|
i = 0
|
||||||
LOG.info("SEND: %s" % msg)
|
for range_start in ranges:
|
||||||
|
length = random.randint(range_start, range_start + 497)
|
||||||
|
msg = ''.join(random.choice(string.lowercase) for x in range(length)) \
|
||||||
|
+ ' ' + str(i)
|
||||||
|
i += 1
|
||||||
|
# temporary file to log approximate bytes size of messages
|
||||||
|
with open('./oslo_%s_%s.log' % (target.topic, CURRENT_PID), 'a+') as f:
|
||||||
|
# 37 additional bytes for Python String object size canculation.
|
||||||
|
# In fact we may ignore these bytes, and estimate the data flow
|
||||||
|
# via number of symbols
|
||||||
|
f.write(str(length + 37) + '\n')
|
||||||
rpc_method(client, msg)
|
rpc_method(client, msg)
|
||||||
if wait_after_msg > 0:
|
if wait_after_msg > 0:
|
||||||
time.sleep(wait_after_msg)
|
time.sleep(wait_after_msg)
|
||||||
@ -197,6 +237,9 @@ def main():
|
|||||||
parser.add_argument('-d', '--debug', dest='debug', type=bool,
|
parser.add_argument('-d', '--debug', dest='debug', type=bool,
|
||||||
default=False,
|
default=False,
|
||||||
help="Turn on DEBUG logging level instead of WARN")
|
help="Turn on DEBUG logging level instead of WARN")
|
||||||
|
parser.add_argument('-tp', '--topic', dest='topic',
|
||||||
|
default="profiler_topic",
|
||||||
|
help="Topic to publish/receive messages to/from.")
|
||||||
subparsers = parser.add_subparsers(dest='mode',
|
subparsers = parser.add_subparsers(dest='mode',
|
||||||
help='notify/rpc server/client mode')
|
help='notify/rpc server/client mode')
|
||||||
|
|
||||||
@ -246,7 +289,7 @@ def main():
|
|||||||
cfg.CONF.project = 'oslo.messaging'
|
cfg.CONF.project = 'oslo.messaging'
|
||||||
|
|
||||||
transport = messaging.get_transport(cfg.CONF, url=args.url)
|
transport = messaging.get_transport(cfg.CONF, url=args.url)
|
||||||
target = messaging.Target(topic='profiler_topic', server='profiler_server')
|
target = messaging.Target(topic=args.topic, server='profiler_server')
|
||||||
|
|
||||||
if args.mode == 'rpc-server':
|
if args.mode == 'rpc-server':
|
||||||
if args.url.startswith('zmq'):
|
if args.url.startswith('zmq'):
|
||||||
@ -266,11 +309,29 @@ def main():
|
|||||||
args.is_cast)
|
args.is_cast)
|
||||||
time_ellapsed = (datetime.datetime.now() - start).total_seconds()
|
time_ellapsed = (datetime.datetime.now() - start).total_seconds()
|
||||||
msg_count = args.messages * args.threads
|
msg_count = args.messages * args.threads
|
||||||
print ('%d messages was sent for %s seconds. Bandwight is %s msg/sec'
|
log_msg = '%d messages was sent for %s seconds. ' \
|
||||||
% (msg_count, time_ellapsed, (msg_count / time_ellapsed)))
|
'Bandwidth is %s msg/sec' % (msg_count, time_ellapsed,
|
||||||
|
(msg_count / time_ellapsed))
|
||||||
|
print (log_msg)
|
||||||
|
with open('./oslo_res_%s.txt' % args.topic, 'a+') as f:
|
||||||
|
f.write(log_msg + '\n')
|
||||||
|
|
||||||
|
with open('./oslo_%s_%s.log' % (args.topic, CURRENT_PID), 'a+') as f:
|
||||||
|
data = f.read()
|
||||||
|
data = [int(i) for i in data.split()]
|
||||||
|
data_sum = sum(data)
|
||||||
|
log_msg = '%s bytes were sent for %s seconds. Bandwidth is %s b/s' % (
|
||||||
|
data_sum, time_ellapsed, (data_sum / time_ellapsed))
|
||||||
|
print(log_msg)
|
||||||
|
with open('./oslo_res_%s.txt' % args.topic, 'a+') as f:
|
||||||
|
f.write(log_msg + '\n')
|
||||||
|
os.remove('./oslo_%s_%s.log' % (args.topic, CURRENT_PID))
|
||||||
|
|
||||||
LOG.info("calls finished, wait %d seconds" % args.exit_wait)
|
LOG.info("calls finished, wait %d seconds" % args.exit_wait)
|
||||||
time.sleep(args.exit_wait)
|
time.sleep(args.exit_wait)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
RANDOM_VARIABLE = init_random_generator()
|
||||||
|
CURRENT_PID = os.getpid()
|
||||||
main()
|
main()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user