34fd61bdc2
Update all .py source files by $ pyupgrade --py3-only $(git ls-files | grep ".py$") to modernize the code according to Python 3 syntaxes. pep8 errors are fixed by $ autopep8 --select=E127,E128,E501 --max-line-length 79 -r \ --in-place oslo_messaging and a few manual adjustments. Also add the pyupgrade hook to pre-commit to avoid merging additional Python 2 syntaxes. Change-Id: I8115b7f8c5d27ce935e4422c351add4bb72e354f
835 lines
29 KiB
Python
Executable File
835 lines
29 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
try:
|
|
# Avoid https://github.com/PyCQA/pycodestyle/issues/472
|
|
import eventlet
|
|
eventlet.monkey_patch()
|
|
except ImportError:
|
|
raise
|
|
|
|
import argparse
|
|
import bisect
|
|
import collections
|
|
import functools
|
|
import itertools
|
|
import json
|
|
import logging
|
|
import os
|
|
import random
|
|
import signal
|
|
import socket
|
|
import string
|
|
import sys
|
|
import threading
|
|
import time
|
|
import yaml
|
|
|
|
from oslo_config import cfg
|
|
import oslo_messaging as messaging
|
|
from oslo_messaging import notify # noqa
|
|
from oslo_messaging import rpc # noqa
|
|
from oslo_utils import timeutils
|
|
|
|
LOG = logging.getLogger()
|
|
CURRENT_PID = None
|
|
CURRENT_HOST = None
|
|
CLIENTS = []
|
|
MESSAGES = []
|
|
IS_RUNNING = True
|
|
SERVERS = []
|
|
TRANSPORT = None
|
|
|
|
USAGE = """ Usage: ./simulator.py [-h] [--url URL] [-d DEBUG]\
|
|
{notify-server,notify-client,rpc-server,rpc-client} ...
|
|
|
|
Usage example:
|
|
python tools/simulator.py\
|
|
--url rabbit://stackrabbit:secretrabbit@localhost/ rpc-server
|
|
python tools/simulator.py\
|
|
--url rabbit://stackrabbit:secretrabbit@localhost/ rpc-client\
|
|
--exit-wait 15000 -p 64 -m 64"""
|
|
|
|
MESSAGES_LIMIT = 1000
|
|
DISTRIBUTION_BUCKET_SIZE = 500
|
|
|
|
|
|
def init_random_generator():
|
|
data = []
|
|
file_dir = os.path.dirname(os.path.abspath(__file__))
|
|
with open(os.path.join(file_dir, 'messages_length.yaml')) as m_file:
|
|
content = yaml.safe_load(m_file)
|
|
data += [int(n) for n in content[
|
|
'test_data']['string_lengths'].split(', ')]
|
|
|
|
ranges = collections.defaultdict(int)
|
|
for msg_length in data:
|
|
range_start = ((msg_length // DISTRIBUTION_BUCKET_SIZE) *
|
|
DISTRIBUTION_BUCKET_SIZE + 1)
|
|
ranges[range_start] += 1
|
|
|
|
ranges_start = sorted(ranges.keys())
|
|
total_count = len(data)
|
|
|
|
accumulated_distribution = []
|
|
running_total = 0
|
|
for range_start in ranges_start:
|
|
norm = float(ranges[range_start]) / total_count
|
|
running_total += norm
|
|
accumulated_distribution.append(running_total)
|
|
|
|
def weighted_random_choice():
|
|
r = random.random() * running_total
|
|
start = ranges_start[bisect.bisect_right(accumulated_distribution, r)]
|
|
return random.randrange(start, start + DISTRIBUTION_BUCKET_SIZE)
|
|
|
|
return weighted_random_choice
|
|
|
|
|
|
class LoggingNoParsingFilter(logging.Filter):
|
|
def filter(self, record):
|
|
msg = record.getMessage()
|
|
for i in ['received {', 'MSG_ID is ']:
|
|
if i in msg:
|
|
return False
|
|
return True
|
|
|
|
|
|
Message = collections.namedtuple(
|
|
'Message', ['seq', 'cargo', 'client_ts', 'server_ts', 'return_ts'])
|
|
|
|
|
|
def make_message(seq, cargo, client_ts=0, server_ts=0, return_ts=0):
|
|
return Message(seq, cargo, client_ts, server_ts, return_ts)
|
|
|
|
|
|
def update_message(message, **kwargs):
|
|
return Message(*message)._replace(**kwargs)
|
|
|
|
|
|
class MessageStatsCollector:
|
|
def __init__(self, label):
|
|
self.label = label
|
|
self.buffer = [] # buffer to store messages during report interval
|
|
self.series = [] # stats for every report interval
|
|
|
|
now = time.time()
|
|
diff = int(now) - now + 1 # align start to whole seconds
|
|
threading.Timer(diff, self.monitor).start() # schedule in a second
|
|
|
|
def monitor(self):
|
|
global IS_RUNNING
|
|
if IS_RUNNING:
|
|
# NOTE(kbespalov): this way not properly works
|
|
# because the monitor starting with range 1sec +-150 ms
|
|
# due to high threading contention between rpc clients
|
|
threading.Timer(1.0, self.monitor).start()
|
|
now = time.time()
|
|
|
|
count = len(self.buffer)
|
|
|
|
size = 0
|
|
min_latency = sys.maxsize
|
|
max_latency = 0
|
|
sum_latencies = 0
|
|
|
|
for i in range(count):
|
|
p = self.buffer[i]
|
|
size += len(p.cargo)
|
|
|
|
latency = None
|
|
if p.return_ts:
|
|
latency = p.return_ts - p.client_ts # round-trip
|
|
elif p.server_ts:
|
|
latency = p.server_ts - p.client_ts # client -> server
|
|
|
|
if latency:
|
|
sum_latencies += latency
|
|
min_latency = min(min_latency, latency)
|
|
max_latency = max(max_latency, latency)
|
|
|
|
del self.buffer[:count] # trim processed items
|
|
|
|
seq = len(self.series)
|
|
stats = dict(seq=seq, timestamp=now, count=count, size=size)
|
|
msg = ('%-14s: seq: %-4d count: %-6d bytes: %-10d' %
|
|
(self.label, seq, count, size))
|
|
|
|
if sum_latencies:
|
|
latency = sum_latencies / count
|
|
stats.update(dict(latency=latency,
|
|
min_latency=min_latency,
|
|
max_latency=max_latency))
|
|
msg += (' latency: %-9.3f min: %-9.3f max: %-9.3f' %
|
|
(latency, min_latency, max_latency))
|
|
|
|
self.series.append(stats)
|
|
LOG.info(msg)
|
|
|
|
def push(self, parsed_message):
|
|
self.buffer.append(parsed_message)
|
|
|
|
def get_series(self):
|
|
return self.series
|
|
|
|
@staticmethod
|
|
def calc_stats(label, *collectors):
|
|
count = 0
|
|
size = 0
|
|
min_latency = sys.maxsize
|
|
max_latency = 0
|
|
sum_latencies = 0
|
|
start = sys.maxsize
|
|
end = 0
|
|
|
|
for point in itertools.chain(*(c.get_series() for c in collectors)):
|
|
count += point['count']
|
|
size += point['size']
|
|
if point['count']:
|
|
# NOTE(kbespalov):
|
|
# we except the start and end time as time of
|
|
# first and last processed message, no reason
|
|
# to set boundaries if server was idle before
|
|
# running of clients and after.
|
|
start = min(start, point['timestamp'])
|
|
end = max(end, point['timestamp'])
|
|
|
|
if 'latency' in point:
|
|
sum_latencies += point['latency'] * point['count']
|
|
min_latency = min(min_latency, point['min_latency'])
|
|
max_latency = max(max_latency, point['max_latency'])
|
|
|
|
# start is the timestamp of the earliest block, which inclides samples
|
|
# for the prior second
|
|
start -= 1
|
|
duration = end - start if count else 0
|
|
stats = dict(count=count, size=size, duration=duration, count_p_s=0,
|
|
size_p_s=0)
|
|
if duration:
|
|
stats.update(dict(start=start, end=end,
|
|
count_p_s=count / duration,
|
|
size_p_s=size / duration))
|
|
|
|
msg = ('%s: duration: %.2f count: %d (%.1f msg/sec) '
|
|
'bytes: %d (%.0f bps)' %
|
|
(label, duration, count, stats['count_p_s'],
|
|
size, stats['size_p_s']))
|
|
|
|
if sum_latencies:
|
|
latency = sum_latencies / count
|
|
stats.update(dict(latency=latency,
|
|
min_latency=min_latency,
|
|
max_latency=max_latency))
|
|
msg += (' latency: %.3f min: %.3f max: %.3f' %
|
|
(latency, min_latency, max_latency))
|
|
|
|
LOG.info(msg)
|
|
return stats
|
|
|
|
|
|
class NotifyEndpoint:
|
|
def __init__(self, wait_before_answer, requeue):
|
|
self.wait_before_answer = wait_before_answer
|
|
self.requeue = requeue
|
|
self.received_messages = MessageStatsCollector('server')
|
|
self.cache = set()
|
|
|
|
def info(self, ctxt, publisher_id, event_type, payload, metadata):
|
|
LOG.debug("%s %s %s %s", ctxt, publisher_id, event_type, payload)
|
|
|
|
server_ts = time.time()
|
|
|
|
message = update_message(payload, server_ts=server_ts)
|
|
self.received_messages.push(message)
|
|
|
|
if self.requeue and message.seq not in self.cache:
|
|
self.cache.add(message.seq)
|
|
|
|
if self.wait_before_answer > 0:
|
|
time.sleep(self.wait_before_answer)
|
|
|
|
return messaging.NotificationResult.REQUEUE
|
|
|
|
return messaging.NotificationResult.HANDLED
|
|
|
|
|
|
def notify_server(transport, topic, wait_before_answer, duration, requeue):
|
|
endpoints = [NotifyEndpoint(wait_before_answer, requeue)]
|
|
target = messaging.Target(topic=topic)
|
|
server = notify.get_notification_listener(transport, [target],
|
|
endpoints, executor='eventlet')
|
|
run_server(server, duration=duration)
|
|
|
|
return endpoints[0]
|
|
|
|
|
|
class BatchNotifyEndpoint:
|
|
def __init__(self, wait_before_answer, requeue):
|
|
self.wait_before_answer = wait_before_answer
|
|
self.requeue = requeue
|
|
self.received_messages = MessageStatsCollector('server')
|
|
self.cache = set()
|
|
|
|
def info(self, batch):
|
|
LOG.debug('msg rcv')
|
|
LOG.debug("%s", batch)
|
|
|
|
server_ts = time.time()
|
|
|
|
for item in batch:
|
|
message = update_message(item['payload'], server_ts=server_ts)
|
|
self.received_messages.push(message)
|
|
|
|
return messaging.NotificationResult.HANDLED
|
|
|
|
|
|
def batch_notify_server(transport, topic, wait_before_answer, duration,
|
|
requeue):
|
|
endpoints = [BatchNotifyEndpoint(wait_before_answer, requeue)]
|
|
target = messaging.Target(topic=topic)
|
|
server = notify.get_batch_notification_listener(
|
|
transport, [target],
|
|
endpoints, executor='eventlet',
|
|
batch_size=1000, batch_timeout=5)
|
|
run_server(server, duration=duration)
|
|
|
|
return endpoints[0]
|
|
|
|
|
|
class RpcEndpoint:
|
|
def __init__(self, wait_before_answer):
|
|
self.wait_before_answer = wait_before_answer
|
|
self.received_messages = MessageStatsCollector('server')
|
|
|
|
def info(self, ctxt, message):
|
|
server_ts = time.time()
|
|
|
|
LOG.debug("######## RCV: %s", message)
|
|
|
|
reply = update_message(message, server_ts=server_ts)
|
|
self.received_messages.push(reply)
|
|
|
|
if self.wait_before_answer > 0:
|
|
time.sleep(self.wait_before_answer)
|
|
|
|
return reply
|
|
|
|
|
|
class ServerControlEndpoint:
|
|
def __init__(self, controlled_server):
|
|
self.connected_clients = set()
|
|
self.controlled_server = controlled_server
|
|
|
|
def sync_start(self, ctx, message):
|
|
"""Handle start reports from clients"""
|
|
|
|
client_id = message['id']
|
|
LOG.info('The client %s started to send messages' % client_id)
|
|
self.connected_clients.add(client_id)
|
|
|
|
def sync_done(self, ctx, message):
|
|
"""Handle done reports from clients"""
|
|
|
|
client_id = message['id']
|
|
LOG.info('The client %s finished msg sending.' % client_id)
|
|
|
|
if client_id in self.connected_clients:
|
|
self.connected_clients.remove(client_id)
|
|
|
|
if not self.connected_clients:
|
|
LOG.info(
|
|
'The clients sent all messages. Shutting down the server..')
|
|
threading.Timer(1, self._stop_server_with_delay).start()
|
|
|
|
def _stop_server_with_delay(self):
|
|
self.controlled_server.stop()
|
|
self.controlled_server.wait()
|
|
|
|
|
|
class Client:
|
|
def __init__(self, client_id, client, method, has_result,
|
|
wait_after_msg):
|
|
self.client_id = client_id
|
|
self.client = client
|
|
self.method = method
|
|
self.wait_after_msg = wait_after_msg
|
|
|
|
self.seq = 0
|
|
self.messages_count = len(MESSAGES)
|
|
# Start sending the messages from a random position to avoid
|
|
# memory re-usage and generate more realistic load on the library
|
|
# and a message transport
|
|
self.position = random.randint(0, self.messages_count - 1)
|
|
self.sent_messages = MessageStatsCollector('client-%s' % client_id)
|
|
self.errors = MessageStatsCollector('error-%s' % client_id)
|
|
|
|
if has_result:
|
|
self.round_trip_messages = MessageStatsCollector(
|
|
'round-trip-%s' % client_id)
|
|
|
|
def host_based_id(self):
|
|
_id = "%(client_id)s %(salt)s@%(hostname)s"
|
|
return _id % {'hostname': CURRENT_HOST,
|
|
'salt': hex(id(self))[2:],
|
|
'client_id': self.client_id}
|
|
|
|
def send_msg(self):
|
|
msg = make_message(self.seq, MESSAGES[self.position], time.time())
|
|
self.sent_messages.push(msg)
|
|
|
|
res = None
|
|
try:
|
|
res = self.method(self.client, msg)
|
|
except Exception:
|
|
self.errors.push(msg)
|
|
else:
|
|
LOG.debug("SENT: %s", msg)
|
|
|
|
if res:
|
|
return_ts = time.time()
|
|
res = update_message(res, return_ts=return_ts)
|
|
self.round_trip_messages.push(res)
|
|
|
|
self.seq += 1
|
|
self.position = (self.position + 1) % self.messages_count
|
|
if self.wait_after_msg > 0:
|
|
time.sleep(self.wait_after_msg)
|
|
|
|
|
|
class RPCClient(Client):
|
|
def __init__(self, client_id, transport, target, timeout, is_cast,
|
|
wait_after_msg, sync_mode=False):
|
|
|
|
client = rpc.get_rpc_client(transport, target)
|
|
method = _rpc_cast if is_cast else _rpc_call
|
|
|
|
super().__init__(client_id,
|
|
client.prepare(timeout=timeout),
|
|
method, not is_cast, wait_after_msg)
|
|
self.sync_mode = sync_mode
|
|
self.is_sync = False
|
|
|
|
# prepare the sync client
|
|
if sync_mode:
|
|
if sync_mode == 'call':
|
|
self.sync_client = self.client
|
|
else:
|
|
self.sync_client = client.prepare(fanout=True, timeout=timeout)
|
|
|
|
def send_msg(self):
|
|
if self.sync_mode and not self.is_sync:
|
|
self.is_sync = self.sync_start()
|
|
super().send_msg()
|
|
|
|
def sync_start(self):
|
|
try:
|
|
msg = {'id': self.host_based_id()}
|
|
method = _rpc_call if self.sync_mode == 'call' else _rpc_cast
|
|
method(self.sync_client, msg, 'sync_start')
|
|
except Exception:
|
|
LOG.error('The client: %s failed to sync with %s.' %
|
|
(self.client_id, self.client.target))
|
|
return False
|
|
LOG.info('The client: {} successfully sync with {}'.format(
|
|
self.client_id, self.client.target))
|
|
return True
|
|
|
|
def sync_done(self):
|
|
try:
|
|
msg = {'id': self.host_based_id()}
|
|
method = _rpc_call if self.sync_mode == 'call' else _rpc_cast
|
|
method(self.sync_client, msg, 'sync_done')
|
|
except Exception:
|
|
LOG.error('The client: %s failed finish the sync with %s.'
|
|
% (self.client_id, self.client.target))
|
|
return False
|
|
LOG.info('The client: %s successfully finished sync with %s'
|
|
% (self.client_id, self.client.target))
|
|
return True
|
|
|
|
|
|
class NotifyClient(Client):
|
|
def __init__(self, client_id, transport, topic, wait_after_msg):
|
|
client = notify.Notifier(transport, driver='messaging', topics=topic)
|
|
client = client.prepare(publisher_id='publisher-%d' % client_id)
|
|
method = _notify
|
|
super().__init__(client_id, client, method, False, wait_after_msg)
|
|
|
|
|
|
def generate_messages(messages_count):
|
|
# Limit the messages amount. Clients will reiterate the array again
|
|
# if an amount of messages to be sent is bigger than MESSAGES_LIMIT
|
|
if messages_count > MESSAGES_LIMIT:
|
|
messages_count = MESSAGES_LIMIT
|
|
LOG.info("Generating %d random messages", messages_count)
|
|
generator = init_random_generator()
|
|
for i in range(messages_count):
|
|
length = generator()
|
|
msg = ''.join(random.choice(
|
|
string.ascii_lowercase) for x in range(length))
|
|
MESSAGES.append(msg)
|
|
|
|
LOG.info("Messages has been prepared")
|
|
|
|
|
|
def wrap_sigexit(f):
|
|
def inner(*args, **kwargs):
|
|
try:
|
|
return f(*args, **kwargs)
|
|
except SignalExit as e:
|
|
LOG.info('Signal %s is caught. Interrupting the execution',
|
|
e.signo)
|
|
for server in SERVERS:
|
|
server.stop()
|
|
server.wait()
|
|
finally:
|
|
if TRANSPORT:
|
|
TRANSPORT.cleanup()
|
|
return inner
|
|
|
|
|
|
@wrap_sigexit
|
|
def run_server(server, duration=None):
|
|
global IS_RUNNING
|
|
SERVERS.append(server)
|
|
server.start()
|
|
if duration:
|
|
with timeutils.StopWatch(duration) as stop_watch:
|
|
while not stop_watch.expired() and IS_RUNNING:
|
|
time.sleep(1)
|
|
server.stop()
|
|
IS_RUNNING = False
|
|
server.wait()
|
|
LOG.info('The server is terminating')
|
|
time.sleep(1) # wait for stats collector to process the last second
|
|
|
|
|
|
def rpc_server(transport, target, wait_before_answer, executor, duration):
|
|
|
|
endpoints = [RpcEndpoint(wait_before_answer)]
|
|
server = rpc.get_rpc_server(transport, target, endpoints, executor)
|
|
|
|
# make the rpc server controllable by rpc clients
|
|
endpoints.append(ServerControlEndpoint(server))
|
|
|
|
LOG.debug("starting RPC server for target %s", target)
|
|
|
|
run_server(server, duration=duration)
|
|
|
|
return server.dispatcher.endpoints[0]
|
|
|
|
|
|
@wrap_sigexit
|
|
def spawn_rpc_clients(threads, transport, targets, wait_after_msg, timeout,
|
|
is_cast, messages_count, duration, sync_mode):
|
|
p = eventlet.GreenPool(size=threads)
|
|
targets = itertools.cycle(targets)
|
|
|
|
for i in range(threads):
|
|
target = next(targets)
|
|
LOG.debug("starting RPC client for target %s", target)
|
|
client_builder = functools.partial(RPCClient, i, transport, target,
|
|
timeout, is_cast, wait_after_msg,
|
|
sync_mode)
|
|
p.spawn_n(send_messages, i, client_builder,
|
|
messages_count, duration)
|
|
p.waitall()
|
|
|
|
|
|
@wrap_sigexit
|
|
def spawn_notify_clients(threads, topic, transport, message_count,
|
|
wait_after_msg, timeout, duration):
|
|
p = eventlet.GreenPool(size=threads)
|
|
for i in range(threads):
|
|
client_builder = functools.partial(NotifyClient, i, transport, [topic],
|
|
wait_after_msg)
|
|
p.spawn_n(send_messages, i, client_builder, message_count, duration)
|
|
p.waitall()
|
|
|
|
|
|
def send_messages(client_id, client_builder, messages_count, duration):
|
|
global IS_RUNNING
|
|
client = client_builder()
|
|
CLIENTS.append(client)
|
|
|
|
# align message sending closer to whole seconds
|
|
now = time.time()
|
|
diff = int(now) - now + 1
|
|
time.sleep(diff)
|
|
|
|
if duration:
|
|
with timeutils.StopWatch(duration) as stop_watch:
|
|
while not stop_watch.expired() and IS_RUNNING:
|
|
client.send_msg()
|
|
eventlet.sleep()
|
|
IS_RUNNING = False
|
|
else:
|
|
LOG.debug("Sending %d messages using client %d",
|
|
messages_count, client_id)
|
|
for _ in range(messages_count):
|
|
client.send_msg()
|
|
eventlet.sleep()
|
|
if not IS_RUNNING:
|
|
break
|
|
LOG.debug("Client %d has sent %d messages", client_id, messages_count)
|
|
|
|
# wait for replies to be collected
|
|
time.sleep(1)
|
|
|
|
# send stop request to the rpc server
|
|
if isinstance(client, RPCClient) and client.is_sync:
|
|
client.sync_done()
|
|
|
|
|
|
def _rpc_call(client, msg, remote_method='info'):
|
|
try:
|
|
res = client.call({}, remote_method, message=msg)
|
|
except Exception as e:
|
|
LOG.exception('Error %s on CALL for message %s', str(e), msg)
|
|
raise
|
|
else:
|
|
LOG.debug("SENT: %s, RCV: %s", msg, res)
|
|
return res
|
|
|
|
|
|
def _rpc_cast(client, msg, remote_method='info'):
|
|
try:
|
|
client.cast({}, remote_method, message=msg)
|
|
except Exception as e:
|
|
LOG.exception('Error %s on CAST for message %s', str(e), msg)
|
|
raise
|
|
else:
|
|
LOG.debug("SENT: %s", msg)
|
|
|
|
|
|
def _notify(notification_client, msg):
|
|
notification_client.info({}, 'compute.start', msg)
|
|
|
|
|
|
def show_server_stats(endpoint, json_filename):
|
|
LOG.info('=' * 35 + ' summary ' + '=' * 35)
|
|
output = dict(series={}, summary={})
|
|
output['series']['server'] = endpoint.received_messages.get_series()
|
|
stats = MessageStatsCollector.calc_stats(
|
|
'server', endpoint.received_messages)
|
|
output['summary'] = stats
|
|
|
|
if json_filename:
|
|
write_json_file(json_filename, output)
|
|
|
|
|
|
def show_client_stats(clients, json_filename, has_reply=False):
|
|
LOG.info('=' * 35 + ' summary ' + '=' * 35)
|
|
output = dict(series={}, summary={})
|
|
|
|
for cl in clients:
|
|
cl_id = cl.client_id
|
|
output['series']['client_%s' % cl_id] = cl.sent_messages.get_series()
|
|
output['series']['error_%s' % cl_id] = cl.errors.get_series()
|
|
|
|
if has_reply:
|
|
output['series']['round_trip_%s' % cl_id] = (
|
|
cl.round_trip_messages.get_series())
|
|
|
|
sent_stats = MessageStatsCollector.calc_stats(
|
|
'client', *(cl.sent_messages for cl in clients))
|
|
output['summary']['client'] = sent_stats
|
|
|
|
error_stats = MessageStatsCollector.calc_stats(
|
|
'error', *(cl.errors for cl in clients))
|
|
output['summary']['error'] = error_stats
|
|
|
|
if has_reply:
|
|
round_trip_stats = MessageStatsCollector.calc_stats(
|
|
'round-trip', *(cl.round_trip_messages for cl in clients))
|
|
output['summary']['round_trip'] = round_trip_stats
|
|
|
|
if json_filename:
|
|
write_json_file(json_filename, output)
|
|
|
|
|
|
def write_json_file(filename, output):
|
|
with open(filename, 'w') as f:
|
|
f.write(json.dumps(output))
|
|
LOG.info('Stats are written into %s', filename)
|
|
|
|
|
|
class SignalExit(SystemExit):
|
|
def __init__(self, signo, exccode=1):
|
|
super().__init__(exccode)
|
|
self.signo = signo
|
|
|
|
|
|
def signal_handler(signum, frame):
|
|
global IS_RUNNING
|
|
IS_RUNNING = False
|
|
|
|
raise SignalExit(signum)
|
|
|
|
|
|
def _setup_logging(is_debug):
|
|
log_level = logging.DEBUG if is_debug else logging.INFO
|
|
logging.basicConfig(
|
|
stream=sys.stdout, level=log_level,
|
|
format="%(asctime)-15s %(levelname)s %(name)s %(message)s")
|
|
logging.getLogger().handlers[0].addFilter(LoggingNoParsingFilter())
|
|
for i in ['kombu', 'amqp', 'stevedore', 'qpid.messaging'
|
|
'oslo.messaging._drivers.amqp', ]:
|
|
logging.getLogger(i).setLevel(logging.WARN)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='Tools to play with oslo.messaging\'s RPC',
|
|
usage=USAGE,
|
|
)
|
|
parser.add_argument('--url', dest='url',
|
|
help="oslo.messaging transport url")
|
|
parser.add_argument('-d', '--debug', dest='debug', action='store_true',
|
|
help="Turn on DEBUG logging level instead of WARN")
|
|
parser.add_argument('-tp', '--topic', dest='topic',
|
|
default="profiler_topic",
|
|
help="Topics to publish/receive messages to/from.")
|
|
parser.add_argument('-s', '--server', dest='server',
|
|
default="profiler_server",
|
|
help="Servers to publish/receive messages to/from.")
|
|
parser.add_argument('-tg', '--targets', dest='targets', nargs="+",
|
|
default=["profiler_topic.profiler_server"],
|
|
help="Targets to publish/receive messages to/from.")
|
|
parser.add_argument('-l', dest='duration', type=int,
|
|
help='send messages for certain time')
|
|
parser.add_argument('-j', '--json', dest='json_filename',
|
|
help='File name to store results in JSON format')
|
|
parser.add_argument('--config-file', dest='config_file', type=str,
|
|
help="Oslo messaging config file")
|
|
|
|
subparsers = parser.add_subparsers(dest='mode',
|
|
help='notify/rpc server/client mode')
|
|
|
|
server = subparsers.add_parser('notify-server')
|
|
server.add_argument('-w', dest='wait_before_answer', type=int, default=-1)
|
|
server.add_argument('--requeue', dest='requeue', action='store_true')
|
|
|
|
server = subparsers.add_parser('batch-notify-server')
|
|
server.add_argument('-w', dest='wait_before_answer', type=int, default=-1)
|
|
server.add_argument('--requeue', dest='requeue', action='store_true')
|
|
|
|
client = subparsers.add_parser('notify-client')
|
|
client.add_argument('-p', dest='threads', type=int, default=1,
|
|
help='number of client threads')
|
|
client.add_argument('-m', dest='messages', type=int, default=1,
|
|
help='number of call per threads')
|
|
client.add_argument('-w', dest='wait_after_msg', type=float, default=-1,
|
|
help='sleep time between two messages')
|
|
client.add_argument('--timeout', dest='timeout', type=int, default=3,
|
|
help='client timeout')
|
|
|
|
server = subparsers.add_parser('rpc-server')
|
|
server.add_argument('-w', dest='wait_before_answer', type=int, default=-1)
|
|
server.add_argument('-e', '--executor', dest='executor',
|
|
type=str, default='eventlet',
|
|
help='name of a message executor')
|
|
|
|
client = subparsers.add_parser('rpc-client')
|
|
client.add_argument('-p', dest='threads', type=int, default=1,
|
|
help='number of client threads')
|
|
client.add_argument('-m', dest='messages', type=int, default=1,
|
|
help='number of call per threads')
|
|
client.add_argument('-w', dest='wait_after_msg', type=float, default=-1,
|
|
help='sleep time between two messages')
|
|
client.add_argument('--timeout', dest='timeout', type=int, default=3,
|
|
help='client timeout')
|
|
client.add_argument('--exit-wait', dest='exit_wait', type=int, default=0,
|
|
help='Keep connections open N seconds after calls '
|
|
'have been done')
|
|
client.add_argument('--is-cast', dest='is_cast', action='store_true',
|
|
help='Use `call` or `cast` RPC methods')
|
|
client.add_argument('--is-fanout', dest='is_fanout', action='store_true',
|
|
help='fanout=True for CAST messages')
|
|
|
|
client.add_argument('--sync', dest='sync', choices=('call', 'fanout'),
|
|
help="stop server when all msg was sent by clients")
|
|
|
|
args = parser.parse_args()
|
|
|
|
_setup_logging(is_debug=args.debug)
|
|
|
|
if args.config_file:
|
|
cfg.CONF(["--config-file", args.config_file])
|
|
|
|
global TRANSPORT
|
|
if args.mode in ['rpc-server', 'rpc-client']:
|
|
TRANSPORT = messaging.get_transport(cfg.CONF, url=args.url)
|
|
else:
|
|
TRANSPORT = messaging.get_notification_transport(cfg.CONF,
|
|
url=args.url)
|
|
|
|
if args.mode in ['rpc-client', 'notify-client']:
|
|
# always generate maximum number of messages for duration-limited tests
|
|
generate_messages(MESSAGES_LIMIT if args.duration else args.messages)
|
|
|
|
# oslo.config defaults
|
|
cfg.CONF.heartbeat_interval = 5
|
|
cfg.CONF.prog = os.path.basename(__file__)
|
|
cfg.CONF.project = 'oslo.messaging'
|
|
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
if args.mode == 'rpc-server':
|
|
target = messaging.Target(topic=args.topic, server=args.server)
|
|
endpoint = rpc_server(TRANSPORT, target, args.wait_before_answer,
|
|
args.executor, args.duration)
|
|
show_server_stats(endpoint, args.json_filename)
|
|
|
|
elif args.mode == 'notify-server':
|
|
endpoint = notify_server(TRANSPORT, args.topic,
|
|
args.wait_before_answer, args.duration,
|
|
args.requeue)
|
|
show_server_stats(endpoint, args.json_filename)
|
|
|
|
elif args.mode == 'batch-notify-server':
|
|
endpoint = batch_notify_server(TRANSPORT, args.topic,
|
|
args.wait_before_answer,
|
|
args.duration, args.requeue)
|
|
show_server_stats(endpoint, args.json_filename)
|
|
|
|
elif args.mode == 'notify-client':
|
|
spawn_notify_clients(args.threads, args.topic, TRANSPORT,
|
|
args.messages, args.wait_after_msg,
|
|
args.timeout, args.duration)
|
|
show_client_stats(CLIENTS, args.json_filename)
|
|
|
|
elif args.mode == 'rpc-client':
|
|
|
|
targets = []
|
|
for target in args.targets:
|
|
tp, srv = target.partition('.')[::2]
|
|
t = messaging.Target(topic=tp, server=srv, fanout=args.is_fanout)
|
|
targets.append(t)
|
|
|
|
spawn_rpc_clients(args.threads, TRANSPORT, targets,
|
|
args.wait_after_msg, args.timeout, args.is_cast,
|
|
args.messages, args.duration, args.sync)
|
|
show_client_stats(CLIENTS, args.json_filename, not args.is_cast)
|
|
|
|
if args.exit_wait:
|
|
LOG.info("Finished. waiting for %d seconds", args.exit_wait)
|
|
time.sleep(args.exit_wait)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
CURRENT_PID = os.getpid()
|
|
CURRENT_HOST = socket.gethostname()
|
|
main()
|