# Copyright 2013 Red Hat, Inc.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import fixtures
import six
from six.moves import mox
from stevedore import driver
import testscenarios

from oslo.config import cfg
from oslo import messaging
from oslo.messaging import transport
from oslo_messaging.tests import utils as test_utils
from oslo_messaging import transport as private_transport

load_tests = testscenarios.load_tests_apply_scenarios


class _FakeDriver(object):

    def __init__(self, conf):
        self.conf = conf

    def send(self, *args, **kwargs):
        pass

    def send_notification(self, *args, **kwargs):
        pass

    def listen(self, target):
        pass


class _FakeManager(object):

    def __init__(self, driver):
        self.driver = driver


class GetTransportTestCase(test_utils.BaseTestCase):

    scenarios = [
        ('rpc_backend',
         dict(url=None, transport_url=None, rpc_backend='testbackend',
              control_exchange=None, allowed=None, aliases=None,
              expect=dict(backend='testbackend',
                          exchange=None,
                          url='testbackend:',
                          allowed=[]))),
        ('transport_url',
         dict(url=None, transport_url='testtransport:', rpc_backend=None,
              control_exchange=None, allowed=None, aliases=None,
              expect=dict(backend='testtransport',
                          exchange=None,
                          url='testtransport:',
                          allowed=[]))),
        ('url_param',
         dict(url='testtransport:', transport_url=None, rpc_backend=None,
              control_exchange=None, allowed=None, aliases=None,
              expect=dict(backend='testtransport',
                          exchange=None,
                          url='testtransport:',
                          allowed=[]))),
        ('control_exchange',
         dict(url=None, transport_url=None, rpc_backend='testbackend',
              control_exchange='testexchange', allowed=None, aliases=None,
              expect=dict(backend='testbackend',
                          exchange='testexchange',
                          url='testbackend:',
                          allowed=[]))),
        ('allowed_remote_exmods',
         dict(url=None, transport_url=None, rpc_backend='testbackend',
              control_exchange=None, allowed=['foo', 'bar'], aliases=None,
              expect=dict(backend='testbackend',
                          exchange=None,
                          url='testbackend:',
                          allowed=['foo', 'bar']))),
        ('rpc_backend_aliased',
         dict(url=None, transport_url=None, rpc_backend='testfoo',
              control_exchange=None, allowed=None,
              aliases=dict(testfoo='testbackend'),
              expect=dict(backend='testbackend',
                          exchange=None,
                          url='testbackend:',
                          allowed=[]))),
        ('transport_url_aliased',
         dict(url=None, transport_url='testfoo:', rpc_backend=None,
              control_exchange=None, allowed=None,
              aliases=dict(testfoo='testtransport'),
              expect=dict(backend='testtransport',
                          exchange=None,
                          url='testtransport:',
                          allowed=[]))),
        ('url_param_aliased',
         dict(url='testfoo:', transport_url=None, rpc_backend=None,
              control_exchange=None, allowed=None,
              aliases=dict(testfoo='testtransport'),
              expect=dict(backend='testtransport',
                          exchange=None,
                          url='testtransport:',
                          allowed=[]))),
    ]

    def test_get_transport(self):
        self.config(rpc_backend=self.rpc_backend,
                    control_exchange=self.control_exchange,
                    transport_url=self.transport_url)

        self.mox.StubOutWithMock(driver, 'DriverManager')

        invoke_args = [self.conf,
                       messaging.TransportURL.parse(self.conf,
                                                    self.expect['url'])]
        invoke_kwds = dict(default_exchange=self.expect['exchange'],
                           allowed_remote_exmods=self.expect['allowed'])

        drvr = _FakeDriver(self.conf)
        driver.DriverManager('oslo.messaging.drivers',
                             self.expect['backend'],
                             invoke_on_load=True,
                             invoke_args=invoke_args,
                             invoke_kwds=invoke_kwds).\
            AndReturn(_FakeManager(drvr))

        self.mox.ReplayAll()

        kwargs = dict(url=self.url)
        if self.allowed is not None:
            kwargs['allowed_remote_exmods'] = self.allowed
        if self.aliases is not None:
            kwargs['aliases'] = self.aliases
        transport_ = messaging.get_transport(self.conf, **kwargs)

        self.assertIsNotNone(transport_)
        self.assertIs(transport_.conf, self.conf)
        self.assertIs(transport_._driver, drvr)


