... | ... |
@@ -46,7 +46,9 @@ with suppress(ModuleNotFoundError): |
46 | 46 |
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) |
47 | 47 |
|
48 | 48 |
|
49 |
-_default_executor = concurrent.ProcessPoolExecutor |
|
49 |
+_default_executor = ( |
|
50 |
+ loky.get_reusable_executor if with_loky else concurrent.ProcessPoolExecutor |
|
51 |
+) |
|
50 | 52 |
|
51 | 53 |
|
52 | 54 |
class BaseRunner(metaclass=abc.ABCMeta): |
... | ... |
@@ -46,9 +46,7 @@ with suppress(ModuleNotFoundError): |
46 | 46 |
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) |
47 | 47 |
|
48 | 48 |
|
49 |
-_default_executor = ( |
|
50 |
- loky.get_reusable_executor if with_loky else concurrent.ProcessPoolExecutor |
|
51 |
-) |
|
49 |
+_default_executor = concurrent.ProcessPoolExecutor |
|
52 | 50 |
|
53 | 51 |
|
54 | 52 |
class BaseRunner(metaclass=abc.ABCMeta): |
... | ... |
@@ -62,7 +60,8 @@ class BaseRunner(metaclass=abc.ABCMeta): |
62 | 60 |
the learner as its sole argument, and return True when we should |
63 | 61 |
stop requesting more points. |
64 | 62 |
executor : `concurrent.futures.Executor`, `distributed.Client`,\ |
65 |
- `mpi4py.futures.MPIPoolExecutor`, or `ipyparallel.Client`, optional |
|
63 |
+ `mpi4py.futures.MPIPoolExecutor`, `ipyparallel.Client` or\ |
|
64 |
+ `loky.get_reusable_executor`, optional |
|
66 | 65 |
The executor in which to evaluate the function to be learned. |
67 | 66 |
If not provided, a new `~concurrent.futures.ProcessPoolExecutor`. |
68 | 67 |
ntasks : int, optional |
... | ... |
@@ -5,7 +5,6 @@ import functools |
5 | 5 |
import inspect |
6 | 6 |
import itertools |
7 | 7 |
import pickle |
8 |
-import sys |
|
9 | 8 |
import time |
10 | 9 |
import traceback |
11 | 10 |
import warnings |
... | ... |
@@ -14,13 +13,9 @@ from contextlib import suppress |
14 | 13 |
from adaptive.notebook_integration import in_ipynb, live_info, live_plot |
15 | 14 |
|
16 | 15 |
try: |
17 |
- if sys.version_info < (3, 8): |
|
18 |
- # XXX: remove when ipyparallel 6.2.5 is released |
|
19 |
- import ipyparallel |
|
16 |
+ import ipyparallel |
|
20 | 17 |
|
21 |
- with_ipyparallel = True |
|
22 |
- else: |
|
23 |
- with_ipyparallel = False |
|
18 |
+ with_ipyparallel = True |
|
24 | 19 |
except ModuleNotFoundError: |
25 | 20 |
with_ipyparallel = False |
26 | 21 |
|
... | ... |
@@ -379,9 +379,7 @@ class BlockingRunner(BaseRunner): |
379 | 379 |
raise_if_retries_exceeded=True, |
380 | 380 |
): |
381 | 381 |
if inspect.iscoroutinefunction(learner.function): |
382 |
- raise ValueError( |
|
383 |
- "Coroutine functions can only be used " "with 'AsyncRunner'." |
|
384 |
- ) |
|
382 |
+ raise ValueError("Coroutine functions can only be used with 'AsyncRunner'.") |
|
385 | 383 |
super().__init__( |
386 | 384 |
learner, |
387 | 385 |
goal, |
... | ... |
@@ -549,7 +547,7 @@ class AsyncRunner(BaseRunner): |
549 | 547 |
if inspect.iscoroutinefunction(learner.function): |
550 | 548 |
if executor: # user-provided argument |
551 | 549 |
raise RuntimeError( |
552 |
- "Cannot use an executor when learning an " "async function." |
|
550 |
+ "Cannot use an executor when learning an async function." |
|
553 | 551 |
) |
554 | 552 |
self.executor.shutdown() # Make sure we don't shoot ourselves later |
555 | 553 |
|
... | ... |
@@ -171,6 +171,7 @@ class BaseRunner(metaclass=abc.ABCMeta): |
171 | 171 |
|
172 | 172 |
def _ask(self, n): |
173 | 173 |
pending_ids = self._pending_tasks.values() |
174 |
+ # using generator here because we only need until `n` |
|
174 | 175 |
pids_gen = (pid for pid in self._to_retry.keys() if pid not in pending_ids) |
175 | 176 |
pids = list(itertools.islice(pids_gen, n)) |
176 | 177 |
|
... | ... |
@@ -128,7 +128,7 @@ class BaseRunner(metaclass=abc.ABCMeta): |
128 | 128 |
|
129 | 129 |
self._max_tasks = ntasks |
130 | 130 |
|
131 |
- self._pending_pids = {} |
|
131 |
+ self._pending_tasks = {} # mapping from concurrent.futures.Future → point id |
|
132 | 132 |
|
133 | 133 |
# if we instantiate our own executor, then we are also responsible |
134 | 134 |
# for calling 'shutdown' |
... | ... |
@@ -170,7 +170,7 @@ class BaseRunner(metaclass=abc.ABCMeta): |
170 | 170 |
return self.log is not None |
171 | 171 |
|
172 | 172 |
def _ask(self, n): |
173 |
- pending_ids = self._pending_pids.values() |
|
173 |
+ pending_ids = self._pending_tasks.values() |
|
174 | 174 |
pids_gen = (pid for pid in self._to_retry.keys() if pid not in pending_ids) |
175 | 175 |
pids = list(itertools.islice(pids_gen, n)) |
176 | 176 |
|
... | ... |
@@ -210,7 +210,7 @@ class BaseRunner(metaclass=abc.ABCMeta): |
210 | 210 |
|
211 | 211 |
def _process_futures(self, done_futs): |
212 | 212 |
for fut in done_futs: |
213 |
- pid = self._pending_pids.pop(fut) |
|
213 |
+ pid = self._pending_tasks.pop(fut) |
|
214 | 214 |
try: |
215 | 215 |
y = fut.result() |
216 | 216 |
t = time.time() - fut.start_time # total execution time |
... | ... |
@@ -234,7 +234,7 @@ class BaseRunner(metaclass=abc.ABCMeta): |
234 | 234 |
# Launch tasks to replace the ones that completed |
235 | 235 |
# on the last iteration, making sure to fill workers |
236 | 236 |
# that have started since the last iteration. |
237 |
- n_new_tasks = max(0, self._get_max_tasks() - len(self._pending_pids)) |
|
237 |
+ n_new_tasks = max(0, self._get_max_tasks() - len(self._pending_tasks)) |
|
238 | 238 |
|
239 | 239 |
if self.do_log: |
240 | 240 |
self.log.append(("ask", n_new_tasks)) |
... | ... |
@@ -246,17 +246,17 @@ class BaseRunner(metaclass=abc.ABCMeta): |
246 | 246 |
point = self._id_to_point[pid] |
247 | 247 |
fut = self._submit(point) |
248 | 248 |
fut.start_time = start_time |
249 |
- self._pending_pids[fut] = pid |
|
249 |
+ self._pending_tasks[fut] = pid |
|
250 | 250 |
|
251 | 251 |
# Collect and results and add them to the learner |
252 |
- futures = list(self._pending_pids.keys()) |
|
252 |
+ futures = list(self._pending_tasks.keys()) |
|
253 | 253 |
return futures |
254 | 254 |
|
255 | 255 |
def _remove_unfinished(self): |
256 | 256 |
# remove points with 'None' values from the learner |
257 | 257 |
self.learner.remove_unfinished() |
258 | 258 |
# cancel any outstanding tasks |
259 |
- remaining = list(self._pending_pids.keys()) |
|
259 |
+ remaining = list(self._pending_tasks.keys()) |
|
260 | 260 |
for fut in remaining: |
261 | 261 |
fut.cancel() |
262 | 262 |
return remaining |
... | ... |
@@ -302,7 +302,7 @@ class BaseRunner(metaclass=abc.ABCMeta): |
302 | 302 |
@property |
303 | 303 |
def pending_points(self): |
304 | 304 |
return [ |
305 |
- (fut, self._id_to_point[pid]) for fut, pid in self._pending_pids.items() |
|
305 |
+ (fut, self._id_to_point[pid]) for fut, pid in self._pending_tasks.items() |
|
306 | 306 |
] |
307 | 307 |
|
308 | 308 |
|
... | ... |
@@ -128,7 +128,7 @@ class BaseRunner(metaclass=abc.ABCMeta): |
128 | 128 |
|
129 | 129 |
self._max_tasks = ntasks |
130 | 130 |
|
131 |
- self._pending_points = {} |
|
131 |
+ self._pending_pids = {} |
|
132 | 132 |
|
133 | 133 |
# if we instantiate our own executor, then we are also responsible |
134 | 134 |
# for calling 'shutdown' |
... | ... |
@@ -170,7 +170,7 @@ class BaseRunner(metaclass=abc.ABCMeta): |
170 | 170 |
return self.log is not None |
171 | 171 |
|
172 | 172 |
def _ask(self, n): |
173 |
- pending_ids = self._pending_points.values() |
|
173 |
+ pending_ids = self._pending_pids.values() |
|
174 | 174 |
pids_gen = (pid for pid in self._to_retry.keys() if pid not in pending_ids) |
175 | 175 |
pids = list(itertools.islice(pids_gen, n)) |
176 | 176 |
|
... | ... |
@@ -210,7 +210,7 @@ class BaseRunner(metaclass=abc.ABCMeta): |
210 | 210 |
|
211 | 211 |
def _process_futures(self, done_futs): |
212 | 212 |
for fut in done_futs: |
213 |
- pid = self._pending_points.pop(fut) |
|
213 |
+ pid = self._pending_pids.pop(fut) |
|
214 | 214 |
try: |
215 | 215 |
y = fut.result() |
216 | 216 |
t = time.time() - fut.start_time # total execution time |
... | ... |
@@ -234,7 +234,7 @@ class BaseRunner(metaclass=abc.ABCMeta): |
234 | 234 |
# Launch tasks to replace the ones that completed |
235 | 235 |
# on the last iteration, making sure to fill workers |
236 | 236 |
# that have started since the last iteration. |
237 |
- n_new_tasks = max(0, self._get_max_tasks() - len(self._pending_points)) |
|
237 |
+ n_new_tasks = max(0, self._get_max_tasks() - len(self._pending_pids)) |
|
238 | 238 |
|
239 | 239 |
if self.do_log: |
240 | 240 |
self.log.append(("ask", n_new_tasks)) |
... | ... |
@@ -246,17 +246,17 @@ class BaseRunner(metaclass=abc.ABCMeta): |
246 | 246 |
point = self._id_to_point[pid] |
247 | 247 |
fut = self._submit(point) |
248 | 248 |
fut.start_time = start_time |
249 |
- self._pending_points[fut] = pid |
|
249 |
+ self._pending_pids[fut] = pid |
|
250 | 250 |
|
251 | 251 |
# Collect and results and add them to the learner |
252 |
- futures = list(self._pending_points.keys()) |
|
252 |
+ futures = list(self._pending_pids.keys()) |
|
253 | 253 |
return futures |
254 | 254 |
|
255 | 255 |
def _remove_unfinished(self): |
256 | 256 |
# remove points with 'None' values from the learner |
257 | 257 |
self.learner.remove_unfinished() |
258 | 258 |
# cancel any outstanding tasks |
259 |
- remaining = list(self._pending_points.keys()) |
|
259 |
+ remaining = list(self._pending_pids.keys()) |
|
260 | 260 |
for fut in remaining: |
261 | 261 |
fut.cancel() |
262 | 262 |
return remaining |
... | ... |
@@ -302,7 +302,7 @@ class BaseRunner(metaclass=abc.ABCMeta): |
302 | 302 |
@property |
303 | 303 |
def pending_points(self): |
304 | 304 |
return [ |
305 |
- (fut, self._id_to_point[pid]) for fut, pid in self._pending_points.items() |
|
305 |
+ (fut, self._id_to_point[pid]) for fut, pid in self._pending_pids.items() |
|
306 | 306 |
] |
307 | 307 |
|
308 | 308 |
|
... | ... |
@@ -170,11 +170,10 @@ class BaseRunner(metaclass=abc.ABCMeta): |
170 | 170 |
return self.log is not None |
171 | 171 |
|
172 | 172 |
def _ask(self, n): |
173 |
- pids = [ |
|
174 |
- pid |
|
175 |
- for pid in self._to_retry.keys() |
|
176 |
- if pid not in self._pending_points.values() |
|
177 |
- ][:n] |
|
173 |
+ pending_ids = self._pending_points.values() |
|
174 |
+ pids_gen = (pid for pid in self._to_retry.keys() if pid not in pending_ids) |
|
175 |
+ pids = list(itertools.islice(pids_gen, n)) |
|
176 |
+ |
|
178 | 177 |
loss_improvements = len(pids) * [float("inf")] |
179 | 178 |
|
180 | 179 |
if len(pids) < n: |
... | ... |
@@ -103,8 +103,8 @@ class BaseRunner(metaclass=abc.ABCMeta): |
103 | 103 |
in ``runner.tracebacks``. |
104 | 104 |
tracebacks : list of tuples |
105 | 105 |
List of of ``(point, tb)`` for points that failed. |
106 |
- pending_points : dict |
|
107 |
- A mapping of `~concurrent.futures.Future`\s to points. |
|
106 |
+ pending_points : list of tuples |
|
107 |
+ A list of tuples with ``(concurrent.futures.Future, point)``. |
|
108 | 108 |
|
109 | 109 |
Methods |
110 | 110 |
------- |
... | ... |
@@ -132,7 +132,7 @@ class BaseRunner(metaclass=abc.ABCMeta): |
132 | 132 |
|
133 | 133 |
self._max_tasks = ntasks |
134 | 134 |
|
135 |
- self.pending_points = {} |
|
135 |
+ self._pending_points = {} |
|
136 | 136 |
|
137 | 137 |
# if we instantiate our own executor, then we are also responsible |
138 | 138 |
# for calling 'shutdown' |
... | ... |
@@ -177,7 +177,7 @@ class BaseRunner(metaclass=abc.ABCMeta): |
177 | 177 |
pids = [ |
178 | 178 |
pid |
179 | 179 |
for pid in self._to_retry.keys() |
180 |
- if pid not in self.pending_points.values() |
|
180 |
+ if pid not in self._pending_points.values() |
|
181 | 181 |
][:n] |
182 | 182 |
loss_improvements = len(pids) * [float("inf")] |
183 | 183 |
|
... | ... |
@@ -215,7 +215,7 @@ class BaseRunner(metaclass=abc.ABCMeta): |
215 | 215 |
|
216 | 216 |
def _process_futures(self, done_futs): |
217 | 217 |
for fut in done_futs: |
218 |
- pid = self.pending_points.pop(fut) |
|
218 |
+ pid = self._pending_points.pop(fut) |
|
219 | 219 |
try: |
220 | 220 |
y = fut.result() |
221 | 221 |
t = time.time() - fut.start_time # total execution time |
... | ... |
@@ -239,7 +239,7 @@ class BaseRunner(metaclass=abc.ABCMeta): |
239 | 239 |
# Launch tasks to replace the ones that completed |
240 | 240 |
# on the last iteration, making sure to fill workers |
241 | 241 |
# that have started since the last iteration. |
242 |
- n_new_tasks = max(0, self._get_max_tasks() - len(self.pending_points)) |
|
242 |
+ n_new_tasks = max(0, self._get_max_tasks() - len(self._pending_points)) |
|
243 | 243 |
|
244 | 244 |
if self.do_log: |
245 | 245 |
self.log.append(("ask", n_new_tasks)) |
... | ... |
@@ -251,17 +251,17 @@ class BaseRunner(metaclass=abc.ABCMeta): |
251 | 251 |
point = self._id_to_point[pid] |
252 | 252 |
fut = self._submit(point) |
253 | 253 |
fut.start_time = start_time |
254 |
- self.pending_points[fut] = pid |
|
254 |
+ self._pending_points[fut] = pid |
|
255 | 255 |
|
256 | 256 |
# Collect and results and add them to the learner |
257 |
- futures = list(self.pending_points.keys()) |
|
257 |
+ futures = list(self._pending_points.keys()) |
|
258 | 258 |
return futures |
259 | 259 |
|
260 | 260 |
def _remove_unfinished(self): |
261 | 261 |
# remove points with 'None' values from the learner |
262 | 262 |
self.learner.remove_unfinished() |
263 | 263 |
# cancel any outstanding tasks |
264 |
- remaining = list(self.pending_points.keys()) |
|
264 |
+ remaining = list(self._pending_points.keys()) |
|
265 | 265 |
for fut in remaining: |
266 | 266 |
fut.cancel() |
267 | 267 |
return remaining |
... | ... |
@@ -298,11 +298,17 @@ class BaseRunner(metaclass=abc.ABCMeta): |
298 | 298 |
|
299 | 299 |
@property |
300 | 300 |
def tracebacks(self): |
301 |
- return [(self._id_to_point[i], tb) for i, tb in self._tracebacks.items()] |
|
301 |
+ return [(self._id_to_point[pid], tb) for pid, tb in self._tracebacks.items()] |
|
302 | 302 |
|
303 | 303 |
@property |
304 | 304 |
def to_retry(self): |
305 |
- return [(self._id_to_point[i], n) for i, n in self._to_retry.items()] |
|
305 |
+ return [(self._id_to_point[pid], n) for pid, n in self._to_retry.items()] |
|
306 |
+ |
|
307 |
+ @property |
|
308 |
+ def pending_points(self): |
|
309 |
+ return [ |
|
310 |
+ (fut, self._id_to_point[pid]) for fut, pid in self._pending_points.items() |
|
311 |
+ ] |
|
306 | 312 |
|
307 | 313 |
|
308 | 314 |
class BlockingRunner(BaseRunner): |
... | ... |
@@ -343,14 +349,14 @@ class BlockingRunner(BaseRunner): |
343 | 349 |
log : list or None |
344 | 350 |
Record of the method calls made to the learner, in the format |
345 | 351 |
``(method_name, *args)``. |
346 |
- to_retry : dict |
|
347 |
- Mapping of ``{point: n_fails, ...}``. When a point has failed |
|
352 |
+ to_retry : list of tuples |
|
353 |
+ List of ``(point, n_fails)``. When a point has failed |
|
348 | 354 |
``runner.retries`` times it is removed but will be present |
349 | 355 |
in ``runner.tracebacks``. |
350 |
- tracebacks : dict |
|
351 |
- A mapping of point to the traceback if that point failed. |
|
352 |
- pending_points : dict |
|
353 |
- A mapping of `~concurrent.futures.Future`\to points. |
|
356 |
+ tracebacks : list of tuples |
|
357 |
+ List of of ``(point, tb)`` for points that failed. |
|
358 |
+ pending_points : list of tuples |
|
359 |
+ A list of tuples with ``(concurrent.futures.Future, point)``. |
|
354 | 360 |
|
355 | 361 |
Methods |
356 | 362 |
------- |
... | ... |
@@ -466,14 +472,14 @@ class AsyncRunner(BaseRunner): |
466 | 472 |
log : list or None |
467 | 473 |
Record of the method calls made to the learner, in the format |
468 | 474 |
``(method_name, *args)``. |
469 |
- to_retry : dict |
|
470 |
- Mapping of ``{point: n_fails, ...}``. When a point has failed |
|
475 |
+ to_retry : list of tuples |
|
476 |
+ List of ``(point, n_fails)``. When a point has failed |
|
471 | 477 |
``runner.retries`` times it is removed but will be present |
472 | 478 |
in ``runner.tracebacks``. |
473 |
- tracebacks : dict |
|
474 |
- A mapping of point to the traceback if that point failed. |
|
475 |
- pending_points : dict |
|
476 |
- A mapping of `~concurrent.futures.Future`\s to points. |
|
479 |
+ tracebacks : list of tuples |
|
480 |
+ List of of ``(point, tb)`` for points that failed. |
|
481 |
+ pending_points : list of tuples |
|
482 |
+ A list of tuples with ``(concurrent.futures.Future, point)``. |
|
477 | 483 |
|
478 | 484 |
Methods |
479 | 485 |
------- |
... | ... |
@@ -174,22 +174,21 @@ class BaseRunner(metaclass=abc.ABCMeta): |
174 | 174 |
return self.log is not None |
175 | 175 |
|
176 | 176 |
def _ask(self, n): |
177 |
- points = [] |
|
178 |
- for i, pid in enumerate(self._to_retry.keys()): |
|
179 |
- if i == n: |
|
180 |
- break |
|
181 |
- point = self._id_to_point[pid] |
|
182 |
- if point not in self.pending_points.values(): |
|
183 |
- points.append(point) |
|
184 |
- |
|
185 |
- loss_improvements = len(points) * [float("inf")] |
|
186 |
- if len(points) < n: |
|
187 |
- new_points, new_losses = self.learner.ask(n - len(points)) |
|
188 |
- points += new_points |
|
177 |
+ pids = [ |
|
178 |
+ pid |
|
179 |
+ for pid in self._to_retry.keys() |
|
180 |
+ if pid not in self.pending_points.values() |
|
181 |
+ ][:n] |
|
182 |
+ loss_improvements = len(pids) * [float("inf")] |
|
183 |
+ |
|
184 |
+ if len(pids) < n: |
|
185 |
+ new_points, new_losses = self.learner.ask(n - len(pids)) |
|
189 | 186 |
loss_improvements += new_losses |
190 |
- for p in new_points: |
|
191 |
- self._id_to_point[self._next_id()] = p |
|
192 |
- return points, loss_improvements |
|
187 |
+ for point in new_points: |
|
188 |
+ pid = self._next_id() |
|
189 |
+ self._id_to_point[pid] = point |
|
190 |
+ pids.append(pid) |
|
191 |
+ return pids, loss_improvements |
|
193 | 192 |
|
194 | 193 |
def overhead(self): |
195 | 194 |
"""Overhead of using Adaptive and the executor in percent. |
... | ... |
@@ -216,23 +215,22 @@ class BaseRunner(metaclass=abc.ABCMeta): |
216 | 215 |
|
217 | 216 |
def _process_futures(self, done_futs): |
218 | 217 |
for fut in done_futs: |
219 |
- x = self.pending_points.pop(fut) |
|
220 |
- i = _key_by_value(self._id_to_point, x) # O(N) |
|
218 |
+ pid = self.pending_points.pop(fut) |
|
221 | 219 |
try: |
222 | 220 |
y = fut.result() |
223 | 221 |
t = time.time() - fut.start_time # total execution time |
224 | 222 |
except Exception as e: |
225 |
- self._tracebacks[i] = traceback.format_exc() |
|
226 |
- self._to_retry[i] = self._to_retry.get(i, 0) + 1 |
|
227 |
- if self._to_retry[i] > self.retries: |
|
228 |
- self._to_retry.pop(i) |
|
223 |
+ self._tracebacks[pid] = traceback.format_exc() |
|
224 |
+ self._to_retry[pid] = self._to_retry.get(pid, 0) + 1 |
|
225 |
+ if self._to_retry[pid] > self.retries: |
|
226 |
+ self._to_retry.pop(pid) |
|
229 | 227 |
if self.raise_if_retries_exceeded: |
230 |
- self._do_raise(e, i) |
|
228 |
+ self._do_raise(e, pid) |
|
231 | 229 |
else: |
232 | 230 |
self._elapsed_function_time += t / self._get_max_tasks() |
233 |
- self._to_retry.pop(i, None) |
|
234 |
- self._tracebacks.pop(i, None) |
|
235 |
- self._id_to_point.pop(i) |
|
231 |
+ self._to_retry.pop(pid, None) |
|
232 |
+ self._tracebacks.pop(pid, None) |
|
233 |
+ x = self._id_to_point.pop(pid) |
|
236 | 234 |
if self.do_log: |
237 | 235 |
self.log.append(("tell", x, y)) |
238 | 236 |
self.learner.tell(x, y) |
... | ... |
@@ -246,13 +244,14 @@ class BaseRunner(metaclass=abc.ABCMeta): |
246 | 244 |
if self.do_log: |
247 | 245 |
self.log.append(("ask", n_new_tasks)) |
248 | 246 |
|
249 |
- points, _ = self._ask(n_new_tasks) |
|
247 |
+ pids, _ = self._ask(n_new_tasks) |
|
250 | 248 |
|
251 |
- for x in points: |
|
249 |
+ for pid in pids: |
|
252 | 250 |
start_time = time.time() # so we can measure execution time |
253 |
- fut = self._submit(x) |
|
251 |
+ point = self._id_to_point[pid] |
|
252 |
+ fut = self._submit(point) |
|
254 | 253 |
fut.start_time = start_time |
255 |
- self.pending_points[fut] = x |
|
254 |
+ self.pending_points[fut] = pid |
|
256 | 255 |
|
257 | 256 |
# Collect and results and add them to the learner |
258 | 257 |
futures = list(self.pending_points.keys()) |
... | ... |
@@ -175,10 +175,10 @@ class BaseRunner(metaclass=abc.ABCMeta): |
175 | 175 |
|
176 | 176 |
def _ask(self, n): |
177 | 177 |
points = [] |
178 |
- for i, _id in enumerate(self._to_retry.keys()): |
|
178 |
+ for i, pid in enumerate(self._to_retry.keys()): |
|
179 | 179 |
if i == n: |
180 | 180 |
break |
181 |
- point = self._id_to_point[_id] |
|
181 |
+ point = self._id_to_point[pid] |
|
182 | 182 |
if point not in self.pending_points.values(): |
183 | 183 |
points.append(point) |
184 | 184 |
|
... | ... |
@@ -187,6 +187,8 @@ class BaseRunner(metaclass=abc.ABCMeta): |
187 | 187 |
new_points, new_losses = self.learner.ask(n - len(points)) |
188 | 188 |
points += new_points |
189 | 189 |
loss_improvements += new_losses |
190 |
+ for p in new_points: |
|
191 |
+ self._id_to_point[self._next_id()] = p |
|
190 | 192 |
return points, loss_improvements |
191 | 193 |
|
192 | 194 |
def overhead(self): |
... | ... |
@@ -251,11 +253,6 @@ class BaseRunner(metaclass=abc.ABCMeta): |
251 | 253 |
fut = self._submit(x) |
252 | 254 |
fut.start_time = start_time |
253 | 255 |
self.pending_points[fut] = x |
254 |
- try: |
|
255 |
- _id = _key_by_value(self._id_to_point, x) # O(N) |
|
256 |
- except StopIteration: # `x` is not a value in `self._id_to_point` |
|
257 |
- _id = self._next_id() |
|
258 |
- self._id_to_point[_id] = x |
|
259 | 256 |
|
260 | 257 |
# Collect and results and add them to the learner |
261 | 258 |
futures = list(self.pending_points.keys()) |
... | ... |
@@ -1,7 +1,9 @@ |
1 | 1 |
import abc |
2 | 2 |
import asyncio |
3 | 3 |
import concurrent.futures as concurrent |
4 |
+import functools |
|
4 | 5 |
import inspect |
6 |
+import itertools |
|
5 | 7 |
import pickle |
6 | 8 |
import sys |
7 | 9 |
import time |
... | ... |
@@ -150,9 +152,10 @@ class BaseRunner(metaclass=abc.ABCMeta): |
150 | 152 |
self._to_retry = {} |
151 | 153 |
self._tracebacks = {} |
152 | 154 |
|
153 |
- # Keeping track of index -> point |
|
154 | 155 |
self._id_to_point = {} |
155 |
- self._i = 0 # some unique index to be associated with each point |
|
156 |
+ self._next_id = functools.partial( |
|
157 |
+ next, itertools.count() |
|
158 |
+ ) # some unique id to be associated with each point |
|
156 | 159 |
|
157 | 160 |
def _get_max_tasks(self): |
158 | 161 |
return self._max_tasks or _get_ncores(self.executor) |
... | ... |
@@ -172,10 +175,10 @@ class BaseRunner(metaclass=abc.ABCMeta): |
172 | 175 |
|
173 | 176 |
def _ask(self, n): |
174 | 177 |
points = [] |
175 |
- for i, index in enumerate(self._to_retry.keys()): |
|
178 |
+ for i, _id in enumerate(self._to_retry.keys()): |
|
176 | 179 |
if i == n: |
177 | 180 |
break |
178 |
- point = self._id_to_point[index] |
|
181 |
+ point = self._id_to_point[_id] |
|
179 | 182 |
if point not in self.pending_points.values(): |
180 | 183 |
points.append(point) |
181 | 184 |
|
... | ... |
@@ -249,11 +252,10 @@ class BaseRunner(metaclass=abc.ABCMeta): |
249 | 252 |
fut.start_time = start_time |
250 | 253 |
self.pending_points[fut] = x |
251 | 254 |
try: |
252 |
- i = _key_by_value(self._id_to_point, x) # O(N) |
|
255 |
+ _id = _key_by_value(self._id_to_point, x) # O(N) |
|
253 | 256 |
except StopIteration: # `x` is not a value in `self._id_to_point` |
254 |
- self._i += 1 |
|
255 |
- i = self._i |
|
256 |
- self._id_to_point[i] = x |
|
257 |
+ _id = self._next_id() |
|
258 |
+ self._id_to_point[_id] = x |
|
257 | 259 |
|
258 | 260 |
# Collect and results and add them to the learner |
259 | 261 |
futures = list(self.pending_points.keys()) |
... | ... |
@@ -151,7 +151,7 @@ class BaseRunner(metaclass=abc.ABCMeta): |
151 | 151 |
self._tracebacks = {} |
152 | 152 |
|
153 | 153 |
# Keeping track of index -> point |
154 |
- self._index_to_point = {} |
|
154 |
+ self._id_to_point = {} |
|
155 | 155 |
self._i = 0 # some unique index to be associated with each point |
156 | 156 |
|
157 | 157 |
def _get_max_tasks(self): |
... | ... |
@@ -159,7 +159,7 @@ class BaseRunner(metaclass=abc.ABCMeta): |
159 | 159 |
|
160 | 160 |
def _do_raise(self, e, i): |
161 | 161 |
tb = self._tracebacks[i] |
162 |
- x = self._index_to_point[i] |
|
162 |
+ x = self._id_to_point[i] |
|
163 | 163 |
raise RuntimeError( |
164 | 164 |
"An error occured while evaluating " |
165 | 165 |
f'"learner.function({x})". ' |
... | ... |
@@ -175,7 +175,7 @@ class BaseRunner(metaclass=abc.ABCMeta): |
175 | 175 |
for i, index in enumerate(self._to_retry.keys()): |
176 | 176 |
if i == n: |
177 | 177 |
break |
178 |
- point = self._index_to_point[index] |
|
178 |
+ point = self._id_to_point[index] |
|
179 | 179 |
if point not in self.pending_points.values(): |
180 | 180 |
points.append(point) |
181 | 181 |
|
... | ... |
@@ -212,7 +212,7 @@ class BaseRunner(metaclass=abc.ABCMeta): |
212 | 212 |
def _process_futures(self, done_futs): |
213 | 213 |
for fut in done_futs: |
214 | 214 |
x = self.pending_points.pop(fut) |
215 |
- i = _key_by_value(self._index_to_point, x) # O(N) |
|
215 |
+ i = _key_by_value(self._id_to_point, x) # O(N) |
|
216 | 216 |
try: |
217 | 217 |
y = fut.result() |
218 | 218 |
t = time.time() - fut.start_time # total execution time |
... | ... |
@@ -227,7 +227,7 @@ class BaseRunner(metaclass=abc.ABCMeta): |
227 | 227 |
self._elapsed_function_time += t / self._get_max_tasks() |
228 | 228 |
self._to_retry.pop(i, None) |
229 | 229 |
self._tracebacks.pop(i, None) |
230 |
- self._index_to_point.pop(i) |
|
230 |
+ self._id_to_point.pop(i) |
|
231 | 231 |
if self.do_log: |
232 | 232 |
self.log.append(("tell", x, y)) |
233 | 233 |
self.learner.tell(x, y) |
... | ... |
@@ -249,11 +249,11 @@ class BaseRunner(metaclass=abc.ABCMeta): |
249 | 249 |
fut.start_time = start_time |
250 | 250 |
self.pending_points[fut] = x |
251 | 251 |
try: |
252 |
- i = _key_by_value(self._index_to_point, x) # O(N) |
|
253 |
- except StopIteration: # `x` is not a value in `self._index_to_point` |
|
252 |
+ i = _key_by_value(self._id_to_point, x) # O(N) |
|
253 |
+ except StopIteration: # `x` is not a value in `self._id_to_point` |
|
254 | 254 |
self._i += 1 |
255 | 255 |
i = self._i |
256 |
- self._index_to_point[i] = x |
|
256 |
+ self._id_to_point[i] = x |
|
257 | 257 |
|
258 | 258 |
# Collect and results and add them to the learner |
259 | 259 |
futures = list(self.pending_points.keys()) |
... | ... |
@@ -300,11 +300,11 @@ class BaseRunner(metaclass=abc.ABCMeta): |
300 | 300 |
|
301 | 301 |
@property |
302 | 302 |
def tracebacks(self): |
303 |
- return [(self._index_to_point[i], tb) for i, tb in self._tracebacks.items()] |
|
303 |
+ return [(self._id_to_point[i], tb) for i, tb in self._tracebacks.items()] |
|
304 | 304 |
|
305 | 305 |
@property |
306 | 306 |
def to_retry(self): |
307 |
- return [(self._index_to_point[i], n) for i, n in self._to_retry.items()] |
|
307 |
+ return [(self._id_to_point[i], n) for i, n in self._to_retry.items()] |
|
308 | 308 |
|
309 | 309 |
|
310 | 310 |
class BlockingRunner(BaseRunner): |
... | ... |
@@ -55,7 +55,7 @@ _default_executor = ( |
55 | 55 |
|
56 | 56 |
|
57 | 57 |
def _key_by_value(dct, value): |
58 |
- return next((k for k, v in dct.items() if v == value), None) |
|
58 |
+ return next(k for k, v in dct.items() if v == value) |
|
59 | 59 |
|
60 | 60 |
|
61 | 61 |
class BaseRunner(metaclass=abc.ABCMeta): |
... | ... |
@@ -248,9 +248,9 @@ class BaseRunner(metaclass=abc.ABCMeta): |
248 | 248 |
fut = self._submit(x) |
249 | 249 |
fut.start_time = start_time |
250 | 250 |
self.pending_points[fut] = x |
251 |
- i = _key_by_value(self._index_to_point, x) # O(N) |
|
252 |
- if i is None: |
|
253 |
- # `x` is not a value in `self._index_to_point` |
|
251 |
+ try: |
|
252 |
+ i = _key_by_value(self._index_to_point, x) # O(N) |
|
253 |
+ except StopIteration: # `x` is not a value in `self._index_to_point` |
|
254 | 254 |
self._i += 1 |
255 | 255 |
i = self._i |
256 | 256 |
self._index_to_point[i] = x |
... | ... |
@@ -97,12 +97,12 @@ class BaseRunner(metaclass=abc.ABCMeta): |
97 | 97 |
log : list or None |
98 | 98 |
Record of the method calls made to the learner, in the format |
99 | 99 |
``(method_name, *args)``. |
100 |
- to_retry : dict |
|
101 |
- Mapping of ``{point: n_fails, ...}``. When a point has failed |
|
100 |
+ to_retry : list of tuples |
|
101 |
+ List of ``(point, n_fails)``. When a point has failed |
|
102 | 102 |
``runner.retries`` times it is removed but will be present |
103 | 103 |
in ``runner.tracebacks``. |
104 |
- tracebacks : dict |
|
105 |
- A mapping of point to the traceback if that point failed. |
|
104 |
+ tracebacks : list of tuples |
|
105 |
+ List of of ``(point, tb)`` for points that failed. |
|
106 | 106 |
pending_points : dict |
107 | 107 |
A mapping of `~concurrent.futures.Future`\s to points. |
108 | 108 |
|
... | ... |
@@ -149,19 +149,19 @@ class BaseRunner(metaclass=abc.ABCMeta): |
149 | 149 |
# Error handling attributes |
150 | 150 |
self.retries = retries |
151 | 151 |
self.raise_if_retries_exceeded = raise_if_retries_exceeded |
152 |
- self.to_retry = {} |
|
153 |
- self.tracebacks = {} |
|
152 |
+ self._to_retry = {} |
|
153 |
+ self._tracebacks = {} |
|
154 | 154 |
|
155 | 155 |
# Keeping track of index -> point |
156 |
- self.index_to_point = {} |
|
156 |
+ self._index_to_point = {} |
|
157 | 157 |
self._i = 0 # some unique index to be associated with each point |
158 | 158 |
|
159 | 159 |
def _get_max_tasks(self): |
160 | 160 |
return self._max_tasks or _get_ncores(self.executor) |
161 | 161 |
|
162 | 162 |
def _do_raise(self, e, i): |
163 |
- tb = self.tracebacks[i] |
|
164 |
- x = self.index_to_point[i] |
|
163 |
+ tb = self._tracebacks[i] |
|
164 |
+ x = self._index_to_point[i] |
|
165 | 165 |
raise RuntimeError( |
166 | 166 |
"An error occured while evaluating " |
167 | 167 |
f'"learner.function({x})". ' |
... | ... |
@@ -174,10 +174,10 @@ class BaseRunner(metaclass=abc.ABCMeta): |
174 | 174 |
|
175 | 175 |
def _ask(self, n): |
176 | 176 |
points = [] |
177 |
- for i, index in enumerate(self.to_retry.keys()): |
|
177 |
+ for i, index in enumerate(self._to_retry.keys()): |
|
178 | 178 |
if i == n: |
179 | 179 |
break |
180 |
- point = self.index_to_point[index] |
|
180 |
+ point = self._index_to_point[index] |
|
181 | 181 |
if point not in self.pending_points.values(): |
182 | 182 |
points.append(point) |
183 | 183 |
|
... | ... |
@@ -214,22 +214,22 @@ class BaseRunner(metaclass=abc.ABCMeta): |
214 | 214 |
def _process_futures(self, done_futs): |
215 | 215 |
for fut in done_futs: |
216 | 216 |
x = self.pending_points.pop(fut) |
217 |
- i = _key_by_value(self.index_to_point, x) # O(N) |
|
217 |
+ i = _key_by_value(self._index_to_point, x) # O(N) |
|
218 | 218 |
try: |
219 | 219 |
y = fut.result() |
220 | 220 |
t = time.time() - fut.start_time # total execution time |
221 | 221 |
except Exception as e: |
222 |
- self.tracebacks[i] = traceback.format_exc() |
|
223 |
- self.to_retry[i] = self.to_retry.get(i, 0) + 1 |
|
224 |
- if self.to_retry[i] > self.retries: |
|
225 |
- self.to_retry.pop(i) |
|
222 |
+ self._tracebacks[i] = traceback.format_exc() |
|
223 |
+ self._to_retry[i] = self._to_retry.get(i, 0) + 1 |
|
224 |
+ if self._to_retry[i] > self.retries: |
|
225 |
+ self._to_retry.pop(i) |
|
226 | 226 |
if self.raise_if_retries_exceeded: |
227 | 227 |
self._do_raise(e, i) |
228 | 228 |
else: |
229 | 229 |
self._elapsed_function_time += t / self._get_max_tasks() |
230 |
- self.to_retry.pop(i, None) |
|
231 |
- self.tracebacks.pop(i, None) |
|
232 |
- self.index_to_point.pop(i) |
|
230 |
+ self._to_retry.pop(i, None) |
|
231 |
+ self._tracebacks.pop(i, None) |
|
232 |
+ self._index_to_point.pop(i) |
|
233 | 233 |
if self.do_log: |
234 | 234 |
self.log.append(("tell", x, y)) |
235 | 235 |
self.learner.tell(x, y) |
... | ... |
@@ -250,12 +250,12 @@ class BaseRunner(metaclass=abc.ABCMeta): |
250 | 250 |
fut = self._submit(x) |
251 | 251 |
fut.start_time = start_time |
252 | 252 |
self.pending_points[fut] = x |
253 |
- i = _key_by_value(self.index_to_point, x) # O(N) |
|
253 |
+ i = _key_by_value(self._index_to_point, x) # O(N) |
|
254 | 254 |
if i is None: |
255 |
- # `x` is not a value in `self.index_to_point` |
|
255 |
+ # `x` is not a value in `self._index_to_point` |
|
256 | 256 |
self._i += 1 |
257 | 257 |
i = self._i |
258 |
- self.index_to_point[i] = x |
|
258 |
+ self._index_to_point[i] = x |
|
259 | 259 |
|
260 | 260 |
# Collect and results and add them to the learner |
261 | 261 |
futures = list(self.pending_points.keys()) |
... | ... |
@@ -284,7 +284,7 @@ class BaseRunner(metaclass=abc.ABCMeta): |
284 | 284 |
@property |
285 | 285 |
def failed(self): |
286 | 286 |
"""Set of points that failed ``runner.retries`` times.""" |
287 |
- return set(self.tracebacks) - set(self.to_retry) |
|
287 |
+ return set(self._tracebacks) - set(self._to_retry) |
|
288 | 288 |
|
289 | 289 |
@abc.abstractmethod |
290 | 290 |
def elapsed_time(self): |
... | ... |
@@ -300,6 +300,14 @@ class BaseRunner(metaclass=abc.ABCMeta): |
300 | 300 |
"""Is called in `_get_futures`.""" |
301 | 301 |
pass |
302 | 302 |
|
303 |
+ @property |
|
304 |
+ def tracebacks(self): |
|
305 |
+ return [(self._index_to_point[i], tb) for i, tb in self._tracebacks.items()] |
|
306 |
+ |
|
307 |
+ @property |
|
308 |
+ def to_retry(self): |
|
309 |
+ return [(self._index_to_point[i], n) for i, n in self._to_retry.items()] |
|
310 |
+ |
|
303 | 311 |
|
304 | 312 |
class BlockingRunner(BaseRunner): |
305 | 313 |
"""Run a learner synchronously in an executor. |
... | ... |
@@ -54,6 +54,12 @@ _default_executor = ( |
54 | 54 |
) |
55 | 55 |
|
56 | 56 |
|
57 |
+def _key_by_value(dct, value): |
|
58 |
+ for k, v in dct.items(): |
|
59 |
+ if v == value: |
|
60 |
+ return k |
|
61 |
+ |
|
62 |
+ |
|
57 | 63 |
class BaseRunner(metaclass=abc.ABCMeta): |
58 | 64 |
r"""Base class for runners that use `concurrent.futures.Executors`. |
59 | 65 |
|
... | ... |
@@ -146,11 +152,16 @@ class BaseRunner(metaclass=abc.ABCMeta): |
146 | 152 |
self.to_retry = {} |
147 | 153 |
self.tracebacks = {} |
148 | 154 |
|
155 |
+ # Keeping track of index -> point |
|
156 |
+ self.index_to_point = {} |
|
157 |
+ self._i = 0 # some unique index to be associated with each point |
|
158 |
+ |
|
149 | 159 |
def _get_max_tasks(self): |
150 | 160 |
return self._max_tasks or _get_ncores(self.executor) |
151 | 161 |
|
152 |
- def _do_raise(self, e, x): |
|
153 |
- tb = self.tracebacks[x] |
|
162 |
+ def _do_raise(self, e, i): |
|
163 |
+ tb = self.tracebacks[i] |
|
164 |
+ x = self.index_to_point[i] |
|
154 | 165 |
raise RuntimeError( |
155 | 166 |
"An error occured while evaluating " |
156 | 167 |
f'"learner.function({x})". ' |
... | ... |
@@ -162,9 +173,14 @@ class BaseRunner(metaclass=abc.ABCMeta): |
162 | 173 |
return self.log is not None |
163 | 174 |
|
164 | 175 |
def _ask(self, n): |
165 |
- points = [ |
|
166 |
- p for p in self.to_retry.keys() if p not in self.pending_points.values() |
|
167 |
- ][:n] |
|
176 |
+ points = [] |
|
177 |
+ for i, index in enumerate(self.to_retry.keys()): |
|
178 |
+ if i == n: |
|
179 |
+ break |
|
180 |
+ point = self.index_to_point[index] |
|
181 |
+ if point not in self.pending_points.values(): |
|
182 |
+ points.append(point) |
|
183 |
+ |
|
168 | 184 |
loss_improvements = len(points) * [float("inf")] |
169 | 185 |
if len(points) < n: |
170 | 186 |
new_points, new_losses = self.learner.ask(n - len(points)) |
... | ... |
@@ -198,20 +214,22 @@ class BaseRunner(metaclass=abc.ABCMeta): |
198 | 214 |
def _process_futures(self, done_futs): |
199 | 215 |
for fut in done_futs: |
200 | 216 |
x = self.pending_points.pop(fut) |
217 |
+ i = _key_by_value(self.index_to_point, x) # O(N) |
|
201 | 218 |
try: |
202 | 219 |
y = fut.result() |
203 | 220 |
t = time.time() - fut.start_time # total execution time |
204 | 221 |
except Exception as e: |
205 |
- self.tracebacks[x] = traceback.format_exc() |
|
206 |
- self.to_retry[x] = self.to_retry.get(x, 0) + 1 |
|
207 |
- if self.to_retry[x] > self.retries: |
|
208 |
- self.to_retry.pop(x) |
|
222 |
+ self.tracebacks[i] = traceback.format_exc() |
|
223 |
+ self.to_retry[i] = self.to_retry.get(i, 0) + 1 |
|
224 |
+ if self.to_retry[i] > self.retries: |
|
225 |
+ self.to_retry.pop(i) |
|
209 | 226 |
if self.raise_if_retries_exceeded: |
210 |
- self._do_raise(e, x) |
|
227 |
+ self._do_raise(e, i) |
|
211 | 228 |
else: |
212 | 229 |
self._elapsed_function_time += t / self._get_max_tasks() |
213 |
- self.to_retry.pop(x, None) |
|
214 |
- self.tracebacks.pop(x, None) |
|
230 |
+ self.to_retry.pop(i, None) |
|
231 |
+ self.tracebacks.pop(i, None) |
|
232 |
+ self.index_to_point.pop(i) |
|
215 | 233 |
if self.do_log: |
216 | 234 |
self.log.append(("tell", x, y)) |
217 | 235 |
self.learner.tell(x, y) |
... | ... |
@@ -232,6 +250,12 @@ class BaseRunner(metaclass=abc.ABCMeta): |
232 | 250 |
fut = self._submit(x) |
233 | 251 |
fut.start_time = start_time |
234 | 252 |
self.pending_points[fut] = x |
253 |
+ i = _key_by_value(self.index_to_point, x) # O(N) |
|
254 |
+ if i is None: |
|
255 |
+ # `x` is not a value in `self.index_to_point` |
|
256 |
+ self._i += 1 |
|
257 |
+ i = self._i |
|
258 |
+ self.index_to_point[i] = x |
|
235 | 259 |
|
236 | 260 |
# Collect and results and add them to the learner |
237 | 261 |
futures = list(self.pending_points.keys()) |
... | ... |
@@ -49,6 +49,11 @@ with suppress(ModuleNotFoundError): |
49 | 49 |
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) |
50 | 50 |
|
51 | 51 |
|
52 |
+_default_executor = ( |
|
53 |
+ loky.get_reusable_executor if with_loky else concurrent.ProcessPoolExecutor |
|
54 |
+) |
|
55 |
+ |
|
56 |
+ |
|
52 | 57 |
class BaseRunner(metaclass=abc.ABCMeta): |
53 | 58 |
r"""Base class for runners that use `concurrent.futures.Executors`. |
54 | 59 |
|
... | ... |
@@ -478,7 +483,11 @@ class AsyncRunner(BaseRunner): |
478 | 483 |
def goal(_): |
479 | 484 |
return False |
480 | 485 |
|
481 |
- if executor is None and not inspect.iscoroutinefunction(learner.function): |
|
486 |
+ if ( |
|
487 |
+ executor is None |
|
488 |
+ and _default_executor is concurrent.ProcessPoolExecutor |
|
489 |
+ and not inspect.iscoroutinefunction(learner.function) |
|
490 |
+ ): |
|
482 | 491 |
try: |
483 | 492 |
pickle.dumps(learner.function) |
484 | 493 |
except pickle.PicklingError: |
... | ... |
@@ -756,13 +765,6 @@ class SequentialExecutor(concurrent.Executor): |
756 | 765 |
pass |
757 | 766 |
|
758 | 767 |
|
759 |
-def _default_executor(): |
|