Browse code

using asyncio and create user interface

Bas Nijholt authored on 16/08/2017 18:18:11
Showing 2 changed files
... ...
@@ -25,12 +25,43 @@
25 25
    "outputs": [],
26 26
    "source": [
27 27
     "import numpy as np\n",
28
-    "import learner\n",
28
+    "import adalearner\n",
29 29
     "from time import sleep\n",
30 30
     "from random import randint\n",
31 31
     "from functools import partial\n",
32
+    "import ipyparallel\n",
33
+    "import concurrent.futures\n",
32 34
     "import importlib\n",
33
-    "importlib.reload(learner)"
35
+    "importlib.reload(adalearner)"
36
+   ]
37
+  },
38
+  {
39
+   "cell_type": "code",
40
+   "execution_count": null,
41
+   "metadata": {
42
+    "collapsed": true
43
+   },
44
+   "outputs": [],
45
+   "source": [
46
+    "import asyncio\n",
47
+    "from ipykernel.eventloops import register_integration\n",
48
+    "\n",
49
+    "@register_integration('asyncio')\n",
50
+    "def loop_asyncio(kernel):\n",
51
+    "    '''Start a kernel with asyncio event loop support.'''\n",
52
+    "    loop = asyncio.get_event_loop()\n",
53
+    "\n",
54
+    "    def kernel_handler():\n",
55
+    "        loop.call_soon(kernel.do_one_iteration)\n",
56
+    "        loop.call_later(kernel._poll_interval, kernel_handler)\n",
57
+    "\n",
58
+    "    loop.call_soon(kernel_handler)\n",
59
+    "    try:\n",
60
+    "        if not loop.is_running():\n",
61
+    "            loop.run_forever()\n",
62
+    "    finally:\n",
63
+    "        loop.run_until_complete(loop.shutdown_asyncgens())\n",
64
+    "        loop.close()"
34 65
    ]
35 66
   },
36 67
   {
... ...
@@ -38,13 +69,27 @@
38 69
    "execution_count": null,
39 70
    "metadata": {},
40 71
    "outputs": [],
72
+   "source": [
73
+    "%gui asyncio"
74
+   ]
75
+  },
76
+  {
77
+   "cell_type": "code",
78
+   "execution_count": null,
79
+   "metadata": {
80
+    "collapsed": true
81
+   },
82
+   "outputs": [],
41 83
    "source": [
42 84
     "def func(x, wait=True):\n",
85
+    "    \"\"\"Function with a sharp peak on a smooth background\"\"\"\n",
86
+    "    import numpy as np\n",
87
+    "    from time import sleep\n",
43 88
     "    x = np.asarray(x)\n",
44
-    "    a = 10\n",
89
+    "    a = 0.001\n",
45 90
     "    if wait:\n",
46
-    "        sleep(randint(1, 3))\n",
47
-    "    return np.sin(x) + 0.0001/(0.0001 + x**2)"
91
+    "        sleep(np.random.randint(1, 3))\n",
92
+    "    return x + a**2/(a**2 + (x)**2)"
48 93
    ]
49 94
   },
50 95
   {
... ...
@@ -60,33 +105,21 @@
60 105
    "metadata": {},
61 106
    "outputs": [],
62 107
    "source": [
63
-    "import tornado\n",
64
-    "from distributed import Client\n",
108
+    "learner = adalearner.Learner1D(func, client=ipyparallel.Client())\n",
65 109
     "\n",
66
-    "io = tornado.ioloop.IOLoop.current()\n",
67
-    "\n",
68
-    "# Initialize the learner\n",
69
-    "learner1d = learner.Learner1D()\n",
70
-    "learner1d.add_point(-1, func(-1))\n",
71
-    "learner1d.add_point(1, func(1))"
110
+    "learner.add_point(-1, func(-1))\n",
111
+    "learner.add_point(1, func(1))"
72 112
    ]
73 113
   },
