From 0d49793e340728416c0c7b1bf964b54efd7e5acb Mon Sep 17 00:00:00 2001
From: Elena Ezhova <eezhova@mirantis.com>
Date: Wed, 8 Oct 2014 18:18:20 +0400
Subject: [PATCH] Create ZeroMQ Context per socket

ZeroMQ Context is a singleton and thus is created only once. This leads
to problems when there is more than one process working with it.
For example, while Neutron server starts, it firstly loads core
plugin and service plugins, which start message handling server,
and only then forks to create api-workers. As a result, all child
processes get the same copy of the context.

Creating new Context for each socket will prevent such situations
from happening and will guarantee that each process works with its
own Context.

Change-Id: I56912e39b119c20f6f23311fc2c7c4b9e9e480d0
Closes-Bug: #1364814
---
 oslo/messaging/_drivers/impl_zmq.py | 30 ++++++-----------------------
 1 file changed, 6 insertions(+), 24 deletions(-)

diff --git a/oslo/messaging/_drivers/impl_zmq.py b/oslo/messaging/_drivers/impl_zmq.py
index a5b9bba8e..13c0f1942 100644
--- a/oslo/messaging/_drivers/impl_zmq.py
+++ b/oslo/messaging/_drivers/impl_zmq.py
@@ -88,7 +88,6 @@ zmq_opts = [
 
 CONF = cfg.CONF
 
-ZMQ_CTX = None  # ZeroMQ Context, must be global.
 matchmaker = None  # memoized matchmaker object
 
 
@@ -119,7 +118,8 @@ class ZmqSocket(object):
     """
 
     def __init__(self, addr, zmq_type, bind=True, subscribe=None):
-        self.sock = _get_ctxt().socket(zmq_type)
+        self.ctxt = zmq.Context(CONF.rpc_zmq_contexts)
+        self.sock = self.ctxt.socket(zmq_type)
         self.addr = addr
         self.type = zmq_type
         self.subscriptions = []
@@ -196,6 +196,7 @@ class ZmqSocket(object):
         try:
             # Default is to linger
             self.sock.close()
+            self.ctxt.term()
         except Exception:
             # While this is a bad thing to happen,
             # it would be much worse if some of the code calling this
@@ -759,27 +760,6 @@ def _multi_send(method, context, topic, msg, timeout=None,
     return return_val
 
 
-def cleanup():
-    """Clean up resources in use by implementation."""
-    global ZMQ_CTX
-    if ZMQ_CTX:
-        ZMQ_CTX.term()
-    ZMQ_CTX = None
-
-    global matchmaker
-    matchmaker = None
-
-
-def _get_ctxt():
-    if not zmq:
-        raise ImportError("Failed to import eventlet.green.zmq")
-
-    global ZMQ_CTX
-    if not ZMQ_CTX:
-        ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts)
-    return ZMQ_CTX
-
-
 def _get_matchmaker(*args, **kwargs):
     global matchmaker
     if not matchmaker:
@@ -858,6 +838,8 @@ class ZmqDriver(base.BaseDriver):
 
     def __init__(self, conf, url, default_exchange=None,
                  allowed_remote_exmods=None):
+        if not zmq:
+            raise ImportError("Failed to import eventlet.green.zmq")
         conf.register_opts(zmq_opts)
         conf.register_opts(impl_eventlet._eventlet_opts)
 
@@ -954,4 +936,4 @@ class ZmqDriver(base.BaseDriver):
         return listener
 
     def cleanup(self):
-        cleanup()
+        pass