Browse code

make tracebacks and to_retry properties and lists of tuples

Bas Nijholt authored on 16/04/2020 20:08:40
Showing 1 changed files
... ...
@@ -97,12 +97,12 @@ class BaseRunner(metaclass=abc.ABCMeta):
97 97
     log : list or None
98 98
         Record of the method calls made to the learner, in the format
99 99
         ``(method_name, *args)``.
100
-    to_retry : dict
101
-        Mapping of ``{point: n_fails, ...}``. When a point has failed
100
+    to_retry : list of tuples
101
+        List of ``(point, n_fails)``. When a point has failed
102 102
         ``runner.retries`` times it is removed but will be present
103 103
         in ``runner.tracebacks``.
104
-    tracebacks : dict
105
-        A mapping of point to the traceback if that point failed.
104
+    tracebacks : list of tuples
105
+        List of of ``(point, tb)`` for points that failed.
106 106
     pending_points : dict
107 107
         A mapping of `~concurrent.futures.Future`\s to points.
108 108
 
... ...
@@ -149,19 +149,19 @@ class BaseRunner(metaclass=abc.ABCMeta):
149 149
         # Error handling attributes
150 150
         self.retries = retries
151 151
         self.raise_if_retries_exceeded = raise_if_retries_exceeded
152
-        self.to_retry = {}
153
-        self.tracebacks = {}
152
+        self._to_retry = {}
153
+        self._tracebacks = {}
154 154
 
155 155
         # Keeping track of index -> point
156
-        self.index_to_point = {}
156
+        self._index_to_point = {}
157 157
         self._i = 0  # some unique index to be associated with each point
158 158
 
159 159
     def _get_max_tasks(self):
160 160
         return self._max_tasks or _get_ncores(self.executor)
161 161
 
162 162
     def _do_raise(self, e, i):
163
-        tb = self.tracebacks[i]
164
-        x = self.index_to_point[i]
163
+        tb = self._tracebacks[i]
164
+        x = self._index_to_point[i]
165 165
         raise RuntimeError(
166 166
             "An error occured while evaluating "
167 167
             f'"learner.function({x})". '
... ...
@@ -174,10 +174,10 @@ class BaseRunner(metaclass=abc.ABCMeta):
174 174
 
175 175
     def _ask(self, n):
176 176
         points = []
177
-        for i, index in enumerate(self.to_retry.keys()):
177
+        for i, index in enumerate(self._to_retry.keys()):
178 178
             if i == n:
179 179
                 break
180
-            point = self.index_to_point[index]
180
+            point = self._index_to_point[index]
181 181
             if point not in self.pending_points.values():
182 182
                 points.append(point)
183 183
 
... ...
@@ -214,22 +214,22 @@ class BaseRunner(metaclass=abc.ABCMeta):
214 214
     def _process_futures(self, done_futs):
215 215
         for fut in done_futs:
216 216
             x = self.pending_points.pop(fut)
217
-            i = _key_by_value(self.index_to_point, x)  # O(N)
217
+            i = _key_by_value(self._index_to_point, x)  # O(N)
218 218
             try:
219 219
                 y = fut.result()
220 220
                 t = time.time() - fut.start_time  # total execution time
221 221
             except Exception as e:
222
-                self.tracebacks[i] = traceback.format_exc()
223
-                self.to_retry[i] = self.to_retry.get(i, 0) + 1
224
-                if self.to_retry[i] > self.retries:
225
-                    self.to_retry.pop(i)
222
+                self._tracebacks[i] = traceback.format_exc()
223
+                self._to_retry[i] = self._to_retry.get(i, 0) + 1
224
+                if self._to_retry[i] > self.retries:
225
+                    self._to_retry.pop(i)
226 226
                     if self.raise_if_retries_exceeded:
227 227
                         self._do_raise(e, i)
228 228
             else:
229 229
                 self._elapsed_function_time += t / self._get_max_tasks()
230
-                self.to_retry.pop(i, None)
231
-                self.tracebacks.pop(i, None)
232
-                self.index_to_point.pop(i)
230
+                self._to_retry.pop(i, None)
231
+                self._tracebacks.pop(i, None)
232
+                self._index_to_point.pop(i)
233 233
                 if self.do_log:
234 234
                     self.log.append(("tell", x, y))
235 235
                 self.learner.tell(x, y)
... ...
@@ -250,12 +250,12 @@ class BaseRunner(metaclass=abc.ABCMeta):
250 250
             fut = self._submit(x)
251 251
             fut.start_time = start_time
252 252
             self.pending_points[fut] = x
253
-            i = _key_by_value(self.index_to_point, x)  # O(N)
253
+            i = _key_by_value(self._index_to_point, x)  # O(N)
254 254
             if i is None:
255
-                # `x` is not a value in `self.index_to_point`
255
+                # `x` is not a value in `self._index_to_point`
256 256
                 self._i += 1
257 257
                 i = self._i
258
-            self.index_to_point[i] = x
258
+            self._index_to_point[i] = x
259 259
 
260 260
         # Collect and results and add them to the learner
261 261
         futures = list(self.pending_points.keys())
... ...
@@ -284,7 +284,7 @@ class BaseRunner(metaclass=abc.ABCMeta):
284 284
     @property
285 285
     def failed(self):
286 286
         """Set of points that failed ``runner.retries`` times."""
287
-        return set(self.tracebacks) - set(self.to_retry)
287
+        return set(self._tracebacks) - set(self._to_retry)
288 288
 
289 289
     @abc.abstractmethod
290 290
     def elapsed_time(self):
... ...
@@ -300,6 +300,14 @@ class BaseRunner(metaclass=abc.ABCMeta):
300 300
         """Is called in `_get_futures`."""
301 301
         pass
302 302
 
303
+    @property
304
+    def tracebacks(self):
305
+        return [(self._index_to_point[i], tb) for i, tb in self._tracebacks.items()]
306
+
307
+    @property
308
+    def to_retry(self):
309
+        return [(self._index_to_point[i], n) for i, n in self._to_retry.items()]
310
+
303 311
 
304 312
 class BlockingRunner(BaseRunner):
305 313
     """Run a learner synchronously in an executor.