#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import eventlet
eventlet.monkey_patch()

import argparse
import bisect
import collections
import functools
import itertools
import json
import logging
import os
import random
import six
import string
import sys
import threading
import time
import yaml

from oslo_config import cfg
import oslo_messaging as messaging
from oslo_messaging import notify  # noqa
from oslo_messaging import rpc  # noqa
from oslo_utils import timeutils

LOG = logging.getLogger()
RANDOM_GENERATOR = None
CURRENT_PID = None
CLIENTS = []
MESSAGES = []

USAGE = """ Usage: ./simulator.py [-h] [--url URL] [-d DEBUG]\
 {notify-server,notify-client,rpc-server,rpc-client} ...

Usage example:
 python tools/simulator.py\
 --url rabbit://stackrabbit:secretrabbit@localhost/ rpc-server
 python tools/simulator.py\
 --url rabbit://stackrabbit:secretrabbit@localhost/ rpc-client\
 --exit-wait 15000 -p 64 -m 64"""

MESSAGES_LIMIT = 1000
DISTRIBUTION_BUCKET_SIZE = 500


def init_random_generator():
    data = []
    with open('./messages_length.yaml') as m_file:
        content = yaml.safe_load(m_file)
        data += [int(n) for n in content[
            'test_data']['string_lengths'].split(', ')]

    ranges = collections.defaultdict(int)
    for msg_length in data:
        range_start = ((msg_length / DISTRIBUTION_BUCKET_SIZE) *
                       DISTRIBUTION_BUCKET_SIZE + 1)
        ranges[range_start] += 1

    ranges_start = sorted(ranges.keys())
    total_count = len(data)

    accumulated_distribution = []
    running_total = 0
    for range_start in ranges_start:
        norm = float(ranges[range_start]) / total_count
        running_total += norm
        accumulated_distribution.append(running_total)

    def weighted_random_choice():
        r = random.random() * running_total
        start = ranges_start[bisect.bisect_right(accumulated_distribution, r)]
        return random.randrange(start, start + DISTRIBUTION_BUCKET_SIZE)

    return weighted_random_choice


class LoggingNoParsingFilter(logging.Filter):
    def filter(self, record):
        msg = record.getMessage()
        for i in ['received {', 'MSG_ID is ']:
            if i in msg:
                return False
        return True


Message = collections.namedtuple(
    'Message', ['seq', 'cargo', 'client_ts', 'server_ts', 'return_ts'])


def make_message(seq, cargo, client_ts=0, server_ts=0, return_ts=0):
    return Message(seq, cargo, client_ts, server_ts, return_ts)


def update_message(message, **kwargs):
    return Message(*message)._replace(**kwargs)


class MessageStatsCollector(object):
    def __init__(self, label):
        self.label = label
        self.buffer = []  # buffer to store messages during report interval
        self.series = []  # stats for every report interval
        threading.Timer(1.0, self.monitor).start()  # schedule in a second

    def monitor(self):
        threading.Timer(1.0, self.monitor).start()
        now = time.time()

        count = len(self.buffer)

        size = 0
        min_latency = sys.maxint
        max_latency = 0
        sum_latencies = 0

        for i in range(count):
            p = self.buffer[i]
            size += len(p.cargo)

            latency = None
            if p.return_ts:
                latency = p.return_ts - p.client_ts  # round-trip
            elif p.server_ts:
                latency = p.server_ts - p.client_ts  # client -> server

            if latency:
                sum_latencies += latency
                min_latency = min(min_latency, latency)
                max_latency = max(max_latency, latency)

        del self.buffer[:count]  # trim processed items

        seq = len(self.series)
        stats = dict(seq=seq, timestamp=now, count=count, size=size)
        msg = ('%-14s: seq: %-4d count: %-6d bytes: %-10d' %
               (self.label, seq, count, size))

        if sum_latencies:
            latency = sum_latencies / count
            stats.update(dict(latency=latency,
                              min_latency=min_latency,
                              max_latency=max_latency))
            msg += (' latency: %-9.3f min: %-9.3f max: %-9.3f' %
                    (latency, min_latency, max_latency))

        self.series.append(stats)
        LOG.info(msg)

    def push(self, parsed_message):
        self.buffer.append(parsed_message)

    def get_series(self):
        return self.series

    @staticmethod
    def calc_stats(label, *collectors):
        count = 0
        size = 0
        min_latency = sys.maxint
        max_latency = 0
        sum_latencies = 0
        start = sys.maxint
        end = 0

        for point in itertools.chain(*(c.get_series() for c in collectors)):
            count += point['count']
            size += point['size']
            start = min(start, point['timestamp'])
            end = max(end, point['timestamp'])

            if 'latency' in point:
                sum_latencies += point['latency'] * point['count']
                min_latency = min(min_latency, point['min_latency'])
                max_latency = max(max_latency, point['max_latency'])

        # start is the timestamp of the earliest block, which inclides samples
        # for the prior second
        start -= 1
        duration = end - start if count else 0
        stats = dict(count=count, size=size, duration=duration, count_p_s=0,
                     size_p_s=0)
        if duration:
            stats.update(dict(start=start, end=end,
                              count_p_s=count / duration,
                              size_p_s=size / duration))

        msg = ('%s: duration: %.2f count: %d (%.1f msg/sec) '
               'bytes: %d (%.0f bps)' %
               (label, duration, count, stats['count_p_s'],
                size, stats['size_p_s']))

        if sum_latencies:
            latency = sum_latencies / count
            stats.update(dict(latency=latency,
                              min_latency=min_latency,
                              max_latency=max_latency))
            msg += (' latency: %.3f min: %.3f max: %.3f' %
                    (latency, min_latency, max_latency))

        LOG.info(msg)
        return stats