74 114
   {
75 115
    "cell_type": "code",
76 116
    "execution_count": null,
77
-   "metadata": {},
117
+   "metadata": {
118
+    "collapsed": true
119
+   },
78 120
    "outputs": [],
79 121
    "source": [
80
-    "async def dask_run(learner):\n",
81
-    "    async with Client(asynchronous=True) as client:\n",
82
-    "        await learner.run(func, client, learner1d ,goal=lambda learner1d: learner1d.loss() < 0.000001)\n",
83
-    "\n",
84
-    "def plot(data):\n",
85
-    "        xy = [(k, v) for k, v in sorted(data.items()) if v is not None]\n",
86
-    "        if not xy:\n",
87
-    "            return hv.Scatter([])\n",
88
-    "        x, y  = np.array(xy, dtype=float).T\n",
89
-    "        return hv.Scatter((x, y))"
122
+    "learner.start()"
90 123
    ]
91 124
   },
92 125
   {
... ...
@@ -96,7 +129,7 @@
96 129
    "outputs": [],
97 130
    "source": [
98 131
     "data_stream = Stream.define('data', data=param.ObjectSelector(default=dict()))\n",
99
-    "dm = hv.DynamicMap(plot, streams=[data_stream()])\n",
132
+    "dm = hv.DynamicMap(learner.plot, streams=[data_stream()])\n",
100 133
     "dm"
101 134
    ]
102 135
   },
... ...
@@ -106,9 +139,12 @@
106 139
    "metadata": {},
107 140
    "outputs": [],
108 141
    "source": [
109
-    "pc = tornado.ioloop.PeriodicCallback(lambda: dm.event(data=learner1d.data), 100)\n",
110
-    "pc.start()\n",
111
-    "io.add_callback(dask_run, learner)"
142
+    "async def monitor(delay=1):\n",
143
+    "    while True:\n",
144
+    "        dm.event(data=learner.data)\n",
145
+    "        await asyncio.sleep(delay)\n",
146
+    "        \n",
147
+    "monitor_task = learner.ioloop.create_task(monitor())"
112 148
    ]
113 149
   },
114 150
   {
... ...
@@ -116,7 +152,9 @@
116 152
    "execution_count": null,
117 153
    "metadata": {},
118 154
    "outputs": [],
119
-   "source": []
155
+   "source": [
156
+    "learner.task.print_stack()"
157
+   ]
120 158
   }
121 159
  ],