class GetTransportSadPathTestCase(test_utils.BaseTestCase):

    scenarios = [
        ('invalid_transport_url',
         dict(url=None, transport_url='invalid', rpc_backend=None,
              ex=dict(cls=messaging.InvalidTransportURL,
                      msg_contains='No scheme specified',
                      url='invalid'))),
        ('invalid_url_param',
         dict(url='invalid', transport_url=None, rpc_backend=None,
              ex=dict(cls=messaging.InvalidTransportURL,
                      msg_contains='No scheme specified',
                      url='invalid'))),
        ('driver_load_failure',
         dict(url=None, transport_url=None, rpc_backend='testbackend',
              ex=dict(cls=messaging.DriverLoadFailure,
                      msg_contains='Failed to load',
                      driver='testbackend'))),
    ]

    def test_get_transport_sad(self):
        self.config(rpc_backend=self.rpc_backend,
                    transport_url=self.transport_url)

        if self.rpc_backend:
            self.mox.StubOutWithMock(driver, 'DriverManager')

            invoke_args = [self.conf,
                           messaging.TransportURL.parse(self.conf,
                                                        self.url)]
            invoke_kwds = dict(default_exchange='openstack',
                               allowed_remote_exmods=[])

            driver.DriverManager('oslo.messaging.drivers',
                                 self.rpc_backend,
                                 invoke_on_load=True,
                                 invoke_args=invoke_args,
                                 invoke_kwds=invoke_kwds).\
                AndRaise(RuntimeError())

            self.mox.ReplayAll()

        try:
            messaging.get_transport(self.conf, url=self.url)
            self.assertFalse(True)
        except Exception as ex:
            ex_cls = self.ex.pop('cls')
            ex_msg_contains = self.ex.pop('msg_contains')

            self.assertIsInstance(ex, messaging.MessagingException)
            self.assertIsInstance(ex, ex_cls)
            self.assertIn(ex_msg_contains, six.text_type(ex))

            for k, v in self.ex.items():
                self.assertTrue(hasattr(ex, k))
                self.assertEqual(v, str(getattr(ex, k)))


# FIXME(markmc): this could be used elsewhere
class _SetDefaultsFixture(fixtures.Fixture):

    def __init__(self, set_defaults, opts, *names):
        super(_SetDefaultsFixture, self).__init__()
        self.set_defaults = set_defaults
        self.opts = opts
        self.names = names

    def setUp(self):
        super(_SetDefaultsFixture, self).setUp()

        # FIXME(markmc): this comes from Id5c1f3ba
        def first(seq, default=None, key=None):
            if key is None:
                key = bool
            return next(six.moves.filter(key, seq), default)

        def default(opts, name):
            return first(opts, key=lambda o: o.name == name).default

        orig_defaults = {}
        for n in self.names:
            orig_defaults[n] = default(self.opts, n)

        def restore_defaults():
            self.set_defaults(**orig_defaults)

        self.addCleanup(restore_defaults)


class TestSetDefaults(test_utils.BaseTestCase):

    def setUp(self):
        super(TestSetDefaults, self).setUp(conf=cfg.ConfigOpts())
        self.useFixture(_SetDefaultsFixture(messaging.set_transport_defaults,
                                            private_transport._transport_opts,
                                            'control_exchange'))

    def test_set_default_control_exchange(self):
        messaging.set_transport_defaults(control_exchange='foo')

        self.mox.StubOutWithMock(driver, 'DriverManager')
        invoke_kwds = mox.ContainsKeyValue('default_exchange', 'foo')
        driver.DriverManager(mox.IgnoreArg(),
                             mox.IgnoreArg(),
                             invoke_on_load=mox.IgnoreArg(),
                             invoke_args=mox.IgnoreArg(),
                             invoke_kwds=invoke_kwds).\
            AndReturn(_FakeManager(_FakeDriver(self.conf)))
        self.mox.ReplayAll()

        messaging.get_transport(self.conf)