class NotifyEndpoint(object):
    def __init__(self, wait_before_answer, requeue):
        self.wait_before_answer = wait_before_answer
        self.requeue = requeue
        self.received_messages = MessageStatsCollector('server')
        self.cache = set()

    def info(self, ctxt, publisher_id, event_type, payload, metadata):
        LOG.debug("%s %s %s %s", ctxt, publisher_id, event_type, payload)

        server_ts = time.time()

        message = update_message(payload, server_ts=server_ts)
        self.received_messages.push(message)

        if self.requeue and message.seq not in self.cache:
            self.cache.add(message.seq)

            if self.wait_before_answer > 0:
                time.sleep(self.wait_before_answer)

            return messaging.NotificationResult.REQUEUE

        return messaging.NotificationResult.HANDLED


def notify_server(transport, topic, wait_before_answer, duration, requeue):
    endpoints = [NotifyEndpoint(wait_before_answer, requeue)]
    target = messaging.Target(topic=topic)
    server = notify.get_notification_listener(transport, [target],
                                              endpoints, executor='eventlet')
    run_server(server, duration=duration)

    return endpoints[0]


class BatchNotifyEndpoint(object):
    def __init__(self, wait_before_answer, requeue):
        self.wait_before_answer = wait_before_answer
        self.requeue = requeue
        self.received_messages = MessageStatsCollector('server')
        self.cache = set()

    def info(self, batch):
        LOG.debug('msg rcv')
        LOG.debug("%s", batch)

        server_ts = time.time()

        for item in batch:
            message = update_message(item['payload'], server_ts=server_ts)
            self.received_messages.push(message)

        return messaging.NotificationResult.HANDLED


def batch_notify_server(transport, topic, wait_before_answer, duration,
                        requeue):
    endpoints = [BatchNotifyEndpoint(wait_before_answer, requeue)]
    target = messaging.Target(topic=topic)
    server = notify.get_batch_notification_listener(
        transport, [target],
        endpoints, executor='eventlet',
        batch_size=1000, batch_timeout=5)
    run_server(server, duration=duration)

    return endpoints[0]


class RpcEndpoint(object):
    def __init__(self, wait_before_answer):
        self.wait_before_answer = wait_before_answer
        self.received_messages = MessageStatsCollector('server')

    def info(self, ctxt, message):
        server_ts = time.time()

        LOG.debug("######## RCV: %s", message)

        reply = update_message(message, server_ts=server_ts)
        self.received_messages.push(reply)

        if self.wait_before_answer > 0:
            time.sleep(self.wait_before_answer)

        return reply


