Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(420)

Delta Between Two Patch Sets: services/auth_service/tests/replication_smoke_test.py

Issue 106310043: Protocol, UI and smoke test for Primary <-> Replica linking process. (Closed) Base URL: https://code.google.com/p/swarming/@master
Left Patch Set: Created 10 years, 9 months ago
Right Patch Set: Created 10 years, 9 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Left: Side by side diff | Download
Right: Side by side diff | Download
« no previous file with change/comment | « services/auth_service/tests/replica_app/main.py ('k') | services/auth_service/tools/compile_proto.py » ('j') | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 # Copyright 2014 The Swarming Authors. All rights reserved. 2 # Copyright 2014 The Swarming Authors. All rights reserved.
3 # Use of this source code is governed by the Apache v2.0 license that can be 3 # Use of this source code is governed by the Apache v2.0 license that can be
4 # found in the LICENSE file. 4 # found in the LICENSE file.
5 5
6 """High level test for Primary <-> Replica replication logic. 6 """High level test for Primary <-> Replica replication logic.
7 7
8 It launches two local services (Primary and Replica) via dev_appserver and sets 8 It launches two local services (Primary and Replica) via dev_appserver and sets
9 up auth db replication between them. 9 up auth db replication between them.
10 """ 10 """
11 11
12 import collections 12 import collections
13 import cookielib 13 import cookielib
14 import ctypes 14 import ctypes
15 import json 15 import json
16 import logging 16 import logging
17 import os 17 import os
18 import shutil 18 import shutil
19 import socket
19 import subprocess 20 import subprocess
20 import sys 21 import sys
21 import tempfile 22 import tempfile
22 import time 23 import time
23 import unittest 24 import unittest
24 import urllib2 25 import urllib2
25 26
26 27
27 # services/auth_service/tests/. 28 # services/auth_service/tests/.
28 THIS_DIR = os.path.dirname(os.path.abspath(__file__)) 29 THIS_DIR = os.path.dirname(os.path.abspath(__file__))
(...skipping 18 matching lines...) Expand all
47 except OSError: 48 except OSError:
48 return 49 return
49 PR_SET_PDEATHSIG = 1 50 PR_SET_PDEATHSIG = 1
50 SIGTERM = 15 51 SIGTERM = 15
51 try: 52 try:
52 libc.prctl(PR_SET_PDEATHSIG, SIGTERM) 53 libc.prctl(PR_SET_PDEATHSIG, SIGTERM)
53 except AttributeError: 54 except AttributeError:
54 return 55 return
55 56
56 57
58 def is_port_free(host, port):
59 """Returns True if the listening port number is available."""
60 s = socket.socket()
61 try:
62 # connect_ex returns 0 on success (i.e. port is being listened to).
63 return bool(s.connect_ex((host, port)))
64 finally:
65 s.close()
66
67
68 def find_free_ports(host, base_port, count):
69 """Finds several consecutive listening ports free to listen to."""
70 while base_port < (2<<16):
71 candidates = range(base_port, base_port + count)
72 if all(is_port_free(host, port) for port in candidates):
73 return candidates
74 base_port += len(candidates)
75 assert False, (
76 'Failed to find %d available ports starting at %d' % (count, base_port))
77
78
57 class DevServerApplication(object): 79 class DevServerApplication(object):
58 """GAE application running via dev_appserver.py.""" 80 """GAE application running via dev_appserver.py."""
59 81
60 # Return value of request(...) and json_request. 82 def __init__(self, app_dir, base_port):
61 HttpResponse = collections.namedtuple(
62 'HttpResponse', ['http_code', 'body', 'headers'])
63
64 def __init__(self, app_dir, port, admin_port):
65 self._admin_port = admin_port
66 self._app = gae_sdk_utils.Application(app_dir) 83 self._app = gae_sdk_utils.Application(app_dir)
84 self._base_port = base_port
85 self._client = None
67 self._exit_code = None 86 self._exit_code = None
68 self._log = None 87 self._log = None
69 self._opener = None 88 self._port = None
70 self._port = port
71 self._proc = None 89 self._proc = None
72 self._serving = False 90 self._serving = False
73 self._temp_root = None 91 self._temp_root = None
74 self._xsrf_token = None
75 92
76 @property 93 @property
77 def app_id(self): 94 def app_id(self):
78 """Application ID as specified in app.yaml.""" 95 """Application ID as specified in app.yaml."""
79 return self._app.app_id 96 return self._app.app_id
80 97
81 @property 98 @property
82 def port(self): 99 def port(self):
83 """Main HTTP port as passed in __init__.""" 100 """Main HTTP port that serves requests to 'default' module.
101
102 Valid only after app has started.
103 """
84 return self._port 104 return self._port
85 105
86 @property 106 @property
87 def url(self): 107 def url(self):
88 """Host URL.""" 108 """Host URL."""
89 return 'http://localhost:%d' % self._port 109 return 'http://localhost:%d' % self._port
90 110
111 @property
112 def client(self):
113 """HttpClient that can be used to make requests to the instance."""
114 return self._client
115
91 def start(self): 116 def start(self):
92 """Starts dev_appserver process.""" 117 """Starts dev_appserver process."""
93 assert not self._proc, 'Already running' 118 assert not self._proc, 'Already running'
94 119
95 # Ensure nothing is serving on |port|. 120 # Clear state.
96 port_free = False 121 self._client = None
97 try:
98 urllib2.urlopen(self.url + '/_ah/warmup')
99 except urllib2.URLError as exc:
100 # HTTPError means somebody replied (but with weird status code like 405).
101 # Any other exception means connection failed.
102 port_free = not isinstance(exc, urllib2.HTTPError)
103 if not port_free:
104 raise Exception('Something is already serving HTTP on %s' % self.url)
105
106 # Clear state (including cookies).
107 self._exit_code = None 122 self._exit_code = None
108 self._log = None 123 self._log = None
109 self._serving = False 124 self._serving = False
110 self._xsrf_token = None 125
111 self._opener = urllib2.build_opener( 126 # Find available ports, one per module + one for app admin.
112 urllib2.HTTPCookieProcessor(cookielib.CookieJar())) 127 free_ports = find_free_ports(
128 'localhost', self._base_port, len(self._app.modules) + 1)
129 self._port = free_ports[0]
113 130
114 # Create temp directories where dev_server keeps its state. 131 # Create temp directories where dev_server keeps its state.
115 self._temp_root = tempfile.mkdtemp(prefix=self.app_id) 132 self._temp_root = tempfile.mkdtemp(prefix=self.app_id)
116 os.makedirs(os.path.join(self._temp_root, 'storage')) 133 os.makedirs(os.path.join(self._temp_root, 'storage'))
117 134
118 # Launch the process. 135 # Launch the process.
119 log_file = os.path.join(self._temp_root, 'dev_appserver.log') 136 log_file = os.path.join(self._temp_root, 'dev_appserver.log')
120 logging.info( 137 logging.info(
121 'Launching %s at %s, log is %s', self.app_id, self.url, log_file) 138 'Launching %s at %s, log is %s', self.app_id, self.url, log_file)
122 with open(log_file, 'wb') as f: 139 with open(log_file, 'wb') as f:
123 self._proc = self._app.spawn_dev_appserver( 140 self._proc = self._app.spawn_dev_appserver(
124 [ 141 [
125 '--port', str(self._port), 142 '--port', str(self._port),
126 '--admin_port', str(self._admin_port), 143 '--admin_port', str(free_ports[-1]),
127 '--storage_path', os.path.join(self._temp_root, 'storage'), 144 '--storage_path', os.path.join(self._temp_root, 'storage'),
128 '--automatic_restart', 'no', 145 '--automatic_restart', 'no',
129 # Note: The random policy will provide the same consistency every 146 # Note: The random policy will provide the same consistency every
130 # time the test is run because the random generator is always given 147 # time the test is run because the random generator is always given
131 # the same seed. 148 # the same seed.
132 '--datastore_consistency_policy', 'random', 149 '--datastore_consistency_policy', 'random',
133 ], 150 ],
134 stdout=f, 151 stdout=f,
135 stderr=subprocess.STDOUT, 152 stderr=subprocess.STDOUT,
136 preexec_fn=terminate_with_parent) 153 preexec_fn=terminate_with_parent)
154
155 # Create a client that can talk to the service.
156 self._client = HttpClient(self.url)
137 157
138 def ensure_serving(self, timeout=5): 158 def ensure_serving(self, timeout=5):
139 """Waits for the service to start responding.""" 159 """Waits for the service to start responding."""
140 if self._serving: 160 if self._serving:
141 return 161 return
142 if not self._proc: 162 if not self._proc:
143 self.start() 163 self.start()
144 logging.info('Waiting for %s to become ready...', self.app_id) 164 logging.info('Waiting for %s to become ready...', self.app_id)
145 deadline = time.time() + timeout 165 deadline = time.time() + timeout
146 alive = False 166 alive = False
147 while self._proc.poll() is None and time.time() < deadline: 167 while self._proc.poll() is None and time.time() < deadline:
148 try: 168 try:
149 self._opener.open(self.url + '/_ah/warmup') 169 urllib2.urlopen(self.url + '/_ah/warmup')
150 alive = True 170 alive = True
151 break 171 break
152 except urllib2.URLError as exc: 172 except urllib2.URLError as exc:
153 if isinstance(exc, urllib2.HTTPError): 173 if isinstance(exc, urllib2.HTTPError):
154 alive = True 174 alive = True
155 break 175 break
156 time.sleep(0.05) 176 time.sleep(0.05)
157 if not alive: 177 if not alive:
158 logging.error('Service %s did\'t come online', self.app_id) 178 logging.error('Service %s did\'t come online', self.app_id)
159 self.stop() 179 self.stop()
(...skipping 17 matching lines...) Expand all
177 deadline = time.time() + 5 197 deadline = time.time() + 5
178 while self._proc.poll() is None and time.time() < deadline: 198 while self._proc.poll() is None and time.time() < deadline:
179 time.sleep(0.05) 199 time.sleep(0.05)
180 self._exit_code = self._proc.poll() 200 self._exit_code = self._proc.poll()
181 if self._exit_code is None: 201 if self._exit_code is None:
182 logging.error('Leaking PID %d', self._proc.pid) 202 logging.error('Leaking PID %d', self._proc.pid)
183 finally: 203 finally:
184 with open(os.path.join(self._temp_root, 'dev_appserver.log'), 'r') as f: 204 with open(os.path.join(self._temp_root, 'dev_appserver.log'), 'r') as f:
185 self._log = f.read() 205 self._log = f.read()
186 shutil.rmtree(self._temp_root) 206 shutil.rmtree(self._temp_root)
207 self._client = None
208 self._port = None
187 self._proc = None 209 self._proc = None
188 self._serving = False 210 self._serving = False
189 self._temp_root = None 211 self._temp_root = None
190 212
191 def dump_log(self): 213 def dump_log(self):
192 """Prints dev_appserver log to stderr, works only if app is stopped.""" 214 """Prints dev_appserver log to stderr, works only if app is stopped."""
193 assert self._log is not None 215 assert self._log is not None
216 print >> sys.stderr, '-' * 60
217 print >> sys.stderr, 'dev_appserver.py log for %s' % self.app_id
218 print >> sys.stderr, '-' * 60
194 print >> sys.stderr, self._log 219 print >> sys.stderr, self._log
220 print >> sys.stderr, '-' * 60
221
222
223 class HttpClient(object):
224 """Makes HTTP requests to some instance of dev_appserver."""
225
226 # Return value of request(...) and json_request.
227 HttpResponse = collections.namedtuple(
228 'HttpResponse', ['http_code', 'body', 'headers'])
229
230 def __init__(self, url):
231 self._url = url
232 self._opener = urllib2.build_opener(
233 urllib2.HTTPCookieProcessor(cookielib.CookieJar()))
234 self._xsrf_token = None
195 235
196 def login_as_admin(self, user='test@example.com'): 236 def login_as_admin(self, user='test@example.com'):
197 """Performs dev_appserver login as admin, modifies cookies.""" 237 """Performs dev_appserver login as admin, modifies cookies."""
198 self.request('/_ah/login?email=%s&admin=True&action=Login' % user) 238 self.request('/_ah/login?email=%s&admin=True&action=Login' % user)
199 239
200 def request(self, url, body=None, headers=None): 240 def request(self, resource, body=None, headers=None):
201 """Sends HTTP request to this instance.""" 241 """Sends HTTP request."""
202 assert self._proc, 'Not running' 242 if not resource.startswith(self._url):
203 if not url.startswith(self.url): 243 assert resource.startswith('/')
204 assert url.startswith('/') 244 resource = self._url + resource
205 url = self.url + url 245 req = urllib2.Request(resource, body, headers=(headers or {}))
206 req = urllib2.Request(url, body, headers=(headers or {}))
207 resp = self._opener.open(req) 246 resp = self._opener.open(req)
208 return self.HttpResponse(resp.getcode(), resp.read(), resp.info()) 247 return self.HttpResponse(resp.getcode(), resp.read(), resp.info())
209 248
210 def json_request(self, url, body=None, headers=None): 249 def json_request(self, resource, body=None, headers=None):
211 """Sends HTTP request and returns deserialized JSON.""" 250 """Sends HTTP request and returns deserialized JSON."""
212 if body is not None: 251 if body is not None:
213 body = json.dumps(body) 252 body = json.dumps(body)
214 headers = (headers or {}).copy() 253 headers = (headers or {}).copy()
215 headers['Content-Type'] = 'application/json; charset=UTF-8' 254 headers['Content-Type'] = 'application/json; charset=UTF-8'
216 resp = self.request(url, body, headers=headers) 255 resp = self.request(resource, body, headers=headers)
217 return self.HttpResponse( 256 return self.HttpResponse(
218 resp.http_code, json.loads(resp.body), resp.headers) 257 resp.http_code, json.loads(resp.body), resp.headers)
219 258
220 @property 259 @property
221 def xsrf_token(self): 260 def xsrf_token(self):
222 """Returns XSRF token for this service, fetching it if necessary.""" 261 """Returns XSRF token for the service, fetching it if necessary."""
223 if self._xsrf_token is None: 262 if self._xsrf_token is None:
224 resp = self.json_request( 263 resp = self.json_request(
225 '/auth/api/v1/accounts/self/xsrf_token', 264 '/auth/api/v1/accounts/self/xsrf_token',
226 body={}, 265 body={},
227 headers={'X-XSRF-Token-Request': '1'}) 266 headers={'X-XSRF-Token-Request': '1'})
228 self._xsrf_token = resp.body['xsrf_token'].encode('ascii') 267 self._xsrf_token = resp.body['xsrf_token'].encode('ascii')
229 return self._xsrf_token 268 return self._xsrf_token
230 269
231 270
232 class ReplicationTest(unittest.TestCase): 271 class ReplicationTest(unittest.TestCase):
233 auth_service = None 272 def setUp(self):
234 replica = None 273 super(ReplicationTest, self).setUp()
235 274 self.auth_service = DevServerApplication(SERVICE_APP_DIR, 9500)
236 @classmethod 275 self.replica = DevServerApplication(REPLICA_APP_DIR, 9600)
237 def setUpClass(cls):
238 cls.auth_service = DevServerApplication(SERVICE_APP_DIR, 9555, 9560)
239 cls.replica = DevServerApplication(REPLICA_APP_DIR, 9575, 9580)
240 # Launch both first, only then wait for them to come online. 276 # Launch both first, only then wait for them to come online.
241 apps = [cls.auth_service, cls.replica] 277 apps = [self.auth_service, self.replica]
242 for app in apps: 278 for app in apps:
243 app.start() 279 app.start()
244 for app in apps: 280 for app in apps:
245 app.ensure_serving() 281 app.ensure_serving()
246 app.login_as_admin() 282 app.client.login_as_admin()
247 283
248 @classmethod 284 def tearDown(self):
249 def tearDownClass(cls): 285 self.auth_service.stop()
250 cls.auth_service.stop() 286 self.replica.stop()
251 cls.auth_service = None 287 if self.has_failed():
252 cls.replica.stop() 288 self.auth_service.dump_log()
253 cls.replica = None 289 self.replica.dump_log()
290 super(ReplicationTest, self).tearDown()
291
292 def has_failed(self):
293 # pylint: disable=E1101
294 return not self._resultForDoCleanups.wasSuccessful()
254 295
255 def test_replication(self): 296 def test_replication(self):
256 """Tests Replica <-> Primary linking flow.""" 297 """Tests Replica <-> Primary linking flow."""
257 # Verify initial state: no linked services on primary. 298 # Verify initial state: no linked services on primary.
258 linked_services = self.auth_service.json_request( 299 linked_services = self.auth_service.client.json_request(
259 '/auth_service/api/v1/services').body 300 '/auth_service/api/v1/services').body
260 self.assertEqual([], linked_services['services']) 301 self.assertEqual([], linked_services['services'])
261 302
262 # Step 1. Generate a link to associate |replica| to |auth_service|. 303 # Step 1. Generate a link to associate |replica| to |auth_service|.
263 app_id = '%s@localhost:%d' % (self.replica.app_id, self.replica.port) 304 app_id = '%s@localhost:%d' % (self.replica.app_id, self.replica.port)
264 response = self.auth_service.json_request( 305 response = self.auth_service.client.json_request(
265 url='/auth_service/api/v1/services/%s/linking_url' % app_id, 306 resource='/auth_service/api/v1/services/%s/linking_url' % app_id,
266 body={}, 307 body={},
267 headers={'X-XSRF-Token': self.auth_service.xsrf_token}) 308 headers={'X-XSRF-Token': self.auth_service.client.xsrf_token})
268 self.assertEqual(201, response.http_code) 309 self.assertEqual(201, response.http_code)
269 310
270 # URL points to HTML page on the replica. 311 # URL points to HTML page on the replica.
271 linking_url = response.body['url'] 312 linking_url = response.body['url']
272 self.assertTrue( 313 self.assertTrue(
273 linking_url.startswith('%s/auth/link?t=' % self.replica.url)) 314 linking_url.startswith('%s/auth/link?t=' % self.replica.url))
274 315
275 # Step 2. "Click" this link. It should associates Replica with Primary via 316 # Step 2. "Click" this link. It should associates Replica with Primary via
276 # behind-the-scenes service <-> service URLFetch call. 317 # behind-the-scenes service <-> service URLFetch call.
277 response = self.replica.request( 318 response = self.replica.client.request(
278 url=linking_url, 319 resource=linking_url,
279 body='', 320 body='',
280 headers={'X-XSRF-Token': self.replica.xsrf_token}) 321 headers={'X-XSRF-Token': self.replica.client.xsrf_token})
281 self.assertEqual(200, response.http_code) 322 self.assertEqual(200, response.http_code)
282 self.assertIn('Success!', response.body) 323 self.assertIn('Success!', response.body)
283 324
284 # Verify primary knows about new replica now. 325 # Verify primary knows about new replica now.
285 linked_services = self.auth_service.json_request( 326 linked_services = self.auth_service.client.json_request(
286 '/auth_service/api/v1/services').body 327 '/auth_service/api/v1/services').body
287 self.assertEqual(1, len(linked_services['services'])) 328 self.assertEqual(1, len(linked_services['services']))
288 service = linked_services['services'][0] 329 service = linked_services['services'][0]
289 self.assertEqual(self.replica.app_id, service['app_id']) 330 self.assertEqual(self.replica.app_id, service['app_id'])
290 self.assertEqual(self.replica.url, service['replica_url']) 331 self.assertEqual(self.replica.url, service['replica_url'])
291 332
292 # Verify replica knows about the primary now. 333 # Verify replica knows about the primary now.
293 # TODO(vadimsh): Test once implemented. 334 # TODO(vadimsh): Test once implemented.
294 335
295 336
296 if __name__ == '__main__': 337 if __name__ == '__main__':
297 sdk_path = gae_sdk_utils.find_gae_sdk() 338 sdk_path = gae_sdk_utils.find_gae_sdk()
298 if not sdk_path: 339 if not sdk_path:
299 print >> sys.stderr, 'Couldn\'t find GAE SDK.' 340 print >> sys.stderr, 'Couldn\'t find GAE SDK.'
300 sys.exit(1) 341 sys.exit(1)
301 gae_sdk_utils.setup_gae_sdk(sdk_path) 342 gae_sdk_utils.setup_gae_sdk(sdk_path)
302 343
303 if '-v' in sys.argv: 344 if '-v' in sys.argv:
304 unittest.TestCase.maxDiff = None 345 unittest.TestCase.maxDiff = None
305 logging.basicConfig(level=logging.DEBUG) 346 logging.basicConfig(level=logging.DEBUG)
306 else: 347 else:
307 logging.basicConfig(level=logging.FATAL) 348 logging.basicConfig(level=logging.FATAL)
308 unittest.main() 349 unittest.main()
LEFTRIGHT

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b