Browse code

add utilities and module for controlling OpenVPN processes

Joseph Weston authored on 05/09/2017 18:43:53
Showing 3 changed files
... ...
@@ -14,3 +14,6 @@
14 14
 #
15 15
 # You should have received a copy of the GNU General Public License
16 16
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
+"""NordVPN client."""
18
+
19
+from . import vpn
17 20
new file mode 100644
... ...
@@ -0,0 +1,148 @@
1
+# -*- coding: utf-8 -*-
2
+#
3
+# copyright 2017 joseph weston
4
+#
5
+# this program is free software: you can redistribute it and/or modify
6
+# it under the terms of the gnu general public license as published by
7
+# the free software foundation, either version 3 of the license, or
8
+# (at your option) any later version.
9
+#
10
+# this program is distributed in the hope that it will be useful,
11
+# but without any warranty; without even the implied warranty of
12
+# merchantability or fitness for a particular purpose.  see the
13
+# gnu general public license for more details.
14
+#
15
+# you should have received a copy of the gnu general public license
16
+# along with this program.  if not, see <http://www.gnu.org/licenses/>.
17
+"""Miscellaneous utilities."""
18
+
19
+import os
20
+import fcntl
21
+import tempfile
22
+import asyncio
23
+from collections import OrderedDict
24
+
25
+from decorator import decorator
26
+
27
+
28
+def silence(*exceptions_to_silence):
29
+    """Catch and discard selected exception types.
30
+
31
+    this is a coroutine decorator.
32
+    """
33
+    async def wrapper(func, *args, **kwargs):
34
+        try:
35
+            return await func(*args, **kwargs)
36
+        except Exception as error:
37
+            if not isinstance(error, exceptions_to_silence):
38
+                raise
39
+
40
+    return decorator(wrapper)
41
+
42
+
43
+# Created by Github user 'jaredlunde':
44
+# https://gist.github.com/jaredlunde/7a118c03c3e9b925f2bf
45
+# with minor modifications.
46
+def async_lru_cache(size=float('inf')):
47
+    """LRU cache for coroutines."""
48
+    cache = OrderedDict()
49
+
50
+    async def memoized(func, *args, **kwargs):
51
+        key = str((args, kwargs))
52
+        try:
53
+            cache[key] = cache.pop(key)
54
+        except KeyError:
55
+            if len(cache) >= size:
56
+                cache.popitem(last=False)
57
+            cache[key] = await func(*args, **kwargs)
58
+        return cache[key]
59
+
60
+    return decorator(memoized)
61
+
62
+
63
+def write_to_tmp(content):
64
+    """Write text content to a temporary file a return a handle to it."""
65
+    tmp = tempfile.NamedTemporaryFile(mode='w+t')
66
+    tmp.write(content)
67
+    tmp.flush()
68
+    return tmp
69
+
70
+
71
+def _secure_opener(path, flags):
72
+    return os.open(path, flags, mode=0o600)
73
+
74
+
75
+class LockError(BlockingIOError):
76
+    """Exception raised on failure to acquire a file lock/"""
77
+    pass
78
+
79
+
80
+async def lock_subprocess(*args, lockfile, **kwargs):
81
+    """Acquire a lock for launching a subprocess.
82
+
83
+    Acquires a lock on a lockfile, launches a subprocess
84
+    and schedules unlocking the lockfile for when the
85
+    subprocess is dead.
86
+
87
+    Parameters
88
+    ----------
89
+    *args, **kwargs
90
+        Arguments to pass to 'asyncio.subprocess.create_subprocess_exec'.
91
+    lockfile : str
92
+        Filename of the lockfile.
93
+
94
+    Returns
95
+    -------
96
+    proc : asyncio.subprocess.Process
97
+
98
+    Raises
99
+    ------
100
+    LockError if the lock could not be acquired.
101
+    """
102
+
103
+    file = open(lockfile, 'w', opener=_secure_opener)
104
+    try:
105
+        fcntl.flock(file, fcntl.LOCK_EX | fcntl.LOCK_NB)
106
+    except BlockingIOError as error:
107
+        raise LockError(lockfile) from error
108
+    file.write(str(os.getpid()))  # write pid to lockfile
109
+    file.flush()
110
+
111
+    def unlock(*_):
112
+        fcntl.flock(file, fcntl.LOCK_UN)
113
+        file.truncate(0)
114
+        file.close()
115
+
116
+    try:
117
+        proc = await asyncio.create_subprocess_exec(*args, **kwargs)
118
+    except:
119
+        unlock()
120
+        raise
121
+    else:
122
+        when_dead = asyncio.ensure_future(proc.wait())
123
+        when_dead.add_done_callback(unlock)
124
+
125
+    return proc
126
+
127
+
128
+@silence(ProcessLookupError)
129
+async def kill_process(proc, timeout=None):
130
+    """Terminate a process as gracefully as possible.
131
+
132
+    sends sigterm and follows up with a sigkill if the process is not
133
+    dead after 'timeout' seconds, and flushes stdout and stderr
134
+    by calling 'proc.communicate()'.
135
+
136
+    Parameters
137
+    ----------
138
+    proc : asyncio.subprocess.process
139
+    timeout : int
140
+    """
141
+    try:
142
+        proc.terminate()
143
+        await asyncio.wait_for(proc.wait(), timeout)
144
+    except asyncio.TimeoutError:  # process didn't die in time
145
+        proc.kill()
146
+    finally:
147
+        # flush buffers
148
+        await proc.communicate()
0 149
new file mode 100644
... ...
@@ -0,0 +1,146 @@
1
+# -*- coding: utf-8 -*-
2
+#
3
+# Copyright 2017 Joseph Weston
4
+#
5
+# This program is free software: you can redistribute it and/or modify
6
+# it under the terms of the GNU General Public License as published by
7
+# the Free Software Foundation, either version 3 of the License, or
8
+# (at your option) any later version.
9
+#
10
+# This program is distributed in the hope that it will be useful,
11
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
+# GNU General Public License for more details.
14
+#
15
+# You should have received a copy of the GNU General Public License
16
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
+"""Tools for starting and supervising OpenVPN clients."""
18
+
19
+import asyncio
20
+
21
+from structlog import get_logger
22
+
23
+from ._utils import write_to_tmp, lock_subprocess, kill_process
24
+
25
+
26
+OPENVPN_EXECUTABLE = '/usr/sbin/openvpn'
27
+LOCKFILE = '/run/lock/nordvpn.lockfile'
28
+_OPENVPN_UP = b'Initialization Sequence Completed'
29
+
30
+
31
+async def start(config, username, password):
32
+    """Start an OpenVPN client with the given configuration.
33
+
34
+    Parameters
35
+    ----------
36
+    config : str
37
+        The contents of the OpenVPN config file.
38
+    username, password : str
39
+        Credentials for the OpenVPN connection.
40
+
41
+    Returns
42
+    -------
43
+    proc : asyncio.subprocess.Process
44
+
45
+    Raises
46
+    ------
47
+    RuntimeError if the OpenVPN process does not start correctly.
48
+    LockError if a lock could not be obtained for the lockfile
49
+
50
+    Notes
51
+    -----
52
+    Obtains a lock on a global lockfile before launching an OpenVPN
53
+    client in a subprocess. The lock is released when the process
54
+    dies.
55
+    """
56
+    logger = get_logger(__name__)
57
+
58
+    config_file = write_to_tmp(config)
59
+    credentials_file = write_to_tmp(f'{username}\n{password}')
60
+
61
+    cmd = ['sudo', '-n', OPENVPN_EXECUTABLE,
62
+           '--suppress-timestamps',
63
+           '--config', config_file.name,
64
+           '--auth-user-pass', credentials_file.name
65
+          ]
66
+
67
+    proc = None
68
+    try:
69
+        proc = await lock_subprocess(*cmd, stdout=asyncio.subprocess.PIPE,
70
+                                     lockfile=LOCKFILE)
71
+        logger = logger.bind(pid=proc.pid)
72
+
73
+        # Wait until OpenVPN comes up, as indicated by a particular line in stdout
74
+        stdout = b''
75
+        while _OPENVPN_UP not in stdout:
76
+            stdout = await proc.stdout.readline()
77
+            if not stdout:
78
+                # 'readline' returned empty; stdout is closed.
79
+                # Even if OpenVPN is not dead, we have no way of knowing
80
+                # whether the connection is up or not, so we kill it anyway.
81
+                raise RuntimeError('OpenVPN failed to start')
82
+            logger.info(stdout.decode().rstrip(), stream='stdout')
83
+
84
+    except Exception:
85
+        logger.error('failed to start')
86
+        if proc:
87
+            await asyncio.shield(kill_process(proc))
88
+        raise
89
+
90
+    finally:
91
+        config_file.close()
92
+        credentials_file.close()
93
+
94
+    logger.info('up')
95
+
96
+    return proc
97
+
98
+
99
+async def supervise(proc):
100
+    """Supervise a process.
101
+
102
+    This coroutine supervises a process and writes its stdout to
103
+    a logger until it dies, or until the coroutine is cancelled,
104
+    when the process will be killed.
105
+
106
+    Parameters
107
+    ----------
108
+    proc : asyncio.subprocess.Process
109
+
110
+    Returns
111
+    -------
112
+    returncode : int
113
+         'proc.returncode'.
114
+    """
115
+    logger = get_logger(__name__).bind(pid=proc.pid)
116
+    try:
117
+        stdout = await proc.stdout.readline()
118
+        while stdout:
119
+            logger.info(stdout.decode().rstrip(), stream='stdout')
120
+            stdout = await proc.stdout.readline()
121
+        # stdout is closed -- wait for the process to terminate
122
+        await proc.wait()
123
+    except asyncio.CancelledError:
124
+        logger.debug('received cancellation')
125
+    else:
126
+        stdout, _ = await proc.communicate()
127
+        stdout = (l.rstrip() for l in stdout.decode().split('\n'))
128
+        for line in (l for l in stdout if l):
129
+            logger.info(line, stream='stdout')
130
+        logger.warn('unexpected exit', return_code=proc.returncode)
131
+    finally:
132
+        logger.debug('cleaning up process')
133
+        await asyncio.shield(kill_process(proc))
134
+        logger.info('down')
135
+
136
+    return proc.returncode
137
+
138
+
139
+async def run(config, username, password):
140
+    """Run an OpenVPN client until it dies.
141
+
142
+    A description of the parameters can be found
143
+    in the documentation for `start`.
144
+    """
145
+    proc = await start(config, username, password)
146
+    await supervise(proc)