class Client(object):
    def __init__(self, client_id, client, method, has_result,
                 wait_after_msg):
        self.client_id = client_id
        self.client = client
        self.method = method
        self.wait_after_msg = wait_after_msg

        self.seq = 0
        self.messages_count = len(MESSAGES)
        # Start sending the messages from a random position to avoid
        # memory re-usage and generate more realistic load on the library
        # and a message transport
        self.position = random.randint(0, self.messages_count - 1)
        self.sent_messages = MessageStatsCollector('client-%s' % client_id)

        if has_result:
            self.round_trip_messages = MessageStatsCollector(
                'round-trip-%s' % client_id)

    def send_msg(self):
        msg = make_message(self.seq, MESSAGES[self.position], time.time())
        self.sent_messages.push(msg)

        res = self.method(self.client, msg)
        if res:
            return_ts = time.time()
            res = update_message(res, return_ts=return_ts)
            self.round_trip_messages.push(res)

        self.seq += 1
        self.position = (self.position + 1) % self.messages_count
        if self.wait_after_msg > 0:
            time.sleep(self.wait_after_msg)


class RPCClient(Client):
    def __init__(self, client_id, transport, target, timeout, is_cast,
                 wait_after_msg):
        client = rpc.RPCClient(transport, target).prepare(timeout=timeout)
        method = _rpc_cast if is_cast else _rpc_call

        super(RPCClient, self).__init__(client_id, client, method,
                                        not is_cast, wait_after_msg)


class NotifyClient(Client):
    def __init__(self, client_id, transport, topic, wait_after_msg):
        client = notify.Notifier(transport, driver='messaging', topic=topic)
        client = client.prepare(publisher_id='publisher-%d' % client_id)
        method = _notify
        super(NotifyClient, self).__init__(client_id, client, method,
                                           False, wait_after_msg)


def generate_messages(messages_count):
    # Limit the messages amount. Clients will reiterate the array again
    # if an amount of messages to be sent is bigger than MESSAGES_LIMIT
    if messages_count > MESSAGES_LIMIT:
        messages_count = MESSAGES_LIMIT
    LOG.info("Generating %d random messages", messages_count)

    for i in range(messages_count):
        length = RANDOM_GENERATOR()
        msg = ''.join(random.choice(string.lowercase) for x in range(length))
        MESSAGES.append(msg)

    LOG.info("Messages has been prepared")


def run_server(server, duration=None):
    try:
        server.start()
        if duration:
            with timeutils.StopWatch(duration) as stop_watch:
                while not stop_watch.expired():
                    time.sleep(1)
            server.stop()
        server.wait()
    except KeyboardInterrupt:  # caught SIGINT
        LOG.info('Caught SIGINT, terminating')
        time.sleep(1)  # wait for stats collector to process the last second


def rpc_server(transport, target, wait_before_answer, executor, duration):
    endpoints = [RpcEndpoint(wait_before_answer)]
    server = rpc.get_rpc_server(transport, target, endpoints,
                                executor=executor)
    LOG.debug("starting RPC server for target %s", target)

    run_server(server, duration=duration)

    return server.dispatcher.endpoints[0]


def spawn_rpc_clients(threads, transport, targets, wait_after_msg, timeout,
                      is_cast, messages_count, duration):
    p = eventlet.GreenPool(size=threads)
    targets = itertools.cycle(targets)
    for i in range(0, threads):
        target = targets.next()
        LOG.debug("starting RPC client for target %s", target)
        client_builder = functools.partial(RPCClient, i, transport, target,
                                           timeout, is_cast, wait_after_msg)
        p.spawn_n(send_messages, i, client_builder, messages_count, duration)
    p.waitall()


def spawn_notify_clients(threads, topic, transport, message_count,
                         wait_after_msg, timeout, duration):
    p = eventlet.GreenPool(size=threads)
    for i in range(0, threads):
        client_builder = functools.partial(NotifyClient, i, transport, topic,
                                           wait_after_msg)
        p.spawn_n(send_messages, i, client_builder, message_count, duration)
    p.waitall()


def send_messages(client_id, client_builder, messages_count, duration):
    client = client_builder()
    CLIENTS.append(client)

    if duration:
        with timeutils.StopWatch(duration) as stop_watch:
            while not stop_watch.expired():
                client.send_msg()
                eventlet.sleep()
    else:
        LOG.debug("Sending %d messages using client %d",
                  messages_count, client_id)
        for _ in six.moves.range(0, messages_count):
            client.send_msg()
            eventlet.sleep()
        LOG.debug("Client %d has sent %d messages", client_id, messages_count)

    time.sleep(1)  # wait for replies to be collected


