Browse code

leave loky as deafult, change back in different PR

Bas Nijholt authored on 17/09/2020 23:34:38
Showing 1 changed files
... ...
@@ -46,7 +46,9 @@ with suppress(ModuleNotFoundError):
46 46
     asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
47 47
 
48 48
 
49
-_default_executor = concurrent.ProcessPoolExecutor
49
+_default_executor = (
50
+    loky.get_reusable_executor if with_loky else concurrent.ProcessPoolExecutor
51
+)
50 52
 
51 53
 
52 54
 class BaseRunner(metaclass=abc.ABCMeta):
Browse code

make concurrent.ProcessPoolExecutor the default again because loky gives issues on MacOS

Bas Nijholt authored on 17/09/2020 22:33:57
Showing 1 changed files
... ...
@@ -46,9 +46,7 @@ with suppress(ModuleNotFoundError):
46 46
     asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
47 47
 
48 48
 
49
-_default_executor = (
50
-    loky.get_reusable_executor if with_loky else concurrent.ProcessPoolExecutor
51
-)
49
+_default_executor = concurrent.ProcessPoolExecutor
52 50
 
53 51
 
54 52
 class BaseRunner(metaclass=abc.ABCMeta):
... ...
@@ -62,7 +60,8 @@ class BaseRunner(metaclass=abc.ABCMeta):
62 60
         the learner as its sole argument, and return True when we should
63 61
         stop requesting more points.
64 62
     executor : `concurrent.futures.Executor`, `distributed.Client`,\
65
-               `mpi4py.futures.MPIPoolExecutor`, or `ipyparallel.Client`, optional
63
+               `mpi4py.futures.MPIPoolExecutor`, `ipyparallel.Client` or\
64
+               `loky.get_reusable_executor`, optional
66 65
         The executor in which to evaluate the function to be learned.
67 66
         If not provided, a new `~concurrent.futures.ProcessPoolExecutor`.
68 67
     ntasks : int, optional
Browse code

no longer skip ipyparallel tests on Python 3.8

Bas Nijholt authored on 12/05/2020 15:35:04
Showing 1 changed files
... ...
@@ -5,7 +5,6 @@ import functools
5 5
 import inspect
6 6
 import itertools
7 7
 import pickle
8
-import sys
9 8
 import time
10 9
 import traceback
11 10
 import warnings
... ...
@@ -14,13 +13,9 @@ from contextlib import suppress
14 13
 from adaptive.notebook_integration import in_ipynb, live_info, live_plot
15 14
 
16 15
 try:
17
-    if sys.version_info < (3, 8):
18
-        # XXX: remove when ipyparallel 6.2.5 is released
19
-        import ipyparallel
16
+    import ipyparallel
20 17
 
21
-        with_ipyparallel = True
22
-    else:
23
-        with_ipyparallel = False
18
+    with_ipyparallel = True
24 19
 except ModuleNotFoundError:
25 20
     with_ipyparallel = False
26 21
 
Browse code

remove " "

Bas Nijholt authored on 05/05/2020 20:53:13
Showing 1 changed files
... ...
@@ -379,9 +379,7 @@ class BlockingRunner(BaseRunner):
379 379
         raise_if_retries_exceeded=True,
380 380
     ):
381 381
         if inspect.iscoroutinefunction(learner.function):
