... | ... |
@@ -103,6 +103,18 @@ class BaseRunner: |
103 | 103 |
self.log = [] if log else None |
104 | 104 |
self.task = None |
105 | 105 |
|
106 |
+ def _get_n(self, done): |
|
107 |
+ n = len(done) |
|
108 |
+ if not self._ntasks_is_user_specified: |
|
109 |
+ ntasks_new = _get_ncores(self.executor) |
|
110 |
+ dn = ntasks_new - self.ntasks |
|
111 |
+ if dn < 0: |
|
112 |
+ # Cores have died or stopped |
|
113 |
+ n = 0 |
|
114 |
+ n += dn |
|
115 |
+ self.ntasks = ntasks_new |
|
116 |
+ return n |
|
117 |
+ |
|
106 | 118 |
|
107 | 119 |
class BlockingRunner(BaseRunner): |
108 | 120 |
"""Run a learner synchronously in an executor. |
... | ... |
@@ -166,15 +178,7 @@ class BlockingRunner(BaseRunner): |
166 | 178 |
# Launch tasks to replace the ones that completed |
167 | 179 |
# on the last iteration and check if new workers |
168 | 180 |
# have started since the last iteration |
169 |
- n = len(done) |
|
170 |
- if not self._ntasks_is_user_specified: |
|
171 |
- ntasks_new = _get_ncores(self.executor) |
|
172 |
- dn = ntasks_new - self.ntasks |
|
173 |
- if dn < 0: |
|
174 |
- # Cores have died or stopped |
|
175 |
- n = 0 |
|
176 |
- n += dn |
|
177 |
- self.ntasks = ntasks_new |
|
181 |
+ n = self._get_n(done) |
|
178 | 182 |
|
179 | 183 |
if do_log: |
180 | 184 |
self.log.append(('ask', n)) |
... | ... |
@@ -381,15 +385,7 @@ class AsyncRunner(BaseRunner): |
381 | 385 |
# Launch tasks to replace the ones that completed |
382 | 386 |
# on the last iteration and check if new workers |
383 | 387 |
# have started since the last iteration |
384 |
- n = len(done) |
|
385 |
- if not self._ntasks_is_user_specified: |
|
386 |
- ntasks_new = _get_ncores(self.executor) |
|
387 |
- dn = ntasks_new - self.ntasks |
|
388 |
- if dn < 0: |
|
389 |
- # Cores have died or stopped |
|
390 |
- n = 0 |
|
391 |
- n += dn |
|
392 |
- self.ntasks = ntasks_new |
|
388 |
+ n = self._get_n(done) |
|
393 | 389 |
|
394 | 390 |
if do_log: |
395 | 391 |
self.log.append(('ask', n)) |