def _rpc_call(client, msg):
    try:
        res = client.call({}, 'info', message=msg)
    except Exception as e:
        LOG.exception('Error %s on CALL for message %s', str(e), msg)
    else:
        LOG.debug("SENT: %s, RCV: %s", msg, res)
        return res


def _rpc_cast(client, msg):
    try:
        client.cast({}, 'info', message=msg)
    except Exception as e:
        LOG.exception('Error %s on CAST for message %s', str(e), msg)
    else:
        LOG.debug("SENT: %s", msg)


def _notify(notification_client, msg):
    notification_client.info({}, 'compute.start', msg)


def show_server_stats(endpoint, json_filename):
    LOG.info('=' * 35 + ' summary ' + '=' * 35)
    output = dict(series={}, summary={})
    output['series']['server'] = endpoint.received_messages.get_series()
    stats = MessageStatsCollector.calc_stats(
        'server', endpoint.received_messages)
    output['summary'] = stats

    if json_filename:
        write_json_file(json_filename, output)


def show_client_stats(clients, json_filename, has_reply=False):
    LOG.info('=' * 35 + ' summary ' + '=' * 35)
    output = dict(series={}, summary={})

    for cl in clients:
        cl_id = cl.client_id
        output['series']['client_%s' % cl_id] = cl.sent_messages.get_series()

        if has_reply:
            output['series']['round_trip_%s' % cl_id] = (
                cl.round_trip_messages.get_series())

    sent_stats = MessageStatsCollector.calc_stats(
        'client', *(cl.sent_messages for cl in clients))
    output['summary']['client'] = sent_stats

    if has_reply:
        round_trip_stats = MessageStatsCollector.calc_stats(
            'round-trip', *(cl.round_trip_messages for cl in clients))
        output['summary']['round_trip'] = round_trip_stats

    if json_filename:
        write_json_file(json_filename, output)


def write_json_file(filename, output):
    with open(filename, 'w') as f:
        f.write(json.dumps(output))
        LOG.info('Stats are written into %s', filename)


def _setup_logging(is_debug):
    log_level = logging.DEBUG if is_debug else logging.INFO
    logging.basicConfig(
        stream=sys.stdout, level=log_level,
        format="%(asctime)-15s %(levelname)s %(name)s %(message)s")
    logging.getLogger().handlers[0].addFilter(LoggingNoParsingFilter())
    for i in ['kombu', 'amqp', 'stevedore', 'qpid.messaging'
              'oslo.messaging._drivers.amqp', ]:
        logging.getLogger(i).setLevel(logging.WARN)


