Browse code

split runner into synchronous and asynchronous varieties

Joseph Weston authored on 14/02/2018 17:45:15
Showing 2 changed files
... ...
@@ -7,6 +7,6 @@ from . import runner
7 7
 
8 8
 from .learner import (Learner1D, Learner2D, AverageLearner,
9 9
                       BalancingLearner, DataSaver, IntegratorLearner)
10
-from .runner import Runner
10
+from .runner import Runner, BlockingRunner
11 11
 
12 12
 del notebook_integration  # to avoid confusion with `notebook_extension`
... ...
@@ -1,5 +1,6 @@
1 1
 # -*- coding: utf-8 -*-
2 2
 import asyncio
3
+import functools
3 4
 import inspect
4 5
 import concurrent.futures as concurrent
5 6
 import warnings
... ...
@@ -23,26 +24,24 @@ except ModuleNotFoundError:
23 24
     pass
24 25
 
25 26
 
26
-class Runner:
27
-    """Runs a learning algorithm in an executor.
27
+class BaseRunner:
28
+    """Base class for runners that use concurrent.futures.Executors.
28 29
 
29 30
     Parameters
30 31
     ----------
31
-    learner : Learner
32
+    learner : adaptive.learner.BaseLearner
33
+    goal : callable
34
+        The end condition for the calculation. This function must take
35
+        the learner as its sole argument, and return True when we should
36
+        stop requesting more points.
32 37
     executor : concurrent.futures.Executor, or ipyparallel.Client, optional
33 38
         The executor in which to evaluate the function to be learned.
34 39
         If not provided, a new ProcessPoolExecutor is used.
35
-    goal : callable, optional
36
-        The end condition for the calculation. This function must take the
37
-        learner as its sole argument, and return True if we should stop.
38 40
     ntasks : int, optional
39 41
         The number of concurrent function evaluations. Defaults to the number
40
-        of cores available in `executor`.
42
+        of cores available in 'executor'.
41 43
     log : bool, default: False
42 44
         If True, record the method calls made to the learner by this runner
43
-    ioloop : asyncio.AbstractEventLoop, optional
44
-        The ioloop in which to run the learning algorithm. If not provided,
45
-        the default event loop is used.
46 45
     shutdown_executor : Bool, default: True
47 46
         If True, shutdown the executor when the runner has completed. If
48 47
         'executor' is not provided then the executor created internally
... ...
@@ -50,8 +49,6 @@ class Runner:
50 49
 
51 50
     Attributes
52 51
     ----------
53
-    task : asyncio.Task
54
-        The underlying task. May be cancelled to stop the runner.
55 52
     learner : Learner
56 53
         The underlying learner. May be queried for its state
57 54
     log : list or None
... ...
@@ -59,39 +56,191 @@ class Runner:
59 56
         '(method_name, *args)'.