122 160
  "metadata": {
123 161
similarity index 67%
124 162
rename from learner.py
125 163
rename to adalearner.py
... ...
@@ -1,20 +1,74 @@
1
+
2
+import abc
3
+import asyncio
1 4
 import heapq
2
-from math import sqrt
3 5
 import itertools
4
-import multiprocessing
6
+import os
7
+from math import sqrt
5 8
 
9
+import concurrent
10
+import distributed
11
+import holoviews as hv
12
+import ipyparallel
6 13
 import numpy as np
7
-import tornado
8 14
 
9 15
 
10
-def add_arg(func):
11
-    """Make func return (arg, func(arg))."""
12
-    def wrapper(*args):
13
-        return (args[0], func(*args))
14
-    return wrapper
16
+class BaseLearner(metaclass=abc.ABCMeta):
17
+    def __init__(self, xdata=None, ydata=None):
18
+        """Initialize the learner.
19
+
20
+        Parameters
21
+        ----------
22
+        data :
23
+           Possibly empty list of float-like tuples, describing the initial
24
+           data.
25
+        """
26
+        # A dict {x_n: y_n} for quick checking of local
27
+        # properties.
28
+        self.data = {}
29
+
30
+        # Add initial data if provided
31
+        if xdata is not None:
32
+            self.add_data(xdata, ydata)
33
+
34
+    def add_data(self, xvalues, yvalues):
35
+        """Add data to the intervals.
36
+
37
+        Parameters
38
+        ----------
39
+        xvalues : iterable of numbers
40
+            Values of the x coordinate.
41
+        yvalues : iterable of numbers and None
42
+            Values of the y coordinate. `None` means that the value will be
43
+            provided later.
44
+        """
45
+        try:
46
+            for x, y in zip(xvalues, yvalues):
47
+                self.add_point(x, y)
48
+        except TypeError:
49
+            self.add_point(xvalues, yvalues)
50
+
51
+    def add_point(self, x, y):
52
+        """Update the data."""
53
+        self.data[x] = y
54
+
55
+    def remove_unfinished(self):
56
+        self.data = {k: v for k, v in self.data.items() if v is not None}
57
+
58
+    @abc.abstractmethod
59
+    def loss(self):
60
+        pass
61
+
62
+    @abc.abstractmethod
63
+    def choose_points(self, n=10):
64
+        pass
65
+
66
+    @abc.abstractmethod
67
+    def interpolate(self):
68
+        pass
15 69
 
16 70
 
17
-class Learner1D(object):
71
+class _Learner1D(BaseLearner):
18 72
     """ Learns and predicts a 1D function.
19 73
 
20 74
     Description
... ...
@@ -38,6 +92,7 @@ class Learner1D(object):
38 92
         """
39 93
 
40 94
         # Set internal variables
95
+        super().__init__(xdata, ydata)
41 96
 
42 97
         # A dict storing the loss function for each interval x_n.
43 98
         self.losses = {}
... ...
@@ -45,9 +100,6 @@ class Learner1D(object):
45 100
         # A dict {x_n: [x_{n-1}, x_{n+1}]} for quick checking of local
46 101
         # properties.
47 102
         self.neighbors = {}
48
-        # A dict {x_n: y_n} for quick checking of local
49
-        # properties.
50
-        self.data = {}
51 103
 
52 104
         # Bounding box [[minx, maxx], [miny, maxy]].
53 105
         self._bbox = [[np.inf, -np.inf], [np.inf, -np.inf]]
... ...
@@ -55,10 +107,6 @@ class Learner1D(object):
55 107
         self._scale = [0, 0]
56 108
         self._oldscale = [0, 0]
57 109
 
58
-        # Add initial data if provided
59
-        if xdata is not None:
60
-            self.add_data(xdata, ydata)
61
-
62 110
     def interval_loss(self, x_left, x_right):
63 111
         """Calculate loss in the interval x_left, x_right.
64 112
 
... ...
@@ -76,26 +124,10 @@ class Learner1D(object):
76 124
         else:
77 125
             return max(self.losses.values())
78 126
 
79
-    def add_data(self, xvalues, yvalues):
80
-        """Add data to the intervals.
81
-
82
-        Parameters
83
-        ----------
84
-        xvalues : iterable of numbers
85
-            Values of the x coordinate.
86
-        yvalues : iterable of numbers and None
87
-            Values of the y coordinate. `None` means that the value will be
88
-            provided later.
89
-        """
90
-        try:
91
-            for x, y in zip(xvalues, yvalues):
92
-                self.add_point(x, y)
93
-        except TypeError:
94
-            self.add_point(xvalues, yvalues)
95 127
 
96 128
     def add_point(self, x, y):
97 129
         """Update the data."""
98
-        self.data[x] = y
130
+        super().add_point(x, y)
99 131
 
100 132
         # Update the scale.
101 133
         self._bbox[0][0] = min(self._bbox[0][0], x)
... ...
@@ -138,7 +170,7 @@ class Learner1D(object):
138 170
         return xs
139 171
 
140 172
     def remove_unfinished(self):
141
-        self.data = {k: v for k, v in self.data.items() if v is not None}
173
+        super().remove_unfinished()
142 174
         # Update the scale.
143 175
         self._bbox[0][0] = min(self.data.keys())
144 176
         self._bbox[0][1] = max(self.data.keys())
... ...
@@ -149,13 +181,6 @@ class Learner1D(object):
149 181
 
150 182
         self.interpolate()
151 183
 
152
-    def get_largest_interval(self):
153
-        xs = sorted(x for x, y in self.data.items() if y is not None)
154
-        if len(xs) < 2:
155
-            return np.inf
156
-        else:
157
-            return np.diff(xs).max()
158
-
159 184
     def interpolate(self):
160 185
         xdata = []
161 186
         ydata = []
... ...
@@ -200,19 +225,79 @@ class Learner1D(object):
200 225
                 pass
201 226
 
202 227
 
203
-# We can't use API that is specific to any particular asynchronous
204
-# framework, so we have to roll our own utility functions.
228
+class AsyncExecutor:
229
+
230
+    def __init__(self, executor, ioloop):
231
+        self.executor = executor
232
+        self.ioloop = ioloop
233
+
234
+    def submit(self, f, *args, **kwargs):
235
+        return self.ioloop.run_in_executor(self.executor, f, *args, **kwargs)
236
+
237
+
238
+def ensure_async_executor(client, ioloop):
239
+    if isinstance(client, ipyparallel.Client):
240
+        async_executor = AsyncExecutor(client.executor(), ioloop)
241
+    elif isinstance(client, distributed.Client):
242
+        async_executor = async_executor
243
+    elif client is None:
244
+        client = concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count())
245
+        async_executor = AsyncExecutor(client, ioloop)
246
+    else:
247
+        raise NotImplementedError('Blabla')
248
+
249
+    return async_executor
250
+
251
+
252
+def runner(learner):
253
+    if isinstance(learner.client, ipyparallel.Client):
254
+        ncores = len(learner.client)
255
+    elif isinstance(learner.client, distributed.Client):
256
+        ncores = sum(learner.client.ncores().values())
257
+    elif learner.client is None:
258
+        ncores = os.cpu_count()
259
+    else:
260
+        raise NotImplementedError('Blabla')
205 261
 
206
-async def any_complete(futures):
207
-    total = tornado.concurrent.Future()
208
-    for f in futures:
209
-        f.add_done_callback(lambda f: total.set_result(None)
210
-                            if not total.done() else None)
211
-    await total
212
-    return [f for f in futures if f.done()]
262
+    return run_asyncio(learner.func, learner.executor, learner, ncores=ncores,
263
+                       goal=lambda learner: learner.loss() < 0.1)
213 264
 
214 265
 
215
-async def run(f, executor, learner, goal, ncores=multiprocessing.cpu_count()):
266
+class LearnerMixin:
267
+
268
+    def __init__(self, func, *, client=None, goal=None, ioloop=None, **learner_kwargs):
269
+        self.ioloop = ioloop if ioloop else asyncio.get_event_loop()
270
+        self.executor = ensure_async_executor(client, self.ioloop)  # wraps in `run_in_executor` if concurrent.futures.Executor compatible
271
+        self.client = client
272
+        self.func = func
273
+        self.task = None
274
+        super().__init__(**learner_kwargs)
275
+
276
+    def start(self):
277
+        self.task = self.ioloop.create_task(runner(self))
278
+
279
+    def cancel(self):
280
+        if self.task:
281
+            return self.task.cancel()
282
+        else:
283
+            return False
284
+
285
+
286
+class Learner1D(LearnerMixin, _Learner1D):
287
+
288
+    def plot(self, data=None):
289
+        "Plot another learner"
290
+        if data is None:
291
+            data = self.data
292
+        xy = [(k, v) for k, v in sorted(data.items()) if v is not None]
293
+        if not xy:
294
+            return hv.Scatter([])[-1.1:1.1, -1.1:1.1]
295
+        x, y  = np.array(xy, dtype=float).T
296
+        return hv.Scatter((x, y))[-1.1:1.1, -1.1:1.1]
297
+
298
+
299
+async def run_asyncio(f, executor, learner, goal,
300
+                      ncores=os.cpu_count()):
216 301
     xs = dict()
217 302
     done = [None] * ncores
218 303
 
... ...
@@ -224,8 +309,7 @@ async def run(f, executor, learner, goal, ncores=multiprocessing.cpu_count()):
224 309
 
225 310
         # Collect and results and add them to the learner
226 311
         futures = list(xs.keys())
227
-        await any_complete(futures)
228
-        done = [fut for fut in futures if fut.done()]
312
+        done, _ = await asyncio.wait(futures, return_when=asyncio.FIRST_COMPLETED)
229 313
         for fut in done:
230 314
             x = xs.pop(fut)
231 315
             # Need to explicitly await the future (even though we know the
... ...
@@ -234,7 +318,8 @@ async def run(f, executor, learner, goal, ncores=multiprocessing.cpu_count()):
234 318
             y = await fut
235 319
             learner.add_point(x, y)
236 320
 
237
-    # cancel any outstanding tasks
238
-    for fut in xs.keys():
239
-        fut.cancel()
240 321
     learner.remove_unfinished()
322
+    # cancel any outstanding tasks
323
+    cancelled = all(fut.cancel() for fut in xs.keys())
324
+    if not cancelled:
325
+        raise RuntimeError('Some futures remain uncancelled')