def main():
    parser = argparse.ArgumentParser(
        description='Tools to play with oslo.messaging\'s RPC',
        usage=USAGE,
    )
    parser.add_argument('--url', dest='url',
                        default='rabbit://guest:password@localhost/',
                        help="oslo.messaging transport url")
    parser.add_argument('-d', '--debug', dest='debug', type=bool,
                        default=False,
                        help="Turn on DEBUG logging level instead of WARN")
    parser.add_argument('-tp', '--topic', dest='topic',
                        default="profiler_topic",
                        help="Topics to publish/receive messages to/from.")
    parser.add_argument('-s', '--server', dest='server',
                        default="profiler_server",
                        help="Servers to publish/receive messages to/from.")
    parser.add_argument('-tg', '--targets', dest='targets', nargs="+",
                        default=["profiler_topic.profiler_server"],
                        help="Targets to publish/receive messages to/from.")
    parser.add_argument('-l', dest='duration', type=int,
                        help='send messages for certain time')
    parser.add_argument('-j', '--json', dest='json_filename',
                        help='File name to store results in JSON format')
    parser.add_argument('--config-file', dest='config_file', type=str,
                        help="Oslo messaging config file")

    subparsers = parser.add_subparsers(dest='mode',
                                       help='notify/rpc server/client mode')

    server = subparsers.add_parser('notify-server')
    server.add_argument('-w', dest='wait_before_answer', type=int, default=-1)
    server.add_argument('--requeue', dest='requeue', action='store_true')

    server = subparsers.add_parser('batch-notify-server')
    server.add_argument('-w', dest='wait_before_answer', type=int, default=-1)
    server.add_argument('--requeue', dest='requeue', action='store_true')

    client = subparsers.add_parser('notify-client')
    client.add_argument('-p', dest='threads', type=int, default=1,
                        help='number of client threads')
    client.add_argument('-m', dest='messages', type=int, default=1,
                        help='number of call per threads')
    client.add_argument('-w', dest='wait_after_msg', type=int, default=-1,
                        help='sleep time between two messages')
    client.add_argument('--timeout', dest='timeout', type=int, default=3,
                        help='client timeout')

    server = subparsers.add_parser('rpc-server')
    server.add_argument('-w', dest='wait_before_answer', type=int, default=-1)
    server.add_argument('-e', '--executor', dest='executor',
                        type=str, default='eventlet',
                        help='name of a message executor')

    client = subparsers.add_parser('rpc-client')
    client.add_argument('-p', dest='threads', type=int, default=1,
                        help='number of client threads')
    client.add_argument('-m', dest='messages', type=int, default=1,
                        help='number of call per threads')
    client.add_argument('-w', dest='wait_after_msg', type=int, default=-1,
                        help='sleep time between two messages')
    client.add_argument('--timeout', dest='timeout', type=int, default=3,
                        help='client timeout')
    client.add_argument('--exit-wait', dest='exit_wait', type=int, default=0,
                        help='Keep connections open N seconds after calls '
                        'have been done')
    client.add_argument('--is-cast', dest='is_cast', type=bool, default=False,
                        help='Use `call` or `cast` RPC methods')
    client.add_argument('--is-fanout', dest='is_fanout', type=bool,
                        default=False, help='fanout=True for CAST messages')

    args = parser.parse_args()

    _setup_logging(is_debug=args.debug)

    if args.config_file:
        cfg.CONF(["--config-file", args.config_file])

    if args.mode in ['rpc-server', 'rpc-client']:
        transport = messaging.get_transport(cfg.CONF, url=args.url)
    else:
        transport = messaging.get_notification_transport(cfg.CONF,
                                                         url=args.url)

    if args.mode in ['rpc-client', 'notify-client']:
        # always generate maximum number of messages for duration-limited tests
        generate_messages(MESSAGES_LIMIT if args.duration else args.messages)

    # oslo.config defaults
    cfg.CONF.heartbeat_interval = 5
    cfg.CONF.prog = os.path.basename(__file__)
    cfg.CONF.project = 'oslo.messaging'

    if args.mode == 'rpc-server':
        target = messaging.Target(topic=args.topic, server=args.server)
        if args.url.startswith('zmq'):
            cfg.CONF.rpc_zmq_matchmaker = "redis"

        endpoint = rpc_server(transport, target, args.wait_before_answer,
                              args.executor, args.duration)
        show_server_stats(endpoint, args.json_filename)

    elif args.mode == 'notify-server':
        endpoint = notify_server(transport, args.topic,
                                 args.wait_before_answer, args.duration,
                                 args.requeue)
        show_server_stats(endpoint, args.json_filename)

    elif args.mode == 'batch-notify-server':
        endpoint = batch_notify_server(transport, args.topic,
                                       args.wait_before_answer, args.duration,
                                       args.requeue)
        show_server_stats(endpoint, args.json_filename)

    elif args.mode == 'notify-client':
        spawn_notify_clients(args.threads, args.topic, transport,
                             args.messages, args.wait_after_msg, args.timeout,
                             args.duration)
        show_client_stats(CLIENTS, args.json_filename)

    elif args.mode == 'rpc-client':
        targets = [target.partition('.')[::2] for target in args.targets]
        targets = [messaging.Target(
            topic=topic, server=server_name, fanout=args.is_fanout) for
            topic, server_name in targets]
        spawn_rpc_clients(args.threads, transport, targets,
                          args.wait_after_msg, args.timeout, args.is_cast,
                          args.messages, args.duration)

        show_client_stats(CLIENTS, args.json_filename, not args.is_cast)

        if args.exit_wait:
            LOG.info("Finished. waiting for %d seconds", args.exit_wait)
            time.sleep(args.exit_wait)


if __name__ == '__main__':
    RANDOM_GENERATOR = init_random_generator()
    CURRENT_PID = os.getpid()
    main()