Create a new connection when a process fork has been detected
This patch attempts to deal with applications that have forked the process after connecting to the broker. First, the creation of the connection is delayed until the application attempts to perform a messaging operation. Second, each time the application performs a messaging operation the current process id is checked against the id of the process that created the connection. If the process ids do not match, the application has called os.fork(). The new child process discards the existing connection and creates a new one. Change-Id: I5455cb0f8d380d6b65f1268b34a91355cbb14aa2 Closes-Bug: #1392868
This commit is contained in:
parent
0650bde775
commit
80e62aed7d
oslo/messaging/_drivers/protocols/amqp
@ -57,8 +57,11 @@ class Replies(pyngus.ReceiverEventHandler):
|
||||
self._correlation = {} # map of correlation-id to response queue
|
||||
self._ready = False
|
||||
self._on_ready = on_ready
|
||||
rname = "Consumer-%s:src=[dynamic]:tgt=replies" % uuid.uuid4().hex
|
||||
self._receiver = connection.create_receiver("replies",
|
||||
event_handler=self)
|
||||
event_handler=self,
|
||||
name=rname)
|
||||
|
||||
# capacity determines the maximum number of reply messages this link
|
||||
# can receive. As messages are received and credit is consumed, this
|
||||
# driver will 'top up' the credit back to max capacity. This number
|
||||
@ -253,8 +256,6 @@ class Controller(pyngus.ConnectionEventHandler):
|
||||
self.group_request_prefix = \
|
||||
config.oslo_messaging_amqp.group_request_prefix
|
||||
self._container_name = config.oslo_messaging_amqp.container_name
|
||||
if not self._container_name:
|
||||
self._container_name = "container-%s" % uuid.uuid4().hex
|
||||
self.idle_timeout = config.oslo_messaging_amqp.idle_timeout
|
||||
self.trace_protocol = config.oslo_messaging_amqp.trace
|
||||
self.ssl_ca_file = config.oslo_messaging_amqp.ssl_ca_file
|
||||
@ -290,14 +291,13 @@ class Controller(pyngus.ConnectionEventHandler):
|
||||
self._tasks.put(task)
|
||||
self._schedule_task_processing()
|
||||
|
||||
def destroy(self):
|
||||
def shutdown(self, wait=True, timeout=None):
|
||||
"""Shutdown the messaging service."""
|
||||
if self.processor:
|
||||
self.processor.wakeup(lambda: self._start_shutdown())
|
||||
LOG.info("Waiting for eventloop to exit")
|
||||
self.processor.join()
|
||||
LOG.debug("Waiting for eventloop to exit")
|
||||
self.processor.shutdown(wait, timeout)
|
||||
self.processor = None
|
||||
LOG.info("Eventloop exited, driver shut down")
|
||||
LOG.debug("Eventloop exited, driver shut down")
|
||||
|
||||
# The remaining methods are reserved to run from the eventloop thread only!
|
||||
# They must not be invoked directly!
|
||||
|
@ -21,6 +21,7 @@ messaging protocol. The driver sends messages and creates subscriptions via
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
|
||||
@ -190,25 +191,36 @@ class ProtonDriver(base.BaseDriver):
|
||||
|
||||
super(ProtonDriver, self).__init__(conf, url, default_exchange,
|
||||
allowed_remote_exmods)
|
||||
# TODO(grs): handle authentication etc
|
||||
self._hosts = url.hosts
|
||||
self._conf = conf
|
||||
self._default_exchange = default_exchange
|
||||
|
||||
# Create a Controller that connects to the messaging service:
|
||||
self._ctrl = controller.Controller(url.hosts, default_exchange, conf)
|
||||
|
||||
# lazy connection setup - don't cause the controller to connect until
|
||||
# lazy connection setup - don't create the controller until
|
||||
# after the first messaging request:
|
||||
self._connect_called = False
|
||||
self._ctrl = None
|
||||
self._pid = None
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def _ensure_connect_called(func):
|
||||
"""Causes the controller to connect to the messaging service when it is
|
||||
first used. It is safe to push tasks to it whether connected or not,
|
||||
but those tasks won't be processed until connection completes.
|
||||
"""Causes a new controller to be created when the messaging service is
|
||||
first used by the current process. It is safe to push tasks to it
|
||||
whether connected or not, but those tasks won't be processed until
|
||||
connection completes.
|
||||
"""
|
||||
def wrap(self, *args, **kws):
|
||||
with self._lock:
|
||||
connect_called = self._connect_called
|
||||
self._connect_called = True
|
||||
if not connect_called:
|
||||
old_pid = self._pid
|
||||
self._pid = os.getpid()
|
||||
|
||||
if old_pid != self._pid:
|
||||
if self._ctrl is not None:
|
||||
LOG.warning("Process forked after connection established!")
|
||||
self._ctrl.shutdown(wait=False)
|
||||
# Create a Controller that connects to the messaging service:
|
||||
self._ctrl = controller.Controller(self._hosts,
|
||||
self._default_exchange,
|
||||
self._conf)
|
||||
self._ctrl.connect()
|
||||
return func(self, *args, **kws)
|
||||
return wrap
|
||||
@ -274,6 +286,7 @@ class ProtonDriver(base.BaseDriver):
|
||||
|
||||
def cleanup(self):
|
||||
"""Release all resources."""
|
||||
LOG.debug("Cleaning up ProtonDriver")
|
||||
self._ctrl.destroy()
|
||||
self._ctrl = None
|
||||
if self._ctrl:
|
||||
self._ctrl.shutdown()
|
||||
self._ctrl = None
|
||||
LOG.info("AMQP 1.0 messaging driver shutdown")
|
||||
|
@ -235,7 +235,7 @@ class Thread(threading.Thread):
|
||||
|
||||
# Configure a container
|
||||
if container_name is None:
|
||||
container_name = uuid.uuid4().hex
|
||||
container_name = "Container-" + uuid.uuid4().hex
|
||||
self._container = pyngus.Container(container_name)
|
||||
|
||||
self.name = "Thread for Proton container: %s" % self._container.name
|
||||
@ -245,25 +245,27 @@ class Thread(threading.Thread):
|
||||
|
||||
def wakeup(self, request=None):
|
||||
"""Wake up the eventloop thread, Optionally providing a callable to run
|
||||
when the eventloop wakes up.
|
||||
when the eventloop wakes up. Thread safe.
|
||||
"""
|
||||
self._requests.wakeup(request)
|
||||
|
||||
def shutdown(self, wait=True, timeout=None):
|
||||
"""Shutdown the eventloop thread. Thread safe.
|
||||
"""
|
||||
LOG.debug("eventloop shutdown requested")
|
||||
self._shutdown = True
|
||||
self.wakeup()
|
||||
if wait:
|
||||
self.join(timeout=timeout)
|
||||
LOG.debug("eventloop shutdown complete")
|
||||
|
||||
# the following methods are not thread safe - they must be run from the
|
||||
# eventloop thread
|
||||
|
||||
def schedule(self, request, delay):
|
||||
"""Invoke request after delay seconds."""
|
||||
self._schedule.schedule(request, delay)
|
||||
|
||||
def destroy(self):
|
||||
"""Stop the processing thread, releasing all resources.
|
||||
"""
|
||||
LOG.debug("Stopping Proton container %s", self._container.name)
|
||||
self.wakeup(lambda: self.shutdown())
|
||||
self.join()
|
||||
|
||||
def shutdown(self):
|
||||
LOG.info("eventloop shutdown requested")
|
||||
self._shutdown = True
|
||||
|
||||
def connect(self, host, handler, properties=None, name=None):
|
||||
"""Get a _SocketConnection to a peer represented by url."""
|
||||
key = name or "%s:%i" % (host.hostname, host.port)
|
||||
@ -311,6 +313,13 @@ class Thread(threading.Thread):
|
||||
str(serror))
|
||||
continue
|
||||
raise # assuming fatal...
|
||||
|
||||
# don't process any I/O or timers if woken up by a shutdown:
|
||||
# if we've been forked we don't want to do I/O on the parent's
|
||||
# sockets
|
||||
if self._shutdown:
|
||||
break
|
||||
|
||||
readable, writable, ignore = results
|
||||
|
||||
for r in readable:
|
||||
|
Loading…
x
Reference in New Issue
Block a user