838ae8f47c
The zerorpc package is a light-weight, reliable and language-agnostic library for distributed communication between server-side processes. It builds on top of ZeroMQ and MessagePack. This package is required for sysinv ZeroMQ-based RPC backend [1]. TEST PLAN: PASS: Verify STX Debian builds properly PASS: Verify STX Debian deploys properly PASS: Verify zerorpc-python package was properly installed PASS: Verify syinv processes runs properly [1] https://review.opendev.org/c/starlingx/config/+/859571 Story: 2010087 Task: 46794 Signed-off-by: Alyson Deives Pereira <alyson.deivespereira@windriver.com> Change-Id: I62565e2ce39c0bed63506bfcabf909d5cf186ec1
2737 lines
92 KiB
Diff
2737 lines
92 KiB
Diff
From 68d1ba0a7157850ecdee09d64063f4c004faeed5 Mon Sep 17 00:00:00 2001
|
|
From: Alyson Deives Pereira <alyson.deivespereira@windriver.com>
|
|
Date: Wed, 20 Jul 2022 12:26:15 -0300
|
|
Subject: [PATCH] Use eventlet instead of gevent
|
|
|
|
However, the following tests are still failing:
|
|
|
|
FAILED tests/test_middleware.py::test_server_inspect_exception_middleware - zerorpc.exceptions.LostRemote: Lost remote after 10s heartbeat
|
|
FAILED tests/test_middleware_before_after_exec.py::test_hook_server_before_exec_puller - AssertionError: assert 'echo: test' == 'echo: test with a middleware'
|
|
FAILED tests/test_middleware_before_after_exec.py::test_hook_server_after_exec_puller - AssertionError: assert 'echo: test' == 'echo: test with a middleware'
|
|
FAILED tests/test_pubpush.py::test_pubsub_inheritance - RuntimeError: The subscriber didn't receive any published message
|
|
FAILED tests/test_pubpush.py::test_pubsub_composite - RuntimeError: The subscriber didn't receive any published message
|
|
|
|
Signed-off-by: Alyson Deives Pereira <alyson.deivespereira@windriver.com>
|
|
---
|
|
setup.py | 13 +-
|
|
tests/test_buffered_channel.py | 48 ++--
|
|
tests/test_client.py | 6 +-
|
|
tests/test_client_async.py | 16 +-
|
|
tests/test_client_heartbeat.py | 40 +--
|
|
tests/test_heartbeat.py | 46 ++--
|
|
tests/test_middleware.py | 88 +++---
|
|
tests/test_middleware_before_after_exec.py | 81 ++++--
|
|
tests/test_middleware_client.py | 76 ++++--
|
|
tests/test_pubpush.py | 35 ++-
|
|
tests/test_reqstream.py | 6 +-
|
|
tests/test_server.py | 18 +-
|
|
tests/test_zmq.py | 8 +-
|
|
tests/zmqbug.py | 270 ++++++++++++++----
|
|
tox.ini | 2 +-
|
|
zerorpc/channel.py | 30 +-
|
|
zerorpc/context.py | 2 +-
|
|
zerorpc/core.py | 32 +--
|
|
zerorpc/events.py | 42 ++-
|
|
zerorpc/gevent_zmq.py | 301 +++++++++++++--------
|
|
zerorpc/heartbeat.py | 20 +-
|
|
21 files changed, 748 insertions(+), 432 deletions(-)
|
|
|
|
diff --git a/setup.py b/setup.py
|
|
index b07ebcb..d57ddcb 100644
|
|
--- a/setup.py
|
|
+++ b/setup.py
|
|
@@ -44,12 +44,13 @@ requirements = [
|
|
if sys.version_info < (2, 7):
|
|
requirements.append('argparse')
|
|
|
|
-if sys.version_info < (2, 7):
|
|
- requirements.append('gevent>=1.1.0,<1.2.0')
|
|
-elif sys.version_info < (3, 0):
|
|
- requirements.append('gevent>=1.0')
|
|
-else:
|
|
- requirements.append('gevent>=1.1')
|
|
+# if sys.version_info < (2, 7):
|
|
+# requirements.append('gevent>=1.1.0,<1.2.0')
|
|
+# elif sys.version_info < (3, 0):
|
|
+# requirements.append('gevent>=1.0')
|
|
+# else:
|
|
+# requirements.append('gevent>=1.1')
|
|
+requirements.append('eventlet>=0.24.1')
|
|
|
|
with open("README.rst", "r") as fh:
|
|
long_description = fh.read()
|
|
diff --git a/tests/test_buffered_channel.py b/tests/test_buffered_channel.py
|
|
index 20b8173..f94152c 100644
|
|
--- a/tests/test_buffered_channel.py
|
|
+++ b/tests/test_buffered_channel.py
|
|
@@ -28,7 +28,7 @@ from __future__ import absolute_import
|
|
from builtins import range
|
|
|
|
import pytest
|
|
-import gevent
|
|
+import eventlet
|
|
import sys
|
|
|
|
from zerorpc import zmq
|
|
@@ -57,7 +57,7 @@ def test_close_server_bufchan():
|
|
server_bufchan = zerorpc.BufferedChannel(server_hbchan)
|
|
server_bufchan.recv()
|
|
|
|
- gevent.sleep(TIME_FACTOR * 3)
|
|
+ eventlet.sleep(TIME_FACTOR * 3)
|
|
print('CLOSE SERVER SOCKET!!!')
|
|
server_bufchan.close()
|
|
if sys.version_info < (2, 7):
|
|
@@ -92,7 +92,7 @@ def test_close_client_bufchan():
|
|
server_bufchan = zerorpc.BufferedChannel(server_hbchan)
|
|
server_bufchan.recv()
|
|
|
|
- gevent.sleep(TIME_FACTOR * 3)
|
|
+ eventlet.sleep(TIME_FACTOR * 3)
|
|
print('CLOSE CLIENT SOCKET!!!')
|
|
client_bufchan.close()
|
|
if sys.version_info < (2, 7):
|
|
@@ -125,7 +125,7 @@ def test_heartbeat_can_open_channel_server_close():
|
|
server_hbchan = zerorpc.HeartBeatOnChannel(server_channel, freq=TIME_FACTOR * 2)
|
|
server_bufchan = zerorpc.BufferedChannel(server_hbchan)
|
|
|
|
- gevent.sleep(TIME_FACTOR * 3)
|
|
+ eventlet.sleep(TIME_FACTOR * 3)
|
|
print('CLOSE SERVER SOCKET!!!')
|
|
server_bufchan.close()
|
|
if sys.version_info < (2, 7):
|
|
@@ -160,12 +160,12 @@ def test_heartbeat_can_open_channel_client_close():
|
|
server_bufchan = zerorpc.BufferedChannel(server_hbchan)
|
|
try:
|
|
while True:
|
|
- gevent.sleep(1)
|
|
+ eventlet.sleep(1)
|
|
finally:
|
|
server_bufchan.close()
|
|
- server_coro = gevent.spawn(server_fn)
|
|
+ server_coro = eventlet.spawn(server_fn)
|
|
|
|
- gevent.sleep(TIME_FACTOR * 3)
|
|
+ eventlet.sleep(TIME_FACTOR * 3)
|
|
print('CLOSE CLIENT SOCKET!!!')
|
|
client_bufchan.close()
|
|
client.close()
|
|
@@ -173,7 +173,7 @@ def test_heartbeat_can_open_channel_client_close():
|
|
pytest.raises(zerorpc.LostRemote, server_coro.get)
|
|
else:
|
|
with pytest.raises(zerorpc.LostRemote):
|
|
- server_coro.get()
|
|
+ server_coro.wait()
|
|
print('SERVER LOST CLIENT :)')
|
|
server.close()
|
|
|
|
@@ -200,7 +200,7 @@ def test_do_some_req_rep():
|
|
assert list(event.args) == [x + x * x]
|
|
client_bufchan.close()
|
|
|
|
- coro_pool = gevent.pool.Pool()
|
|
+ coro_pool = eventlet.greenpool.GreenPool()
|
|
coro_pool.spawn(client_do)
|
|
|
|
def server_do():
|
|
@@ -217,7 +217,7 @@ def test_do_some_req_rep():
|
|
|
|
coro_pool.spawn(server_do)
|
|
|
|
- coro_pool.join()
|
|
+ coro_pool.waitall()
|
|
client.close()
|
|
server.close()
|
|
|
|
@@ -250,7 +250,7 @@ def test_do_some_req_rep_lost_server():
|
|
client_bufchan.recv()
|
|
client_bufchan.close()
|
|
|
|
- coro_pool = gevent.pool.Pool()
|
|
+ coro_pool = eventlet.greenpool.GreenPool()
|
|
coro_pool.spawn(client_do)
|
|
|
|
def server_do():
|
|
@@ -266,7 +266,7 @@ def test_do_some_req_rep_lost_server():
|
|
|
|
coro_pool.spawn(server_do)
|
|
|
|
- coro_pool.join()
|
|
+ coro_pool.waitall()
|
|
client.close()
|
|
server.close()
|
|
|
|
@@ -293,7 +293,7 @@ def test_do_some_req_rep_lost_client():
|
|
assert list(event.args) == [x + x * x]
|
|
client_bufchan.close()
|
|
|
|
- coro_pool = gevent.pool.Pool()
|
|
+ coro_pool = eventlet.greenpool.GreenPool()
|
|
coro_pool.spawn(client_do)
|
|
|
|
def server_do():
|
|
@@ -316,7 +316,7 @@ def test_do_some_req_rep_lost_client():
|
|
|
|
coro_pool.spawn(server_do)
|
|
|
|
- coro_pool.join()
|
|
+ coro_pool.waitall()
|
|
client.close()
|
|
server.close()
|
|
|
|
@@ -353,7 +353,7 @@ def test_do_some_req_rep_client_timeout():
|
|
assert list(event.args) == [x]
|
|
client_bufchan.close()
|
|
|
|
- coro_pool = gevent.pool.Pool()
|
|
+ coro_pool = eventlet.greenpool.GreenPool()
|
|
coro_pool.spawn(client_do)
|
|
|
|
def server_do():
|
|
@@ -367,7 +367,7 @@ def test_do_some_req_rep_client_timeout():
|
|
for x in range(20):
|
|
event = server_bufchan.recv()
|
|
assert event.name == 'sleep'
|
|
- gevent.sleep(TIME_FACTOR * event.args[0])
|
|
+ eventlet.sleep(TIME_FACTOR * event.args[0])
|
|
server_bufchan.emit('OK', event.args)
|
|
pytest.raises(zerorpc.LostRemote, _do_with_assert_raises)
|
|
else:
|
|
@@ -375,14 +375,14 @@ def test_do_some_req_rep_client_timeout():
|
|
for x in range(20):
|
|
event = server_bufchan.recv()
|
|
assert event.name == 'sleep'
|
|
- gevent.sleep(TIME_FACTOR * event.args[0])
|
|
+ eventlet.sleep(TIME_FACTOR * event.args[0])
|
|
server_bufchan.emit('OK', event.args)
|
|
server_bufchan.close()
|
|
|
|
|
|
coro_pool.spawn(server_do)
|
|
|
|
- coro_pool.join()
|
|
+ coro_pool.waitall()
|
|
client.close()
|
|
server.close()
|
|
|
|
@@ -410,7 +410,7 @@ def test_congestion_control_server_pushing():
|
|
read_cnt.value += 1
|
|
client_bufchan.close()
|
|
|
|
- coro_pool = gevent.pool.Pool()
|
|
+ coro_pool = eventlet.greenpool.GreenPool()
|
|
coro_pool.spawn(client_do)
|
|
|
|
def server_do():
|
|
@@ -443,7 +443,7 @@ def test_congestion_control_server_pushing():
|
|
|
|
coro_pool.spawn(server_do)
|
|
try:
|
|
- coro_pool.join()
|
|
+ coro_pool.waitall()
|
|
except zerorpc.LostRemote:
|
|
pass
|
|
finally:
|
|
@@ -485,7 +485,7 @@ def test_on_close_if():
|
|
if event.name == 'done':
|
|
return
|
|
seen.append(event.args)
|
|
- gevent.sleep(0.1)
|
|
+ eventlet.sleep(0.1)
|
|
|
|
def server_do():
|
|
for i in range(0, 10):
|
|
@@ -494,12 +494,12 @@ def test_on_close_if():
|
|
|
|
client_bufchan.on_close_if = is_stream_done
|
|
|
|
- coro_pool = gevent.pool.Pool()
|
|
+ coro_pool = eventlet.greenpool.GreenPool()
|
|
g1 = coro_pool.spawn(client_do)
|
|
g2 = coro_pool.spawn(server_do)
|
|
|
|
- g1.get() # Re-raise any exceptions...
|
|
- g2.get()
|
|
+ g1.wait() # Re-raise any exceptions...
|
|
+ g2.wait()
|
|
|
|
assert seen == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
|
|
|
|
diff --git a/tests/test_client.py b/tests/test_client.py
|
|
index 6a692b3..b9e3be3 100644
|
|
--- a/tests/test_client.py
|
|
+++ b/tests/test_client.py
|
|
@@ -24,7 +24,7 @@
|
|
|
|
|
|
from __future__ import absolute_import
|
|
-import gevent
|
|
+import eventlet
|
|
|
|
import zerorpc
|
|
from .testutils import teardown, random_ipc_endpoint
|
|
@@ -39,7 +39,7 @@ def test_client_connect():
|
|
|
|
srv = MySrv()
|
|
srv.bind(endpoint)
|
|
- gevent.spawn(srv.run)
|
|
+ eventlet.spawn(srv.run)
|
|
|
|
client = zerorpc.Client()
|
|
client.connect(endpoint)
|
|
@@ -56,7 +56,7 @@ def test_client_quick_connect():
|
|
|
|
srv = MySrv()
|
|
srv.bind(endpoint)
|
|
- gevent.spawn(srv.run)
|
|
+ eventlet.spawn(srv.run)
|
|
|
|
client = zerorpc.Client(endpoint)
|
|
|
|
diff --git a/tests/test_client_async.py b/tests/test_client_async.py
|
|
index ced4b1f..a2272ff 100644
|
|
--- a/tests/test_client_async.py
|
|
+++ b/tests/test_client_async.py
|
|
@@ -26,7 +26,7 @@
|
|
from __future__ import print_function
|
|
from __future__ import absolute_import
|
|
import pytest
|
|
-import gevent
|
|
+import eventlet
|
|
import sys
|
|
|
|
from zerorpc import zmq
|
|
@@ -43,12 +43,12 @@ def test_client_server_client_timeout_with_async():
|
|
return 42
|
|
|
|
def add(self, a, b):
|
|
- gevent.sleep(TIME_FACTOR * 10)
|
|
+ eventlet.sleep(TIME_FACTOR * 10)
|
|
return a + b
|
|
|
|
srv = MySrv()
|
|
srv.bind(endpoint)
|
|
- gevent.spawn(srv.run)
|
|
+ eventlet.spawn(srv.run)
|
|
|
|
client = zerorpc.Client(timeout=TIME_FACTOR * 2)
|
|
client.connect(endpoint)
|
|
@@ -57,11 +57,11 @@ def test_client_server_client_timeout_with_async():
|
|
|
|
if sys.version_info < (2, 7):
|
|
def _do_with_assert_raises():
|
|
- print(async_result.get())
|
|
+ print(async_result.wait())
|
|
pytest.raises(zerorpc.TimeoutExpired, _do_with_assert_raises)
|
|
else:
|
|
with pytest.raises(zerorpc.TimeoutExpired):
|
|
- print(async_result.get())
|
|
+ print(async_result.wait())
|
|
client.close()
|
|
srv.close()
|
|
|
|
@@ -79,13 +79,13 @@ def test_client_server_with_async():
|
|
|
|
srv = MySrv()
|
|
srv.bind(endpoint)
|
|
- gevent.spawn(srv.run)
|
|
+ eventlet.spawn(srv.run)
|
|
|
|
client = zerorpc.Client()
|
|
client.connect(endpoint)
|
|
|
|
async_result = client.lolita(async_=True)
|
|
- assert async_result.get() == 42
|
|
+ assert async_result.wait() == 42
|
|
|
|
async_result = client.add(1, 4, async_=True)
|
|
- assert async_result.get() == 5
|
|
+ assert async_result.wait() == 5
|
|
diff --git a/tests/test_client_heartbeat.py b/tests/test_client_heartbeat.py
|
|
index 6b552a4..908c866 100644
|
|
--- a/tests/test_client_heartbeat.py
|
|
+++ b/tests/test_client_heartbeat.py
|
|
@@ -28,7 +28,7 @@ from __future__ import absolute_import
|
|
from builtins import next
|
|
from builtins import range
|
|
|
|
-import gevent
|
|
+import eventlet
|
|
|
|
import zerorpc
|
|
from .testutils import teardown, random_ipc_endpoint, TIME_FACTOR
|
|
@@ -43,11 +43,11 @@ def test_client_server_hearbeat():
|
|
return 42
|
|
|
|
def slow(self):
|
|
- gevent.sleep(TIME_FACTOR * 10)
|
|
+ eventlet.sleep(TIME_FACTOR * 10)
|
|
|
|
srv = MySrv(heartbeat=TIME_FACTOR * 1)
|
|
srv.bind(endpoint)
|
|
- gevent.spawn(srv.run)
|
|
+ eventlet.spawn(srv.run)
|
|
|
|
client = zerorpc.Client(heartbeat=TIME_FACTOR * 1)
|
|
client.connect(endpoint)
|
|
@@ -62,13 +62,13 @@ def test_client_server_activate_heartbeat():
|
|
class MySrv(zerorpc.Server):
|
|
|
|
def lolita(self):
|
|
- gevent.sleep(TIME_FACTOR * 3)
|
|
+ eventlet.sleep(TIME_FACTOR * 3)
|
|
return 42
|
|
|
|
srv = MySrv(heartbeat=TIME_FACTOR * 4)
|
|
srv.bind(endpoint)
|
|
- gevent.spawn(srv.run)
|
|
- gevent.sleep(0)
|
|
+ eventlet.spawn(srv.run)
|
|
+ eventlet.sleep(0)
|
|
|
|
client = zerorpc.Client(heartbeat=TIME_FACTOR * 4)
|
|
client.connect(endpoint)
|
|
@@ -86,13 +86,13 @@ def test_client_server_passive_hearbeat():
|
|
return 42
|
|
|
|
def slow(self):
|
|
- gevent.sleep(TIME_FACTOR * 3)
|
|
+ eventlet.sleep(TIME_FACTOR * 3)
|
|
return 2
|
|
|
|
srv = MySrv(heartbeat=TIME_FACTOR * 4)
|
|
srv.bind(endpoint)
|
|
- gevent.spawn(srv.run)
|
|
- gevent.sleep(0)
|
|
+ eventlet.spawn(srv.run)
|
|
+ eventlet.sleep(0)
|
|
|
|
client = zerorpc.Client(heartbeat=TIME_FACTOR * 4, passive_heartbeat=True)
|
|
client.connect(endpoint)
|
|
@@ -112,16 +112,16 @@ def test_client_hb_doesnt_linger_on_streaming():
|
|
|
|
srv = MySrv(heartbeat=TIME_FACTOR * 1, context=zerorpc.Context())
|
|
srv.bind(endpoint)
|
|
- gevent.spawn(srv.run)
|
|
+ eventlet.spawn(srv.run)
|
|
|
|
client1 = zerorpc.Client(endpoint, heartbeat=TIME_FACTOR * 1, context=zerorpc.Context())
|
|
|
|
def test_client():
|
|
assert list(client1.iter()) == list(range(42))
|
|
print('sleep 3s')
|
|
- gevent.sleep(TIME_FACTOR * 3)
|
|
+ eventlet.sleep(TIME_FACTOR * 3)
|
|
|
|
- gevent.spawn(test_client).join()
|
|
+ eventlet.spawn(test_client).wait()
|
|
|
|
|
|
def est_client_drop_few():
|
|
@@ -134,7 +134,7 @@ def est_client_drop_few():
|
|
|
|
srv = MySrv(heartbeat=TIME_FACTOR * 1, context=zerorpc.Context())
|
|
srv.bind(endpoint)
|
|
- gevent.spawn(srv.run)
|
|
+ eventlet.spawn(srv.run)
|
|
|
|
client1 = zerorpc.Client(endpoint, heartbeat=TIME_FACTOR * 1, context=zerorpc.Context())
|
|
client2 = zerorpc.Client(endpoint, heartbeat=TIME_FACTOR * 1, context=zerorpc.Context())
|
|
@@ -143,7 +143,7 @@ def est_client_drop_few():
|
|
assert client1.lolita() == 42
|
|
assert client2.lolita() == 42
|
|
|
|
- gevent.sleep(TIME_FACTOR * 3)
|
|
+ eventlet.sleep(TIME_FACTOR * 3)
|
|
assert client3.lolita() == 42
|
|
|
|
|
|
@@ -158,7 +158,7 @@ def test_client_drop_empty_stream():
|
|
|
|
srv = MySrv(heartbeat=TIME_FACTOR * 1, context=zerorpc.Context())
|
|
srv.bind(endpoint)
|
|
- gevent.spawn(srv.run)
|
|
+ eventlet.spawn(srv.run)
|
|
|
|
client1 = zerorpc.Client(endpoint, heartbeat=TIME_FACTOR * 1, context=zerorpc.Context())
|
|
|
|
@@ -167,9 +167,9 @@ def test_client_drop_empty_stream():
|
|
i = client1.iter()
|
|
|
|
print('sleep 3s')
|
|
- gevent.sleep(TIME_FACTOR * 3)
|
|
+ eventlet.sleep(TIME_FACTOR * 3)
|
|
|
|
- gevent.spawn(test_client).join()
|
|
+ eventlet.spawn(test_client).wait()
|
|
|
|
|
|
def test_client_drop_stream():
|
|
@@ -183,7 +183,7 @@ def test_client_drop_stream():
|
|
|
|
srv = MySrv(heartbeat=TIME_FACTOR * 1, context=zerorpc.Context())
|
|
srv.bind(endpoint)
|
|
- gevent.spawn(srv.run)
|
|
+ eventlet.spawn(srv.run)
|
|
|
|
client1 = zerorpc.Client(endpoint, heartbeat=TIME_FACTOR * 1, context=zerorpc.Context())
|
|
|
|
@@ -195,6 +195,6 @@ def test_client_drop_stream():
|
|
assert list(next(i) for x in range(142)) == list(range(142))
|
|
|
|
print('sleep 3s')
|
|
- gevent.sleep(TIME_FACTOR * 3)
|
|
+ eventlet.sleep(TIME_FACTOR * 3)
|
|
|
|
- gevent.spawn(test_client).join()
|
|
+ eventlet.spawn(test_client).wait()
|
|
diff --git a/tests/test_heartbeat.py b/tests/test_heartbeat.py
|
|
index 14c66fd..c34d204 100644
|
|
--- a/tests/test_heartbeat.py
|
|
+++ b/tests/test_heartbeat.py
|
|
@@ -28,7 +28,7 @@ from __future__ import absolute_import
|
|
from builtins import range
|
|
|
|
import pytest
|
|
-import gevent
|
|
+import eventlet
|
|
import sys
|
|
|
|
from zerorpc import zmq
|
|
@@ -55,7 +55,7 @@ def test_close_server_hbchan():
|
|
server_hbchan = zerorpc.HeartBeatOnChannel(server_channel, freq=TIME_FACTOR * 2)
|
|
server_hbchan.recv()
|
|
|
|
- gevent.sleep(TIME_FACTOR * 3)
|
|
+ eventlet.sleep(TIME_FACTOR * 3)
|
|
print('CLOSE SERVER SOCKET!!!')
|
|
server_hbchan.close()
|
|
if sys.version_info < (2, 7):
|
|
@@ -88,7 +88,7 @@ def test_close_client_hbchan():
|
|
server_hbchan = zerorpc.HeartBeatOnChannel(server_channel, freq=TIME_FACTOR * 2)
|
|
server_hbchan.recv()
|
|
|
|
- gevent.sleep(TIME_FACTOR * 3)
|
|
+ eventlet.sleep(TIME_FACTOR * 3)
|
|
print('CLOSE CLIENT SOCKET!!!')
|
|
client_hbchan.close()
|
|
if sys.version_info < (2, 7):
|
|
@@ -119,7 +119,7 @@ def test_heartbeat_can_open_channel_server_close():
|
|
server_channel = server.channel(event)
|
|
server_hbchan = zerorpc.HeartBeatOnChannel(server_channel, freq=TIME_FACTOR * 2)
|
|
|
|
- gevent.sleep(TIME_FACTOR * 3)
|
|
+ eventlet.sleep(TIME_FACTOR * 3)
|
|
print('CLOSE SERVER SOCKET!!!')
|
|
server_hbchan.close()
|
|
if sys.version_info < (2, 7):
|
|
@@ -150,7 +150,7 @@ def test_heartbeat_can_open_channel_client_close():
|
|
server_channel = server.channel(event)
|
|
server_hbchan = zerorpc.HeartBeatOnChannel(server_channel, freq=TIME_FACTOR * 2)
|
|
|
|
- gevent.sleep(TIME_FACTOR * 3)
|
|
+ eventlet.sleep(TIME_FACTOR * 3)
|
|
print('CLOSE CLIENT SOCKET!!!')
|
|
client_hbchan.close()
|
|
client.close()
|
|
@@ -189,7 +189,7 @@ def test_do_some_req_rep():
|
|
assert list(event.args) == [x + x * x]
|
|
client_hbchan.close()
|
|
|
|
- client_task = gevent.spawn(client_do)
|
|
+ client_task = eventlet.spawn(client_do)
|
|
|
|
def server_do():
|
|
for x in range(20):
|
|
@@ -198,10 +198,10 @@ def test_do_some_req_rep():
|
|
server_hbchan.emit('OK', (sum(event.args),))
|
|
server_hbchan.close()
|
|
|
|
- server_task = gevent.spawn(server_do)
|
|
+ server_task = eventlet.spawn(server_do)
|
|
|
|
- server_task.get()
|
|
- client_task.get()
|
|
+ server_task.wait()
|
|
+ client_task.wait()
|
|
client.close()
|
|
server.close()
|
|
|
|
@@ -233,7 +233,7 @@ def test_do_some_req_rep_lost_server():
|
|
client_hbchan.recv()
|
|
client_hbchan.close()
|
|
|
|
- client_task = gevent.spawn(client_do)
|
|
+ client_task = eventlet.spawn(client_do)
|
|
|
|
def server_do():
|
|
event = server.recv()
|
|
@@ -245,10 +245,10 @@ def test_do_some_req_rep_lost_server():
|
|
server_hbchan.emit('OK', (sum(event.args),))
|
|
server_hbchan.close()
|
|
|
|
- server_task = gevent.spawn(server_do)
|
|
+ server_task = eventlet.spawn(server_do)
|
|
|
|
- server_task.get()
|
|
- client_task.get()
|
|
+ server_task.wait()
|
|
+ client_task.wait()
|
|
client.close()
|
|
server.close()
|
|
|
|
@@ -274,7 +274,7 @@ def test_do_some_req_rep_lost_client():
|
|
assert list(event.args) == [x + x * x]
|
|
client_hbchan.close()
|
|
|
|
- client_task = gevent.spawn(client_do)
|
|
+ client_task = eventlet.spawn(client_do)
|
|
|
|
def server_do():
|
|
event = server.recv()
|
|
@@ -293,10 +293,10 @@ def test_do_some_req_rep_lost_client():
|
|
server_hbchan.recv()
|
|
server_hbchan.close()
|
|
|
|
- server_task = gevent.spawn(server_do)
|
|
+ server_task = eventlet.spawn(server_do)
|
|
|
|
- server_task.get()
|
|
- client_task.get()
|
|
+ server_task.wait()
|
|
+ client_task.wait()
|
|
client.close()
|
|
server.close()
|
|
|
|
@@ -332,7 +332,7 @@ def test_do_some_req_rep_client_timeout():
|
|
assert list(event.args) == [x]
|
|
client_hbchan.close()
|
|
|
|
- client_task = gevent.spawn(client_do)
|
|
+ client_task = eventlet.spawn(client_do)
|
|
|
|
def server_do():
|
|
event = server.recv()
|
|
@@ -344,7 +344,7 @@ def test_do_some_req_rep_client_timeout():
|
|
for x in range(20):
|
|
event = server_hbchan.recv()
|
|
assert event.name == 'sleep'
|
|
- gevent.sleep(TIME_FACTOR * event.args[0])
|
|
+ eventlet.sleep(TIME_FACTOR * event.args[0])
|
|
server_hbchan.emit('OK', event.args)
|
|
pytest.raises(zerorpc.LostRemote, _do_with_assert_raises)
|
|
else:
|
|
@@ -352,13 +352,13 @@ def test_do_some_req_rep_client_timeout():
|
|
for x in range(20):
|
|
event = server_hbchan.recv()
|
|
assert event.name == 'sleep'
|
|
- gevent.sleep(TIME_FACTOR * event.args[0])
|
|
+ eventlet.sleep(TIME_FACTOR * event.args[0])
|
|
server_hbchan.emit('OK', event.args)
|
|
server_hbchan.close()
|
|
|
|
- server_task = gevent.spawn(server_do)
|
|
+ server_task = eventlet.spawn(server_do)
|
|
|
|
- server_task.get()
|
|
- client_task.get()
|
|
+ server_task.wait()
|
|
+ client_task.wait()
|
|
client.close()
|
|
server.close()
|
|
diff --git a/tests/test_middleware.py b/tests/test_middleware.py
|
|
index 3163a3a..12ba899 100644
|
|
--- a/tests/test_middleware.py
|
|
+++ b/tests/test_middleware.py
|
|
@@ -26,11 +26,13 @@
|
|
from __future__ import print_function
|
|
from __future__ import absolute_import
|
|
from builtins import str
|
|
+
|
|
+import greenlet
|
|
from future.utils import tobytes
|
|
|
|
import pytest
|
|
-import gevent
|
|
-import gevent.local
|
|
+import eventlet
|
|
+import eventlet.corolocal
|
|
import random
|
|
import hashlib
|
|
import sys
|
|
@@ -109,7 +111,7 @@ def test_resolve_endpoint_events():
|
|
cnt = c.register_middleware(Resolver())
|
|
assert cnt == 1
|
|
srv.bind('some_service')
|
|
- gevent.spawn(srv.run)
|
|
+ eventlet.spawn(srv.run)
|
|
|
|
client = zerorpc.Client(heartbeat=TIME_FACTOR * 1, context=c)
|
|
client.connect('some_service')
|
|
@@ -123,7 +125,7 @@ class Tracer(object):
|
|
'''Used by test_task_context_* tests'''
|
|
def __init__(self, identity):
|
|
self._identity = identity
|
|
- self._locals = gevent.local.local()
|
|
+ self._locals = eventlet.corolocal.local()
|
|
self._log = []
|
|
|
|
@property
|
|
@@ -169,7 +171,7 @@ def test_task_context():
|
|
|
|
srv = zerorpc.Server(Srv(), context=srv_ctx)
|
|
srv.bind(endpoint)
|
|
- srv_task = gevent.spawn(srv.run)
|
|
+ srv_task = eventlet.spawn(srv.run)
|
|
|
|
c = zerorpc.Client(context=cli_ctx)
|
|
c.connect(endpoint)
|
|
@@ -179,7 +181,10 @@ def test_task_context():
|
|
assert x == 42
|
|
|
|
srv.stop()
|
|
- srv_task.join()
|
|
+ try:
|
|
+ srv_task.wait()
|
|
+ except greenlet.GreenletExit:
|
|
+ pass
|
|
|
|
assert cli_tracer._log == [
|
|
('new', cli_tracer.trace_id),
|
|
@@ -212,7 +217,7 @@ def test_task_context_relay():
|
|
|
|
srv = zerorpc.Server(Srv(), context=srv_ctx)
|
|
srv.bind(endpoint1)
|
|
- srv_task = gevent.spawn(srv.run)
|
|
+ srv_task = eventlet.spawn(srv.run)
|
|
|
|
c_relay = zerorpc.Client(context=srv_relay_ctx)
|
|
c_relay.connect(endpoint1)
|
|
@@ -223,7 +228,7 @@ def test_task_context_relay():
|
|
|
|
srv_relay = zerorpc.Server(SrvRelay(), context=srv_relay_ctx)
|
|
srv_relay.bind(endpoint2)
|
|
- srv_relay_task = gevent.spawn(srv_relay.run)
|
|
+ srv_relay_task = eventlet.spawn(srv_relay.run)
|
|
|
|
c = zerorpc.Client(context=cli_ctx)
|
|
c.connect(endpoint2)
|
|
@@ -232,8 +237,14 @@ def test_task_context_relay():
|
|
|
|
srv_relay.stop()
|
|
srv.stop()
|
|
- srv_relay_task.join()
|
|
- srv_task.join()
|
|
+ try:
|
|
+ srv_relay_task.wait()
|
|
+ except greenlet.GreenletExit:
|
|
+ pass
|
|
+ try:
|
|
+ srv_task.wait()
|
|
+ except greenlet.GreenletExit:
|
|
+ pass
|
|
|
|
assert cli_tracer._log == [
|
|
('new', cli_tracer.trace_id),
|
|
@@ -268,7 +279,7 @@ def test_task_context_relay_fork():
|
|
|
|
srv = zerorpc.Server(Srv(), context=srv_ctx)
|
|
srv.bind(endpoint1)
|
|
- srv_task = gevent.spawn(srv.run)
|
|
+ srv_task = eventlet.spawn(srv.run)
|
|
|
|
c_relay = zerorpc.Client(context=srv_relay_ctx)
|
|
c_relay.connect(endpoint1)
|
|
@@ -277,16 +288,16 @@ def test_task_context_relay_fork():
|
|
def echo(self, msg):
|
|
def dothework(msg):
|
|
return c_relay.echo(msg) + 'relayed'
|
|
- g = gevent.spawn(zerorpc.fork_task_context(dothework,
|
|
+ g = eventlet.spawn(zerorpc.fork_task_context(dothework,
|
|
srv_relay_ctx), 'relay' + msg)
|
|
print('relaying in separate task:', g)
|
|
- r = g.get()
|
|
+ r = g.wait()
|
|
print('back to main task')
|
|
return r
|
|
|
|
srv_relay = zerorpc.Server(SrvRelay(), context=srv_relay_ctx)
|
|
srv_relay.bind(endpoint2)
|
|
- srv_relay_task = gevent.spawn(srv_relay.run)
|
|
+ srv_relay_task = eventlet.spawn(srv_relay.run)
|
|
|
|
c = zerorpc.Client(context=cli_ctx)
|
|
c.connect(endpoint2)
|
|
@@ -295,8 +306,15 @@ def test_task_context_relay_fork():
|
|
|
|
srv_relay.stop()
|
|
srv.stop()
|
|
- srv_relay_task.join()
|
|
- srv_task.join()
|
|
+ try:
|
|
+ srv_relay_task.wait()
|
|
+ except greenlet.GreenletExit:
|
|
+ pass
|
|
+
|
|
+ try:
|
|
+ srv_task.wait()
|
|
+ except greenlet.GreenletExit:
|
|
+ pass
|
|
|
|
assert cli_tracer._log == [
|
|
('new', cli_tracer.trace_id),
|
|
@@ -324,25 +342,28 @@ def test_task_context_pushpull():
|
|
pusher_tracer = Tracer('[pusher]')
|
|
pusher_ctx.register_middleware(pusher_tracer)
|
|
|
|
- trigger = gevent.event.Event()
|
|
+ trigger = eventlet.event.Event()
|
|
|
|
class Puller(object):
|
|
def echo(self, msg):
|
|
- trigger.set()
|
|
+ trigger.send()
|
|
|
|
puller = zerorpc.Puller(Puller(), context=puller_ctx)
|
|
puller.bind(endpoint)
|
|
- puller_task = gevent.spawn(puller.run)
|
|
+ puller_task = eventlet.spawn(puller.run)
|
|
|
|
c = zerorpc.Pusher(context=pusher_ctx)
|
|
c.connect(endpoint)
|
|
|
|
- trigger.clear()
|
|
+ # trigger.reset()
|
|
c.echo('hello')
|
|
trigger.wait()
|
|
|
|
puller.stop()
|
|
- puller_task.join()
|
|
+ try:
|
|
+ puller_task.wait()
|
|
+ except greenlet.GreenletExit:
|
|
+ pass
|
|
|
|
assert pusher_tracer._log == [
|
|
('new', pusher_tracer.trace_id),
|
|
@@ -362,29 +383,32 @@ def test_task_context_pubsub():
|
|
publisher_tracer = Tracer('[publisher]')
|
|
publisher_ctx.register_middleware(publisher_tracer)
|
|
|
|
- trigger = gevent.event.Event()
|
|
+ trigger = eventlet.event.Event()
|
|
|
|
class Subscriber(object):
|
|
def echo(self, msg):
|
|
- trigger.set()
|
|
+ trigger.send()
|
|
|
|
subscriber = zerorpc.Subscriber(Subscriber(), context=subscriber_ctx)
|
|
subscriber.bind(endpoint)
|
|
- subscriber_task = gevent.spawn(subscriber.run)
|
|
+ subscriber_task = eventlet.spawn(subscriber.run)
|
|
|
|
c = zerorpc.Publisher(context=publisher_ctx)
|
|
c.connect(endpoint)
|
|
|
|
- trigger.clear()
|
|
+ # trigger.reset()
|
|
# We need this retry logic to wait that the subscriber.run coroutine starts
|
|
# reading (the published messages will go to /dev/null until then).
|
|
- while not trigger.is_set():
|
|
+ while not trigger.ready():
|
|
c.echo('pub...')
|
|
if trigger.wait(TIME_FACTOR * 1):
|
|
break
|
|
|
|
subscriber.stop()
|
|
- subscriber_task.join()
|
|
+ try:
|
|
+ subscriber_task.wait()
|
|
+ except greenlet.GreenletExit:
|
|
+ pass
|
|
|
|
print(publisher_tracer._log)
|
|
assert ('new', publisher_tracer.trace_id) in publisher_tracer._log
|
|
@@ -429,7 +453,7 @@ def test_server_inspect_exception_middleware():
|
|
module = Srv()
|
|
server = zerorpc.Server(module, context=ctx)
|
|
server.bind(endpoint)
|
|
- gevent.spawn(server.run)
|
|
+ eventlet.spawn(server.run)
|
|
|
|
client = zerorpc.Client()
|
|
client.connect(endpoint)
|
|
@@ -447,7 +471,7 @@ def test_server_inspect_exception_middleware():
|
|
def test_server_inspect_exception_middleware_puller():
|
|
endpoint = random_ipc_endpoint()
|
|
|
|
- barrier = gevent.event.Event()
|
|
+ barrier = eventlet.event.Event()
|
|
middleware = InspectExceptionMiddleware(barrier)
|
|
ctx = zerorpc.Context()
|
|
ctx.register_middleware(middleware)
|
|
@@ -455,12 +479,12 @@ def test_server_inspect_exception_middleware_puller():
|
|
module = Srv()
|
|
server = zerorpc.Puller(module, context=ctx)
|
|
server.bind(endpoint)
|
|
- gevent.spawn(server.run)
|
|
+ eventlet.spawn(server.run)
|
|
|
|
client = zerorpc.Pusher()
|
|
client.connect(endpoint)
|
|
|
|
- barrier.clear()
|
|
+ # barrier.reset()
|
|
client.echo('This is a test which should call the InspectExceptionMiddleware')
|
|
barrier.wait(timeout=TIME_FACTOR * 2)
|
|
|
|
@@ -479,7 +503,7 @@ def test_server_inspect_exception_middleware_stream():
|
|
module = Srv()
|
|
server = zerorpc.Server(module, context=ctx)
|
|
server.bind(endpoint)
|
|
- gevent.spawn(server.run)
|
|
+ eventlet.spawn(server.run)
|
|
|
|
client = zerorpc.Client()
|
|
client.connect(endpoint)
|
|
diff --git a/tests/test_middleware_before_after_exec.py b/tests/test_middleware_before_after_exec.py
|
|
index 5dafeb0..32bfc4c 100644
|
|
--- a/tests/test_middleware_before_after_exec.py
|
|
+++ b/tests/test_middleware_before_after_exec.py
|
|
@@ -25,7 +25,9 @@
|
|
from __future__ import absolute_import
|
|
from builtins import range
|
|
|
|
-import gevent
|
|
+import eventlet
|
|
+import greenlet
|
|
+
|
|
import zerorpc
|
|
|
|
from .testutils import teardown, random_ipc_endpoint, TIME_FACTOR
|
|
@@ -39,7 +41,7 @@ class EchoModule(object):
|
|
def echo(self, msg):
|
|
self.last_msg = 'echo: ' + msg
|
|
if self._trigger:
|
|
- self._trigger.set()
|
|
+ self._trigger.send()
|
|
return self.last_msg
|
|
|
|
@zerorpc.stream
|
|
@@ -63,7 +65,7 @@ def test_hook_server_before_exec():
|
|
|
|
test_server = zerorpc.Server(EchoModule(), context=zero_ctx)
|
|
test_server.bind(endpoint)
|
|
- test_server_task = gevent.spawn(test_server.run)
|
|
+ test_server_task = eventlet.spawn(test_server.run)
|
|
test_client = zerorpc.Client()
|
|
test_client.connect(endpoint)
|
|
|
|
@@ -78,17 +80,20 @@ def test_hook_server_before_exec():
|
|
assert test_middleware.called == True
|
|
|
|
test_server.stop()
|
|
- test_server_task.join()
|
|
+ try:
|
|
+ test_server_task.wait()
|
|
+ except greenlet.GreenletExit:
|
|
+ pass
|
|
|
|
def test_hook_server_before_exec_puller():
|
|
zero_ctx = zerorpc.Context()
|
|
- trigger = gevent.event.Event()
|
|
+ trigger = eventlet.event.Event()
|
|
endpoint = random_ipc_endpoint()
|
|
|
|
echo_module = EchoModule(trigger)
|
|
test_server = zerorpc.Puller(echo_module, context=zero_ctx)
|
|
test_server.bind(endpoint)
|
|
- test_server_task = gevent.spawn(test_server.run)
|
|
+ test_server_task = eventlet.spawn(test_server.run)
|
|
test_client = zerorpc.Pusher()
|
|
test_client.connect(endpoint)
|
|
|
|
@@ -96,7 +101,7 @@ def test_hook_server_before_exec_puller():
|
|
test_client.echo("test")
|
|
trigger.wait(timeout=TIME_FACTOR * 2)
|
|
assert echo_module.last_msg == "echo: test"
|
|
- trigger.clear()
|
|
+ # trigger.reset()
|
|
|
|
# Test with a middleware
|
|
test_middleware = ServerBeforeExecMiddleware()
|
|
@@ -108,7 +113,10 @@ def test_hook_server_before_exec_puller():
|
|
assert test_middleware.called == True
|
|
|
|
test_server.stop()
|
|
- test_server_task.join()
|
|
+ try:
|
|
+ test_server_task.wait()
|
|
+ except greenlet.GreenletExit:
|
|
+ pass
|
|
|
|
def test_hook_server_before_exec_stream():
|
|
zero_ctx = zerorpc.Context()
|
|
@@ -116,7 +124,7 @@ def test_hook_server_before_exec_stream():
|
|
|
|
test_server = zerorpc.Server(EchoModule(), context=zero_ctx)
|
|
test_server.bind(endpoint)
|
|
- test_server_task = gevent.spawn(test_server.run)
|
|
+ test_server_task = eventlet.spawn(test_server.run)
|
|
test_client = zerorpc.Client()
|
|
test_client.connect(endpoint)
|
|
|
|
@@ -135,7 +143,10 @@ def test_hook_server_before_exec_stream():
|
|
assert echo == "echo: test"
|
|
|
|
test_server.stop()
|
|
- test_server_task.join()
|
|
+ try:
|
|
+ test_server_task.wait()
|
|
+ except greenlet.GreenletExit:
|
|
+ pass
|
|
|
|
class ServerAfterExecMiddleware(object):
|
|
|
|
@@ -153,7 +164,7 @@ def test_hook_server_after_exec():
|
|
|
|
test_server = zerorpc.Server(EchoModule(), context=zero_ctx)
|
|
test_server.bind(endpoint)
|
|
- test_server_task = gevent.spawn(test_server.run)
|
|
+ test_server_task = eventlet.spawn(test_server.run)
|
|
test_client = zerorpc.Client()
|
|
test_client.connect(endpoint)
|
|
|
|
@@ -170,17 +181,20 @@ def test_hook_server_after_exec():
|
|
assert test_middleware.reply_event_name == 'OK'
|
|
|
|
test_server.stop()
|
|
- test_server_task.join()
|
|
+ try:
|
|
+ test_server_task.wait()
|
|
+ except greenlet.GreenletExit:
|
|
+ pass
|
|
|
|
def test_hook_server_after_exec_puller():
|
|
zero_ctx = zerorpc.Context()
|
|
- trigger = gevent.event.Event()
|
|
+ trigger = eventlet.event.Event()
|
|
endpoint = random_ipc_endpoint()
|
|
|
|
echo_module = EchoModule(trigger)
|
|
test_server = zerorpc.Puller(echo_module, context=zero_ctx)
|
|
test_server.bind(endpoint)
|
|
- test_server_task = gevent.spawn(test_server.run)
|
|
+ test_server_task = eventlet.spawn(test_server.run)
|
|
test_client = zerorpc.Pusher()
|
|
test_client.connect(endpoint)
|
|
|
|
@@ -188,7 +202,7 @@ def test_hook_server_after_exec_puller():
|
|
test_client.echo("test")
|
|
trigger.wait(timeout=TIME_FACTOR * 2)
|
|
assert echo_module.last_msg == "echo: test"
|
|
- trigger.clear()
|
|
+ # trigger.reset()
|
|
|
|
# Test with a middleware
|
|
test_middleware = ServerAfterExecMiddleware()
|
|
@@ -202,7 +216,10 @@ def test_hook_server_after_exec_puller():
|
|
assert test_middleware.reply_event_name is None
|
|
|
|
test_server.stop()
|
|
- test_server_task.join()
|
|
+ try:
|
|
+ test_server_task.wait()
|
|
+ except greenlet.GreenletExit:
|
|
+ pass
|
|
|
|
def test_hook_server_after_exec_stream():
|
|
zero_ctx = zerorpc.Context()
|
|
@@ -210,7 +227,7 @@ def test_hook_server_after_exec_stream():
|
|
|
|
test_server = zerorpc.Server(EchoModule(), context=zero_ctx)
|
|
test_server.bind(endpoint)
|
|
- test_server_task = gevent.spawn(test_server.run)
|
|
+ test_server_task = eventlet.spawn(test_server.run)
|
|
test_client = zerorpc.Client()
|
|
test_client.connect(endpoint)
|
|
|
|
@@ -232,7 +249,10 @@ def test_hook_server_after_exec_stream():
|
|
assert test_middleware.reply_event_name == 'STREAM_DONE'
|
|
|
|
test_server.stop()
|
|
- test_server_task.join()
|
|
+ try:
|
|
+ test_server_task.wait()
|
|
+ except greenlet.GreenletExit:
|
|
+ pass
|
|
|
|
class BrokenEchoModule(object):
|
|
|
|
@@ -246,7 +266,7 @@ class BrokenEchoModule(object):
|
|
raise RuntimeError("BrokenEchoModule")
|
|
finally:
|
|
if self._trigger:
|
|
- self._trigger.set()
|
|
+ self._trigger.send()
|
|
|
|
@zerorpc.stream
|
|
def echoes(self, msg):
|
|
@@ -258,7 +278,7 @@ def test_hook_server_after_exec_on_error():
|
|
|
|
test_server = zerorpc.Server(BrokenEchoModule(), context=zero_ctx)
|
|
test_server.bind(endpoint)
|
|
- test_server_task = gevent.spawn(test_server.run)
|
|
+ test_server_task = eventlet.spawn(test_server.run)
|
|
test_client = zerorpc.Client()
|
|
test_client.connect(endpoint)
|
|
|
|
@@ -272,17 +292,20 @@ def test_hook_server_after_exec_on_error():
|
|
assert test_middleware.called == False
|
|
|
|
test_server.stop()
|
|
- test_server_task.join()
|
|
+ try:
|
|
+ test_server_task.wait()
|
|
+ except greenlet.GreenletExit:
|
|
+ pass
|
|
|
|
def test_hook_server_after_exec_on_error_puller():
|
|
zero_ctx = zerorpc.Context()
|
|
- trigger = gevent.event.Event()
|
|
+ trigger = eventlet.event.Event()
|
|
endpoint = random_ipc_endpoint()
|
|
|
|
echo_module = BrokenEchoModule(trigger)
|
|
test_server = zerorpc.Puller(echo_module, context=zero_ctx)
|
|
test_server.bind(endpoint)
|
|
- test_server_task = gevent.spawn(test_server.run)
|
|
+ test_server_task = eventlet.spawn(test_server.run)
|
|
test_client = zerorpc.Pusher()
|
|
test_client.connect(endpoint)
|
|
|
|
@@ -298,7 +321,10 @@ def test_hook_server_after_exec_on_error_puller():
|
|
assert test_middleware.called == False
|
|
|
|
test_server.stop()
|
|
- test_server_task.join()
|
|
+ try:
|
|
+ test_server_task.wait()
|
|
+ except greenlet.GreenletExit:
|
|
+ pass
|
|
|
|
def test_hook_server_after_exec_on_error_stream():
|
|
zero_ctx = zerorpc.Context()
|
|
@@ -306,7 +332,7 @@ def test_hook_server_after_exec_on_error_stream():
|
|
|
|
test_server = zerorpc.Server(BrokenEchoModule(), context=zero_ctx)
|
|
test_server.bind(endpoint)
|
|
- test_server_task = gevent.spawn(test_server.run)
|
|
+ test_server_task = eventlet.spawn(test_server.run)
|
|
test_client = zerorpc.Client()
|
|
test_client.connect(endpoint)
|
|
|
|
@@ -320,4 +346,7 @@ def test_hook_server_after_exec_on_error_stream():
|
|
assert test_middleware.called == False
|
|
|
|
test_server.stop()
|
|
- test_server_task.join()
|
|
+ try:
|
|
+ test_server_task.wait()
|
|
+ except greenlet.GreenletExit:
|
|
+ pass
|
|
diff --git a/tests/test_middleware_client.py b/tests/test_middleware_client.py
|
|
index 943985e..64f7b5a 100644
|
|
--- a/tests/test_middleware_client.py
|
|
+++ b/tests/test_middleware_client.py
|
|
@@ -25,7 +25,9 @@
|
|
from __future__ import absolute_import
|
|
from builtins import range
|
|
|
|
-import gevent
|
|
+import eventlet
|
|
+import greenlet
|
|
+
|
|
import zerorpc
|
|
|
|
from .testutils import teardown, random_ipc_endpoint, TIME_FACTOR
|
|
@@ -62,7 +64,7 @@ class EchoModule(object):
|
|
|
|
def timeout(self, msg):
|
|
self.last_msg = "timeout: " + msg
|
|
- gevent.sleep(TIME_FACTOR * 2)
|
|
+ eventlet.sleep(TIME_FACTOR * 2)
|
|
|
|
def test_hook_client_before_request():
|
|
|
|
@@ -78,7 +80,7 @@ def test_hook_client_before_request():
|
|
|
|
test_server = zerorpc.Server(EchoModule(), context=zero_ctx)
|
|
test_server.bind(endpoint)
|
|
- test_server_task = gevent.spawn(test_server.run)
|
|
+ test_server_task = eventlet.spawn(test_server.run)
|
|
|
|
test_client = zerorpc.Client(context=zero_ctx)
|
|
test_client.connect(endpoint)
|
|
@@ -93,7 +95,10 @@ def test_hook_client_before_request():
|
|
assert test_middleware.method == 'echo'
|
|
|
|
test_server.stop()
|
|
- test_server_task.join()
|
|
+ try:
|
|
+ test_server_task.wait()
|
|
+ except greenlet.GreenletExit:
|
|
+ pass
|
|
|
|
class ClientAfterRequestMiddleware(object):
|
|
def __init__(self):
|
|
@@ -111,7 +116,7 @@ def test_hook_client_after_request():
|
|
|
|
test_server = zerorpc.Server(EchoModule(), context=zero_ctx)
|
|
test_server.bind(endpoint)
|
|
- test_server_task = gevent.spawn(test_server.run)
|
|
+ test_server_task = eventlet.spawn(test_server.run)
|
|
|
|
test_client = zerorpc.Client(context=zero_ctx)
|
|
test_client.connect(endpoint)
|
|
@@ -126,7 +131,10 @@ def test_hook_client_after_request():
|
|
assert test_middleware.retcode == 'OK'
|
|
|
|
test_server.stop()
|
|
- test_server_task.join()
|
|
+ try:
|
|
+ test_server_task.wait()
|
|
+ except greenlet.GreenletExit:
|
|
+ pass
|
|
|
|
def test_hook_client_after_request_stream():
|
|
zero_ctx = zerorpc.Context()
|
|
@@ -134,7 +142,7 @@ def test_hook_client_after_request_stream():
|
|
|
|
test_server = zerorpc.Server(EchoModule(), context=zero_ctx)
|
|
test_server.bind(endpoint)
|
|
- test_server_task = gevent.spawn(test_server.run)
|
|
+ test_server_task = eventlet.spawn(test_server.run)
|
|
|
|
test_client = zerorpc.Client(context=zero_ctx)
|
|
test_client.connect(endpoint)
|
|
@@ -156,7 +164,10 @@ def test_hook_client_after_request_stream():
|
|
assert test_middleware.retcode == 'STREAM_DONE'
|
|
|
|
test_server.stop()
|
|
- test_server_task.join()
|
|
+ try:
|
|
+ test_server_task.wait()
|
|
+ except greenlet.GreenletExit:
|
|
+ pass
|
|
|
|
def test_hook_client_after_request_timeout():
|
|
|
|
@@ -176,7 +187,7 @@ def test_hook_client_after_request_timeout():
|
|
|
|
test_server = zerorpc.Server(EchoModule(), context=zero_ctx)
|
|
test_server.bind(endpoint)
|
|
- test_server_task = gevent.spawn(test_server.run)
|
|
+ test_server_task = eventlet.spawn(test_server.run)
|
|
|
|
test_client = zerorpc.Client(timeout=TIME_FACTOR * 1, context=zero_ctx)
|
|
test_client.connect(endpoint)
|
|
@@ -189,7 +200,10 @@ def test_hook_client_after_request_timeout():
|
|
assert "timeout" in ex.args[0]
|
|
|
|
test_server.stop()
|
|
- test_server_task.join()
|
|
+ try:
|
|
+ test_server_task.wait()
|
|
+ except greenlet.GreenletExit:
|
|
+ pass
|
|
|
|
class ClientAfterFailedRequestMiddleware(object):
|
|
def __init__(self):
|
|
@@ -212,7 +226,7 @@ def test_hook_client_after_request_remote_error():
|
|
|
|
test_server = zerorpc.Server(EchoModule(), context=zero_ctx)
|
|
test_server.bind(endpoint)
|
|
- test_server_task = gevent.spawn(test_server.run)
|
|
+ test_server_task = eventlet.spawn(test_server.run)
|
|
|
|
test_client = zerorpc.Client(timeout=TIME_FACTOR * 1, context=zero_ctx)
|
|
test_client.connect(endpoint)
|
|
@@ -224,7 +238,10 @@ def test_hook_client_after_request_remote_error():
|
|
assert test_middleware.called == True
|
|
|
|
test_server.stop()
|
|
- test_server_task.join()
|
|
+ try:
|
|
+ test_server_task.wait()
|
|
+ except greenlet.GreenletExit:
|
|
+ pass
|
|
|
|
def test_hook_client_after_request_remote_error_stream():
|
|
|
|
@@ -235,7 +252,7 @@ def test_hook_client_after_request_remote_error_stream():
|
|
|
|
test_server = zerorpc.Server(EchoModule(), context=zero_ctx)
|
|
test_server.bind(endpoint)
|
|
- test_server_task = gevent.spawn(test_server.run)
|
|
+ test_server_task = eventlet.spawn(test_server.run)
|
|
|
|
test_client = zerorpc.Client(timeout=TIME_FACTOR * 1, context=zero_ctx)
|
|
test_client.connect(endpoint)
|
|
@@ -247,7 +264,10 @@ def test_hook_client_after_request_remote_error_stream():
|
|
assert test_middleware.called == True
|
|
|
|
test_server.stop()
|
|
- test_server_task.join()
|
|
+ try:
|
|
+ test_server_task.wait()
|
|
+ except greenlet.GreenletExit:
|
|
+ pass
|
|
|
|
def test_hook_client_handle_remote_error_inspect():
|
|
|
|
@@ -264,7 +284,7 @@ def test_hook_client_handle_remote_error_inspect():
|
|
|
|
test_server = zerorpc.Server(EchoModule(), context=zero_ctx)
|
|
test_server.bind(endpoint)
|
|
- test_server_task = gevent.spawn(test_server.run)
|
|
+ test_server_task = eventlet.spawn(test_server.run)
|
|
|
|
test_client = zerorpc.Client(context=zero_ctx)
|
|
test_client.connect(endpoint)
|
|
@@ -277,7 +297,10 @@ def test_hook_client_handle_remote_error_inspect():
|
|
assert ex.name == "RuntimeError"
|
|
|
|
test_server.stop()
|
|
- test_server_task.join()
|
|
+ try:
|
|
+ test_server_task.wait()
|
|
+ except greenlet.GreenletExit:
|
|
+ pass
|
|
|
|
# This is a seriously broken idea, but possible nonetheless
|
|
class ClientEvalRemoteErrorMiddleware(object):
|
|
@@ -298,7 +321,7 @@ def test_hook_client_handle_remote_error_eval():
|
|
|
|
test_server = zerorpc.Server(EchoModule(), context=zero_ctx)
|
|
test_server.bind(endpoint)
|
|
- test_server_task = gevent.spawn(test_server.run)
|
|
+ test_server_task = eventlet.spawn(test_server.run)
|
|
|
|
test_client = zerorpc.Client(context=zero_ctx)
|
|
test_client.connect(endpoint)
|
|
@@ -311,7 +334,10 @@ def test_hook_client_handle_remote_error_eval():
|
|
assert "BrokenEchoModule" in ex.args[0]
|
|
|
|
test_server.stop()
|
|
- test_server_task.join()
|
|
+ try:
|
|
+ test_server_task.wait()
|
|
+ except greenlet.GreenletExit:
|
|
+ pass
|
|
|
|
def test_hook_client_handle_remote_error_eval_stream():
|
|
test_middleware = ClientEvalRemoteErrorMiddleware()
|
|
@@ -321,7 +347,7 @@ def test_hook_client_handle_remote_error_eval_stream():
|
|
|
|
test_server = zerorpc.Server(EchoModule(), context=zero_ctx)
|
|
test_server.bind(endpoint)
|
|
- test_server_task = gevent.spawn(test_server.run)
|
|
+ test_server_task = eventlet.spawn(test_server.run)
|
|
|
|
test_client = zerorpc.Client(context=zero_ctx)
|
|
test_client.connect(endpoint)
|
|
@@ -334,7 +360,10 @@ def test_hook_client_handle_remote_error_eval_stream():
|
|
assert "BrokenEchoModule" in ex.args[0]
|
|
|
|
test_server.stop()
|
|
- test_server_task.join()
|
|
+ try:
|
|
+ test_server_task.wait()
|
|
+ except greenlet.GreenletExit:
|
|
+ pass
|
|
|
|
def test_hook_client_after_request_custom_error():
|
|
|
|
@@ -360,7 +389,7 @@ def test_hook_client_after_request_custom_error():
|
|
|
|
test_server = zerorpc.Server(EchoModule(), context=zero_ctx)
|
|
test_server.bind(endpoint)
|
|
- test_server_task = gevent.spawn(test_server.run)
|
|
+ test_server_task = eventlet.spawn(test_server.run)
|
|
|
|
test_client = zerorpc.Client(context=zero_ctx)
|
|
test_client.connect(endpoint)
|
|
@@ -373,4 +402,7 @@ def test_hook_client_after_request_custom_error():
|
|
assert "BrokenEchoModule" in ex.args[0]
|
|
|
|
test_server.stop()
|
|
- test_server_task.join()
|
|
+ try:
|
|
+ test_server_task.wait()
|
|
+ except greenlet.GreenletExit:
|
|
+ pass
|
|
diff --git a/tests/test_pubpush.py b/tests/test_pubpush.py
|
|
index a99f9b4..512a1a0 100644
|
|
--- a/tests/test_pubpush.py
|
|
+++ b/tests/test_pubpush.py
|
|
@@ -27,8 +27,7 @@ from __future__ import print_function
|
|
from __future__ import absolute_import
|
|
from builtins import range
|
|
|
|
-import gevent
|
|
-import gevent.event
|
|
+import eventlet
|
|
import zerorpc
|
|
|
|
from .testutils import teardown, random_ipc_endpoint
|
|
@@ -39,19 +38,19 @@ def test_pushpull_inheritance():
|
|
|
|
pusher = zerorpc.Pusher()
|
|
pusher.bind(endpoint)
|
|
- trigger = gevent.event.Event()
|
|
+ trigger = eventlet.event.Event()
|
|
|
|
class Puller(zerorpc.Puller):
|
|
def lolita(self, a, b):
|
|
print('lolita', a, b)
|
|
assert a + b == 3
|
|
- trigger.set()
|
|
+ trigger.send()
|
|
|
|
puller = Puller()
|
|
puller.connect(endpoint)
|
|
- gevent.spawn(puller.run)
|
|
+ eventlet.spawn(puller.run)
|
|
|
|
- trigger.clear()
|
|
+ # trigger.reset()
|
|
pusher.lolita(1, 2)
|
|
trigger.wait()
|
|
print('done')
|
|
@@ -62,19 +61,19 @@ def test_pubsub_inheritance():
|
|
|
|
publisher = zerorpc.Publisher()
|
|
publisher.bind(endpoint)
|
|
- trigger = gevent.event.Event()
|
|
+ trigger = eventlet.event.Event()
|
|
|
|
class Subscriber(zerorpc.Subscriber):
|
|
def lolita(self, a, b):
|
|
print('lolita', a, b)
|
|
assert a + b == 3
|
|
- trigger.set()
|
|
+ trigger.send()
|
|
|
|
subscriber = Subscriber()
|
|
subscriber.connect(endpoint)
|
|
- gevent.spawn(subscriber.run)
|
|
+ eventlet.spawn(subscriber.run)
|
|
|
|
- trigger.clear()
|
|
+ # trigger.reset()
|
|
# We need this retry logic to wait that the subscriber.run coroutine starts
|
|
# reading (the published messages will go to /dev/null until then).
|
|
for attempt in range(0, 10):
|
|
@@ -87,13 +86,13 @@ def test_pubsub_inheritance():
|
|
|
|
def test_pushpull_composite():
|
|
endpoint = random_ipc_endpoint()
|
|
- trigger = gevent.event.Event()
|
|
+ trigger = eventlet.event.Event()
|
|
|
|
class Puller(object):
|
|
def lolita(self, a, b):
|
|
print('lolita', a, b)
|
|
assert a + b == 3
|
|
- trigger.set()
|
|
+ trigger.send()
|
|
|
|
pusher = zerorpc.Pusher()
|
|
pusher.bind(endpoint)
|
|
@@ -101,9 +100,9 @@ def test_pushpull_composite():
|
|
service = Puller()
|
|
puller = zerorpc.Puller(service)
|
|
puller.connect(endpoint)
|
|
- gevent.spawn(puller.run)
|
|
+ eventlet.spawn(puller.run)
|
|
|
|
- trigger.clear()
|
|
+ # trigger.reset()
|
|
pusher.lolita(1, 2)
|
|
trigger.wait()
|
|
print('done')
|
|
@@ -111,13 +110,13 @@ def test_pushpull_composite():
|
|
|
|
def test_pubsub_composite():
|
|
endpoint = random_ipc_endpoint()
|
|
- trigger = gevent.event.Event()
|
|
+ trigger = eventlet.event.Event()
|
|
|
|
class Subscriber(object):
|
|
def lolita(self, a, b):
|
|
print('lolita', a, b)
|
|
assert a + b == 3
|
|
- trigger.set()
|
|
+ trigger.send()
|
|
|
|
publisher = zerorpc.Publisher()
|
|
publisher.bind(endpoint)
|
|
@@ -125,9 +124,9 @@ def test_pubsub_composite():
|
|
service = Subscriber()
|
|
subscriber = zerorpc.Subscriber(service)
|
|
subscriber.connect(endpoint)
|
|
- gevent.spawn(subscriber.run)
|
|
+ eventlet.spawn(subscriber.run)
|
|
|
|
- trigger.clear()
|
|
+ # trigger.reset()
|
|
# We need this retry logic to wait that the subscriber.run coroutine starts
|
|
# reading (the published messages will go to /dev/null until then).
|
|
for attempt in range(0, 10):
|
|
diff --git a/tests/test_reqstream.py b/tests/test_reqstream.py
|
|
index 71e1511..2cb9266 100644
|
|
--- a/tests/test_reqstream.py
|
|
+++ b/tests/test_reqstream.py
|
|
@@ -27,7 +27,7 @@ from __future__ import print_function
|
|
from __future__ import absolute_import
|
|
from builtins import range
|
|
|
|
-import gevent
|
|
+import eventlet
|
|
|
|
import zerorpc
|
|
from .testutils import teardown, random_ipc_endpoint, TIME_FACTOR
|
|
@@ -55,7 +55,7 @@ def test_rcp_streaming():
|
|
|
|
srv = MySrv(heartbeat=TIME_FACTOR * 4)
|
|
srv.bind(endpoint)
|
|
- gevent.spawn(srv.run)
|
|
+ eventlet.spawn(srv.run)
|
|
|
|
client = zerorpc.Client(heartbeat=TIME_FACTOR * 4)
|
|
client.connect(endpoint)
|
|
@@ -67,7 +67,7 @@ def test_rcp_streaming():
|
|
assert isinstance(r, Iterator)
|
|
l = []
|
|
print('wait 4s for fun')
|
|
- gevent.sleep(TIME_FACTOR * 4)
|
|
+ eventlet.sleep(TIME_FACTOR * 4)
|
|
for x in r:
|
|
l.append(x)
|
|
assert l == list(range(10))
|
|
diff --git a/tests/test_server.py b/tests/test_server.py
|
|
index 86997a9..a58f5eb 100644
|
|
--- a/tests/test_server.py
|
|
+++ b/tests/test_server.py
|
|
@@ -28,7 +28,7 @@ from __future__ import absolute_import
|
|
from builtins import range
|
|
|
|
import pytest
|
|
-import gevent
|
|
+import eventlet
|
|
import sys
|
|
|
|
from zerorpc import zmq
|
|
@@ -49,7 +49,7 @@ def test_server_manual():
|
|
|
|
srv = MySrv()
|
|
srv.bind(endpoint)
|
|
- gevent.spawn(srv.run)
|
|
+ eventlet.spawn(srv.run)
|
|
|
|
client_events = zerorpc.Events(zmq.DEALER)
|
|
client_events.connect(endpoint)
|
|
@@ -82,7 +82,7 @@ def test_client_server():
|
|
|
|
srv = MySrv()
|
|
srv.bind(endpoint)
|
|
- gevent.spawn(srv.run)
|
|
+ eventlet.spawn(srv.run)
|
|
|
|
client = zerorpc.Client()
|
|
client.connect(endpoint)
|
|
@@ -103,12 +103,12 @@ def test_client_server_client_timeout():
|
|
return 42
|
|
|
|
def add(self, a, b):
|
|
- gevent.sleep(TIME_FACTOR * 10)
|
|
+ eventlet.sleep(TIME_FACTOR * 10)
|
|
return a + b
|
|
|
|
srv = MySrv()
|
|
srv.bind(endpoint)
|
|
- gevent.spawn(srv.run)
|
|
+ eventlet.spawn(srv.run)
|
|
|
|
client = zerorpc.Client(timeout=TIME_FACTOR * 2)
|
|
client.connect(endpoint)
|
|
@@ -132,7 +132,7 @@ def test_client_server_exception():
|
|
|
|
srv = MySrv()
|
|
srv.bind(endpoint)
|
|
- gevent.spawn(srv.run)
|
|
+ eventlet.spawn(srv.run)
|
|
|
|
client = zerorpc.Client(timeout=TIME_FACTOR * 2)
|
|
client.connect(endpoint)
|
|
@@ -159,7 +159,7 @@ def test_client_server_detailed_exception():
|
|
|
|
srv = MySrv()
|
|
srv.bind(endpoint)
|
|
- gevent.spawn(srv.run)
|
|
+ eventlet.spawn(srv.run)
|
|
|
|
client = zerorpc.Client(timeout=TIME_FACTOR * 2)
|
|
client.connect(endpoint)
|
|
@@ -192,7 +192,7 @@ def test_exception_compat_v1():
|
|
|
|
srv = MySrv()
|
|
srv.bind(endpoint)
|
|
- gevent.spawn(srv.run)
|
|
+ eventlet.spawn(srv.run)
|
|
|
|
client_events = zerorpc.Events(zmq.DEALER)
|
|
client_events.connect(endpoint)
|
|
@@ -215,7 +215,7 @@ def test_exception_compat_v1():
|
|
assert event.name == 'ERR'
|
|
(msg,) = event.args
|
|
print('msg only', msg)
|
|
- assert msg == "NameError('donotexist',)"
|
|
+ assert msg == "NameError('donotexist')"
|
|
|
|
client_events.close()
|
|
srv.close()
|
|
diff --git a/tests/test_zmq.py b/tests/test_zmq.py
|
|
index 1e7b4dd..18ee39f 100644
|
|
--- a/tests/test_zmq.py
|
|
+++ b/tests/test_zmq.py
|
|
@@ -25,7 +25,7 @@
|
|
|
|
from __future__ import print_function
|
|
from __future__ import absolute_import
|
|
-import gevent
|
|
+import eventlet
|
|
|
|
from zerorpc import zmq
|
|
from .testutils import teardown, random_ipc_endpoint
|
|
@@ -61,6 +61,6 @@ def test1():
|
|
s.close()
|
|
c.term()
|
|
|
|
- s = gevent.spawn(server)
|
|
- c = gevent.spawn(client)
|
|
- c.join()
|
|
+ s = eventlet.spawn(server)
|
|
+ c = eventlet.spawn(client)
|
|
+ c.wait()
|
|
diff --git a/tests/zmqbug.py b/tests/zmqbug.py
|
|
index 1d102a2..da83fd2 100644
|
|
--- a/tests/zmqbug.py
|
|
+++ b/tests/zmqbug.py
|
|
@@ -28,29 +28,147 @@
|
|
from __future__ import print_function
|
|
|
|
import zmq
|
|
+from zmq.constants import *
|
|
|
|
-import gevent.event
|
|
-import gevent.core
|
|
+import eventlet
|
|
+import eventlet.hubs
|
|
+import greenlet
|
|
+from collections import deque
|
|
|
|
STOP_EVERYTHING = False
|
|
|
|
+class LockReleaseError(Exception):
|
|
+ pass
|
|
+
|
|
+class _QueueLock(object):
|
|
+ """A Lock that can be acquired by at most one thread. Any other
|
|
+ thread calling acquire will be blocked in a queue. When release
|
|
+ is called, the threads are awoken in the order they blocked,
|
|
+ one at a time. This lock can be required recursively by the same
|
|
+ thread."""
|
|
+
|
|
+ def __init__(self):
|
|
+ self._waiters = deque()
|
|
+ self._count = 0
|
|
+ self._holder = None
|
|
+ self._hub = eventlet.hubs.get_hub()
|
|
+
|
|
+ def __nonzero__(self):
|
|
+ return bool(self._count)
|
|
+
|
|
+ __bool__ = __nonzero__
|
|
+
|
|
+ def __enter__(self):
|
|
+ self.acquire()
|
|
+
|
|
+ def __exit__(self, type, value, traceback):
|
|
+ self.release()
|
|
+
|
|
+ def acquire(self):
|
|
+ current = greenlet.getcurrent()
|
|
+ if (self._waiters or self._count > 0) and self._holder is not current:
|
|
+ # block until lock is free
|
|
+ self._waiters.append(current)
|
|
+ self._hub.switch()
|
|
+ w = self._waiters.popleft()
|
|
+
|
|
+ assert w is current, 'Waiting threads woken out of order'
|
|
+ assert self._count == 0, 'After waking a thread, the lock must be unacquired'
|
|
+
|
|
+ self._holder = current
|
|
+ self._count += 1
|
|
+
|
|
+ def release(self):
|
|
+ if self._count <= 0:
|
|
+ raise LockReleaseError("Cannot release unacquired lock")
|
|
+
|
|
+ self._count -= 1
|
|
+ if self._count == 0:
|
|
+ self._holder = None
|
|
+ if self._waiters:
|
|
+ # wake next
|
|
+ self._hub.schedule_call_global(0, self._waiters[0].switch)
|
|
+
|
|
+
|
|
+class _BlockedThread(object):
|
|
+ """Is either empty, or represents a single blocked thread that
|
|
+ blocked itself by calling the block() method. The thread can be
|
|
+ awoken by calling wake(). Wake() can be called multiple times and
|
|
+ all but the first call will have no effect."""
|
|
+
|
|
+ def __init__(self):
|
|
+ self._blocked_thread = None
|
|
+ self._wakeupper = None
|
|
+ self._hub = eventlet.hubs.get_hub()
|
|
+
|
|
+ def __nonzero__(self):
|
|
+ return self._blocked_thread is not None
|
|
+
|
|
+ __bool__ = __nonzero__
|
|
+
|
|
+ def block(self, deadline=None):
|
|
+ if self._blocked_thread is not None:
|
|
+ raise Exception("Cannot block more than one thread on one BlockedThread")
|
|
+ self._blocked_thread = greenlet.getcurrent()
|
|
+
|
|
+ if deadline is not None:
|
|
+ self._hub.schedule_call_local(deadline - self._hub.clock(), self.wake)
|
|
+
|
|
+ try:
|
|
+ self._hub.switch()
|
|
+ finally:
|
|
+ self._blocked_thread = None
|
|
+ # cleanup the wakeup task
|
|
+ if self._wakeupper is not None:
|
|
+ # Important to cancel the wakeup task so it doesn't
|
|
+ # spuriously wake this greenthread later on.
|
|
+ self._wakeupper.cancel()
|
|
+ self._wakeupper = None
|
|
+
|
|
+ def wake(self):
|
|
+ """Schedules the blocked thread to be awoken and return
|
|
+ True. If wake has already been called or if there is no
|
|
+ blocked thread, then this call has no effect and returns
|
|
+ False."""
|
|
+ if self._blocked_thread is not None and self._wakeupper is None:
|
|
+ self._wakeupper = self._hub.schedule_call_global(0, self._blocked_thread.switch)
|
|
+ return True
|
|
+ return False
|
|
+
|
|
|
|
class ZMQSocket(zmq.Socket):
|
|
|
|
def __init__(self, context, socket_type):
|
|
super(ZMQSocket, self).__init__(context, socket_type)
|
|
on_state_changed_fd = self.getsockopt(zmq.FD)
|
|
- self._readable = gevent.event.Event()
|
|
- self._writable = gevent.event.Event()
|
|
- try:
|
|
- # gevent>=1.0
|
|
- self._state_event = gevent.hub.get_hub().loop.io(
|
|
- on_state_changed_fd, gevent.core.READ)
|
|
- self._state_event.start(self._on_state_changed)
|
|
- except AttributeError:
|
|
- # gevent<1.0
|
|
- self._state_event = gevent.core.read_event(on_state_changed_fd,
|
|
- self._on_state_changed, persist=True)
|
|
+ self.__dict__['_eventlet_send_event'] = _BlockedThread()
|
|
+ self.__dict__['_eventlet_recv_event'] = _BlockedThread()
|
|
+ self.__dict__['_eventlet_send_lock'] = _QueueLock()
|
|
+ self.__dict__['_eventlet_recv_lock'] = _QueueLock()
|
|
+
|
|
+ def event(fd):
|
|
+ # Some events arrived at the zmq socket. This may mean
|
|
+ # there's a message that can be read or there's space for
|
|
+ # a message to be written.
|
|
+ send_wake = self._eventlet_send_event.wake()
|
|
+ recv_wake = self._eventlet_recv_event.wake()
|
|
+ if not send_wake and not recv_wake:
|
|
+ # if no waiting send or recv thread was woken up, then
|
|
+ # force the zmq socket's events to be processed to
|
|
+ # avoid repeated wakeups
|
|
+ events = self.getsockopt(zmq.EVENTS)
|
|
+ if events & zmq.POLLOUT:
|
|
+ self._eventlet_send_event.wake()
|
|
+ if events & zmq.POLLIN:
|
|
+ self._eventlet_recv_event.wake()
|
|
+
|
|
+ hub = eventlet.hubs.get_hub()
|
|
+ self.__dict__['_eventlet_listener'] = hub.add(hub.READ,
|
|
+ self.getsockopt(FD),
|
|
+ event,
|
|
+ lambda _: None,
|
|
+ lambda: None)
|
|
+ self.__dict__['_eventlet_clock'] = hub.clock
|
|
|
|
def _on_state_changed(self, event=None, _evtype=None):
|
|
if self.closed:
|
|
@@ -64,47 +182,89 @@ class ZMQSocket(zmq.Socket):
|
|
if events & zmq.POLLIN:
|
|
self._readable.set()
|
|
|
|
- def close(self):
|
|
- if not self.closed and getattr(self, '_state_event', None):
|
|
- try:
|
|
- # gevent>=1.0
|
|
- self._state_event.stop()
|
|
- except AttributeError:
|
|
- # gevent<1.0
|
|
- self._state_event.cancel()
|
|
- super(ZMQSocket, self).close()
|
|
+ def close(self, linger=None):
|
|
+ super(ZMQSocket, self).close(linger)
|
|
+ if self._eventlet_listener is not None:
|
|
+ eventlet.hubs.get_hub().remove(self._state_event)
|
|
+ self.__dict__['_eventlet_listener'] = None
|
|
+ self._eventlet_send_event.wake()
|
|
+ self._eventlet_recv_event.wake()
|
|
|
|
def send(self, data, flags=0, copy=True, track=False):
|
|
if flags & zmq.NOBLOCK:
|
|
- return super(ZMQSocket, self).send(data, flags, copy, track)
|
|
+ result = super(ZMQSocket, self).send(data, flags, copy, track)
|
|
+ # Instead of calling both wake methods, could call
|
|
+ # self.getsockopt(EVENTS) which would trigger wakeups if
|
|
+ # needed.
|
|
+ self._eventlet_send_event.wake()
|
|
+ self._eventlet_recv_event.wake()
|
|
+ return result
|
|
+
|
|
+ # TODO: pyzmq will copy the message buffer and create Message
|
|
+ # objects under some circumstances. We could do that work here
|
|
+ # once to avoid doing it every time the send is retried.
|
|
flags |= zmq.NOBLOCK
|
|
- while True:
|
|
- try:
|
|
- return super(ZMQSocket, self).send(data, flags, copy, track)
|
|
- except zmq.ZMQError as e:
|
|
- if e.errno != zmq.EAGAIN:
|
|
- raise
|
|
- self._writable.clear()
|
|
- self._writable.wait()
|
|
+ with self._eventlet_send_lock:
|
|
+ while True:
|
|
+ try:
|
|
+ return super(ZMQSocket, self).send(data, flags, copy, track)
|
|
+ except zmq.ZMQError as e:
|
|
+ if e.errno == zmq.EAGAIN:
|
|
+ self._eventlet_send_event.block()
|
|
+ else:
|
|
+ raise
|
|
+ finally:
|
|
+ # The call to send processes 0mq events and may
|
|
+ # make the socket ready to recv. Wake the next
|
|
+ # receiver. (Could check EVENTS for POLLIN here)
|
|
+ self._eventlet_recv_event.wake()
|
|
|
|
def recv(self, flags=0, copy=True, track=False):
|
|
if flags & zmq.NOBLOCK:
|
|
- return super(ZMQSocket, self).recv(flags, copy, track)
|
|
+ msg = super(ZMQSocket, self).recv(flags, copy, track)
|
|
+ # Instead of calling both wake methods, could call
|
|
+ # self.getsockopt(EVENTS) which would trigger wakeups if
|
|
+ # needed.
|
|
+ self._eventlet_send_event.wake()
|
|
+ self._eventlet_recv_event.wake()
|
|
+ return msg
|
|
+
|
|
+ deadline = None
|
|
+ if hasattr(zmq, 'RCVTIMEO'):
|
|
+ sock_timeout = self.getsockopt(zmq.RCVTIMEO)
|
|
+ if sock_timeout == -1:
|
|
+ pass
|
|
+ elif sock_timeout > 0:
|
|
+ deadline = self._eventlet_clock() + sock_timeout / 1000.0
|
|
+ else:
|
|
+ raise ValueError(sock_timeout)
|
|
+
|
|
flags |= zmq.NOBLOCK
|
|
- while True:
|
|
- try:
|
|
- return super(ZMQSocket, self).recv(flags, copy, track)
|
|
- except zmq.ZMQError as e:
|
|
- if e.errno != zmq.EAGAIN:
|
|
- raise
|
|
- self._readable.clear()
|
|
- while not self._readable.wait(timeout=10):
|
|
- events = self.getsockopt(zmq.EVENTS)
|
|
- if bool(events & zmq.POLLIN):
|
|
- print("here we go, nobody told me about new messages!")
|
|
- global STOP_EVERYTHING
|
|
- STOP_EVERYTHING = True
|
|
- raise gevent.GreenletExit()
|
|
+ with self._eventlet_recv_lock:
|
|
+ while True:
|
|
+ try:
|
|
+ return super(ZMQSocket, self).recv(flags, copy, track)
|
|
+ except zmq.ZMQError as e:
|
|
+ if e.errno == zmq.EAGAIN:
|
|
+ # zmq in its wisdom decided to reuse EAGAIN for timeouts
|
|
+ if deadline is not None and self._eventlet_clock() > deadline:
|
|
+ e.is_timeout = True
|
|
+ raise
|
|
+
|
|
+ self._eventlet_recv_event.block(deadline=deadline)
|
|
+ else:
|
|
+ raise
|
|
+ finally:
|
|
+ # The call to recv processes 0mq events and may
|
|
+ # make the socket ready to send. Wake the next
|
|
+ # receiver. (Could check EVENTS for POLLOUT here)
|
|
+ while self._eventlet_send_event.wake():
|
|
+ events = self.getsockopt(zmq.EVENTS)
|
|
+ if bool(events & zmq.POLLIN):
|
|
+ print("here we go, nobody told me about new messages!")
|
|
+ global STOP_EVERYTHING
|
|
+ STOP_EVERYTHING = True
|
|
+ raise greenlet.GreenletExit()
|
|
|
|
zmq_context = zmq.Context()
|
|
|
|
@@ -124,11 +284,11 @@ def server():
|
|
socket.send(msg)
|
|
cnt.responded += 1
|
|
|
|
- gevent.spawn(responder)
|
|
+ eventlet.spawn(responder)
|
|
|
|
while not STOP_EVERYTHING:
|
|
print("cnt.responded=", cnt.responded)
|
|
- gevent.sleep(0.5)
|
|
+ eventlet.sleep(0.5)
|
|
|
|
|
|
def client():
|
|
@@ -149,17 +309,17 @@ def client():
|
|
|
|
def sendmsg():
|
|
while not STOP_EVERYTHING:
|
|
- socket.send('', flags=zmq.SNDMORE)
|
|
- socket.send('hello')
|
|
+ socket.send(b'', flags=zmq.SNDMORE)
|
|
+ socket.send(b'hello')
|
|
cnt.send += 1
|
|
- gevent.sleep(0)
|
|
+ eventlet.sleep(0)
|
|
|
|
- gevent.spawn(recvmsg)
|
|
- gevent.spawn(sendmsg)
|
|
+ eventlet.spawn(recvmsg)
|
|
+ eventlet.spawn(sendmsg)
|
|
|
|
while not STOP_EVERYTHING:
|
|
print("cnt.recv=", cnt.recv, "cnt.send=", cnt.send)
|
|
- gevent.sleep(0.5)
|
|
+ eventlet.sleep(0.5)
|
|
|
|
-gevent.spawn(server)
|
|
+eventlet.spawn(server)
|
|
client()
|
|
diff --git a/tox.ini b/tox.ini
|
|
index 96bace8..a12cbc6 100644
|
|
--- a/tox.ini
|
|
+++ b/tox.ini
|
|
@@ -11,6 +11,6 @@ commands =
|
|
passenv = ZPC_TEST_TIME_FACTOR
|
|
|
|
[flake8]
|
|
-ignore = E501,E128
|
|
+ignore = E501,E128,E129,F841,W504
|
|
filename = *.py,zerorpc
|
|
exclude = tests,.git,dist,doc,*.egg-info,__pycache__,setup.py
|
|
diff --git a/zerorpc/channel.py b/zerorpc/channel.py
|
|
index ad21c27..bd376ec 100644
|
|
--- a/zerorpc/channel.py
|
|
+++ b/zerorpc/channel.py
|
|
@@ -22,11 +22,7 @@
|
|
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
# SOFTWARE.
|
|
|
|
-import gevent.pool
|
|
-import gevent.queue
|
|
-import gevent.event
|
|
-import gevent.local
|
|
-import gevent.lock
|
|
+import eventlet
|
|
import logging
|
|
|
|
from .exceptions import TimeoutExpired
|
|
@@ -43,8 +39,8 @@ class ChannelMultiplexer(ChannelBase):
|
|
self._channel_dispatcher_task = None
|
|
self._broadcast_queue = None
|
|
if events.recv_is_supported and not ignore_broadcast:
|
|
- self._broadcast_queue = gevent.queue.Queue(maxsize=1)
|
|
- self._channel_dispatcher_task = gevent.spawn(
|
|
+ self._broadcast_queue = eventlet.queue.Queue(maxsize=1)
|
|
+ self._channel_dispatcher_task = eventlet.spawn(
|
|
self._channel_dispatcher)
|
|
|
|
@property
|
|
@@ -98,7 +94,7 @@ class ChannelMultiplexer(ChannelBase):
|
|
|
|
def channel(self, from_event=None):
|
|
if self._channel_dispatcher_task is None:
|
|
- self._channel_dispatcher_task = gevent.spawn(
|
|
+ self._channel_dispatcher_task = eventlet.spawn(
|
|
self._channel_dispatcher)
|
|
return Channel(self, from_event)
|
|
|
|
@@ -117,7 +113,7 @@ class Channel(ChannelBase):
|
|
self._multiplexer = multiplexer
|
|
self._channel_id = None
|
|
self._zmqid = None
|
|
- self._queue = gevent.queue.Queue(maxsize=1)
|
|
+ self._queue = eventlet.queue.Queue(maxsize=1)
|
|
if from_event is not None:
|
|
self._channel_id = from_event.header[u'message_id']
|
|
self._zmqid = from_event.identity
|
|
@@ -156,7 +152,7 @@ class Channel(ChannelBase):
|
|
def recv(self, timeout=None):
|
|
try:
|
|
event = self._queue.get(timeout=timeout)
|
|
- except gevent.queue.Empty:
|
|
+ except eventlet.queue.Empty:
|
|
raise TimeoutExpired(timeout)
|
|
return event
|
|
|
|
@@ -172,11 +168,11 @@ class BufferedChannel(ChannelBase):
|
|
self._input_queue_size = inqueue_size
|
|
self._remote_queue_open_slots = 1
|
|
self._input_queue_reserved = 1
|
|
- self._remote_can_recv = gevent.event.Event()
|
|
- self._input_queue = gevent.queue.Queue()
|
|
+ self._remote_can_recv = eventlet.event.Event()
|
|
+ self._input_queue = eventlet.queue.Queue()
|
|
self._verbose = False
|
|
self._on_close_if = None
|
|
- self._recv_task = gevent.spawn(self._recver)
|
|
+ self._recv_task = eventlet.spawn(self._recver)
|
|
|
|
@property
|
|
def recv_is_supported(self):
|
|
@@ -211,7 +207,7 @@ class BufferedChannel(ChannelBase):
|
|
except Exception:
|
|
logger.exception('gevent_zerorpc.BufferedChannel._recver')
|
|
if self._remote_queue_open_slots > 0:
|
|
- self._remote_can_recv.set()
|
|
+ self._remote_can_recv.send()
|
|
elif self._input_queue.qsize() == self._input_queue_size:
|
|
raise RuntimeError(
|
|
'BufferedChannel, queue overflow on event:', event)
|
|
@@ -227,12 +223,12 @@ class BufferedChannel(ChannelBase):
|
|
|
|
def emit_event(self, event, timeout=None):
|
|
if self._remote_queue_open_slots == 0:
|
|
- self._remote_can_recv.clear()
|
|
+ # self._remote_can_recv.reset() # TODO Check if the result is equivalent to gevent.clear()
|
|
self._remote_can_recv.wait(timeout=timeout)
|
|
self._remote_queue_open_slots -= 1
|
|
try:
|
|
self._channel.emit_event(event)
|
|
- except:
|
|
+ except Exception:
|
|
self._remote_queue_open_slots += 1
|
|
raise
|
|
|
|
@@ -253,7 +249,7 @@ class BufferedChannel(ChannelBase):
|
|
|
|
try:
|
|
event = self._input_queue.get(timeout=timeout)
|
|
- except gevent.queue.Empty:
|
|
+ except eventlet.queue.Empty:
|
|
raise TimeoutExpired(timeout)
|
|
|
|
self._input_queue_reserved -= 1
|
|
diff --git a/zerorpc/context.py b/zerorpc/context.py
|
|
index debce26..6e20720 100644
|
|
--- a/zerorpc/context.py
|
|
+++ b/zerorpc/context.py
|
|
@@ -29,7 +29,7 @@ from future.utils import tobytes
|
|
import uuid
|
|
import random
|
|
|
|
-from . import gevent_zmq as zmq
|
|
+from eventlet.green import zmq
|
|
|
|
|
|
class Context(zmq.Context):
|
|
diff --git a/zerorpc/core.py b/zerorpc/core.py
|
|
index 9dbf5cc..ea89f36 100644
|
|
--- a/zerorpc/core.py
|
|
+++ b/zerorpc/core.py
|
|
@@ -30,13 +30,9 @@ from future.utils import iteritems
|
|
|
|
import sys
|
|
import traceback
|
|
-import gevent.pool
|
|
-import gevent.queue
|
|
-import gevent.event
|
|
-import gevent.local
|
|
-import gevent.lock
|
|
+import eventlet
|
|
|
|
-from . import gevent_zmq as zmq
|
|
+from eventlet.green import zmq
|
|
from .exceptions import TimeoutExpired, RemoteError, LostRemote
|
|
from .channel import ChannelMultiplexer, BufferedChannel
|
|
from .socket import SocketBase
|
|
@@ -52,7 +48,7 @@ logger = getLogger(__name__)
|
|
class ServerBase(object):
|
|
|
|
def __init__(self, channel, methods=None, name=None, context=None,
|
|
- pool_size=None, heartbeat=5):
|
|
+ pool_size=1000, heartbeat=5):
|
|
self._multiplexer = ChannelMultiplexer(channel)
|
|
|
|
if methods is None:
|
|
@@ -60,7 +56,7 @@ class ServerBase(object):
|
|
|
|
self._context = context or Context.get_instance()
|
|
self._name = name or self._extract_name()
|
|
- self._task_pool = gevent.pool.Pool(size=pool_size)
|
|
+ self._task_pool = eventlet.greenpool.GreenPool(size=pool_size)
|
|
self._acceptor_task = None
|
|
self._methods = self._filter_methods(ServerBase, self, methods)
|
|
|
|
@@ -171,12 +167,12 @@ class ServerBase(object):
|
|
self._task_pool.spawn(self._async_task, initial_event)
|
|
|
|
def run(self):
|
|
- self._acceptor_task = gevent.spawn(self._acceptor)
|
|
+ self._acceptor_task = eventlet.spawn(self._acceptor)
|
|
try:
|
|
- self._acceptor_task.get()
|
|
+ self._acceptor_task.wait()
|
|
finally:
|
|
self.stop()
|
|
- self._task_pool.join(raise_error=True)
|
|
+ self._task_pool.waitall()
|
|
|
|
def stop(self):
|
|
if self._acceptor_task is not None:
|
|
@@ -272,10 +268,8 @@ class ClientBase(object):
|
|
kargs.get('async_', False) is False):
|
|
return self._process_response(request_event, bufchan, timeout)
|
|
|
|
- async_result = gevent.event.AsyncResult()
|
|
- gevent.spawn(self._process_response, request_event, bufchan,
|
|
- timeout).link(async_result)
|
|
- return async_result
|
|
+ return eventlet.spawn(self._process_response, request_event, bufchan,
|
|
+ timeout)
|
|
|
|
def __getattr__(self, method):
|
|
return lambda *args, **kargs: self(method, *args, **kargs)
|
|
@@ -283,7 +277,7 @@ class ClientBase(object):
|
|
|
|
class Server(SocketBase, ServerBase):
|
|
|
|
- def __init__(self, methods=None, name=None, context=None, pool_size=None,
|
|
+ def __init__(self, methods=None, name=None, context=None, pool_size=1000,
|
|
heartbeat=5):
|
|
SocketBase.__init__(self, zmq.ROUTER, context)
|
|
if methods is None:
|
|
@@ -368,15 +362,15 @@ class Puller(SocketBase):
|
|
del exc_infos
|
|
|
|
def run(self):
|
|
- self._receiver_task = gevent.spawn(self._receiver)
|
|
+ self._receiver_task = eventlet.spawn(self._receiver)
|
|
try:
|
|
- self._receiver_task.get()
|
|
+ self._receiver_task.wait()
|
|
finally:
|
|
self._receiver_task = None
|
|
|
|
def stop(self):
|
|
if self._receiver_task is not None:
|
|
- self._receiver_task.kill(block=False)
|
|
+ self._receiver_task.kill()
|
|
|
|
|
|
class Publisher(Pusher):
|
|
diff --git a/zerorpc/events.py b/zerorpc/events.py
|
|
index f87d0b5..ce97ad6 100644
|
|
--- a/zerorpc/events.py
|
|
+++ b/zerorpc/events.py
|
|
@@ -28,15 +28,12 @@ from builtins import str
|
|
from builtins import range
|
|
|
|
import msgpack
|
|
-import gevent.pool
|
|
-import gevent.queue
|
|
-import gevent.event
|
|
-import gevent.local
|
|
-import gevent.lock
|
|
+import eventlet
|
|
+import greenlet
|
|
import logging
|
|
import sys
|
|
|
|
-from . import gevent_zmq as zmq
|
|
+from eventlet.green import zmq
|
|
from .exceptions import TimeoutExpired
|
|
from .context import Context
|
|
from .channel_base import ChannelBase
|
|
@@ -50,8 +47,8 @@ else:
|
|
return frame.buffer
|
|
|
|
# gevent <= 1.1.0.rc5 is missing the Python3 __next__ method.
|
|
-if sys.version_info >= (3, 0) and gevent.version_info <= (1, 1, 0, 'rc', '5'):
|
|
- setattr(gevent.queue.Channel, '__next__', gevent.queue.Channel.next)
|
|
+# if sys.version_info >= (3, 0) and gevent.version_info <= (1, 1, 0, 'rc', '5'):
|
|
+# setattr(gevent.queue.Channel, '__next__', gevent.queue.Channel.next)
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
@@ -67,20 +64,20 @@ class SequentialSender(object):
|
|
for i in range(len(parts) - 1):
|
|
try:
|
|
self._socket.send(parts[i], copy=False, flags=zmq.SNDMORE)
|
|
- except (gevent.GreenletExit, gevent.Timeout) as e:
|
|
+ except (greenlet.GreenletExit, eventlet.Timeout) as e:
|
|
if i == 0:
|
|
raise
|
|
self._socket.send(parts[i], copy=False, flags=zmq.SNDMORE)
|
|
try:
|
|
self._socket.send(parts[-1], copy=False)
|
|
- except (gevent.GreenletExit, gevent.Timeout) as e:
|
|
+ except (greenlet.GreenletExit, eventlet.Timeout) as e:
|
|
self._socket.send(parts[-1], copy=False)
|
|
if e:
|
|
raise e
|
|
|
|
def __call__(self, parts, timeout=None):
|
|
if timeout:
|
|
- with gevent.Timeout(timeout):
|
|
+ with eventlet.Timeout(timeout):
|
|
self._send(parts)
|
|
else:
|
|
self._send(parts)
|
|
@@ -97,7 +94,7 @@ class SequentialReceiver(object):
|
|
while True:
|
|
try:
|
|
part = self._socket.recv(copy=False)
|
|
- except (gevent.GreenletExit, gevent.Timeout) as e:
|
|
+ except (greenlet.GreenletExit, eventlet.Timeout) as e:
|
|
if len(parts) == 0:
|
|
raise
|
|
part = self._socket.recv(copy=False)
|
|
@@ -110,7 +107,7 @@ class SequentialReceiver(object):
|
|
|
|
def __call__(self, timeout=None):
|
|
if timeout:
|
|
- with gevent.Timeout(timeout):
|
|
+ with eventlet.Timeout(timeout):
|
|
return self._recv()
|
|
else:
|
|
return self._recv()
|
|
@@ -120,21 +117,22 @@ class Sender(SequentialSender):
|
|
|
|
def __init__(self, socket):
|
|
self._socket = socket
|
|
- self._send_queue = gevent.queue.Channel()
|
|
- self._send_task = gevent.spawn(self._sender)
|
|
+ self._send_queue = eventlet.queue.Queue(maxsize=0) # Channel
|
|
+ self._send_task = eventlet.spawn(self._sender)
|
|
|
|
def close(self):
|
|
if self._send_task:
|
|
self._send_task.kill()
|
|
|
|
def _sender(self):
|
|
- for parts in self._send_queue:
|
|
+ while True:
|
|
+ parts = self._send_queue.get()
|
|
super(Sender, self)._send(parts)
|
|
|
|
def __call__(self, parts, timeout=None):
|
|
try:
|
|
self._send_queue.put(parts, timeout=timeout)
|
|
- except gevent.queue.Full:
|
|
+ except eventlet.queue.Full:
|
|
raise TimeoutExpired(timeout)
|
|
|
|
|
|
@@ -142,8 +140,8 @@ class Receiver(SequentialReceiver):
|
|
|
|
def __init__(self, socket):
|
|
self._socket = socket
|
|
- self._recv_queue = gevent.queue.Channel()
|
|
- self._recv_task = gevent.spawn(self._recver)
|
|
+ self._recv_queue = eventlet.queue.Queue(maxsize=0) # Channel
|
|
+ self._recv_task = eventlet.spawn(self._recver)
|
|
|
|
def close(self):
|
|
if self._recv_task:
|
|
@@ -158,7 +156,7 @@ class Receiver(SequentialReceiver):
|
|
def __call__(self, timeout=None):
|
|
try:
|
|
return self._recv_queue.get(timeout=timeout)
|
|
- except gevent.queue.Empty:
|
|
+ except eventlet.queue.Empty:
|
|
raise TimeoutExpired(timeout)
|
|
|
|
|
|
@@ -281,11 +279,11 @@ class Events(ChannelBase):
|
|
def close(self):
|
|
try:
|
|
self._send.close()
|
|
- except (AttributeError, TypeError, gevent.GreenletExit):
|
|
+ except (AttributeError, TypeError, greenlet.GreenletExit):
|
|
pass
|
|
try:
|
|
self._recv.close()
|
|
- except (AttributeError, TypeError, gevent.GreenletExit):
|
|
+ except (AttributeError, TypeError, greenlet.GreenletExit):
|
|
pass
|
|
self._socket.close()
|
|
|
|
diff --git a/zerorpc/gevent_zmq.py b/zerorpc/gevent_zmq.py
|
|
index 9430695..bac9a48 100644
|
|
--- a/zerorpc/gevent_zmq.py
|
|
+++ b/zerorpc/gevent_zmq.py
|
|
@@ -22,10 +22,11 @@
|
|
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
# SOFTWARE.
|
|
#
|
|
-# Based on https://github.com/traviscline/gevent-zeromq/
|
|
+# Based on https://github.com/eventlet/eventlet/blob/37fca06d7466a698cf53a1ae6e4b3d840a3ced7a/eventlet/green/zmq.py
|
|
|
|
# We want to act like zmq
|
|
from zmq import * # noqa
|
|
+from zmq.constants import * # noqa
|
|
|
|
# Explicit import to please flake8
|
|
from zmq import ZMQError
|
|
@@ -33,13 +34,112 @@ from zmq import ZMQError
|
|
# A way to access original zmq
|
|
import zmq as _zmq
|
|
|
|
-import gevent.event
|
|
-import gevent.core
|
|
+import eventlet
|
|
+import eventlet.hubs
|
|
+from eventlet.support import greenlets as greenlet
|
|
import errno
|
|
from logging import getLogger
|
|
+from collections import deque
|
|
|
|
logger = getLogger(__name__)
|
|
|
|
+class LockReleaseError(Exception):
|
|
+ pass
|
|
+
|
|
+class _QueueLock(object):
|
|
+ """A Lock that can be acquired by at most one thread. Any other
|
|
+ thread calling acquire will be blocked in a queue. When release
|
|
+ is called, the threads are awoken in the order they blocked,
|
|
+ one at a time. This lock can be required recursively by the same
|
|
+ thread."""
|
|
+
|
|
+ def __init__(self):
|
|
+ self._waiters = deque()
|
|
+ self._count = 0
|
|
+ self._holder = None
|
|
+ self._hub = eventlet.hubs.get_hub()
|
|
+
|
|
+ def __nonzero__(self):
|
|
+ return bool(self._count)
|
|
+
|
|
+ __bool__ = __nonzero__
|
|
+
|
|
+ def __enter__(self):
|
|
+ self.acquire()
|
|
+
|
|
+ def __exit__(self, type, value, traceback):
|
|
+ self.release()
|
|
+
|
|
+ def acquire(self):
|
|
+ current = greenlet.getcurrent()
|
|
+ if (self._waiters or self._count > 0) and self._holder is not current:
|
|
+ # block until lock is free
|
|
+ self._waiters.append(current)
|
|
+ self._hub.switch()
|
|
+ w = self._waiters.popleft()
|
|
+
|
|
+ assert w is current, 'Waiting threads woken out of order'
|
|
+ assert self._count == 0, 'After waking a thread, the lock must be unacquired'
|
|
+
|
|
+ self._holder = current
|
|
+ self._count += 1
|
|
+
|
|
+ def release(self):
|
|
+ if self._count <= 0:
|
|
+ raise LockReleaseError("Cannot release unacquired lock")
|
|
+
|
|
+ self._count -= 1
|
|
+ if self._count == 0:
|
|
+ self._holder = None
|
|
+ if self._waiters:
|
|
+ # wake next
|
|
+ self._hub.schedule_call_global(0, self._waiters[0].switch)
|
|
+
|
|
+class _BlockedThread(object):
|
|
+ """Is either empty, or represents a single blocked thread that
|
|
+ blocked itself by calling the block() method. The thread can be
|
|
+ awoken by calling wake(). Wake() can be called multiple times and
|
|
+ all but the first call will have no effect."""
|
|
+
|
|
+ def __init__(self):
|
|
+ self._blocked_thread = None
|
|
+ self._wakeupper = None
|
|
+ self._hub = eventlet.hubs.get_hub()
|
|
+
|
|
+ def __nonzero__(self):
|
|
+ return self._blocked_thread is not None
|
|
+
|
|
+ __bool__ = __nonzero__
|
|
+
|
|
+ def block(self, deadline=None):
|
|
+ if self._blocked_thread is not None:
|
|
+ raise Exception("Cannot block more than one thread on one BlockedThread")
|
|
+ self._blocked_thread = greenlet.getcurrent()
|
|
+
|
|
+ if deadline is not None:
|
|
+ self._hub.schedule_call_local(deadline - self._hub.clock(), self.wake)
|
|
+
|
|
+ try:
|
|
+ self._hub.switch()
|
|
+ finally:
|
|
+ self._blocked_thread = None
|
|
+ # cleanup the wakeup task
|
|
+ if self._wakeupper is not None:
|
|
+ # Important to cancel the wakeup task so it doesn't
|
|
+ # spuriously wake this greenthread later on.
|
|
+ self._wakeupper.cancel()
|
|
+ self._wakeupper = None
|
|
+
|
|
+ def wake(self):
|
|
+ """Schedules the blocked thread to be awoken and return
|
|
+ True. If wake has already been called or if there is no
|
|
+ blocked thread, then this call has no effect and returns
|
|
+ False."""
|
|
+ if self._blocked_thread is not None and self._wakeupper is None:
|
|
+ self._wakeupper = self._hub.schedule_call_global(0, self._blocked_thread.switch)
|
|
+ return True
|
|
+ return False
|
|
+
|
|
|
|
class Context(_zmq.Context):
|
|
|
|
@@ -57,47 +157,43 @@ class Socket(_zmq.Socket):
|
|
# NOTE: pyzmq 13.0.0 messed up with setattr (they turned it into a
|
|
# non-op) and you can't assign attributes normally anymore, hence the
|
|
# tricks with self.__dict__ here
|
|
- self.__dict__["_readable"] = gevent.event.Event()
|
|
- self.__dict__["_writable"] = gevent.event.Event()
|
|
- try:
|
|
- # gevent>=1.0
|
|
- self.__dict__["_state_event"] = gevent.hub.get_hub().loop.io(
|
|
- on_state_changed_fd, gevent.core.READ)
|
|
- self._state_event.start(self._on_state_changed)
|
|
- except AttributeError:
|
|
- # gevent<1.0
|
|
- self.__dict__["_state_event"] = \
|
|
- gevent.core.read_event(on_state_changed_fd,
|
|
- self._on_state_changed, persist=True)
|
|
-
|
|
- def _on_state_changed(self, event=None, _evtype=None):
|
|
- if self.closed:
|
|
- self._writable.set()
|
|
- self._readable.set()
|
|
- return
|
|
|
|
- while True:
|
|
- try:
|
|
+ self.__dict__['_eventlet_send_event'] = _BlockedThread()
|
|
+ self.__dict__['_eventlet_recv_event'] = _BlockedThread()
|
|
+ self.__dict__['_eventlet_send_lock'] = _QueueLock()
|
|
+ self.__dict__['_eventlet_recv_lock'] = _QueueLock()
|
|
+
|
|
+ def event(fd):
|
|
+ # Some events arrived at the zmq socket. This may mean
|
|
+ # there's a message that can be read or there's space for
|
|
+ # a message to be written.
|
|
+ send_wake = self._eventlet_send_event.wake()
|
|
+ recv_wake = self._eventlet_recv_event.wake()
|
|
+ if not send_wake and not recv_wake:
|
|
+ # if no waiting send or recv thread was woken up, then
|
|
+ # force the zmq socket's events to be processed to
|
|
+ # avoid repeated wakeups
|
|
events = self.getsockopt(_zmq.EVENTS)
|
|
- break
|
|
- except ZMQError as e:
|
|
- if e.errno not in (_zmq.EAGAIN, errno.EINTR):
|
|
- raise
|
|
+ if events & _zmq.POLLOUT:
|
|
+ self._eventlet_send_event.wake()
|
|
+ if events & _zmq.POLLIN:
|
|
+ self._eventlet_recv_event.wake()
|
|
|
|
- if events & _zmq.POLLOUT:
|
|
- self._writable.set()
|
|
- if events & _zmq.POLLIN:
|
|
- self._readable.set()
|
|
+ hub = eventlet.hubs.get_hub()
|
|
+ self.__dict__['_eventlet_listener'] = hub.add(hub.READ,
|
|
+ self.getsockopt(_zmq.FD),
|
|
+ event,
|
|
+ lambda _: None,
|
|
+ lambda: None)
|
|
+ self.__dict__['_eventlet_clock'] = hub.clock
|
|
|
|
- def close(self):
|
|
- if not self.closed and getattr(self, '_state_event', None):
|
|
- try:
|
|
- # gevent>=1.0
|
|
- self._state_event.stop()
|
|
- except AttributeError:
|
|
- # gevent<1.0
|
|
- self._state_event.cancel()
|
|
- super(Socket, self).close()
|
|
+ def close(self, linger=None):
|
|
+ super(Socket, self).close(linger)
|
|
+ if self._eventlet_listener is not None:
|
|
+ eventlet.hubs.get_hub().remove(self._state_event)
|
|
+ self.__dict__['_eventlet_listener'] = None
|
|
+ self._eventlet_send_event.wake()
|
|
+ self._eventlet_recv_event.wake()
|
|
|
|
def connect(self, *args, **kwargs):
|
|
while True:
|
|
@@ -109,80 +205,71 @@ class Socket(_zmq.Socket):
|
|
|
|
def send(self, data, flags=0, copy=True, track=False):
|
|
if flags & _zmq.NOBLOCK:
|
|
- return super(Socket, self).send(data, flags, copy, track)
|
|
+ result = super(Socket, self).send(data, flags, copy, track)
|
|
+ # Instead of calling both wake methods, could call
|
|
+ # self.getsockopt(EVENTS) which would trigger wakeups if
|
|
+ # needed.
|
|
+ self._eventlet_send_event.wake()
|
|
+ self._eventlet_recv_event.wake()
|
|
+ return result
|
|
+
|
|
+ # TODO: pyzmq will copy the message buffer and create Message
|
|
+ # objects under some circumstances. We could do that work here
|
|
+ # once to avoid doing it every time the send is retried.
|
|
flags |= _zmq.NOBLOCK
|
|
- while True:
|
|
- try:
|
|
- msg = super(Socket, self).send(data, flags, copy, track)
|
|
- # The following call, force polling the state of the zmq socket
|
|
- # (POLLIN and/or POLLOUT). It seems that a POLLIN event is often
|
|
- # missed when the socket is used to send at the same time,
|
|
- # forcing to poll at this exact moment seems to reduce the
|
|
- # latencies when a POLLIN event is missed. The drawback is a
|
|
- # reduced throughput (roughly 8.3%) in exchange of a normal
|
|
- # concurrency. In other hand, without the following line, you
|
|
- # loose 90% of the performances as soon as there is simultaneous
|
|
- # send and recv on the socket.
|
|
- self._on_state_changed()
|
|
- return msg
|
|
- except _zmq.ZMQError as e:
|
|
- if e.errno not in (_zmq.EAGAIN, errno.EINTR):
|
|
- raise
|
|
- self._writable.clear()
|
|
- # The following sleep(0) force gevent to switch out to another
|
|
- # coroutine and seems to refresh the notion of time that gevent may
|
|
- # have. This definitively eliminate the gevent bug that can trigger
|
|
- # a timeout too soon under heavy load. In theory it will incur more
|
|
- # CPU usage, but in practice it balance even with the extra CPU used
|
|
- # when the timeout triggers too soon in the following loop. So for
|
|
- # the same CPU load, you get a better throughput (roughly 18.75%).
|
|
- gevent.sleep(0)
|
|
- while not self._writable.wait(timeout=1):
|
|
+ with self._eventlet_send_lock:
|
|
+ while True:
|
|
try:
|
|
- if self.getsockopt(_zmq.EVENTS) & _zmq.POLLOUT:
|
|
- logger.error("/!\\ gevent_zeromq BUG /!\\ "
|
|
- "catching up after missing event (SEND) /!\\")
|
|
- break
|
|
+ return super(Socket, self).send(data, flags, copy, track)
|
|
except ZMQError as e:
|
|
- if e.errno not in (_zmq.EAGAIN, errno.EINTR):
|
|
+ if e.errno == _zmq.EAGAIN:
|
|
+ self._eventlet_send_event.block()
|
|
+ else:
|
|
raise
|
|
+ finally:
|
|
+ # The call to send processes 0mq events and may
|
|
+ # make the socket ready to recv. Wake the next
|
|
+ # receiver. (Could check EVENTS for POLLIN here)
|
|
+ self._eventlet_recv_event.wake()
|
|
|
|
def recv(self, flags=0, copy=True, track=False):
|
|
if flags & _zmq.NOBLOCK:
|
|
- return super(Socket, self).recv(flags, copy, track)
|
|
+ msg = super(Socket, self).recv(flags, copy, track)
|
|
+ # Instead of calling both wake methods, could call
|
|
+ # self.getsockopt(EVENTS) which would trigger wakeups if
|
|
+ # needed.
|
|
+ self._eventlet_send_event.wake()
|
|
+ self._eventlet_recv_event.wake()
|
|
+ return msg
|
|
+
|
|
+ deadline = None
|
|
+ if hasattr(_zmq, 'RCVTIMEO'):
|
|
+ sock_timeout = self.getsockopt(_zmq.RCVTIMEO)
|
|
+ if sock_timeout == -1:
|
|
+ pass
|
|
+ elif sock_timeout > 0:
|
|
+ deadline = self._eventlet_clock() + sock_timeout / 1000.0
|
|
+ else:
|
|
+ raise ValueError(sock_timeout)
|
|
+
|
|
flags |= _zmq.NOBLOCK
|
|
- while True:
|
|
- try:
|
|
- msg = super(Socket, self).recv(flags, copy, track)
|
|
- # The following call, force polling the state of the zmq socket
|
|
- # (POLLIN and/or POLLOUT). It seems that a POLLOUT event is
|
|
- # often missed when the socket is used to receive at the same
|
|
- # time, forcing to poll at this exact moment seems to reduce the
|
|
- # latencies when a POLLOUT event is missed. The drawback is a
|
|
- # reduced throughput (roughly 8.3%) in exchange of a normal
|
|
- # concurrency. In other hand, without the following line, you
|
|
- # loose 90% of the performances as soon as there is simultaneous
|
|
- # send and recv on the socket.
|
|
- self._on_state_changed()
|
|
- return msg
|
|
- except _zmq.ZMQError as e:
|
|
- if e.errno not in (_zmq.EAGAIN, errno.EINTR):
|
|
- raise
|
|
- self._readable.clear()
|
|
- # The following sleep(0) force gevent to switch out to another
|
|
- # coroutine and seems to refresh the notion of time that gevent may
|
|
- # have. This definitively eliminate the gevent bug that can trigger
|
|
- # a timeout too soon under heavy load. In theory it will incur more
|
|
- # CPU usage, but in practice it balance even with the extra CPU used
|
|
- # when the timeout triggers too soon in the following loop. So for
|
|
- # the same CPU load, you get a better throughput (roughly 18.75%).
|
|
- gevent.sleep(0)
|
|
- while not self._readable.wait(timeout=1):
|
|
+ with self._eventlet_recv_lock:
|
|
+ while True:
|
|
try:
|
|
- if self.getsockopt(_zmq.EVENTS) & _zmq.POLLIN:
|
|
- logger.error("/!\\ gevent_zeromq BUG /!\\ "
|
|
- "catching up after missing event (RECV) /!\\")
|
|
- break
|
|
- except ZMQError as e:
|
|
- if e.errno not in (_zmq.EAGAIN, errno.EINTR):
|
|
+ return super(Socket, self).recv(flags, copy, track)
|
|
+ except _zmq.ZMQError as e:
|
|
+ if e.errno == _zmq.EAGAIN:
|
|
+ # zmq in its wisdom decided to reuse EAGAIN for timeouts
|
|
+ if deadline is not None and self._eventlet_clock() > deadline:
|
|
+ e.is_timeout = True
|
|
+ raise
|
|
+
|
|
+ self._eventlet_recv_event.block(deadline=deadline)
|
|
+ else:
|
|
raise
|
|
+ finally:
|
|
+ # The call to recv processes 0mq events and may
|
|
+ # make the socket ready to send. Wake the next
|
|
+ # receiver. (Could check EVENTS for POLLOUT here)
|
|
+ self._eventlet_send_event.wake()
|
|
+
|
|
diff --git a/zerorpc/heartbeat.py b/zerorpc/heartbeat.py
|
|
index 23b974d..daa7d50 100644
|
|
--- a/zerorpc/heartbeat.py
|
|
+++ b/zerorpc/heartbeat.py
|
|
@@ -24,11 +24,7 @@
|
|
|
|
|
|
import time
|
|
-import gevent.pool
|
|
-import gevent.queue
|
|
-import gevent.event
|
|
-import gevent.local
|
|
-import gevent.lock
|
|
+import eventlet
|
|
|
|
from .exceptions import LostRemote, TimeoutExpired
|
|
from .channel_base import ChannelBase
|
|
@@ -40,12 +36,12 @@ class HeartBeatOnChannel(ChannelBase):
|
|
self._closed = False
|
|
self._channel = channel
|
|
self._heartbeat_freq = freq
|
|
- self._input_queue = gevent.queue.Channel()
|
|
+ self._input_queue = eventlet.queue.Queue(maxsize=0) # Channel
|
|
self._remote_last_hb = None
|
|
self._lost_remote = False
|
|
- self._recv_task = gevent.spawn(self._recver)
|
|
+ self._recv_task = eventlet.spawn(self._recver)
|
|
self._heartbeat_task = None
|
|
- self._parent_coroutine = gevent.getcurrent()
|
|
+ self._parent_coroutine = eventlet.getcurrent()
|
|
self._compat_v2 = None
|
|
if not passive:
|
|
self._start_heartbeat()
|
|
@@ -72,20 +68,20 @@ class HeartBeatOnChannel(ChannelBase):
|
|
|
|
def _heartbeat(self):
|
|
while True:
|
|
- gevent.sleep(self._heartbeat_freq)
|
|
+ eventlet.sleep(self._heartbeat_freq)
|
|
if self._remote_last_hb is None:
|
|
self._remote_last_hb = time.time()
|
|
if time.time() > self._remote_last_hb + self._heartbeat_freq * 2:
|
|
self._lost_remote = True
|
|
if not self._closed:
|
|
- gevent.kill(self._parent_coroutine,
|
|
+ eventlet.kill(self._parent_coroutine,
|
|
self._lost_remote_exception())
|
|
break
|
|
self._channel.emit(u'_zpc_hb', (0,)) # 0 -> compat with protocol v2
|
|
|
|
def _start_heartbeat(self):
|
|
if self._heartbeat_task is None and self._heartbeat_freq is not None and not self._closed:
|
|
- self._heartbeat_task = gevent.spawn(self._heartbeat)
|
|
+ self._heartbeat_task = eventlet.spawn(self._heartbeat)
|
|
|
|
def _recver(self):
|
|
while True:
|
|
@@ -120,7 +116,7 @@ class HeartBeatOnChannel(ChannelBase):
|
|
raise self._lost_remote_exception()
|
|
try:
|
|
return self._input_queue.get(timeout=timeout)
|
|
- except gevent.queue.Empty:
|
|
+ except eventlet.queue.Empty:
|
|
raise TimeoutExpired(timeout)
|
|
|
|
@property
|
|
--
|
|
2.25.1
|
|
|