382
-            raise ValueError(
383
-                "Coroutine functions can only be used " "with 'AsyncRunner'."
384
-            )
382
+            raise ValueError("Coroutine functions can only be used with 'AsyncRunner'.")
385 383
         super().__init__(
386 384
             learner,
387 385
             goal,
... ...
@@ -549,7 +547,7 @@ class AsyncRunner(BaseRunner):
549 547
         if inspect.iscoroutinefunction(learner.function):
550 548
             if executor:  # user-provided argument
551 549
                 raise RuntimeError(
552
-                    "Cannot use an executor when learning an " "async function."
550
+                    "Cannot use an executor when learning an async function."
553 551
                 )
554 552
             self.executor.shutdown()  # Make sure we don't shoot ourselves later
555 553
 
Browse code

add a comment about using a generator expression

Bas Nijholt authored on 24/04/2020 18:05:45
Showing 1 changed files
... ...
@@ -171,6 +171,7 @@ class BaseRunner(metaclass=abc.ABCMeta):
171 171
 
172 172
     def _ask(self, n):
173 173
         pending_ids = self._pending_tasks.values()
174
+        # using generator here because we only need until `n`
174 175
         pids_gen = (pid for pid in self._to_retry.keys() if pid not in pending_ids)
175 176
         pids = list(itertools.islice(pids_gen, n))
176 177
 
Browse code

rename _pending_pids → _pending_tasks

Bas Nijholt authored on 23/04/2020 12:42:12
Showing 1 changed files
... ...
@@ -128,7 +128,7 @@ class BaseRunner(metaclass=abc.ABCMeta):
128 128
 
129 129
         self._max_tasks = ntasks
130 130
 
131
-        self._pending_pids = {}
131
+        self._pending_tasks = {}  # mapping from concurrent.futures.Future → point id
132 132
 
133 133
         # if we instantiate our own executor, then we are also responsible
134 134
         # for calling 'shutdown'
... ...
@@ -170,7 +170,7 @@ class BaseRunner(metaclass=abc.ABCMeta):
170 170
         return self.log is not None
171 171
 
172 172
     def _ask(self, n):
173
-        pending_ids = self._pending_pids.values()
173
+        pending_ids = self._pending_tasks.values()
174 174
         pids_gen = (pid for pid in self._to_retry.keys() if pid not in pending_ids)
175 175
         pids = list(itertools.islice(pids_gen, n))
176 176
 
... ...
@@ -210,7 +210,7 @@ class BaseRunner(metaclass=abc.ABCMeta):
210 210
 
211 211
     def _process_futures(self, done_futs):
212 212
         for fut in done_futs:
213
-            pid = self._pending_pids.pop(fut)
213
+            pid = self._pending_tasks.pop(fut)
214 214
             try:
215 215
                 y = fut.result()
216 216
                 t = time.time() - fut.start_time  # total execution time
... ...
@@ -234,7 +234,7 @@ class BaseRunner(metaclass=abc.ABCMeta):
234 234
         # Launch tasks to replace the ones that completed
235 235
         # on the last iteration, making sure to fill workers
236 236
         # that have started since the last iteration.
237
-        n_new_tasks = max(0, self._get_max_tasks() - len(self._pending_pids))
237
+        n_new_tasks = max(0, self._get_max_tasks() - len(self._pending_tasks))
238 238
 
239 239
         if self.do_log:
240 240
             self.log.append(("ask", n_new_tasks))
... ...
@@ -246,17 +246,17 @@ class BaseRunner(metaclass=abc.ABCMeta):
246 246
             point = self._id_to_point[pid]
247 247
             fut = self._submit(point)
248 248
             fut.start_time = start_time
249
-            self._pending_pids[fut] = pid
249
+            self._pending_tasks[fut] = pid
250 250
 
251 251
         # Collect and results and add them to the learner
252
-        futures = list(self._pending_pids.keys())
252
+        futures = list(self._pending_tasks.keys())
253 253
         return futures
254 254
 
255 255
     def _remove_unfinished(self):
256 256
         # remove points with 'None' values from the learner
257 257
         self.learner.remove_unfinished()
258 258
         # cancel any outstanding tasks
259
-        remaining = list(self._pending_pids.keys())
259
+        remaining = list(self._pending_tasks.keys())
260 260
         for fut in remaining:
261 261
             fut.cancel()
262 262
         return remaining
... ...
@@ -302,7 +302,7 @@ class BaseRunner(metaclass=abc.ABCMeta):
302 302
     @property
303 303
     def pending_points(self):
304 304
         return [
305
-            (fut, self._id_to_point[pid]) for fut, pid in self._pending_pids.items()
305
+            (fut, self._id_to_point[pid]) for fut, pid in self._pending_tasks.items()
306 306
         ]
307 307
 
308 308
 
Browse code

rename _pending_points -> _pending_pids

Bas Nijholt authored on 23/04/2020 12:07:39
Showing 1 changed files
... ...
@@ -128,7 +128,7 @@ class BaseRunner(metaclass=abc.ABCMeta):
128 128
 
129 129
         self._max_tasks = ntasks
130 130
 
131
-        self._pending_points = {}
131
+        self._pending_pids = {}
132 132
 
133 133
         # if we instantiate our own executor, then we are also responsible
134 134
         # for calling 'shutdown'
... ...
@@ -170,7 +170,7 @@ class BaseRunner(metaclass=abc.ABCMeta):
170 170
         return self.log is not None
171 171
 
172 172
     def _ask(self, n):
173
-        pending_ids = self._pending_points.values()
173
+        pending_ids = self._pending_pids.values()
174 174
         pids_gen = (pid for pid in self._to_retry.keys() if pid not in pending_ids)
175 175
         pids = list(itertools.islice(pids_gen, n))
176 176
 
... ...
@@ -210,7 +210,7 @@ class BaseRunner(metaclass=abc.ABCMeta):
210 210
 
211 211
     def _process_futures(self, done_futs):
212 212
         for fut in done_futs:
213
-            pid = self._pending_points.pop(fut)
213
+            pid = self._pending_pids.pop(fut)
214 214
             try:
215 215
                 y = fut.result()
216 216
                 t = time.time() - fut.start_time  # total execution time
... ...
@@ -234,7 +234,7 @@ class BaseRunner(metaclass=abc.ABCMeta):
234 234
         # Launch tasks to replace the ones that completed
235 235
         # on the last iteration, making sure to fill workers
236 236
         # that have started since the last iteration.
237
-        n_new_tasks = max(0, self._get_max_tasks() - len(self._pending_points))
237
+        n_new_tasks = max(0, self._get_max_tasks() - len(self._pending_pids))
238 238
 
239 239
         if self.do_log:
240 240
             self.log.append(("ask", n_new_tasks))
... ...
@@ -246,17 +246,17 @@ class BaseRunner(metaclass=abc.ABCMeta):
246 246
             point = self._id_to_point[pid]
247 247
             fut = self._submit(point)
248 248
             fut.start_time = start_time
249
-            self._pending_points[fut] = pid
249
+            self._pending_pids[fut] = pid
250 250
 
251 251
         # Collect and results and add them to the learner
252
-        futures = list(self._pending_points.keys())
252
+        futures = list(self._pending_pids.keys())
253 253
         return futures
254 254
 
255 255
     def _remove_unfinished(self):
256 256
         # remove points with 'None' values from the learner
257 257
         self.learner.remove_unfinished()
258 258
         # cancel any outstanding tasks
259
-        remaining = list(self._pending_points.keys())
259
+        remaining = list(self._pending_pids.keys())
260 260
         for fut in remaining:
261 261
             fut.cancel()
262 262
         return remaining
... ...
@@ -302,7 +302,7 @@ class BaseRunner(metaclass=abc.ABCMeta):
302 302
     @property
303 303
     def pending_points(self):
304 304
         return [
305
-            (fut, self._id_to_point[pid]) for fut, pid in self._pending_points.items()
305
+            (fut, self._id_to_point[pid]) for fut, pid in self._pending_pids.items()
306 306
         ]
307 307
 
308 308
 
Browse code

do not do the entire loop but only until "n"

Bas Nijholt authored on 22/04/2020 23:17:45
Showing 1 changed files
... ...
@@ -170,11 +170,10 @@ class BaseRunner(metaclass=abc.ABCMeta):
170 170
         return self.log is not None
171 171
 
172 172
     def _ask(self, n):
173
-        pids = [
174
-            pid
175
-            for pid in self._to_retry.keys()
176
-            if pid not in self._pending_points.values()
177
-        ][:n]
173
+        pending_ids = self._pending_points.values()
174
+        pids_gen = (pid for pid in self._to_retry.keys() if pid not in pending_ids)
175
+        pids = list(itertools.islice(pids_gen, n))
176
+
178 177
         loss_improvements = len(pids) * [float("inf")]
179 178
 
180 179
         if len(pids) < n:
Browse code

remove _key_by_value

Bas Nijholt authored on 22/04/2020 23:00:23
Showing 1 changed files
... ...
@@ -56,10 +56,6 @@ _default_executor = (
56 56
 )
57 57
 
58 58
 
59
-def _key_by_value(dct, value):
60
-    return next(k for k, v in dct.items() if v == value)
61
-
62
-
63 59
 class BaseRunner(metaclass=abc.ABCMeta):
64 60
     r"""Base class for runners that use `concurrent.futures.Executors`.
65 61
 
Browse code

introduce a property pending_points which is a list of tuples and fix docs

Bas Nijholt authored on 22/04/2020 22:57:22
Showing 1 changed files
... ...
@@ -103,8 +103,8 @@ class BaseRunner(metaclass=abc.ABCMeta):
103 103
         in ``runner.tracebacks``.
104 104
     tracebacks : list of tuples
105 105
         List of of ``(point, tb)`` for points that failed.
106
-    pending_points : dict
107
-        A mapping of `~concurrent.futures.Future`\s to points.
106
+    pending_points : list of tuples
107
+        A list of tuples with ``(concurrent.futures.Future, point)``.
108 108
 
109 109
     Methods
110 110
     -------
... ...
@@ -132,7 +132,7 @@ class BaseRunner(metaclass=abc.ABCMeta):
132 132
 
133 133
         self._max_tasks = ntasks
134 134
 
135
-        self.pending_points = {}
135
+        self._pending_points = {}
136 136
 
137 137
         # if we instantiate our own executor, then we are also responsible
138 138
         # for calling 'shutdown'
... ...
@@ -177,7 +177,7 @@ class BaseRunner(metaclass=abc.ABCMeta):
177 177
         pids = [
178 178
             pid
179 179
             for pid in self._to_retry.keys()
180
-            if pid not in self.pending_points.values()
180
+            if pid not in self._pending_points.values()
181 181
         ][:n]
182 182
         loss_improvements = len(pids) * [float("inf")]
183 183
 
... ...
@@ -215,7 +215,7 @@ class BaseRunner(metaclass=abc.ABCMeta):
215 215
 
216 216
     def _process_futures(self, done_futs):
217 217
         for fut in done_futs:
218
-            pid = self.pending_points.pop(fut)
218
+            pid = self._pending_points.pop(fut)
219 219
             try:
220 220
                 y = fut.result()
221 221
                 t = time.time() - fut.start_time  # total execution time
... ...
@@ -239,7 +239,7 @@ class BaseRunner(metaclass=abc.ABCMeta):
239 239
         # Launch tasks to replace the ones that completed
240 240
         # on the last iteration, making sure to fill workers
241 241
         # that have started since the last iteration.
242
-        n_new_tasks = max(0, self._get_max_tasks() - len(self.pending_points))
242
+        n_new_tasks = max(0, self._get_max_tasks() - len(self._pending_points))
243 243
 
244 244
         if self.do_log:
245 245
             self.log.append(("ask", n_new_tasks))
... ...
@@ -251,17 +251,17 @@ class BaseRunner(metaclass=abc.ABCMeta):
251 251
             point = self._id_to_point[pid]
252 252
             fut = self._submit(point)
253 253
             fut.start_time = start_time
254
-            self.pending_points[fut] = pid
254
+            self._pending_points[fut] = pid
255 255
 
256 256
         # Collect and results and add them to the learner
257
-        futures = list(self.pending_points.keys())
257
+        futures = list(self._pending_points.keys())
258 258
         return futures
259 259
 
260 260
     def _remove_unfinished(self):
261 261
         # remove points with 'None' values from the learner
262 262
         self.learner.remove_unfinished()
263 263
         # cancel any outstanding tasks
264
-        remaining = list(self.pending_points.keys())
264
+        remaining = list(self._pending_points.keys())
265 265
         for fut in remaining:
266 266
             fut.cancel()
267 267
         return remaining
... ...
@@ -298,11 +298,17 @@ class BaseRunner(metaclass=abc.ABCMeta):
298 298
 
299 299
     @property
300 300
     def tracebacks(self):
301
-        return [(self._id_to_point[i], tb) for i, tb in self._tracebacks.items()]
301
+        return [(self._id_to_point[pid], tb) for pid, tb in self._tracebacks.items()]
302 302
 
303 303
     @property
304 304
     def to_retry(self):
305
-        return [(self._id_to_point[i], n) for i, n in self._to_retry.items()]
305
+        return [(self._id_to_point[pid], n) for pid, n in self._to_retry.items()]
306
+
307
+    @property
308
+    def pending_points(self):
309
+        return [
310
+            (fut, self._id_to_point[pid]) for fut, pid in self._pending_points.items()
311
+        ]
306 312
 
307 313
 
308 314
 class BlockingRunner(BaseRunner):
... ...
@@ -343,14 +349,14 @@ class BlockingRunner(BaseRunner):
343 349
     log : list or None
344 350
         Record of the method calls made to the learner, in the format
345 351
         ``(method_name, *args)``.
346
-    to_retry : dict
347
-        Mapping of ``{point: n_fails, ...}``. When a point has failed
352
+    to_retry : list of tuples
353
+        List of ``(point, n_fails)``. When a point has failed
348 354
         ``runner.retries`` times it is removed but will be present
349 355
         in ``runner.tracebacks``.
350
-    tracebacks : dict
351
-        A mapping of point to the traceback if that point failed.
352
-    pending_points : dict
353
-        A mapping of `~concurrent.futures.Future`\to points.
356
+    tracebacks : list of tuples
357
+        List of of ``(point, tb)`` for points that failed.
358
+    pending_points : list of tuples
359
+        A list of tuples with ``(concurrent.futures.Future, point)``.
354 360
 
355 361
     Methods
356 362
     -------
... ...
@@ -466,14 +472,14 @@ class AsyncRunner(BaseRunner):
466 472
     log : list or None
467 473
         Record of the method calls made to the learner, in the format
468 474
         ``(method_name, *args)``.
469
-    to_retry : dict
470
-        Mapping of ``{point: n_fails, ...}``. When a point has failed
475
+    to_retry : list of tuples
476
+        List of ``(point, n_fails)``. When a point has failed
471 477
         ``runner.retries`` times it is removed but will be present
472 478
         in ``runner.tracebacks``.
473
-    tracebacks : dict
474
-        A mapping of point to the traceback if that point failed.
475
-    pending_points : dict
476
-        A mapping of `~concurrent.futures.Future`\s to points.
479
+    tracebacks : list of tuples
480
+        List of of ``(point, tb)`` for points that failed.
481
+    pending_points : list of tuples
482
+        A list of tuples with ``(concurrent.futures.Future, point)``.
477 483
 
478 484
     Methods
479 485
     -------
Browse code

make pending_points a mapping of future -> pid

Bas Nijholt authored on 22/04/2020 22:52:17
Showing 1 changed files
... ...
@@ -174,22 +174,21 @@ class BaseRunner(metaclass=abc.ABCMeta):
174 174
         return self.log is not None
175 175
 
176 176
     def _ask(self, n):
177
-        points = []
178
-        for i, pid in enumerate(self._to_retry.keys()):
179
-            if i == n:
180
-                break
181
-            point = self._id_to_point[pid]
182
-            if point not in self.pending_points.values():
183
-                points.append(point)
184
-
185
-        loss_improvements = len(points) * [float("inf")]
186
-        if len(points) < n:
187
-            new_points, new_losses = self.learner.ask(n - len(points))
188
-            points += new_points
177
+        pids = [
178
+            pid
179
+            for pid in self._to_retry.keys()
180
+            if pid not in self.pending_points.values()
181
+        ][:n]
182
+        loss_improvements = len(pids) * [float("inf")]
183
+
184
+        if len(pids) < n:
185
+            new_points, new_losses = self.learner.ask(n - len(pids))
189 186
             loss_improvements += new_losses
190
-            for p in new_points:
191
-                self._id_to_point[self._next_id()] = p
192
-        return points, loss_improvements
187
+            for point in new_points:
188
+                pid = self._next_id()
189
+                self._id_to_point[pid] = point
190
+                pids.append(pid)
191
+        return pids, loss_improvements
193 192
 
194 193
     def overhead(self):
195 194
         """Overhead of using Adaptive and the executor in percent.
... ...
@@ -216,23 +215,22 @@ class BaseRunner(metaclass=abc.ABCMeta):
216 215
 
217 216
     def _process_futures(self, done_futs):
218 217
         for fut in done_futs:
219
-            x = self.pending_points.pop(fut)
220
-            i = _key_by_value(self._id_to_point, x)  # O(N)
218
+            pid = self.pending_points.pop(fut)
221 219
             try:
222 220
                 y = fut.result()
223 221
                 t = time.time() - fut.start_time  # total execution time
224 222
             except Exception as e:
225
-                self._tracebacks[i] = traceback.format_exc()
226
-                self._to_retry[i] = self._to_retry.get(i, 0) + 1
227
-                if self._to_retry[i] > self.retries:
228
-                    self._to_retry.pop(i)
223
+                self._tracebacks[pid] = traceback.format_exc()
224
+                self._to_retry[pid] = self._to_retry.get(pid, 0) + 1
225
+                if self._to_retry[pid] > self.retries:
226
+                    self._to_retry.pop(pid)
229 227
                     if self.raise_if_retries_exceeded:
230
-                        self._do_raise(e, i)
228
+                        self._do_raise(e, pid)
231 229
             else:
232 230
                 self._elapsed_function_time += t / self._get_max_tasks()
233
-                self._to_retry.pop(i, None)
234
-                self._tracebacks.pop(i, None)
235
-                self._id_to_point.pop(i)
231
+                self._to_retry.pop(pid, None)
232
+                self._tracebacks.pop(pid, None)
233
+                x = self._id_to_point.pop(pid)
236 234
                 if self.do_log:
237 235
                     self.log.append(("tell", x, y))
238 236
                 self.learner.tell(x, y)
... ...
@@ -246,13 +244,14 @@ class BaseRunner(metaclass=abc.ABCMeta):
246 244
         if self.do_log:
247 245
             self.log.append(("ask", n_new_tasks))
248 246
 
249
-        points, _ = self._ask(n_new_tasks)
247
+        pids, _ = self._ask(n_new_tasks)
250 248
 
251
-        for x in points:
249
+        for pid in pids:
252 250
             start_time = time.time()  # so we can measure execution time
253
-            fut = self._submit(x)
251
+            point = self._id_to_point[pid]
252
+            fut = self._submit(point)
254 253
             fut.start_time = start_time
255
-            self.pending_points[fut] = x
254
+            self.pending_points[fut] = pid
256 255
 
257 256
         # Collect and results and add them to the learner
258 257
         futures = list(self.pending_points.keys())
Browse code

move pid logic to _ask

Bas Nijholt authored on 22/04/2020 20:36:56
Showing 1 changed files
... ...
@@ -175,10 +175,10 @@ class BaseRunner(metaclass=abc.ABCMeta):
175 175
 
176 176
     def _ask(self, n):
177 177
         points = []
178
-        for i, _id in enumerate(self._to_retry.keys()):
178
+        for i, pid in enumerate(self._to_retry.keys()):
179 179
             if i == n:
180 180
                 break
181
-            point = self._id_to_point[_id]
181
+            point = self._id_to_point[pid]
182 182
             if point not in self.pending_points.values():
183 183
                 points.append(point)
184 184
 
... ...
@@ -187,6 +187,8 @@ class BaseRunner(metaclass=abc.ABCMeta):
187 187
             new_points, new_losses = self.learner.ask(n - len(points))
188 188
             points += new_points
189 189
             loss_improvements += new_losses
190
+            for p in new_points:
191
+                self._id_to_point[self._next_id()] = p
190 192
         return points, loss_improvements
191 193
 
192 194
     def overhead(self):
... ...
@@ -251,11 +253,6 @@ class BaseRunner(metaclass=abc.ABCMeta):
251 253
             fut = self._submit(x)
252 254
             fut.start_time = start_time
253 255
             self.pending_points[fut] = x
254
-            try:
255
-                _id = _key_by_value(self._id_to_point, x)  # O(N)
256
-            except StopIteration:  # `x` is not a value in `self._id_to_point`
257
-                _id = self._next_id()
258
-            self._id_to_point[_id] = x
259 256
 
260 257
         # Collect and results and add them to the learner
261 258
         futures = list(self.pending_points.keys())
Browse code

use partial(next, itertools.count()) to generate unique ids

Bas Nijholt authored on 22/04/2020 20:14:51
Showing 1 changed files
... ...
@@ -1,7 +1,9 @@
1 1
 import abc
2 2
 import asyncio
3 3
 import concurrent.futures as concurrent
4
+import functools
4 5
 import inspect
6
+import itertools
5 7
 import pickle
6 8
 import sys
7 9
 import time
... ...
@@ -150,9 +152,10 @@ class BaseRunner(metaclass=abc.ABCMeta):
150 152
         self._to_retry = {}
151 153
         self._tracebacks = {}
152 154
 
153
-        # Keeping track of index -> point
154 155
         self._id_to_point = {}
155
-        self._i = 0  # some unique index to be associated with each point
156
+        self._next_id = functools.partial(
157
+            next, itertools.count()
158
+        )  # some unique id to be associated with each point
156 159
 
157 160
     def _get_max_tasks(self):
158 161
         return self._max_tasks or _get_ncores(self.executor)
... ...
@@ -172,10 +175,10 @@ class BaseRunner(metaclass=abc.ABCMeta):
172 175
 
173 176
     def _ask(self, n):
174 177
         points = []
175
-        for i, index in enumerate(self._to_retry.keys()):
178
+        for i, _id in enumerate(self._to_retry.keys()):
176 179
             if i == n:
177 180
                 break
178
-            point = self._id_to_point[index]
181
+            point = self._id_to_point[_id]
179 182
             if point not in self.pending_points.values():
180 183
                 points.append(point)
181 184
 
... ...
@@ -249,11 +252,10 @@ class BaseRunner(metaclass=abc.ABCMeta):
249 252
             fut.start_time = start_time
250 253
             self.pending_points[fut] = x
251 254
             try:
252
-                i = _key_by_value(self._id_to_point, x)  # O(N)
255
+                _id = _key_by_value(self._id_to_point, x)  # O(N)
253 256
             except StopIteration:  # `x` is not a value in `self._id_to_point`
254
-                self._i += 1
255
-                i = self._i
256
-            self._id_to_point[i] = x
257
+                _id = self._next_id()
258
+            self._id_to_point[_id] = x
257 259
 
258 260
         # Collect and results and add them to the learner
259 261
         futures = list(self.pending_points.keys())
Browse code

rename _index_to_point -> _id_to_point

Bas Nijholt authored on 22/04/2020 20:09:03
Showing 1 changed files
... ...
@@ -151,7 +151,7 @@ class BaseRunner(metaclass=abc.ABCMeta):
151 151
         self._tracebacks = {}
152 152
 
153 153
         # Keeping track of index -> point
154
-        self._index_to_point = {}
154
+        self._id_to_point = {}
155 155
         self._i = 0  # some unique index to be associated with each point
156 156
 
157 157
     def _get_max_tasks(self):
... ...
@@ -159,7 +159,7 @@ class BaseRunner(metaclass=abc.ABCMeta):
159 159
 
160 160
     def _do_raise(self, e, i):
161 161
         tb = self._tracebacks[i]
162
-        x = self._index_to_point[i]
162
+        x = self._id_to_point[i]
163 163
         raise RuntimeError(
164 164
             "An error occured while evaluating "
165 165
             f'"learner.function({x})". '
... ...
@@ -175,7 +175,7 @@ class BaseRunner(metaclass=abc.ABCMeta):
175 175
         for i, index in enumerate(self._to_retry.keys()):
176 176
             if i == n:
177 177
                 break
178
-            point = self._index_to_point[index]
178
+            point = self._id_to_point[index]
179 179
             if point not in self.pending_points.values():
180 180
                 points.append(point)
181 181
 
... ...
@@ -212,7 +212,7 @@ class BaseRunner(metaclass=abc.ABCMeta):
212 212
     def _process_futures(self, done_futs):
213 213
         for fut in done_futs:
214 214
             x = self.pending_points.pop(fut)
215
-            i = _key_by_value(self._index_to_point, x)  # O(N)
215
+            i = _key_by_value(self._id_to_point, x)  # O(N)
216 216
             try:
217 217
                 y = fut.result()
218 218
                 t = time.time() - fut.start_time  # total execution time
... ...
@@ -227,7 +227,7 @@ class BaseRunner(metaclass=abc.ABCMeta):
227 227
                 self._elapsed_function_time += t / self._get_max_tasks()
228 228
                 self._to_retry.pop(i, None)
229 229
                 self._tracebacks.pop(i, None)
230
-                self._index_to_point.pop(i)
230
+                self._id_to_point.pop(i)
231 231
                 if self.do_log:
232 232
                     self.log.append(("tell", x, y))
233 233
                 self.learner.tell(x, y)
... ...
@@ -249,11 +249,11 @@ class BaseRunner(metaclass=abc.ABCMeta):
249 249
             fut.start_time = start_time
250 250
             self.pending_points[fut] = x
251 251
             try:
252
-                i = _key_by_value(self._index_to_point, x)  # O(N)
253
-            except StopIteration:  # `x` is not a value in `self._index_to_point`
252
+                i = _key_by_value(self._id_to_point, x)  # O(N)
253
+            except StopIteration:  # `x` is not a value in `self._id_to_point`
254 254
                 self._i += 1
255 255
                 i = self._i
256
-            self._index_to_point[i] = x
256
+            self._id_to_point[i] = x
257 257
 
258 258
         # Collect and results and add them to the learner
259 259
         futures = list(self.pending_points.keys())
... ...
@@ -300,11 +300,11 @@ class BaseRunner(metaclass=abc.ABCMeta):
300 300
 
301 301
     @property
302 302
     def tracebacks(self):
303
-        return [(self._index_to_point[i], tb) for i, tb in self._tracebacks.items()]
303
+        return [(self._id_to_point[i], tb) for i, tb in self._tracebacks.items()]
304 304
 
305 305
     @property
306 306
     def to_retry(self):
307
-        return [(self._index_to_point[i], n) for i, n in self._to_retry.items()]
307
+        return [(self._id_to_point[i], n) for i, n in self._to_retry.items()]
308 308
 
309 309
 
310 310
 class BlockingRunner(BaseRunner):
Browse code

raise StopIteration because it is clearer than checking for None

Bas Nijholt authored on 17/04/2020 10:30:24
Showing 1 changed files
... ...
@@ -55,7 +55,7 @@ _default_executor = (
55 55
 
56 56
 
57 57
 def _key_by_value(dct, value):
58
-    return next((k for k, v in dct.items() if v == value), None)
58
+    return next(k for k, v in dct.items() if v == value)
59 59
 
60 60
 
61 61
 class BaseRunner(metaclass=abc.ABCMeta):
... ...
@@ -248,9 +248,9 @@ class BaseRunner(metaclass=abc.ABCMeta):
248 248
             fut = self._submit(x)
249 249
             fut.start_time = start_time
250 250
             self.pending_points[fut] = x
251
-            i = _key_by_value(self._index_to_point, x)  # O(N)
252
-            if i is None:
253
-                # `x` is not a value in `self._index_to_point`
251
+            try:
252
+                i = _key_by_value(self._index_to_point, x)  # O(N)
253
+            except StopIteration:  # `x` is not a value in `self._index_to_point`
254 254
                 self._i += 1
255 255
                 i = self._i
256 256
             self._index_to_point[i] = x
Browse code

use next in _key_by_value

Bas Nijholt authored on 17/04/2020 10:27:06
Showing 1 changed files
... ...
@@ -55,9 +55,7 @@ _default_executor = (
55 55
 
56 56
 
57 57
 def _key_by_value(dct, value):
58
-    for k, v in dct.items():
59
-        if v == value:
60
-            return k
58
+    return next((k for k, v in dct.items() if v == value), None)
61 59
 
62 60
 
63 61
 class BaseRunner(metaclass=abc.ABCMeta):
Browse code

make tracebacks and to_retry properties and lists of tuples

Bas Nijholt authored on 16/04/2020 20:08:40
Showing 1 changed files
... ...
@@ -97,12 +97,12 @@ class BaseRunner(metaclass=abc.ABCMeta):
97 97
     log : list or None
98 98
         Record of the method calls made to the learner, in the format
99 99
         ``(method_name, *args)``.
100
-    to_retry : dict
101
-        Mapping of ``{point: n_fails, ...}``. When a point has failed
100
+    to_retry : list of tuples
101
+        List of ``(point, n_fails)``. When a point has failed
102 102
         ``runner.retries`` times it is removed but will be present
103 103
         in ``runner.tracebacks``.
104
-    tracebacks : dict
105
-        A mapping of point to the traceback if that point failed.
104
+    tracebacks : list of tuples
105
+        List of of ``(point, tb)`` for points that failed.
106 106
     pending_points : dict
107 107
         A mapping of `~concurrent.futures.Future`\s to points.
108 108
 
... ...
@@ -149,19 +149,19 @@ class BaseRunner(metaclass=abc.ABCMeta):
149 149
         # Error handling attributes
150 150
         self.retries = retries
151 151
         self.raise_if_retries_exceeded = raise_if_retries_exceeded
152
-        self.to_retry = {}
153
-        self.tracebacks = {}
152
+        self._to_retry = {}
153
+        self._tracebacks = {}
154 154
 
155 155
         # Keeping track of index -> point
156
-        self.index_to_point = {}
156
+        self._index_to_point = {}
157 157
         self._i = 0  # some unique index to be associated with each point
158 158
 
159 159
     def _get_max_tasks(self):
160 160
         return self._max_tasks or _get_ncores(self.executor)
161 161
 
162 162
     def _do_raise(self, e, i):
163
-        tb = self.tracebacks[i]
164
-        x = self.index_to_point[i]
163
+        tb = self._tracebacks[i]
164
+        x = self._index_to_point[i]
165 165
         raise RuntimeError(
166 166
             "An error occured while evaluating "
167 167
             f'"learner.function({x})". '
... ...
@@ -174,10 +174,10 @@ class BaseRunner(metaclass=abc.ABCMeta):
174 174
 
175 175
     def _ask(self, n):
176 176
         points = []
177
-        for i, index in enumerate(self.to_retry.keys()):
177
+        for i, index in enumerate(self._to_retry.keys()):
178 178
             if i == n:
179 179
                 break
180
-            point = self.index_to_point[index]
180
+            point = self._index_to_point[index]
181 181
             if point not in self.pending_points.values():
182 182
                 points.append(point)
183 183
 
... ...
@@ -214,22 +214,22 @@ class BaseRunner(metaclass=abc.ABCMeta):
214 214
     def _process_futures(self, done_futs):
215 215
         for fut in done_futs:
216 216
             x = self.pending_points.pop(fut)
217
-            i = _key_by_value(self.index_to_point, x)  # O(N)
217
+            i = _key_by_value(self._index_to_point, x)  # O(N)
218 218
             try:
219 219
                 y = fut.result()
220 220
                 t = time.time() - fut.start_time  # total execution time
221 221
             except Exception as e:
222
-                self.tracebacks[i] = traceback.format_exc()
223
-                self.to_retry[i] = self.to_retry.get(i, 0) + 1
224
-                if self.to_retry[i] > self.retries:
225
-                    self.to_retry.pop(i)
222
+                self._tracebacks[i] = traceback.format_exc()
223
+                self._to_retry[i] = self._to_retry.get(i, 0) + 1
224
+                if self._to_retry[i] > self.retries:
225
+                    self._to_retry.pop(i)
226 226
                     if self.raise_if_retries_exceeded:
227 227
                         self._do_raise(e, i)
228 228
             else:
229 229
                 self._elapsed_function_time += t / self._get_max_tasks()
230
-                self.to_retry.pop(i, None)
231
-                self.tracebacks.pop(i, None)
232
-                self.index_to_point.pop(i)
230
+                self._to_retry.pop(i, None)
231
+                self._tracebacks.pop(i, None)
232
+                self._index_to_point.pop(i)
233 233
                 if self.do_log:
234 234
                     self.log.append(("tell", x, y))
235 235
                 self.learner.tell(x, y)
... ...
@@ -250,12 +250,12 @@ class BaseRunner(metaclass=abc.ABCMeta):
250 250
             fut = self._submit(x)
251 251
             fut.start_time = start_time
252 252
             self.pending_points[fut] = x
253
-            i = _key_by_value(self.index_to_point, x)  # O(N)
253
+            i = _key_by_value(self._index_to_point, x)  # O(N)
254 254
             if i is None:
255
-                # `x` is not a value in `self.index_to_point`
255
+                # `x` is not a value in `self._index_to_point`
256 256
                 self._i += 1
257 257
                 i = self._i
258
-            self.index_to_point[i] = x
258
+            self._index_to_point[i] = x
259 259
 
260 260
         # Collect and results and add them to the learner
261 261
         futures = list(self.pending_points.keys())
... ...
@@ -284,7 +284,7 @@ class BaseRunner(metaclass=abc.ABCMeta):
284 284
     @property
285 285
     def failed(self):
286 286
         """Set of points that failed ``runner.retries`` times."""
287
-        return set(self.tracebacks) - set(self.to_retry)
287
+        return set(self._tracebacks) - set(self._to_retry)
288 288
 
289 289
     @abc.abstractmethod
290 290
     def elapsed_time(self):
... ...
@@ -300,6 +300,14 @@ class BaseRunner(metaclass=abc.ABCMeta):
300 300
         """Is called in `_get_futures`."""
301 301
         pass
302 302
 
303
+    @property
304
+    def tracebacks(self):
305
+        return [(self._index_to_point[i], tb) for i, tb in self._tracebacks.items()]
306
+
307
+    @property
308
+    def to_retry(self):
309
+        return [(self._index_to_point[i], n) for i, n in self._to_retry.items()]
310
+
303 311
 
304 312
 class BlockingRunner(BaseRunner):
305 313
     """Run a learner synchronously in an executor.
Browse code

make the Runner work with unhashable points

Bas Nijholt authored on 16/04/2020 18:38:03
Showing 1 changed files
... ...
@@ -54,6 +54,12 @@ _default_executor = (
54 54
 )
55 55
 
56 56
 
57
+def _key_by_value(dct, value):
58
+    for k, v in dct.items():
59
+        if v == value:
60
+            return k
61
+
62
+
57 63
 class BaseRunner(metaclass=abc.ABCMeta):
58 64
     r"""Base class for runners that use `concurrent.futures.Executors`.
59 65
 
... ...
@@ -146,11 +152,16 @@ class BaseRunner(metaclass=abc.ABCMeta):
146 152
         self.to_retry = {}
147 153
         self.tracebacks = {}
148 154
 
155
+        # Keeping track of index -> point
156
+        self.index_to_point = {}
157
+        self._i = 0  # some unique index to be associated with each point
158
+
149 159
     def _get_max_tasks(self):
150 160
         return self._max_tasks or _get_ncores(self.executor)
151 161
 
152
-    def _do_raise(self, e, x):
153
-        tb = self.tracebacks[x]
162
+    def _do_raise(self, e, i):
163
+        tb = self.tracebacks[i]
164
+        x = self.index_to_point[i]
154 165
         raise RuntimeError(
155 166
             "An error occured while evaluating "
156 167
             f'"learner.function({x})". '
... ...
@@ -162,9 +173,14 @@ class BaseRunner(metaclass=abc.ABCMeta):
162 173
         return self.log is not None
163 174
 
164 175
     def _ask(self, n):
165
-        points = [
166
-            p for p in self.to_retry.keys() if p not in self.pending_points.values()
167
-        ][:n]
176
+        points = []
177
+        for i, index in enumerate(self.to_retry.keys()):
178
+            if i == n:
179
+                break
180
+            point = self.index_to_point[index]
181
+            if point not in self.pending_points.values():
182
+                points.append(point)
183
+
168 184
         loss_improvements = len(points) * [float("inf")]
169 185
         if len(points) < n:
170 186
             new_points, new_losses = self.learner.ask(n - len(points))
... ...
@@ -198,20 +214,22 @@ class BaseRunner(metaclass=abc.ABCMeta):
198 214
     def _process_futures(self, done_futs):
199 215
         for fut in done_futs:
200 216
             x = self.pending_points.pop(fut)
217
+            i = _key_by_value(self.index_to_point, x)  # O(N)
201 218
             try:
202 219
                 y = fut.result()
203 220
                 t = time.time() - fut.start_time  # total execution time
204 221
             except Exception as e:
205
-                self.tracebacks[x] = traceback.format_exc()
206
-                self.to_retry[x] = self.to_retry.get(x, 0) + 1
207
-                if self.to_retry[x] > self.retries:
208
-                    self.to_retry.pop(x)
222
+                self.tracebacks[i] = traceback.format_exc()
223
+                self.to_retry[i] = self.to_retry.get(i, 0) + 1
224
+                if self.to_retry[i] > self.retries:
225
+                    self.to_retry.pop(i)
209 226
                     if self.raise_if_retries_exceeded:
210
-                        self._do_raise(e, x)
227
+                        self._do_raise(e, i)
211 228
             else:
212 229
                 self._elapsed_function_time += t / self._get_max_tasks()
213
-                self.to_retry.pop(x, None)
214
-                self.tracebacks.pop(x, None)
230
+                self.to_retry.pop(i, None)
231
+                self.tracebacks.pop(i, None)
232
+                self.index_to_point.pop(i)
215 233
                 if self.do_log:
216 234
                     self.log.append(("tell", x, y))
217 235
                 self.learner.tell(x, y)
... ...
@@ -232,6 +250,12 @@ class BaseRunner(metaclass=abc.ABCMeta):
232 250
             fut = self._submit(x)
233 251
             fut.start_time = start_time
234 252
             self.pending_points[fut] = x
253
+            i = _key_by_value(self.index_to_point, x)  # O(N)
254
+            if i is None:
255
+                # `x` is not a value in `self.index_to_point`
256
+                self._i += 1
257
+                i = self._i
258
+            self.index_to_point[i] = x
235 259
 
236 260
         # Collect and results and add them to the learner
237 261
         futures = list(self.pending_points.keys())
Browse code

fix test_default_executor

Bas Nijholt authored on 10/04/2020 16:09:07
Showing 1 changed files
... ...
@@ -49,6 +49,11 @@ with suppress(ModuleNotFoundError):
49 49
     asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
50 50
 
51 51
 
52
+_default_executor = (
53
+    loky.get_reusable_executor if with_loky else concurrent.ProcessPoolExecutor
54
+)
55
+
56
+
52 57
 class BaseRunner(metaclass=abc.ABCMeta):
53 58
     r"""Base class for runners that use `concurrent.futures.Executors`.
54 59
 
... ...
@@ -478,7 +483,11 @@ class AsyncRunner(BaseRunner):
478 483
             def goal(_):
479 484
                 return False
480 485
 
481
-        if executor is None and not inspect.iscoroutinefunction(learner.function):
486
+        if (
487
+            executor is None
488
+            and _default_executor is concurrent.ProcessPoolExecutor
489
+            and not inspect.iscoroutinefunction(learner.function)
490
+        ):
482 491
             try:
483 492
                 pickle.dumps(learner.function)
484 493
             except pickle.PicklingError:
... ...
@@ -756,13 +765,6 @@ class SequentialExecutor(concurrent.Executor):
756 765
         pass
757 766
 
758 767
 
759
-def _default_executor():