diff --git a/oslo_messaging/_utils.py b/oslo_messaging/_utils.py
index cec94bb48..1bb20b089 100644
--- a/oslo_messaging/_utils.py
+++ b/oslo_messaging/_utils.py
@@ -116,6 +116,29 @@ def fetch_current_thread_functor():
         return lambda: threading.current_thread()
 
 
+class DummyCondition(object):
+    def acquire(self):
+        pass
+
+    def notify(self):
+        pass
+
+    def notify_all(self):
+        pass
+
+    def wait(self, timeout=None):
+        pass
+
+    def release(self):
+        pass
+
+    def __enter__(self):
+        self.acquire()
+
+    def __exit__(self, type, value, traceback):
+        self.release()
+
+
 class DummyLock(object):
     def acquire(self):
         pass
diff --git a/oslo_messaging/server.py b/oslo_messaging/server.py
index 7af4c3d8a..f8083bd02 100644
--- a/oslo_messaging/server.py
+++ b/oslo_messaging/server.py
@@ -27,6 +27,7 @@ import logging
 import threading
 
 from oslo_service import service
+from oslo_utils import timeutils
 from stevedore import driver
 
 from oslo_messaging._drivers import base as driver_base
@@ -98,9 +99,11 @@ class MessageHandlingServer(service.ServiceBase):
         # is fully started. Except for the blocking executor that have
         # start() that doesn't return
         if self.executor != "blocking":
-            self._state_lock = threading.Lock()
+            self._state_cond = threading.Condition()
+            self._dummy_cond = False
         else:
-            self._state_lock = _utils.DummyLock()
+            self._state_cond = _utils.DummyCondition()
+            self._dummy_cond = True
 
         try:
             mgr = driver.DriverManager('oslo.messaging.executors',
@@ -130,16 +133,18 @@ class MessageHandlingServer(service.ServiceBase):
         """
         if self._executor is not None:
             return
-        try:
-            listener = self.dispatcher._listen(self.transport)
-        except driver_base.TransportDriverError as ex:
-            raise ServerListenError(self.target, ex)
-
-        with self._state_lock:
+        with self._state_cond:
+            if self._executor is not None:
+                return
+            try:
+                listener = self.dispatcher._listen(self.transport)
+            except driver_base.TransportDriverError as ex:
+                raise ServerListenError(self.target, ex)
             self._running = True
             self._executor = self._executor_cls(self.conf, listener,
                                                 self.dispatcher)
             self._executor.start()
+            self._state_cond.notify_all()
 
     def stop(self):
         """Stop handling incoming messages.
@@ -149,10 +154,11 @@ class MessageHandlingServer(service.ServiceBase):
         some messages, and underlying driver resources associated to this
         server are still in use. See 'wait' for more details.
         """
-        with self._state_lock:
+        with self._state_cond:
             if self._executor is not None:
                 self._running = False
                 self._executor.stop()
+            self._state_cond.notify_all()
 
     def wait(self):
         """Wait for message processing to complete.
@@ -164,21 +170,37 @@ class MessageHandlingServer(service.ServiceBase):
         Once it's finished, the underlying driver resources associated to this
         server are released (like closing useless network connections).
         """
-        with self._state_lock:
+        with self._state_cond:
             if self._running:
-                # NOTE(dims): Need to change this to raise RuntimeError after
-                # verifying/fixing other openstack projects (like Neutron)
-                # work ok with this change
                 LOG.warn(_LW("wait() should be called after stop() as it "
                              "waits for existing messages to finish "
                              "processing"))
-
-            if self._executor is not None:
-                self._executor.wait()
-                # Close listener connection after processing all messages
-                self._executor.listener.cleanup()
-
+                w = timeutils.StopWatch()
+                w.start()
+                while self._running:
+                    # NOTE(harlowja): 1.0 seconds was mostly chosen at
+                    # random, but it seems like a reasonable value to
+                    # use to avoid spamming the logs with to much
+                    # information.
+                    self._state_cond.wait(1.0)
+                    if self._running and not self._dummy_cond:
+                        LOG.warn(
+                            _LW("wait() should be have been called"
+                                " after stop() as wait() waits for existing"
+                                " messages to finish processing, it has"
+                                " been %0.2f seconds and stop() still has"
+                                " not been called"), w.elapsed())
+            executor = self._executor
             self._executor = None
+        if executor is not None:
+            # We are the lucky calling thread to wait on the executor to
+            # actually finish.
+            try:
+                executor.wait()
+            finally:
+                # Close listener connection after processing all messages
+                executor.listener.cleanup()
+                executor = None
 
     def reset(self):
         """Reset service.
diff --git a/oslo_messaging/tests/rpc/test_server.py b/oslo_messaging/tests/rpc/test_server.py
index 0c222ae47..b1f8961c5 100644
--- a/oslo_messaging/tests/rpc/test_server.py
+++ b/oslo_messaging/tests/rpc/test_server.py
@@ -130,26 +130,6 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
         self.assertIsNone(server._executor)
         self.assertEqual(1, listener.cleanup.call_count)
 
-    @mock.patch('oslo_messaging._executors.impl_pooledexecutor.'
-                'PooledExecutor.wait')
-    def test_server_invalid_wait_running_server(self, mock_wait):
-        transport = oslo_messaging.get_transport(self.conf, url='fake:')
-        target = oslo_messaging.Target(topic='foo', server='bar')
-        endpoints = [object()]
-        serializer = object()
-
-        server = oslo_messaging.get_rpc_server(transport, target, endpoints,
-                                               serializer=serializer,
-                                               executor='eventlet')
-        self.addCleanup(server.wait)
-        self.addCleanup(server.stop)
-        server.start()
-        with mock.patch('logging.Logger.warn') as warn:
-            server.wait()
-            warn.assert_called_with('wait() should be called after '
-                                    'stop() as it waits for existing '
-                                    'messages to finish processing')
-
     def test_no_target_server(self):
         transport = oslo_messaging.get_transport(self.conf, url='fake:')