Merge "[zmq] Driver optimizations for CALL"
This commit is contained in:
commit
ec5640a646
oslo_messaging/_drivers
impl_zmq.py
zmq_driver
broker
client
publishers
zmq_client.pyzmq_client_base.pyzmq_client_light.pypoller
server
zmq_async.pyzmq_socket.py@ -13,6 +13,7 @@
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import os
|
||||
import pprint
|
||||
import socket
|
||||
import threading
|
||||
@ -23,6 +24,7 @@ from stevedore import driver
|
||||
from oslo_messaging._drivers import base
|
||||
from oslo_messaging._drivers import common as rpc_common
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_client
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_client_light
|
||||
from oslo_messaging._drivers.zmq_driver.server import zmq_server
|
||||
from oslo_messaging._executors import impl_pooledexecutor # FIXME(markmc)
|
||||
|
||||
@ -78,8 +80,8 @@ zmq_opts = [
|
||||
'Poll raises timeout exception when timeout expired.'),
|
||||
|
||||
cfg.BoolOpt('zmq_use_broker',
|
||||
default=True,
|
||||
help='Shows whether zmq-messaging uses broker or not.'),
|
||||
default=False,
|
||||
help='Configures zmq-messaging to use broker or not.'),
|
||||
|
||||
cfg.PortOpt('rpc_zmq_min_port',
|
||||
default=49152,
|
||||
@ -106,6 +108,7 @@ class LazyDriverItem(object):
|
||||
self.item_class = item_cls
|
||||
self.args = args
|
||||
self.kwargs = kwargs
|
||||
self.process_id = os.getpid()
|
||||
|
||||
def get(self):
|
||||
# NOTE(ozamiatin): Lazy initialization.
|
||||
@ -114,11 +117,12 @@ class LazyDriverItem(object):
|
||||
# __init__, but 'fork' extensively used by services
|
||||
# breaks all things.
|
||||
|
||||
if self.item is not None:
|
||||
if self.item is not None and os.getpid() == self.process_id:
|
||||
return self.item
|
||||
|
||||
self._lock.acquire()
|
||||
if self.item is None:
|
||||
if self.item is None or os.getpid() != self.process_id:
|
||||
self.process_id = os.getpid()
|
||||
self.item = self.item_class(*self.args, **self.kwargs)
|
||||
self._lock.release()
|
||||
return self.item
|
||||
@ -175,12 +179,15 @@ class ZmqDriver(base.BaseDriver):
|
||||
self.notify_server = LazyDriverItem(
|
||||
zmq_server.ZmqServer, self, self.conf, self.matchmaker)
|
||||
|
||||
client_cls = zmq_client_light.ZmqClientLight \
|
||||
if conf.zmq_use_broker else zmq_client.ZmqClient
|
||||
|
||||
self.client = LazyDriverItem(
|
||||
zmq_client.ZmqClient, self.conf, self.matchmaker,
|
||||
client_cls, self.conf, self.matchmaker,
|
||||
self.allowed_remote_exmods)
|
||||
|
||||
self.notifier = LazyDriverItem(
|
||||
zmq_client.ZmqClient, self.conf, self.matchmaker,
|
||||
client_cls, self.conf, self.matchmaker,
|
||||
self.allowed_remote_exmods)
|
||||
|
||||
super(ZmqDriver, self).__init__(conf, url, default_exchange,
|
||||
|
@ -15,8 +15,8 @@
|
||||
import logging
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver.broker import zmq_base_proxy
|
||||
from oslo_messaging._drivers.zmq_driver.client.publishers\
|
||||
import zmq_dealer_publisher
|
||||
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
||||
import zmq_dealer_publisher_proxy
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
@ -39,8 +39,8 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy):
|
||||
LOG.info(_LI("Polling at universal proxy"))
|
||||
|
||||
self.matchmaker = matchmaker
|
||||
reply_receiver = zmq_dealer_publisher.ReplyReceiver(self.poller)
|
||||
self.publisher = zmq_dealer_publisher.DealerPublisherProxy(
|
||||
reply_receiver = zmq_dealer_publisher_proxy.ReplyReceiver(self.poller)
|
||||
self.publisher = zmq_dealer_publisher_proxy.DealerPublisherProxy(
|
||||
conf, matchmaker, reply_receiver)
|
||||
|
||||
def run(self):
|
||||
@ -54,18 +54,18 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy):
|
||||
self._redirect_reply(message)
|
||||
|
||||
def _redirect_in_request(self, request):
|
||||
LOG.info(_LI("-> Redirecting request %s to TCP publisher")
|
||||
% request)
|
||||
LOG.debug("-> Redirecting request %s to TCP publisher"
|
||||
% request)
|
||||
self.publisher.send_request(request)
|
||||
|
||||
def _redirect_reply(self, reply):
|
||||
LOG.info(_LI("Reply proxy %s") % reply)
|
||||
LOG.debug("Reply proxy %s" % reply)
|
||||
if reply[zmq_names.IDX_REPLY_TYPE] == zmq_names.ACK_TYPE:
|
||||
LOG.info(_LI("Acknowledge dropped %s") % reply)
|
||||
LOG.debug("Acknowledge dropped %s" % reply)
|
||||
return
|
||||
|
||||
LOG.info(_LI("<- Redirecting reply to ROUTER: reply: %s")
|
||||
% reply[zmq_names.IDX_REPLY_BODY:])
|
||||
LOG.debug("<- Redirecting reply to ROUTER: reply: %s"
|
||||
% reply[zmq_names.IDX_REPLY_BODY:])
|
||||
|
||||
self.router_socket.send_multipart(reply[zmq_names.IDX_REPLY_BODY:])
|
||||
|
||||
|
194
oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py
Normal file
194
oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py
Normal file
@ -0,0 +1,194 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import threading
|
||||
|
||||
from concurrent import futures
|
||||
import futurist
|
||||
|
||||
import oslo_messaging
|
||||
from oslo_messaging._drivers import common as rpc_common
|
||||
from oslo_messaging._drivers.zmq_driver.client.publishers\
|
||||
import zmq_publisher_base
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_socket
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
class DealerCallPublisher(zmq_publisher_base.PublisherBase):
|
||||
"""Thread-safe CALL publisher
|
||||
|
||||
Used as faster and thread-safe publisher for CALL
|
||||
instead of ReqPublisher.
|
||||
"""
|
||||
|
||||
def __init__(self, conf, matchmaker):
|
||||
super(DealerCallPublisher, self).__init__(conf)
|
||||
self.matchmaker = matchmaker
|
||||
self.reply_waiter = ReplyWaiter(conf)
|
||||
self.sender = RequestSender(conf, matchmaker, self.reply_waiter) \
|
||||
if not conf.zmq_use_broker else \
|
||||
RequestSenderLight(conf, matchmaker, self.reply_waiter)
|
||||
|
||||
def send_request(self, request):
|
||||
reply_future = self.sender.send_request(request)
|
||||
try:
|
||||
reply = reply_future.result(timeout=request.timeout)
|
||||
except futures.TimeoutError:
|
||||
raise oslo_messaging.MessagingTimeout(
|
||||
"Timeout %s seconds was reached" % request.timeout)
|
||||
finally:
|
||||
self.reply_waiter.untrack_id(request.message_id)
|
||||
|
||||
LOG.debug("Received reply %s" % reply)
|
||||
if reply[zmq_names.FIELD_FAILURE]:
|
||||
raise rpc_common.deserialize_remote_exception(
|
||||
reply[zmq_names.FIELD_FAILURE],
|
||||
request.allowed_remote_exmods)
|
||||
else:
|
||||
return reply[zmq_names.FIELD_REPLY]
|
||||
|
||||
|
||||
class RequestSender(zmq_publisher_base.PublisherMultisend):
|
||||
|
||||
def __init__(self, conf, matchmaker, reply_waiter):
|
||||
super(RequestSender, self).__init__(conf, matchmaker, zmq.DEALER)
|
||||
self.reply_waiter = reply_waiter
|
||||
self.queue, self.empty_except = zmq_async.get_queue()
|
||||
self.executor = zmq_async.get_executor(self.run_loop)
|
||||
self.executor.execute()
|
||||
|
||||
def send_request(self, request):
|
||||
reply_future = futurist.Future()
|
||||
self.reply_waiter.track_reply(reply_future, request.message_id)
|
||||
self.queue.put(request)
|
||||
return reply_future
|
||||
|
||||
def _do_send_request(self, socket, request):
|
||||
socket.send(b'', zmq.SNDMORE)
|
||||
socket.send_pyobj(request)
|
||||
|
||||
LOG.debug("Sending message_id %(message)s to a target %(target)s"
|
||||
% {"message": request.message_id,
|
||||
"target": request.target})
|
||||
|
||||
def _check_hosts_connections(self, target, listener_type):
|
||||
if str(target) in self.outbound_sockets:
|
||||
socket = self.outbound_sockets[str(target)]
|
||||
else:
|
||||
hosts = self.matchmaker.get_hosts(
|
||||
target, listener_type)
|
||||
socket = zmq_socket.ZmqSocket(self.zmq_context, self.socket_type)
|
||||
self.outbound_sockets[str(target)] = socket
|
||||
|
||||
for host in hosts:
|
||||
self._connect_to_host(socket, host, target)
|
||||
|
||||
return socket
|
||||
|
||||
def run_loop(self):
|
||||
try:
|
||||
request = self.queue.get(timeout=self.conf.rpc_poll_timeout)
|
||||
except self.empty_except:
|
||||
return
|
||||
|
||||
socket = self._check_hosts_connections(
|
||||
request.target, zmq_names.socket_type_str(zmq.ROUTER))
|
||||
|
||||
self._do_send_request(socket, request)
|
||||
self.reply_waiter.poll_socket(socket)
|
||||
|
||||
|
||||
class RequestSenderLight(RequestSender):
|
||||
"""This class used with proxy.
|
||||
|
||||
Simplified address matching because there is only
|
||||
one proxy IPC address.
|
||||
"""
|
||||
|
||||
def __init__(self, conf, matchmaker, reply_waiter):
|
||||
if not conf.zmq_use_broker:
|
||||
raise rpc_common.RPCException("RequestSenderLight needs a proxy!")
|
||||
|
||||
super(RequestSenderLight, self).__init__(
|
||||
conf, matchmaker, reply_waiter)
|
||||
|
||||
self.socket = None
|
||||
|
||||
def _check_hosts_connections(self, target, listener_type):
|
||||
if self.socket is None:
|
||||
self.socket = zmq_socket.ZmqSocket(self.zmq_context,
|
||||
self.socket_type)
|
||||
self.outbound_sockets[str(target)] = self.socket
|
||||
address = zmq_address.get_broker_address(self.conf)
|
||||
self._connect_to_address(self.socket, address, target)
|
||||
return self.socket
|
||||
|
||||
def _do_send_request(self, socket, request):
|
||||
LOG.debug("Sending %(type)s message_id %(message)s"
|
||||
" to a target %(target)s"
|
||||
% {"type": request.msg_type,
|
||||
"message": request.message_id,
|
||||
"target": request.target})
|
||||
|
||||
envelope = request.create_envelope()
|
||||
|
||||
socket.send(b'', zmq.SNDMORE)
|
||||
socket.send_pyobj(envelope, zmq.SNDMORE)
|
||||
socket.send_pyobj(request)
|
||||
|
||||
|
||||
class ReplyWaiter(object):
|
||||
|
||||
def __init__(self, conf):
|
||||
self.conf = conf
|
||||
self.replies = {}
|
||||
self.poller = zmq_async.get_poller()
|
||||
self.executor = zmq_async.get_executor(self.run_loop)
|
||||
self.executor.execute()
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def track_reply(self, reply_future, message_id):
|
||||
self._lock.acquire()
|
||||
self.replies[message_id] = reply_future
|
||||
self._lock.release()
|
||||
|
||||
def untrack_id(self, message_id):
|
||||
self._lock.acquire()
|
||||
self.replies.pop(message_id)
|
||||
self._lock.release()
|
||||
|
||||
def poll_socket(self, socket):
|
||||
|
||||
def _receive_method(socket):
|
||||
empty = socket.recv()
|
||||
assert empty == b'', "Empty expected!"
|
||||
reply = socket.recv_pyobj()
|
||||
LOG.debug("Received reply %s" % reply)
|
||||
return reply
|
||||
|
||||
self.poller.register(socket, recv_method=_receive_method)
|
||||
|
||||
def run_loop(self):
|
||||
reply, socket = self.poller.poll(
|
||||
timeout=self.conf.rpc_poll_timeout)
|
||||
if reply is not None:
|
||||
call_future = self.replies[reply[zmq_names.FIELD_MSG_ID]]
|
||||
call_future.set_result(reply)
|
@ -18,7 +18,7 @@ from oslo_messaging._drivers.zmq_driver.client.publishers\
|
||||
import zmq_publisher_base
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._i18n import _LI, _LW
|
||||
from oslo_messaging._i18n import _LW
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -34,7 +34,7 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend):
|
||||
|
||||
self._check_request_pattern(request)
|
||||
|
||||
dealer_socket, hosts = self._check_hosts_connections(
|
||||
dealer_socket = self._check_hosts_connections(
|
||||
request.target, zmq_names.socket_type_str(zmq.ROUTER))
|
||||
|
||||
if not dealer_socket.connections:
|
||||
@ -61,15 +61,16 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend):
|
||||
socket.send(b'', zmq.SNDMORE)
|
||||
socket.send_pyobj(request)
|
||||
|
||||
LOG.info(_LI("Sending message_id %(message)s to a target %(target)s")
|
||||
% {"message": request.message_id,
|
||||
"target": request.target})
|
||||
LOG.debug("Sending message_id %(message)s to a target %(target)s"
|
||||
% {"message": request.message_id,
|
||||
"target": request.target})
|
||||
|
||||
def cleanup(self):
|
||||
super(DealerPublisher, self).cleanup()
|
||||
|
||||
|
||||
class DealerPublisherLight(zmq_publisher_base.PublisherBase):
|
||||
"""Used when publishing to proxy. """
|
||||
|
||||
def __init__(self, conf, address):
|
||||
super(DealerPublisherLight, self).__init__(conf)
|
||||
@ -92,68 +93,6 @@ class DealerPublisherLight(zmq_publisher_base.PublisherBase):
|
||||
self.socket.close()
|
||||
|
||||
|
||||
class DealerPublisherProxy(DealerPublisher):
|
||||
|
||||
def __init__(self, conf, matchmaker, reply_receiver):
|
||||
super(DealerPublisherProxy, self).__init__(conf, matchmaker)
|
||||
self.reply_receiver = reply_receiver
|
||||
|
||||
def send_request(self, multipart_message):
|
||||
|
||||
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
|
||||
|
||||
LOG.info(_LI("Envelope: %s") % envelope)
|
||||
|
||||
target = envelope[zmq_names.FIELD_TARGET]
|
||||
dealer_socket, hosts = self._check_hosts_connections(
|
||||
target, zmq_names.socket_type_str(zmq.ROUTER))
|
||||
|
||||
if not dealer_socket.connections:
|
||||
# NOTE(ozamiatin): Here we can provide
|
||||
# a queue for keeping messages to send them later
|
||||
# when some listener appears. However such approach
|
||||
# being more reliable will consume additional memory.
|
||||
LOG.warning(_LW("Request %s was dropped because no connection")
|
||||
% envelope[zmq_names.FIELD_MSG_TYPE])
|
||||
return
|
||||
|
||||
self.reply_receiver.track_socket(dealer_socket.handle)
|
||||
|
||||
LOG.info(_LI("Sending message %(message)s to a target %(target)s")
|
||||
% {"message": envelope[zmq_names.FIELD_MSG_ID],
|
||||
"target": envelope[zmq_names.FIELD_TARGET]})
|
||||
|
||||
if envelope[zmq_names.FIELD_MSG_TYPE] in zmq_names.MULTISEND_TYPES:
|
||||
for _ in range(dealer_socket.connections_count()):
|
||||
self._send_request(dealer_socket, multipart_message)
|
||||
else:
|
||||
self._send_request(dealer_socket, multipart_message)
|
||||
|
||||
def _send_request(self, socket, multipart_message):
|
||||
|
||||
socket.send(b'', zmq.SNDMORE)
|
||||
socket.send_pyobj(
|
||||
multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE],
|
||||
zmq.SNDMORE)
|
||||
socket.send(multipart_message[zmq_names.MULTIPART_IDX_BODY])
|
||||
|
||||
|
||||
class ReplyReceiver(object):
|
||||
|
||||
def __init__(self, poller):
|
||||
self.poller = poller
|
||||
LOG.info(_LI("Reply waiter created in broker"))
|
||||
|
||||
def _receive_reply(self, socket):
|
||||
return socket.recv_multipart()
|
||||
|
||||
def track_socket(self, socket):
|
||||
self.poller.register(socket, self._receive_reply)
|
||||
|
||||
def cleanup(self):
|
||||
self.poller.close()
|
||||
|
||||
|
||||
class AcknowledgementReceiver(object):
|
||||
|
||||
def __init__(self):
|
||||
@ -172,8 +111,7 @@ class AcknowledgementReceiver(object):
|
||||
|
||||
def poll_for_acknowledgements(self):
|
||||
ack_message, socket = self.poller.poll()
|
||||
LOG.info(_LI("Message %s acknowledged")
|
||||
% ack_message[zmq_names.FIELD_ID])
|
||||
LOG.debug("Message %s acknowledged" % ack_message[zmq_names.FIELD_ID])
|
||||
|
||||
def cleanup(self):
|
||||
self.thread.stop()
|
87
oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
Normal file
87
oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
Normal file
@ -0,0 +1,87 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
||||
import zmq_dealer_publisher
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._i18n import _LI, _LW
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DealerPublisherProxy(zmq_dealer_publisher.DealerPublisher):
|
||||
|
||||
def __init__(self, conf, matchmaker, reply_receiver):
|
||||
super(DealerPublisherProxy, self).__init__(conf, matchmaker)
|
||||
self.reply_receiver = reply_receiver
|
||||
|
||||
def send_request(self, multipart_message):
|
||||
|
||||
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
|
||||
|
||||
LOG.debug("Envelope: %s" % envelope)
|
||||
|
||||
target = envelope[zmq_names.FIELD_TARGET]
|
||||
dealer_socket = self._check_hosts_connections(
|
||||
target, zmq_names.socket_type_str(zmq.ROUTER))
|
||||
|
||||
if not dealer_socket.connections:
|
||||
# NOTE(ozamiatin): Here we can provide
|
||||
# a queue for keeping messages to send them later
|
||||
# when some listener appears. However such approach
|
||||
# being more reliable will consume additional memory.
|
||||
LOG.warning(_LW("Request %s was dropped because no connection")
|
||||
% envelope[zmq_names.FIELD_MSG_TYPE])
|
||||
return
|
||||
|
||||
self.reply_receiver.track_socket(dealer_socket.handle)
|
||||
|
||||
LOG.debug("Sending message %(message)s to a target %(target)s"
|
||||
% {"message": envelope[zmq_names.FIELD_MSG_ID],
|
||||
"target": envelope[zmq_names.FIELD_TARGET]})
|
||||
|
||||
if envelope[zmq_names.FIELD_MSG_TYPE] in zmq_names.MULTISEND_TYPES:
|
||||
for _ in range(dealer_socket.connections_count()):
|
||||
self._send_request(dealer_socket, multipart_message)
|
||||
else:
|
||||
self._send_request(dealer_socket, multipart_message)
|
||||
|
||||
def _send_request(self, socket, multipart_message):
|
||||
|
||||
socket.send(b'', zmq.SNDMORE)
|
||||
socket.send_pyobj(
|
||||
multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE],
|
||||
zmq.SNDMORE)
|
||||
socket.send(multipart_message[zmq_names.MULTIPART_IDX_BODY])
|
||||
|
||||
|
||||
class ReplyReceiver(object):
|
||||
|
||||
def __init__(self, poller):
|
||||
self.poller = poller
|
||||
LOG.info(_LI("Reply waiter created in broker"))
|
||||
|
||||
def _receive_reply(self, socket):
|
||||
return socket.recv_multipart()
|
||||
|
||||
def track_socket(self, socket):
|
||||
self.poller.register(socket, self._receive_reply)
|
||||
|
||||
def cleanup(self):
|
||||
self.poller.close()
|
@ -18,7 +18,6 @@ from oslo_messaging._drivers.zmq_driver.client.publishers\
|
||||
import zmq_publisher_base
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._i18n import _LI
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -35,7 +34,7 @@ class PubPublisher(zmq_publisher_base.PublisherMultisend):
|
||||
if request.msg_type not in zmq_names.NOTIFY_TYPES:
|
||||
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
|
||||
|
||||
pub_socket, hosts = self._check_hosts_connections(
|
||||
pub_socket = self._check_hosts_connections(
|
||||
request.target, zmq_names.socket_type_str(zmq.SUB))
|
||||
self._send_request(pub_socket, request)
|
||||
|
||||
@ -43,6 +42,6 @@ class PubPublisher(zmq_publisher_base.PublisherMultisend):
|
||||
|
||||
super(PubPublisher, self)._send_request(socket, request)
|
||||
|
||||
LOG.info(_LI("Publishing message %(message)s to a target %(target)s")
|
||||
% {"message": request.message,
|
||||
"target": request.target})
|
||||
LOG.debug("Publishing message %(message)s to a target %(target)s"
|
||||
% {"message": request.message,
|
||||
"target": request.target})
|
||||
|
@ -14,6 +14,7 @@
|
||||
|
||||
import abc
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
import six
|
||||
|
||||
@ -89,12 +90,11 @@ class PublisherBase(object):
|
||||
:param request: Message data and destination container object
|
||||
:type request: zmq_request.Request
|
||||
"""
|
||||
LOG.info(_LI("Sending %(type)s message_id %(message)s to a target"
|
||||
"%(target)s host:%(host)s")
|
||||
% {"type": request.msg_type,
|
||||
"message": request.message_id,
|
||||
"target": request.target,
|
||||
"host": request.host})
|
||||
LOG.debug("Sending %(type)s message_id %(message)s to a target"
|
||||
"%(target)s"
|
||||
% {"type": request.msg_type,
|
||||
"message": request.message_id,
|
||||
"target": request.target})
|
||||
socket.send_pyobj(request)
|
||||
|
||||
def cleanup(self):
|
||||
@ -124,28 +124,30 @@ class PublisherMultisend(PublisherBase):
|
||||
def _check_hosts_connections(self, target, listener_type):
|
||||
# TODO(ozamiatin): Place for significant optimization
|
||||
# Matchmaker cache should be implemented
|
||||
hosts = self.matchmaker.get_hosts(
|
||||
target, listener_type)
|
||||
if str(target) in self.outbound_sockets:
|
||||
socket = self.outbound_sockets[str(target)]
|
||||
else:
|
||||
hosts = self.matchmaker.get_hosts(target, listener_type)
|
||||
socket = zmq_socket.ZmqSocket(self.zmq_context, self.socket_type)
|
||||
self.outbound_sockets[str(target)] = socket
|
||||
for host in hosts:
|
||||
self._connect_to_host(socket, host, target)
|
||||
|
||||
for host in hosts:
|
||||
self._connect_to_host(socket, host, target)
|
||||
return socket
|
||||
|
||||
return socket, hosts
|
||||
|
||||
def _connect_to_host(self, socket, host, target):
|
||||
address = zmq_address.get_tcp_direct_address(host)
|
||||
LOG.info(address)
|
||||
def _connect_to_address(self, socket, address, target):
|
||||
stype = zmq_names.socket_type_str(self.socket_type)
|
||||
try:
|
||||
LOG.info(_LI("Connecting %(stype)s to %(address)s for %(target)s")
|
||||
% {"stype": stype,
|
||||
"address": address,
|
||||
"target": target})
|
||||
|
||||
if six.PY3:
|
||||
socket.setsockopt_string(zmq.IDENTITY, str(uuid.uuid1()))
|
||||
else:
|
||||
socket.handle.identity = str(uuid.uuid1())
|
||||
|
||||
socket.connect(address)
|
||||
except zmq.ZMQError as e:
|
||||
errmsg = _LE("Failed connecting %(stype) to %(address)s: %(e)s")\
|
||||
@ -153,3 +155,7 @@ class PublisherMultisend(PublisherBase):
|
||||
LOG.error(_LE("Failed connecting %(stype) to %(address)s: %(e)s")
|
||||
% (stype, address, e))
|
||||
raise rpc_common.RPCException(errmsg)
|
||||
|
||||
def _connect_to_host(self, socket, host, target):
|
||||
address = zmq_address.get_tcp_direct_address(host)
|
||||
self._connect_to_address(socket, address, target)
|
||||
|
@ -18,7 +18,7 @@ from oslo_messaging._drivers.zmq_driver.client.publishers\
|
||||
import zmq_publisher_base
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._i18n import _LI, _LW
|
||||
from oslo_messaging._i18n import _LW
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -35,7 +35,7 @@ class PushPublisher(zmq_publisher_base.PublisherMultisend):
|
||||
if request.msg_type == zmq_names.CALL_TYPE:
|
||||
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
|
||||
|
||||
push_socket, hosts = self._check_hosts_connections(
|
||||
push_socket = self._check_hosts_connections(
|
||||
request.target, zmq_names.socket_type_str(zmq.PULL))
|
||||
|
||||
if not push_socket.connections:
|
||||
@ -53,6 +53,6 @@ class PushPublisher(zmq_publisher_base.PublisherMultisend):
|
||||
|
||||
super(PushPublisher, self)._send_request(socket, request)
|
||||
|
||||
LOG.info(_LI("Publishing message %(message)s to a target %(target)s")
|
||||
% {"message": request.message,
|
||||
"target": request.target})
|
||||
LOG.debug("Publishing message %(message)s to a target %(target)s"
|
||||
% {"message": request.message,
|
||||
"target": request.target})
|
||||
|
@ -82,8 +82,11 @@ class ReqPublisher(zmq_publisher_base.PublisherBase):
|
||||
def _receive_reply(socket, request):
|
||||
|
||||
def _receive_method(socket):
|
||||
return socket.recv_pyobj()
|
||||
reply = socket.recv_pyobj()
|
||||
LOG.debug("Received reply %s" % reply)
|
||||
return reply
|
||||
|
||||
LOG.debug("Start waiting reply")
|
||||
# NOTE(ozamiatin): Check for retry here (no retries now)
|
||||
with contextlib.closing(zmq_async.get_reply_poller()) as poller:
|
||||
poller.register(socket, recv_method=_receive_method)
|
||||
@ -91,7 +94,7 @@ class ReqPublisher(zmq_publisher_base.PublisherBase):
|
||||
if reply is None:
|
||||
raise oslo_messaging.MessagingTimeout(
|
||||
"Timeout %s seconds was reached" % request.timeout)
|
||||
LOG.info(_LI("Received reply %s") % reply)
|
||||
LOG.debug("Received reply %s" % reply)
|
||||
if reply[zmq_names.FIELD_FAILURE]:
|
||||
raise rpc_common.deserialize_remote_exception(
|
||||
reply[zmq_names.FIELD_FAILURE],
|
||||
@ -114,12 +117,12 @@ class ReqPublisherLight(ReqPublisher):
|
||||
|
||||
def _send_request(self, socket, request):
|
||||
|
||||
LOG.info(_LI("Sending %(type)s message_id %(message)s"
|
||||
" to a target %(target)s, host:%(host)s")
|
||||
% {"type": request.msg_type,
|
||||
"message": request.message_id,
|
||||
"target": request.target,
|
||||
"host": request.host})
|
||||
LOG.debug("Sending %(type)s message_id %(message)s"
|
||||
" to a target %(target)s, host:%(host)s"
|
||||
% {"type": request.msg_type,
|
||||
"message": request.message_id,
|
||||
"target": request.target,
|
||||
"host": request.host})
|
||||
|
||||
envelope = request.create_envelope()
|
||||
|
||||
|
@ -12,70 +12,33 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver.client.publishers\
|
||||
from oslo_messaging._drivers import common as rpc_common
|
||||
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
||||
import zmq_dealer_call_publisher
|
||||
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
||||
import zmq_dealer_publisher
|
||||
from oslo_messaging._drivers.zmq_driver.client.publishers\
|
||||
import zmq_req_publisher
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_request
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_client_base
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
class ZmqClient(object):
|
||||
class ZmqClient(zmq_client_base.ZmqClientBase):
|
||||
|
||||
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
|
||||
self.conf = conf
|
||||
self.context = zmq.Context()
|
||||
self.matchmaker = matchmaker
|
||||
self.allowed_remote_exmods = allowed_remote_exmods or []
|
||||
if conf.zmq_use_broker:
|
||||
raise rpc_common.RPCException("This client doesn't need proxy!")
|
||||
|
||||
self.dealer_publisher = None
|
||||
if self.conf.zmq_use_broker:
|
||||
self.dealer_publisher = zmq_dealer_publisher.DealerPublisherLight(
|
||||
conf, zmq_address.get_broker_address(self.conf))
|
||||
self.req_publisher_cls = zmq_req_publisher.ReqPublisherLight
|
||||
else:
|
||||
self.dealer_publisher = zmq_dealer_publisher.DealerPublisher(
|
||||
conf, matchmaker)
|
||||
self.req_publisher_cls = zmq_req_publisher.ReqPublisher
|
||||
super(ZmqClient, self).__init__(
|
||||
conf, matchmaker, allowed_remote_exmods,
|
||||
publishers={
|
||||
zmq_names.CALL_TYPE:
|
||||
zmq_dealer_call_publisher.DealerCallPublisher(
|
||||
conf, matchmaker),
|
||||
|
||||
def send_call(self, target, context, message, timeout=None, retry=None):
|
||||
with contextlib.closing(zmq_request.CallRequest(
|
||||
target, context=context, message=message,
|
||||
timeout=timeout, retry=retry,
|
||||
allowed_remote_exmods=self.allowed_remote_exmods)) as request:
|
||||
with contextlib.closing(self.req_publisher_cls(
|
||||
self.conf, self.matchmaker)) as req_publisher:
|
||||
return req_publisher.send_request(request)
|
||||
|
||||
def send_cast(self, target, context, message, timeout=None, retry=None):
|
||||
with contextlib.closing(zmq_request.CastRequest(
|
||||
target, context=context, message=message,
|
||||
timeout=timeout, retry=retry)) as request:
|
||||
self.dealer_publisher.send_request(request)
|
||||
|
||||
def send_fanout(self, target, context, message, timeout=None, retry=None):
|
||||
with contextlib.closing(zmq_request.FanoutRequest(
|
||||
target, context=context, message=message,
|
||||
timeout=timeout, retry=retry)) as request:
|
||||
self.dealer_publisher.send_request(request)
|
||||
|
||||
def send_notify(self, target, context, message, version, retry=None):
|
||||
with contextlib.closing(zmq_request.NotificationRequest(
|
||||
target, context, message, version=version,
|
||||
retry=retry)) as request:
|
||||
self.dealer_publisher.send_request(request)
|
||||
|
||||
def send_notify_fanout(self, target, context, message, version,
|
||||
retry=None):
|
||||
with contextlib.closing(zmq_request.NotificationFanoutRequest(
|
||||
target, context, message, version=version,
|
||||
retry=retry)) as request:
|
||||
self.dealer_publisher.send_request(request)
|
||||
|
||||
def cleanup(self):
|
||||
self.dealer_publisher.cleanup()
|
||||
"default": zmq_dealer_publisher.DealerPublisher(
|
||||
conf, matchmaker)
|
||||
}
|
||||
)
|
||||
|
77
oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py
Normal file
77
oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py
Normal file
@ -0,0 +1,77 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_request
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
class ZmqClientBase(object):
|
||||
|
||||
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None,
|
||||
publishers=None):
|
||||
self.conf = conf
|
||||
self.context = zmq.Context()
|
||||
self.matchmaker = matchmaker
|
||||
self.allowed_remote_exmods = allowed_remote_exmods or []
|
||||
|
||||
self.publishers = publishers
|
||||
self.call_publisher = publishers.get(zmq_names.CALL_TYPE) \
|
||||
or publishers["default"]
|
||||
self.cast_publisher = publishers.get(zmq_names.CAST_TYPE) \
|
||||
or publishers["default"]
|
||||
self.fanout_publisher = publishers.get(zmq_names.CAST_FANOUT_TYPE) \
|
||||
or publishers["default"]
|
||||
self.notify_publisher = publishers.get(zmq_names.NOTIFY_TYPE) \
|
||||
or publishers["default"]
|
||||
|
||||
def send_call(self, target, context, message, timeout=None, retry=None):
|
||||
with contextlib.closing(zmq_request.CallRequest(
|
||||
target, context=context, message=message,
|
||||
timeout=timeout, retry=retry,
|
||||
allowed_remote_exmods=self.allowed_remote_exmods)) as request:
|
||||
return self.call_publisher.send_request(request)
|
||||
|
||||
def send_cast(self, target, context, message, timeout=None, retry=None):
|
||||
with contextlib.closing(zmq_request.CastRequest(
|
||||
target, context=context, message=message,
|
||||
timeout=timeout, retry=retry)) as request:
|
||||
self.cast_publisher.send_request(request)
|
||||
|
||||
def send_fanout(self, target, context, message, timeout=None, retry=None):
|
||||
with contextlib.closing(zmq_request.FanoutRequest(
|
||||
target, context=context, message=message,
|
||||
timeout=timeout, retry=retry)) as request:
|
||||
self.fanout_publisher.send_request(request)
|
||||
|
||||
def send_notify(self, target, context, message, version, retry=None):
|
||||
with contextlib.closing(zmq_request.NotificationRequest(
|
||||
target, context, message, version=version,
|
||||
retry=retry)) as request:
|
||||
self.notify_publisher.send_request(request)
|
||||
|
||||
def send_notify_fanout(self, target, context, message, version,
|
||||
retry=None):
|
||||
with contextlib.closing(zmq_request.NotificationFanoutRequest(
|
||||
target, context, message, version=version,
|
||||
retry=retry)) as request:
|
||||
self.notify_publisher.send_request(request)
|
||||
|
||||
def cleanup(self):
|
||||
for publisher in self.publishers.values():
|
||||
publisher.cleanup()
|
@ -0,0 +1,46 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
from oslo_messaging._drivers import common as rpc_common
|
||||
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
||||
import zmq_dealer_call_publisher
|
||||
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
||||
import zmq_dealer_publisher
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_client_base
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
class ZmqClientLight(zmq_client_base.ZmqClientBase):
|
||||
|
||||
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
|
||||
if not conf.zmq_use_broker:
|
||||
raise rpc_common.RPCException(
|
||||
"This client needs proxy to be configured!")
|
||||
|
||||
super(ZmqClientLight, self).__init__(
|
||||
conf, matchmaker, allowed_remote_exmods,
|
||||
publishers={
|
||||
zmq_names.CALL_TYPE:
|
||||
zmq_dealer_call_publisher.DealerCallPublisher(
|
||||
conf, matchmaker),
|
||||
|
||||
"default": zmq_dealer_publisher.DealerPublisherLight(
|
||||
conf, zmq_address.get_broker_address(self.conf))
|
||||
}
|
||||
)
|
@ -38,6 +38,7 @@ class ThreadingPoller(zmq_poller.ZmqPoller):
|
||||
self.recv_methods = {}
|
||||
|
||||
def register(self, socket, recv_method=None):
|
||||
LOG.debug("Registering socket")
|
||||
if socket in self.recv_methods:
|
||||
return
|
||||
if recv_method is not None:
|
||||
@ -46,6 +47,8 @@ class ThreadingPoller(zmq_poller.ZmqPoller):
|
||||
|
||||
def poll(self, timeout=None):
|
||||
|
||||
LOG.debug("Entering poll method")
|
||||
|
||||
if timeout:
|
||||
timeout *= 1000 # zmq poller waits milliseconds
|
||||
|
||||
|
@ -21,7 +21,7 @@ from oslo_messaging._drivers import common as rpc_common
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_socket
|
||||
from oslo_messaging._i18n import _LE, _LI
|
||||
from oslo_messaging._i18n import _LE
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -44,10 +44,10 @@ class ConsumerBase(object):
|
||||
self.conf, self.context, socket_type)
|
||||
self.sockets.append(socket)
|
||||
self.poller.register(socket, self.receive_message)
|
||||
LOG.info(_LI("Run %(stype)s consumer on %(addr)s:%(port)d"),
|
||||
{"stype": zmq_names.socket_type_str(socket_type),
|
||||
"addr": socket.bind_address,
|
||||
"port": socket.port})
|
||||
LOG.debug("Run %(stype)s consumer on %(addr)s:%(port)d",
|
||||
{"stype": zmq_names.socket_type_str(socket_type),
|
||||
"addr": socket.bind_address,
|
||||
"port": socket.port})
|
||||
return socket
|
||||
except zmq.ZMQError as e:
|
||||
errmsg = _LE("Failed binding to port %(port)d: %(e)s")\
|
||||
|
@ -56,9 +56,9 @@ class PullConsumer(zmq_consumer_base.SingleSocketConsumer):
|
||||
assert msg_type is not None, 'Bad format: msg type expected'
|
||||
context = socket.recv_pyobj()
|
||||
message = socket.recv_pyobj()
|
||||
LOG.info(_LI("Received %(msg_type)s message %(msg)s")
|
||||
% {"msg_type": msg_type,
|
||||
"msg": str(message)})
|
||||
LOG.debug("Received %(msg_type)s message %(msg)s"
|
||||
% {"msg_type": msg_type,
|
||||
"msg": str(message)})
|
||||
|
||||
if msg_type in (zmq_names.CAST_TYPES + zmq_names.NOTIFY_TYPES):
|
||||
return PullIncomingMessage(self.server, context, message)
|
||||
|
@ -21,7 +21,7 @@ from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._i18n import _LE, _LI
|
||||
from oslo_messaging._i18n import _LE
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -43,7 +43,7 @@ class RouterIncomingMessage(base.IncomingMessage):
|
||||
"""Reply is not needed for non-call messages"""
|
||||
|
||||
def acknowledge(self):
|
||||
LOG.info("Not sending acknowledge for %s", self.msg_id)
|
||||
LOG.debug("Not sending acknowledge for %s", self.msg_id)
|
||||
|
||||
def requeue(self):
|
||||
"""Requeue is not supported"""
|
||||
@ -83,11 +83,11 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
|
||||
def receive_message(self, socket):
|
||||
try:
|
||||
request, reply_id = self._receive_request(socket)
|
||||
LOG.info(_LI("[%(host)s] Received %(type)s, %(id)s, %(target)s")
|
||||
% {"host": self.host,
|
||||
"type": request.msg_type,
|
||||
"id": request.message_id,
|
||||
"target": request.target})
|
||||
LOG.debug("[%(host)s] Received %(type)s, %(id)s, %(target)s"
|
||||
% {"host": self.host,
|
||||
"type": request.msg_type,
|
||||
"id": request.message_id,
|
||||
"target": request.target})
|
||||
|
||||
if request.msg_type == zmq_names.CALL_TYPE:
|
||||
return zmq_incoming_message.ZmqIncomingRequest(
|
||||
|
@ -45,9 +45,10 @@ class ZmqIncomingRequest(base.IncomingMessage):
|
||||
zmq_names.FIELD_REPLY: reply,
|
||||
zmq_names.FIELD_FAILURE: failure,
|
||||
zmq_names.FIELD_LOG_FAILURE: log_failure,
|
||||
zmq_names.FIELD_ID: self.request.proxy_reply_id}
|
||||
zmq_names.FIELD_ID: self.request.proxy_reply_id,
|
||||
zmq_names.FIELD_MSG_ID: self.request.message_id}
|
||||
|
||||
LOG.info("Replying %s REP", (str(self.request.message_id)))
|
||||
LOG.debug("Replying %s", (str(self.request.message_id)))
|
||||
|
||||
self.received = True
|
||||
self.reply_socket.send(self.reply_id, zmq.SNDMORE)
|
||||
|
@ -80,3 +80,13 @@ def _raise_error_if_invalid_config_value(zmq_concurrency):
|
||||
if zmq_concurrency not in ZMQ_MODULES:
|
||||
errmsg = _('Invalid zmq_concurrency value: %s')
|
||||
raise ValueError(errmsg % zmq_concurrency)
|
||||
|
||||
|
||||
def get_queue(zmq_concurrency='eventlet'):
|
||||
_raise_error_if_invalid_config_value(zmq_concurrency)
|
||||
if zmq_concurrency == 'eventlet' and _is_eventlet_zmq_available():
|
||||
import eventlet
|
||||
return eventlet.queue.Queue(), eventlet.queue.Empty
|
||||
else:
|
||||
import six
|
||||
return six.moves.queue.Queue(), six.moves.queue.Empty
|
||||
|
@ -47,6 +47,9 @@ class ZmqSocket(object):
|
||||
def setsockopt(self, *args, **kwargs):
|
||||
self.handle.setsockopt(*args, **kwargs)
|
||||
|
||||
def setsockopt_string(self, *args, **kwargs):
|
||||
self.handle.setsockopt_string(*args, **kwargs)
|
||||
|
||||
def send(self, *args, **kwargs):
|
||||
self.handle.send(*args, **kwargs)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user