60 57
     """
61 58
 
62
-    def __init__(self, learner, executor=None, goal=None, *,
63
-                 ntasks=None, log=False, ioloop=None,
59
+    def __init__(self, learner, goal, *,
60
+                 executor=None, ntasks=None, log=False,
64 61
                  shutdown_executor=True):
65 62
 
66
-        self.ioloop = ioloop or asyncio.get_event_loop()
67
-
68
-        if in_ipynb() and not self.ioloop.is_running():
69
-            warnings.warn('Run adaptive.notebook_extension() to use '
70
-                          'the Runner in a Jupyter notebook.')
71
-
72
-        if inspect.iscoroutinefunction(learner.function):
73
-            self.executor = _DummyExecutor()
74
-        else:
75
-            self.executor = ensure_async_executor(executor, self.ioloop)
63
+        self.executor = _ensure_executor(executor)
64
+        self.goal = goal
76 65
 
77
-        self.ntasks = ntasks or self.executor.ncores
66
+        self.ntasks = ntasks or _get_ncores(self.executor)
78 67
         # if we instantiate our own executor, then we are also responsible
79 68
         # for calling 'shutdown'
80 69
         self.shutdown_executor = shutdown_executor or (executor is None)
81 70
 
82 71
         self.learner = learner
83 72
         self.log = [] if log else None
73
+        self.task = None
74
+
75
+
76
+class BlockingRunner(BaseRunner):
77
+    """Run a learner synchronously in an executor.
78
+
79
+    Parameters
80
+    ----------
81
+    learner : adaptive.learner.BaseLearner
82
+    goal : callable
83
+        The end condition for the calculation. This function must take
84
+        the learner as its sole argument, and return True when we should
85
+        stop requesting more points.
86
+    executor : concurrent.futures.Executor, or ipyparallel.Client, optional
87
+        The executor in which to evaluate the function to be learned.
88
+        If not provided, a new ProcessPoolExecutor is used.
89
+    ntasks : int, optional
90
+        The number of concurrent function evaluations. Defaults to the number
91
+        of cores available in 'executor'.
92
+    log : bool, default: False
93
+        If True, record the method calls made to the learner by this runner
94
+    shutdown_executor : Bool, default: True
95
+        If True, shutdown the executor when the runner has completed. If
96
+        'executor' is not provided then the executor created internally
97
+        by the runner is shut down, regardless of this parameter.
98
+
99
+    Attributes
100
+    ----------
101
+    learner : Learner
102
+        The underlying learner. May be queried for its state
103
+    log : list or None
104
+        Record of the method calls made to the learner, in the format
105
+        '(method_name, *args)'.
106
+    """
107
+
108
+    def __init__(self, learner, goal, *,
109
+                 executor=None, ntasks=None, log=False,
110
+                 shutdown_executor=True):
111
+        if inspect.iscoroutinefunction(learner.function):
112
+            raise ValueError("Coroutine functions can only be used "
113
+                             "with 'AsyncRunner'.")
114
+        super().__init__(learner, goal, executor=executor, ntasks=ntasks,
115
+                         log=log, shutdown_executor=shutdown_executor)
116
+        self._run()
117
+
118
+    def _submit(self, x):
119
+        return self.executor.submit(self.learner.function, x)
120
+
121
+    def _run(self):
122
+        first_completed = concurrent.FIRST_COMPLETED
123
+        xs = dict()
124
+        done = [None] * self.ntasks
125
+        do_log = self.log is not None
126
+
127
+        if len(done) == 0:
128
+            raise RuntimeError('Executor has no workers')
129
+
130
+        try:
131
+            while not self.goal(self.learner):
132
+                # Launch tasks to replace the ones that completed
133
+                # on the last iteration.
134
+                if do_log:
135
+                    self.log.append(('choose_points', len(done)))
136
+
137
+                points, _ = self.learner.choose_points(len(done))
138
+                for x in points:
139
+                    xs[self._submit(x)] = x
140
+
141
+                # Collect and results and add them to the learner
142
+                futures = list(xs.keys())
143
+                done, _ = concurrent.wait(futures,
144
+                                          return_when=first_completed)
145
+                for fut in done:
146
+                    x = xs.pop(fut)
147
+                    y = fut.result()
148
+                    if do_log:
149
+                        self.log.append(('add_point', x, y))
150
+                    self.learner.add_point(x, y)
151
+
152
+        finally:
153
+            # remove points with 'None' values from the learner
154
+            self.learner.remove_unfinished()
155
+            # cancel any outstanding tasks
156
+            remaining = list(xs.keys())
157
+            if remaining:
158
+                for fut in remaining:
159
+                    fut.cancel()
160
+                concurrent.wait(remaining)
161
+            if self.shutdown_executor:
162
+                self.executor.shutdown()
163
+
164
+
165
+class AsyncRunner(BaseRunner):
166
+    """Run a learner asynchronously in an executor using asyncio.
167
+
168
+    This runner assumes that
169
+
170
+
171
+
172
+    Parameters
173
+    ----------
174
+    learner : adaptive.learner.BaseLearner
175
+    goal : callable, optional
176
+        The end condition for the calculation. This function must take
177
+        the learner as its sole argument, and return True when we should
178
+        stop requesting more points. If not provided, the runner will run
179
+        forever, or until 'self.task.cancel()' is called.
180
+    executor : concurrent.futures.Executor, or ipyparallel.Client, optional
181
+        The executor in which to evaluate the function to be learned.
182
+        If not provided, a new ProcessPoolExecutor is used.
183
+    ntasks : int, optional
184
+        The number of concurrent function evaluations. Defaults to the number
185
+        of cores available in 'executor'.
186
+    log : bool, default: False
187
+        If True, record the method calls made to the learner by this runner
188
+    shutdown_executor : Bool, default: True
189
+        If True, shutdown the executor when the runner has completed. If
190
+        'executor' is not provided then the executor created internally
191
+        by the runner is shut down, regardless of this parameter.
192
+    ioloop : asyncio.AbstractEventLoop, optional
193
+        The ioloop in which to run the learning algorithm. If not provided,
194
+        the default event loop is used.
195
+
196
+    Attributes
197
+    ----------
198
+    task : asyncio.Task
199
+        The underlying task. May be cancelled in order to stop the runner.
200
+    learner : Learner
201
+        The underlying learner. May be queried for its state.
202
+    log : list or None
203
+        Record of the method calls made to the learner, in the format
204
+        '(method_name, *args)'.
205
+
206
+    Notes
207
+    -----
208
+    This runner can be used when an async function (defined with
209
+    'async def') has to be learned. In this case the function will be
210
+    run directly on the event loop (and not in the executor).
211
+    """
212
+
213
+    def __init__(self, learner, goal=None, *,
214
+                 executor=None, ntasks=None, log=False,
215
+                 ioloop=None, shutdown_executor=True):
84 216
 
85 217
         if goal is None:
86 218
             def goal(_):
87 219
                 return False
88
-        self.goal = goal
89 220
 
90
-        coro = self._run()
91
-        self.task = self.ioloop.create_task(coro)
221
+        super().__init__(learner, goal, executor=executor, ntasks=ntasks,
222
+                         log=log, shutdown_executor=shutdown_executor)
223
+        self.ioloop = ioloop or asyncio.get_event_loop()
224
+        self.task = None
92 225
 
93
-    def run_sync(self):
94
-        return self.ioloop.run_until_complete(self.task)
226
+        # When the learned function is 'async def', we run it
227
+        # directly on the event loop, and not in the executor.
228
+        if inspect.iscoroutinefunction(learner.function):
229
+            if executor:  # what the user provided
230
+                raise RuntimeError('Executor is unused when learning '
231
+                                   'an async function')
232
+            self.executor.shutdown()  # Make sure we don't shoot ourselves later
233
+
234
+            self._submit = lambda x: self.ioloop.create_task(learner.function(x))
235
+        else:
236
+            self._submit = functools.partial(self.ioloop.run_in_executor,
237
+                                             self.executor,
238
+                                             self.learner.function)
239
+
240
+        if in_ipynb() and not self.ioloop.is_running():
241
+            warnings.warn('Run adaptive.notebook_extension() to use '
242
+                          'the Runner in a Jupyter notebook.')
243
+        self.task = self.ioloop.create_task(self._run())
95 244
 
96 245
     async def _run(self):
97 246
         first_completed = asyncio.FIRST_COMPLETED
... ...
@@ -111,7 +260,7 @@ class Runner:
111 260
 
112 261
                 points, _ = self.learner.choose_points(len(done))
113 262
                 for x in points:
114
-                    xs[self.executor.submit(self.learner.function, x)] = x
263
+                    xs[self._submit(x)] = x
115 264
 
116 265
                 # Collect and results and add them to the learner
117 266
                 futures = list(xs.keys())
... ...
@@ -120,7 +269,7 @@ class Runner:
120 269
                                              loop=self.ioloop)
121 270
                 for fut in done:
122 271
                     x = xs.pop(fut)
123
-                    y = await fut
272
+                    y = fut.result()
124 273
                     if do_log:
125 274
                         self.log.append(('add_point', x, y))
126 275
                     self.learner.add_point(x, y)
... ...
@@ -137,6 +286,10 @@ class Runner:
137 286
                 self.executor.shutdown()
138 287
 
139 288
 
289
+# Default runner
290
+Runner = AsyncRunner
291
+
292
+
140 293
 def replay_log(learner, log):
141 294
     """Apply a sequence of method calls to a learner.
