Browse code

factor out glue logic between learners and executors

Also add an example notebook that shows the learner running and
updating a live plot.

Joseph Weston authored on 21/07/2017 12:29:46
Showing 2 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1,169 @@
1
+{
2
+ "cells": [
3
+  {
4
+   "cell_type": "markdown",
5
+   "metadata": {
6
+    "deletable": true,
7
+    "editable": true
8
+   },
9
+   "source": [
10
+    "# Adaptive"
11
+   ]
12
+  },
13
+  {
14
+   "cell_type": "code",
15
+   "execution_count": null,
16
+   "metadata": {
17
+    "collapsed": false,
18
+    "deletable": true,
19
+    "editable": true
20
+   },
21
+   "outputs": [],
22
+   "source": [
23
+    "import holoviews as hv\n",
24
+    "from holoviews.streams import Stream, param\n",
25
+    "hv.notebook_extension('bokeh')"
26
+   ]
27
+  },
28
+  {
29
+   "cell_type": "code",
30
+   "execution_count": null,
31
+   "metadata": {
32
+    "collapsed": false,
33
+    "deletable": true,
34
+    "editable": true
35
+   },
36
+   "outputs": [],
37
+   "source": [
38
+    "import numpy as np\n",
39
+    "import learner1D\n",
40
+    "from time import sleep\n",
41
+    "from random import randint\n",
42
+    "from functools import partial\n",
43
+    "import importlib\n",
44
+    "importlib.reload(learner1D)"
45
+   ]
46
+  },
47
+  {
48
+   "cell_type": "code",
49
+   "execution_count": null,
50
+   "metadata": {
51
+    "collapsed": true
52
+   },
53
+   "outputs": [],
54
+   "source": [
55
+    "def func(x, wait=True):\n",
56
+    "    x = np.asarray(x)\n",
57
+    "    a = 10\n",
58
+    "    if wait:\n",
59
+    "        sleep(randint(1, 3))\n",
60
+    "    return np.sin(x * a)"
61
+   ]
62
+  },
63
+  {
64
+   "cell_type": "markdown",
65
+   "metadata": {
66
+    "deletable": true,
67
+    "editable": true
68
+   },
69
+   "source": [
70
+    "# Parallel"
71
+   ]
72
+  },
73
+  {
74
+   "cell_type": "code",
75
+   "execution_count": null,
76
+   "metadata": {
77
+    "collapsed": false,
78
+    "deletable": true,
79
+    "editable": true
80
+   },
81
+   "outputs": [],
82
+   "source": [
83
+    "import tornado\n",
84
+    "from distributed import Client\n",
85
+    "\n",
86
+    "io = tornado.ioloop.IOLoop.current()\n",
87
+    "\n",
88
+    "# Initialize the learner\n",
89
+    "learner = learner1D.Learner1D()\n",
90
+    "learner.add_point(-1, func(-1))\n",
91
+    "learner.add_point(1, func(1))"
92
+   ]
93
+  },
94
+  {
95
+   "cell_type": "code",
96
+   "execution_count": null,
97
+   "metadata": {
98
+    "collapsed": false,
99
+    "deletable": true,
100
+    "editable": true
101
+   },
102
+   "outputs": [],
103
+   "source": [
104
+    "async def dask_run(learner):\n",
105
+    "    async with Client(asynchronous=True) as client:\n",
106
+    "        await learner1D.run(func, client, learner,goal=lambda learner: learner.loss() < 0.1)\n",
107
+    "\n",
108
+    "def plot(data):\n",
109
+    "        xy = [(k, v) for k, v in sorted(data.items()) if v is not None]\n",
110
+    "        if not xy:\n",
111
+    "            return hv.Scatter([])\n",
112
+    "        x, y  = np.array(xy, dtype=float).T\n",
113
+    "        return hv.Scatter((x, y))"
114
+   ]
115
+  },
116
+  {
117
+   "cell_type": "code",
118
+   "execution_count": null,
119
+   "metadata": {
120
+    "collapsed": false,
121
+    "deletable": true,
122
+    "editable": true
123
+   },
124
+   "outputs": [],
125
+   "source": [
126
+    "data_stream = Stream.define('data', data=param.ObjectSelector(default=dict()))\n",
127
+    "dm = hv.DynamicMap(plot, streams=[data_stream()])\n",
128
+    "dm"
129
+   ]
130
+  },
131
+  {
132
+   "cell_type": "code",
133
+   "execution_count": null,
134
+   "metadata": {
135
+    "collapsed": false,
136
+    "deletable": true,
137
+    "editable": true
138
+   },
139
+   "outputs": [],
140
+   "source": [
141
+    "pc = tornado.ioloop.PeriodicCallback(lambda: dm.event(data=learner.data), 100)\n",
142
+    "pc.start()\n",
143
+    "io.add_callback(dask_run, learner)"
144
+   ]
145
+  }
146
+ ],
147
+ "metadata": {
148
+  "anaconda-cloud": {},
149
+  "kernelspec": {
150
+   "display_name": "Python 3",
151
+   "language": "python",
152
+   "name": "python3"
153
+  },
154
+  "language_info": {
155
+   "codemirror_mode": {
156
+    "name": "ipython",
157
+    "version": 3
158
+   },
159
+   "file_extension": ".py",
160
+   "mimetype": "text/x-python",
161
+   "name": "python",
162
+   "nbconvert_exporter": "python",
163
+   "pygments_lexer": "ipython3",
164
+   "version": "3.5.3"
165
+  }
166
+ },
167
+ "nbformat": 4,
168
+ "nbformat_minor": 1
169
+}
... ...
@@ -6,8 +6,10 @@
6 6
 import heapq
