Browse code

add tests against different executors

Joseph Weston authored on 25/10/2018 16:01:01
Showing 2 changed files
... ...
@@ -5,7 +5,8 @@ import asyncio
5 5
 import pytest
6 6
 
7 7
 from ..learner import Learner1D, Learner2D
8
-from ..runner import simple, BlockingRunner, AsyncRunner, SequentialExecutor
8
+from ..runner import (simple, BlockingRunner, AsyncRunner, SequentialExecutor,
9
+                      with_ipyparallel, with_distributed)
9 10
 
10 11
 
11 12
 def blocking_runner(learner, goal):
... ...
@@ -19,15 +20,18 @@ def async_runner(learner, goal):
19 20
 
20 21
 runners = [simple, blocking_runner, async_runner]
21 22
 
23
+
22 24
 def trivial_goal(learner):
23 25
     return learner.npoints > 10
24 26
 
27
+
25 28
 @pytest.mark.parametrize('runner', runners)
26 29
 def test_simple(runner):
27 30
     """Test that the runners actually run."""
28 31
 
29 32
     def f(x):
30 33
         return x
34
+
31 35
     learner = Learner1D(f, (-1, 1))
32 36
     runner(learner, lambda l: l.npoints > 10)
33 37
     assert len(learner.data) > 10
... ...
@@ -54,3 +58,52 @@ def test_aync_def_function():
54 58
     learner = Learner1D(f, (-1, 1))
55 59
     runner = AsyncRunner(learner, trivial_goal)
56 60
     asyncio.get_event_loop().run_until_complete(runner.task)
61
+
62
+
63
+### Test with different executors
64
+
65
+@pytest.fixture(scope="session")
66
+def ipyparallel_executor():
67
+    from ipyparallel import Client
68
+    import pexpect
69
+
70
+    child = pexpect.spawn('ipcluster start -n 1')
71
+    child.expect('Engines appear to have started successfully', timeout=35)
72
+    yield Client()
73
+    if not child.terminate(force=True):
74
+        raise RuntimeError('Could not stop ipcluster')
75
+
76
+
77
+@pytest.fixture(scope="session")
78
+def dask_executor():
79
+    from distributed import LocalCluster, Client
80
+
81
+    client = Client(n_workers=1)
82
+    yield client
83
+    client.close()
84
+
85
+
86
+def linear(x):
87
+    return x
88
+
89
+
90
+def test_concurrent_futures_executor():
91
+    from concurrent.futures import ProcessPoolExecutor
92
+    BlockingRunner(Learner1D(linear, (-1, 1)), trivial_goal,
93
+                   executor=ProcessPoolExecutor(max_workers=1))
94
+
95
+
96
+@pytest.mark.skipif(not with_ipyparallel, reason='IPyparallel is not installed')
97
+def test_ipyparallel_executor(ipyparallel_executor):
98
+    learner = Learner1D(linear, (-1, 1))
99
+    BlockingRunner(learner, trivial_goal,
100
+                   executor=ipyparallel_executor)
101
+    assert learner.npoints > 0
102
+
103
+
104
+@pytest.mark.skipif(not with_distributed, reason='dask.distributed is not installed')
105
+def test_distributed_executor(dask_executor):
106
+    learner = Learner1D(linear, (-1, 1))
107
+    BlockingRunner(learner, trivial_goal,
108
+                   executor=dask_executor)
109
+    assert learner.npoints > 0
... ...
@@ -1,3 +1,4 @@
1 1
 pytest
2 2
 pytest-randomly
3 3
 pytest-cov
4
+pexpect