142 295
 
... ...
@@ -152,22 +305,6 @@ def replay_log(learner, log):
152 305
         getattr(learner, method)(*args)
153 306
 
154 307
 
155
-def ensure_async_executor(executor, ioloop):
156
-    if executor is None:
157
-        executor = concurrent.ProcessPoolExecutor()
158
-    elif isinstance(executor, concurrent.Executor):
159
-        pass
160
-    elif with_ipyparallel and isinstance(executor, ipyparallel.Client):
161
-        executor = executor.executor()
162
-    elif with_distributed and isinstance(executor, distributed.Client):
163
-        executor = executor.get_executor()
164
-    else:
165
-        raise TypeError('Only a concurrent.futures.Executor, distributed.Client,'
166
-                        ' or ipyparallel.Client can be used.')
167
-
168
-    return _AsyncExecutor(executor, ioloop)
169
-
170
-
171 308
 class SequentialExecutor(concurrent.Executor):
172 309
     """A trivial executor that runs functions synchronously.
173 310
 
... ...
@@ -188,53 +325,35 @@ class SequentialExecutor(concurrent.Executor):
188 325
         pass
189 326
 
190 327
 
191
-# Internal functionality
192
-
193
-class _DummyExecutor:
194
-    """Compatibility layer for running async functions."""
328
+def _ensure_executor(executor):
329
+    if executor is None:
330
+        return concurrent.ProcessPoolExecutor()
331
+    elif isinstance(executor, concurrent.Executor):
332
+        return executor
333
+    elif with_ipyparallel and isinstance(executor, ipyparallel.Client):
334
+        return executor.executor()
335
+    elif with_distributed and isinstance(executor, distributed.Client):
336
+        return executor.get_executor()
337
+    else:
338
+        raise TypeError('Only a concurrent.futures.Executor, distributed.Client,'
339
+                        ' or ipyparallel.Client can be used.')
195 340
 
196
-    def submit(self, f, *args, **kwargs):
197
-        assert inspect.iscoroutinefunction(f)
198
-        return asyncio.ensure_future(f(*args, **kwargs))
199 341
 
200
-    def shutdown(self):
201
-        pass
202
-
203
-    @property
204
-    def ncores(self):
342
+def _get_ncores(ex):
343
+    """Return the maximum  number of cores that an executor can use."""
344
+    if with_ipyparallel and isinstance(ex, ipyparallel.client.view.ViewExecutor):
345
+        return len(ex.view)
346
+    elif isinstance(ex, (concurrent.ProcessPoolExecutor,
347
+                         concurrent.ThreadPoolExecutor)):
348
+        return ex._max_workers  # not public API!
349
+    elif isinstance(ex, SequentialExecutor):
205 350
         return 1
206
-
207
-
208
-class _AsyncExecutor:
209
-    """Interface between concurrent.futures.Executor and asyncio."""
210
-
211
-    def __init__(self, executor, ioloop):
212
-        assert isinstance(executor, concurrent.Executor)
213
-        self.executor = executor
214
-        self.ioloop = ioloop
215
-
216
-    def submit(self, f, *args, **kwargs):
217
-        return self.ioloop.run_in_executor(self.executor, f, *args, **kwargs)
218
-
219
-    def shutdown(self, wait=True):
220
-        self.executor.shutdown(wait=wait)
221
-
222
-    @property
223
-    def ncores(self):
224
-        ex = self.executor
225
-        if with_ipyparallel and isinstance(ex, ipyparallel.client.view.ViewExecutor):
226
-            return len(ex.view)
227
-        elif isinstance(ex, (concurrent.ProcessPoolExecutor,
228
-                             concurrent.ThreadPoolExecutor)):
229
-            return ex._max_workers  # not public API!
230
-        elif isinstance(ex, SequentialExecutor):
231
-            return 1
232
-        elif with_distributed and isinstance(ex, distributed.cfexecutor.ClientExecutor):
233
-            # XXX: check if not sum(n for n in ex._client.ncores().values())
234
-            return len(ex._client.ncores())
235
-        else:
236
-            raise TypeError('Cannot get number of cores for {}'
237
-                            .format(ex.__class__))
351
+    elif with_distributed and isinstance(ex, distributed.cfexecutor.ClientExecutor):
352
+        # XXX: check if not sum(n for n in ex._client.ncores().values())
353
+        return len(ex._client.ncores())
354
+    else:
355
+        raise TypeError('Cannot get number of cores for {}'
356
+                        .format(ex.__class__))
238 357
 
239 358
 
240 359
 def in_ipynb():