7 7
 from math import sqrt
8 8
 import itertools
9
+import multiprocessing
10
+
9 11
 import numpy as np
10
-from wrapt import synchronized
12
+import tornado
11 13
 
12 14
 
13 15
 def add_arg(func):
... ...
@@ -30,7 +32,7 @@ class Learner1D(object):
30 32
 
31 33
     """
32 34
 
33
-    def __init__(self, xdata=None, ydata=None, client=None):
35
+    def __init__(self, xdata=None, ydata=None):
34 36
         """Initialize the learner.
35 37
 
36 38
         Parameters
... ...
@@ -62,15 +64,7 @@ class Learner1D(object):
62 64
         if xdata is not None:
63 65
             self.add_data(xdata, ydata)
64 66
 
65
-        self.client = client
66
-
67
-        self.smallest_interval = np.inf
68
-
69
-        self.num_done = 0
70
-
71
-        self.futures = {}
72
-
73
-    def loss(self, x_left, x_right):
67
+    def interval_loss(self, x_left, x_right):
74 68
         """Calculate loss in the interval x_left, x_right.
75 69
 
76 70
         Currently returns the rescaled length of the interval. If one of the
... ...
@@ -81,6 +75,12 @@ class Learner1D(object):
81 75
         return sqrt(((x_right - x_left) / self._scale[0])**2 +
82 76
                     ((y_right - y_left) / self._scale[1])**2)
83 77
 
78
+    def loss(self):
79
+        if len(self.losses) == 0:
80
+            return float('inf')
81
+        else:
82
+            return max(self.losses.values())
83
+
84 84
     def add_data(self, xvalues, yvalues):
85 85
         """Add data to the intervals.
86 86
 
... ...
@@ -112,7 +112,7 @@ class Learner1D(object):
112 112
         self._scale = [self._bbox[0][1] - self._bbox[0][0],
113 113
                        self._bbox[1][1] - self._bbox[1][0]]
114 114
 
115
-    def choose_points(self, n=10, add_to_data=True):
115
+    def choose_points(self, n=10):
116 116
         """Return n points that are expected to maximally reduce the loss."""
117 117
         # Find out how to divide the n points over the intervals
118 118
         # by finding  positive integer n_i that minimize max(L_i / n_i) subject
... ...
@@ -121,7 +121,6 @@ class Learner1D(object):
121 121
 
122 122
         # Return equally spaced points within each interval to which points
123 123
         # will be added.
124
-        # self.get_results()  # Insert finished results into self.data
125 124
         self.interpolate()  # Apply new interpolation step if new results
126 125
 
127 126
         def points(x, n):
