diff --git a/oslo/messaging/_drivers/protocols/amqp/controller.py b/oslo/messaging/_drivers/protocols/amqp/controller.py
index 7bb5493b0..7f013533e 100644
--- a/oslo/messaging/_drivers/protocols/amqp/controller.py
+++ b/oslo/messaging/_drivers/protocols/amqp/controller.py
@@ -57,8 +57,11 @@ class Replies(pyngus.ReceiverEventHandler):
         self._correlation = {}  # map of correlation-id to response queue
         self._ready = False
         self._on_ready = on_ready
+        rname = "Consumer-%s:src=[dynamic]:tgt=replies" % uuid.uuid4().hex
         self._receiver = connection.create_receiver("replies",
-                                                    event_handler=self)
+                                                    event_handler=self,
+                                                    name=rname)
+
         # capacity determines the maximum number of reply messages this link
         # can receive. As messages are received and credit is consumed, this
         # driver will 'top up' the credit back to max capacity.  This number
@@ -253,8 +256,6 @@ class Controller(pyngus.ConnectionEventHandler):
         self.group_request_prefix = \
             config.oslo_messaging_amqp.group_request_prefix
         self._container_name = config.oslo_messaging_amqp.container_name
-        if not self._container_name:
-            self._container_name = "container-%s" % uuid.uuid4().hex
         self.idle_timeout = config.oslo_messaging_amqp.idle_timeout
         self.trace_protocol = config.oslo_messaging_amqp.trace
         self.ssl_ca_file = config.oslo_messaging_amqp.ssl_ca_file
@@ -290,14 +291,13 @@ class Controller(pyngus.ConnectionEventHandler):
         self._tasks.put(task)
         self._schedule_task_processing()
 
-    def destroy(self):
+    def shutdown(self, wait=True, timeout=None):
         """Shutdown the messaging service."""
         if self.processor:
-            self.processor.wakeup(lambda: self._start_shutdown())
-            LOG.info("Waiting for eventloop to exit")
-            self.processor.join()
+            LOG.debug("Waiting for eventloop to exit")
+            self.processor.shutdown(wait, timeout)
             self.processor = None
-        LOG.info("Eventloop exited, driver shut down")
+        LOG.debug("Eventloop exited, driver shut down")
 
     # The remaining methods are reserved to run from the eventloop thread only!
     # They must not be invoked directly!
diff --git a/oslo/messaging/_drivers/protocols/amqp/driver.py b/oslo/messaging/_drivers/protocols/amqp/driver.py
index bf3f19ee8..e60be6593 100644
--- a/oslo/messaging/_drivers/protocols/amqp/driver.py
+++ b/oslo/messaging/_drivers/protocols/amqp/driver.py
@@ -21,6 +21,7 @@ messaging protocol.  The driver sends messages and creates subscriptions via
 """
 
 import logging
+import os
 import threading
 import time
 
@@ -190,25 +191,36 @@ class ProtonDriver(base.BaseDriver):
 
         super(ProtonDriver, self).__init__(conf, url, default_exchange,
                                            allowed_remote_exmods)
+        # TODO(grs): handle authentication etc
+        self._hosts = url.hosts
+        self._conf = conf
+        self._default_exchange = default_exchange
 
-        # Create a Controller that connects to the messaging service:
-        self._ctrl = controller.Controller(url.hosts, default_exchange, conf)
-
-        # lazy connection setup - don't cause the controller to connect until
+        # lazy connection setup - don't create the controller until
         # after the first messaging request:
-        self._connect_called = False
+        self._ctrl = None
+        self._pid = None
         self._lock = threading.Lock()
 
     def _ensure_connect_called(func):
-        """Causes the controller to connect to the messaging service when it is
-        first used. It is safe to push tasks to it whether connected or not,
-        but those tasks won't be processed until connection completes.
+        """Causes a new controller to be created when the messaging service is
+        first used by the current process. It is safe to push tasks to it
+        whether connected or not, but those tasks won't be processed until
+        connection completes.
         """
         def wrap(self, *args, **kws):
             with self._lock:
-                connect_called = self._connect_called
-                self._connect_called = True
-            if not connect_called:
+                old_pid = self._pid
+                self._pid = os.getpid()
+
+            if old_pid != self._pid:
+                if self._ctrl is not None:
+                    LOG.warning("Process forked after connection established!")
+                    self._ctrl.shutdown(wait=False)
+                # Create a Controller that connects to the messaging service:
+                self._ctrl = controller.Controller(self._hosts,
+                                                   self._default_exchange,
+                                                   self._conf)
                 self._ctrl.connect()
             return func(self, *args, **kws)
         return wrap
@@ -277,6 +289,7 @@ class ProtonDriver(base.BaseDriver):
 
     def cleanup(self):
         """Release all resources."""
-        LOG.debug("Cleaning up ProtonDriver")
-        self._ctrl.destroy()
-        self._ctrl = None
+        if self._ctrl:
+            self._ctrl.shutdown()
+            self._ctrl = None
+        LOG.info("AMQP 1.0 messaging driver shutdown")
diff --git a/oslo/messaging/_drivers/protocols/amqp/eventloop.py b/oslo/messaging/_drivers/protocols/amqp/eventloop.py
index f3d235a57..04ff868f6 100644
--- a/oslo/messaging/_drivers/protocols/amqp/eventloop.py
+++ b/oslo/messaging/_drivers/protocols/amqp/eventloop.py
@@ -235,7 +235,7 @@ class Thread(threading.Thread):
 
         # Configure a container
         if container_name is None:
-            container_name = uuid.uuid4().hex
+            container_name = "Container-" + uuid.uuid4().hex
         self._container = pyngus.Container(container_name)
 
         self.name = "Thread for Proton container: %s" % self._container.name
@@ -245,25 +245,27 @@ class Thread(threading.Thread):
 
     def wakeup(self, request=None):
         """Wake up the eventloop thread, Optionally providing a callable to run
-        when the eventloop wakes up.
+        when the eventloop wakes up.  Thread safe.
         """
         self._requests.wakeup(request)
 
+    def shutdown(self, wait=True, timeout=None):
+        """Shutdown the eventloop thread.  Thread safe.
+        """
+        LOG.debug("eventloop shutdown requested")
+        self._shutdown = True
+        self.wakeup()
+        if wait:
+            self.join(timeout=timeout)
+        LOG.debug("eventloop shutdown complete")
+
+    # the following methods are not thread safe - they must be run from the
+    # eventloop thread
+
     def schedule(self, request, delay):
         """Invoke request after delay seconds."""
         self._schedule.schedule(request, delay)
 
-    def destroy(self):
-        """Stop the processing thread, releasing all resources.
-        """
-        LOG.debug("Stopping Proton container %s", self._container.name)
-        self.wakeup(lambda: self.shutdown())
-        self.join()
-
-    def shutdown(self):
-        LOG.info("eventloop shutdown requested")
-        self._shutdown = True
-
     def connect(self, host, handler, properties=None, name=None):
         """Get a _SocketConnection to a peer represented by url."""
         key = name or "%s:%i" % (host.hostname, host.port)
@@ -311,6 +313,13 @@ class Thread(threading.Thread):
                                 str(serror))
                     continue
                 raise  # assuming fatal...
+
+            # don't process any I/O or timers if woken up by a shutdown:
+            # if we've been forked we don't want to do I/O on the parent's
+            # sockets
+            if self._shutdown:
+                break
+
             readable, writable, ignore = results
 
             for r in readable: