Create ZeroMQ Context per socket
ZeroMQ Context is a singleton and thus is created only once. This leads to problems when there is more than one process working with it. For example, while Neutron server starts, it firstly loads core plugin and service plugins, which start message handling server, and only then forks to create api-workers. As a result, all child processes get the same copy of the context. Creating new Context for each socket will prevent such situations from happening and will guarantee that each process works with its own Context. Change-Id: I56912e39b119c20f6f23311fc2c7c4b9e9e480d0 Closes-Bug: #1364814
This commit is contained in:
parent
6eb0d2ff80
commit
0d49793e34
@ -88,7 +88,6 @@ zmq_opts = [
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
ZMQ_CTX = None # ZeroMQ Context, must be global.
|
||||
matchmaker = None # memoized matchmaker object
|
||||
|
||||
|
||||
@ -119,7 +118,8 @@ class ZmqSocket(object):
|
||||
"""
|
||||
|
||||
def __init__(self, addr, zmq_type, bind=True, subscribe=None):
|
||||
self.sock = _get_ctxt().socket(zmq_type)
|
||||
self.ctxt = zmq.Context(CONF.rpc_zmq_contexts)
|
||||
self.sock = self.ctxt.socket(zmq_type)
|
||||
self.addr = addr
|
||||
self.type = zmq_type
|
||||
self.subscriptions = []
|
||||
@ -196,6 +196,7 @@ class ZmqSocket(object):
|
||||
try:
|
||||
# Default is to linger
|
||||
self.sock.close()
|
||||
self.ctxt.term()
|
||||
except Exception:
|
||||
# While this is a bad thing to happen,
|
||||
# it would be much worse if some of the code calling this
|
||||
@ -759,27 +760,6 @@ def _multi_send(method, context, topic, msg, timeout=None,
|
||||
return return_val
|
||||
|
||||
|
||||
def cleanup():
|
||||
"""Clean up resources in use by implementation."""
|
||||
global ZMQ_CTX
|
||||
if ZMQ_CTX:
|
||||
ZMQ_CTX.term()
|
||||
ZMQ_CTX = None
|
||||
|
||||
global matchmaker
|
||||
matchmaker = None
|
||||
|
||||
|
||||
def _get_ctxt():
|
||||
if not zmq:
|
||||
raise ImportError("Failed to import eventlet.green.zmq")
|
||||
|
||||
global ZMQ_CTX
|
||||
if not ZMQ_CTX:
|
||||
ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts)
|
||||
return ZMQ_CTX
|
||||
|
||||
|
||||
def _get_matchmaker(*args, **kwargs):
|
||||
global matchmaker
|
||||
if not matchmaker:
|
||||
@ -858,6 +838,8 @@ class ZmqDriver(base.BaseDriver):
|
||||
|
||||
def __init__(self, conf, url, default_exchange=None,
|
||||
allowed_remote_exmods=None):
|
||||
if not zmq:
|
||||
raise ImportError("Failed to import eventlet.green.zmq")
|
||||
conf.register_opts(zmq_opts)
|
||||
conf.register_opts(impl_eventlet._eventlet_opts)
|
||||
|
||||
@ -954,4 +936,4 @@ class ZmqDriver(base.BaseDriver):
|
||||
return listener
|
||||
|
||||
def cleanup(self):
|
||||
cleanup()
|
||||
pass
|
||||
|
Loading…
x
Reference in New Issue
Block a user