... ...
@@ -140,19 +139,16 @@ class Learner1D(object):
140 139
 
141 140
         # Add `None`s to data because then the same point will not be returned
142 141
         # upon a next request. This can be used for parallelization.
143
-        if add_to_data:
144
-            self.add_data(xs, itertools.repeat(None))
142
+        self.add_data(xs, itertools.repeat(None))
145 143
 
146 144
         return xs
147 145
 
148 146
     def get_largest_interval(self):
149 147
         xs = sorted(x for x, y in self.data.items() if y is not None)
150
-
151 148
         if len(xs) < 2:
152 149
             return np.inf
153 150
         else:
154
-            self.largest_interval = np.diff(xs).max()
155
-            return self.largest_interval
151
+            return np.diff(xs).max()
156 152
 
157 153
     def interpolate(self):
158 154
         xdata = []
... ...
@@ -189,40 +185,51 @@ class Learner1D(object):
189 185
         self.losses = {}
190 186
         for x, (x_left, x_right) in self.neighbors.items():
191 187
             if x_left is not None:
192
-                self.losses[(x_left, x)] = self.loss(x_left, x)
188
+                self.losses[(x_left, x)] = self.interval_loss(x_left, x)
193 189
             if x_right is not None:
194
-                self.losses[x, x_right] = self.loss(x, x_right)
190
+                self.losses[x, x_right] = self.interval_loss(x, x_right)
195 191
             try:
196 192
                 del self.losses[x_left, x_right]
197 193
             except KeyError:
198 194
                 pass
199 195
 
200
-    def get_done(self):
201
-        done = {x: y for x, y in self.data.items() if y is not None}
202
-        return done
203 196
 
204
-    def add_futures(self, xs, ys):
205
-        """Add concurrent.futures to the self.futures dict."""
206
-        try:
207
-            for x, y in zip(xs, ys):
208
-                self.futures[x] = y
209
-        except TypeError:
210
-            self.futures[xs] = ys
211
-
212
-    def done_callback(self, n, tol):
213
-        @synchronized
214
-        def wrapped(future):
215
-            x, y = future.result()
216
-            self.futures.pop(x)
217
-            return self.add_data(x, y)
218
-        return wrapped
219
-
220
-    def map(self, func, xs, n=1, tol=0.01):
221
-        ys = self.client.map(add_arg(func), xs)
222
-        for y in ys:
223
-            y.add_done_callback(self.done_callback(tol, n))
224
-        self.add_futures(xs, ys)
225
-
226
-    def initialize(self, func, xmin, xmax):
227
-        self.map(func, [xmin, xmax])
228
-        self.add_data([xmin, xmax], [None, None])
197
+# We can't use API that is specific to any particular asynchronous
198
+# framework, so we have to roll our own utility functions.
199
+
200
+async def any_complete(futures):
201
+    total = tornado.concurrent.Future()
202
+    for f in futures:
203
+        f.add_done_callback(total.set_result)
204
+    await total
205
+    return [f for f in futures if f.done()]
206
+
207
+
208
+async def run(f, executor, learner, goal, ncores=multiprocessing.cpu_count()):
209
+    xs = dict()
210
+    done = [None] * ncores
211
+
212
+    while not goal(learner):
213
+        # Launch tasks to replace the ones that completed
214
+        # on the last iteration.
215
+        for x in learner.choose_points(len(done)):
216
+            xs[executor.submit(f, x)] = x
217
+
218
+        # Collect and results and add them to the learner
219
+        futures = list(xs.keys())
220
+        await any_complete(futures)
221
+        done = [fut for fut in futures if fut.done()]
222
+        for fut in done:
223
+            x = xs.pop(fut)
224
+            # Need to explicitly await the future (even though we know the
225
+            # result is there) to be compatible with Dask, who's futures'
226
+            # 'result' method return a future themselves.
227
+            y = await fut
228
+            learner.add_point(x, y)
229
+
230
+    # cancel any outstanding tasks
231
+    for fut in xs.keys():
232
+        fut.cancel()
233
+    # XXX: we should introduce an API for removing data points, and remove all
234
+    #      the data points with a 'None' value from the learner, or add a
235
+    #      method to simply remove all "unfinished" points from the learner.