class TestTransportMethodArgs(test_utils.BaseTestCase):

    _target = messaging.Target(topic='topic', server='server')

    def test_send_defaults(self):
        t = transport.Transport(_FakeDriver(cfg.CONF))

        self.mox.StubOutWithMock(t._driver, 'send')
        t._driver.send(self._target, 'ctxt', 'message',
                       wait_for_reply=None,
                       timeout=None, retry=None)
        self.mox.ReplayAll()

        t._send(self._target, 'ctxt', 'message')

    def test_send_all_args(self):
        t = transport.Transport(_FakeDriver(cfg.CONF))

        self.mox.StubOutWithMock(t._driver, 'send')
        t._driver.send(self._target, 'ctxt', 'message',
                       wait_for_reply='wait_for_reply',
                       timeout='timeout', retry='retry')
        self.mox.ReplayAll()

        t._send(self._target, 'ctxt', 'message',
                wait_for_reply='wait_for_reply',
                timeout='timeout', retry='retry')

    def test_send_notification(self):
        t = transport.Transport(_FakeDriver(cfg.CONF))

        self.mox.StubOutWithMock(t._driver, 'send_notification')
        t._driver.send_notification(self._target, 'ctxt', 'message', 1.0,
                                    retry=None)
        self.mox.ReplayAll()

        t._send_notification(self._target, 'ctxt', 'message', version=1.0)

    def test_send_notification_all_args(self):
        t = transport.Transport(_FakeDriver(cfg.CONF))

        self.mox.StubOutWithMock(t._driver, 'send_notification')
        t._driver.send_notification(self._target, 'ctxt', 'message', 1.0,
                                    retry=5)
        self.mox.ReplayAll()

        t._send_notification(self._target, 'ctxt', 'message', version=1.0,
                             retry=5)

    def test_listen(self):
        t = transport.Transport(_FakeDriver(cfg.CONF))

        self.mox.StubOutWithMock(t._driver, 'listen')
        t._driver.listen(self._target)
        self.mox.ReplayAll()

        t._listen(self._target)


class TestTransportUrlCustomisation(test_utils.BaseTestCase):
    def setUp(self):
        super(TestTransportUrlCustomisation, self).setUp()
        self.url1 = transport.TransportURL.parse(self.conf, "fake://vhost1")
        self.url2 = transport.TransportURL.parse(self.conf, "fake://vhost2")
        self.url3 = transport.TransportURL.parse(self.conf, "fake://vhost1")

    def test_hash(self):
        urls = {}
        urls[self.url1] = self.url1
        urls[self.url2] = self.url2
        urls[self.url3] = self.url3
        self.assertEqual(2, len(urls))

    def test_eq(self):
        self.assertEqual(self.url1, self.url3)
        self.assertNotEqual(self.url1, self.url2)


class TestTransportHostCustomisation(test_utils.BaseTestCase):
    def setUp(self):
        super(TestTransportHostCustomisation, self).setUp()
        self.host1 = transport.TransportHost("host1", 5662, "user", "pass")
        self.host2 = transport.TransportHost("host1", 5662, "user", "pass")
        self.host3 = transport.TransportHost("host1", 5663, "user", "pass")
        self.host4 = transport.TransportHost("host1", 5662, "user2", "pass")
        self.host5 = transport.TransportHost("host1", 5662, "user", "pass2")
        self.host6 = transport.TransportHost("host2", 5662, "user", "pass")

    def test_hash(self):
        hosts = {}
        hosts[self.host1] = self.host1
        hosts[self.host2] = self.host2
        hosts[self.host3] = self.host3
        hosts[self.host4] = self.host4
        hosts[self.host5] = self.host5
        hosts[self.host6] = self.host6
        self.assertEqual(5, len(hosts))

    def test_eq(self):
        self.assertEqual(self.host1, self.host2)
        self.assertNotEqual(self.host1, self.host3)
        self.assertNotEqual(self.host1, self.host4)
        self.assertNotEqual(self.host1, self.host5)
        self.assertNotEqual(self.host1, self.host6)