Commit | Line | Data |
---|---|---|
86530b38 AT |
1 | """TestCases for multi-threaded access to a DB. |
2 | """ | |
3 | ||
4 | import os | |
5 | import sys | |
6 | import time | |
7 | import errno | |
8 | import shutil | |
9 | import tempfile | |
10 | from pprint import pprint | |
11 | from random import random | |
12 | ||
13 | try: | |
14 | True, False | |
15 | except NameError: | |
16 | True = 1 | |
17 | False = 0 | |
18 | ||
19 | DASH = '-' | |
20 | ||
21 | try: | |
22 | from threading import Thread, currentThread | |
23 | have_threads = True | |
24 | except ImportError: | |
25 | have_threads = False | |
26 | ||
27 | import unittest | |
28 | from test_all import verbose | |
29 | ||
30 | try: | |
31 | # For Pythons w/distutils pybsddb | |
32 | from bsddb3 import db, dbutils | |
33 | except ImportError: | |
34 | # For Python 2.3 | |
35 | from bsddb import db, dbutils | |
36 | ||
37 | ||
38 | #---------------------------------------------------------------------- | |
39 | ||
40 | class BaseThreadedTestCase(unittest.TestCase): | |
41 | dbtype = db.DB_UNKNOWN # must be set in derived class | |
42 | dbopenflags = 0 | |
43 | dbsetflags = 0 | |
44 | envflags = 0 | |
45 | ||
46 | def setUp(self): | |
47 | if verbose: | |
48 | dbutils._deadlock_VerboseFile = sys.stdout | |
49 | ||
50 | homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home') | |
51 | self.homeDir = homeDir | |
52 | try: | |
53 | os.mkdir(homeDir) | |
54 | except OSError, e: | |
55 | if e.errno <> errno.EEXIST: raise | |
56 | self.env = db.DBEnv() | |
57 | self.setEnvOpts() | |
58 | self.env.open(homeDir, self.envflags | db.DB_CREATE) | |
59 | ||
60 | self.filename = self.__class__.__name__ + '.db' | |
61 | self.d = db.DB(self.env) | |
62 | if self.dbsetflags: | |
63 | self.d.set_flags(self.dbsetflags) | |
64 | self.d.open(self.filename, self.dbtype, self.dbopenflags|db.DB_CREATE) | |
65 | ||
66 | def tearDown(self): | |
67 | self.d.close() | |
68 | self.env.close() | |
69 | shutil.rmtree(self.homeDir) | |
70 | ||
71 | def setEnvOpts(self): | |
72 | pass | |
73 | ||
74 | def makeData(self, key): | |
75 | return DASH.join([key] * 5) | |
76 | ||
77 | ||
78 | #---------------------------------------------------------------------- | |
79 | ||
80 | ||
81 | class ConcurrentDataStoreBase(BaseThreadedTestCase): | |
82 | dbopenflags = db.DB_THREAD | |
83 | envflags = db.DB_THREAD | db.DB_INIT_CDB | db.DB_INIT_MPOOL | |
84 | readers = 0 # derived class should set | |
85 | writers = 0 | |
86 | records = 1000 | |
87 | ||
88 | def test01_1WriterMultiReaders(self): | |
89 | if verbose: | |
90 | print '\n', '-=' * 30 | |
91 | print "Running %s.test01_1WriterMultiReaders..." % \ | |
92 | self.__class__.__name__ | |
93 | ||
94 | threads = [] | |
95 | for x in range(self.writers): | |
96 | wt = Thread(target = self.writerThread, | |
97 | args = (self.d, self.records, x), | |
98 | name = 'writer %d' % x, | |
99 | )#verbose = verbose) | |
100 | threads.append(wt) | |
101 | ||
102 | for x in range(self.readers): | |
103 | rt = Thread(target = self.readerThread, | |
104 | args = (self.d, x), | |
105 | name = 'reader %d' % x, | |
106 | )#verbose = verbose) | |
107 | threads.append(rt) | |
108 | ||
109 | for t in threads: | |
110 | t.start() | |
111 | for t in threads: | |
112 | t.join() | |
113 | ||
114 | def writerThread(self, d, howMany, writerNum): | |
115 | #time.sleep(0.01 * writerNum + 0.01) | |
116 | name = currentThread().getName() | |
117 | start = howMany * writerNum | |
118 | stop = howMany * (writerNum + 1) - 1 | |
119 | if verbose: | |
120 | print "%s: creating records %d - %d" % (name, start, stop) | |
121 | ||
122 | for x in range(start, stop): | |
123 | key = '%04d' % x | |
124 | dbutils.DeadlockWrap(d.put, key, self.makeData(key), | |
125 | max_retries=12) | |
126 | if verbose and x % 100 == 0: | |
127 | print "%s: records %d - %d finished" % (name, start, x) | |
128 | ||
129 | if verbose: | |
130 | print "%s: finished creating records" % name | |
131 | ||
132 | ## # Each write-cursor will be exclusive, the only one that can update the DB... | |
133 | ## if verbose: print "%s: deleting a few records" % name | |
134 | ## c = d.cursor(flags = db.DB_WRITECURSOR) | |
135 | ## for x in range(10): | |
136 | ## key = int(random() * howMany) + start | |
137 | ## key = '%04d' % key | |
138 | ## if d.has_key(key): | |
139 | ## c.set(key) | |
140 | ## c.delete() | |
141 | ||
142 | ## c.close() | |
143 | if verbose: | |
144 | print "%s: thread finished" % name | |
145 | ||
146 | def readerThread(self, d, readerNum): | |
147 | time.sleep(0.01 * readerNum) | |
148 | name = currentThread().getName() | |
149 | ||
150 | for loop in range(5): | |
151 | c = d.cursor() | |
152 | count = 0 | |
153 | rec = c.first() | |
154 | while rec: | |
155 | count += 1 | |
156 | key, data = rec | |
157 | self.assertEqual(self.makeData(key), data) | |
158 | rec = c.next() | |
159 | if verbose: | |
160 | print "%s: found %d records" % (name, count) | |
161 | c.close() | |
162 | time.sleep(0.05) | |
163 | ||
164 | if verbose: | |
165 | print "%s: thread finished" % name | |
166 | ||
167 | ||
168 | class BTreeConcurrentDataStore(ConcurrentDataStoreBase): | |
169 | dbtype = db.DB_BTREE | |
170 | writers = 2 | |
171 | readers = 10 | |
172 | records = 1000 | |
173 | ||
174 | ||
175 | class HashConcurrentDataStore(ConcurrentDataStoreBase): | |
176 | dbtype = db.DB_HASH | |
177 | writers = 2 | |
178 | readers = 10 | |
179 | records = 1000 | |
180 | ||
181 | ||
182 | #---------------------------------------------------------------------- | |
183 | ||
184 | class SimpleThreadedBase(BaseThreadedTestCase): | |
185 | dbopenflags = db.DB_THREAD | |
186 | envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK | |
187 | readers = 5 | |
188 | writers = 3 | |
189 | records = 1000 | |
190 | ||
191 | def setEnvOpts(self): | |
192 | self.env.set_lk_detect(db.DB_LOCK_DEFAULT) | |
193 | ||
194 | def test02_SimpleLocks(self): | |
195 | if verbose: | |
196 | print '\n', '-=' * 30 | |
197 | print "Running %s.test02_SimpleLocks..." % self.__class__.__name__ | |
198 | ||
199 | threads = [] | |
200 | for x in range(self.writers): | |
201 | wt = Thread(target = self.writerThread, | |
202 | args = (self.d, self.records, x), | |
203 | name = 'writer %d' % x, | |
204 | )#verbose = verbose) | |
205 | threads.append(wt) | |
206 | for x in range(self.readers): | |
207 | rt = Thread(target = self.readerThread, | |
208 | args = (self.d, x), | |
209 | name = 'reader %d' % x, | |
210 | )#verbose = verbose) | |
211 | threads.append(rt) | |
212 | ||
213 | for t in threads: | |
214 | t.start() | |
215 | for t in threads: | |
216 | t.join() | |
217 | ||
218 | def writerThread(self, d, howMany, writerNum): | |
219 | name = currentThread().getName() | |
220 | start = howMany * writerNum | |
221 | stop = howMany * (writerNum + 1) - 1 | |
222 | if verbose: | |
223 | print "%s: creating records %d - %d" % (name, start, stop) | |
224 | ||
225 | # create a bunch of records | |
226 | for x in xrange(start, stop): | |
227 | key = '%04d' % x | |
228 | dbutils.DeadlockWrap(d.put, key, self.makeData(key), | |
229 | max_retries=12) | |
230 | ||
231 | if verbose and x % 100 == 0: | |
232 | print "%s: records %d - %d finished" % (name, start, x) | |
233 | ||
234 | # do a bit or reading too | |
235 | if random() <= 0.05: | |
236 | for y in xrange(start, x): | |
237 | key = '%04d' % x | |
238 | data = dbutils.DeadlockWrap(d.get, key, max_retries=12) | |
239 | self.assertEqual(data, self.makeData(key)) | |
240 | ||
241 | # flush them | |
242 | try: | |
243 | dbutils.DeadlockWrap(d.sync, max_retries=12) | |
244 | except db.DBIncompleteError, val: | |
245 | if verbose: | |
246 | print "could not complete sync()..." | |
247 | ||
248 | # read them back, deleting a few | |
249 | for x in xrange(start, stop): | |
250 | key = '%04d' % x | |
251 | data = dbutils.DeadlockWrap(d.get, key, max_retries=12) | |
252 | if verbose and x % 100 == 0: | |
253 | print "%s: fetched record (%s, %s)" % (name, key, data) | |
254 | self.assertEqual(data, self.makeData(key)) | |
255 | if random() <= 0.10: | |
256 | dbutils.DeadlockWrap(d.delete, key, max_retries=12) | |
257 | if verbose: | |
258 | print "%s: deleted record %s" % (name, key) | |
259 | ||
260 | if verbose: | |
261 | print "%s: thread finished" % name | |
262 | ||
263 | def readerThread(self, d, readerNum): | |
264 | time.sleep(0.01 * readerNum) | |
265 | name = currentThread().getName() | |
266 | ||
267 | for loop in range(5): | |
268 | c = d.cursor() | |
269 | count = 0 | |
270 | rec = dbutils.DeadlockWrap(c.first, max_retries=10) | |
271 | while rec: | |
272 | count += 1 | |
273 | key, data = rec | |
274 | self.assertEqual(self.makeData(key), data) | |
275 | rec = dbutils.DeadlockWrap(c.next, max_retries=10) | |
276 | if verbose: | |
277 | print "%s: found %d records" % (name, count) | |
278 | c.close() | |
279 | time.sleep(0.05) | |
280 | ||
281 | if verbose: | |
282 | print "%s: thread finished" % name | |
283 | ||
284 | ||
285 | class BTreeSimpleThreaded(SimpleThreadedBase): | |
286 | dbtype = db.DB_BTREE | |
287 | ||
288 | ||
289 | class HashSimpleThreaded(SimpleThreadedBase): | |
290 | dbtype = db.DB_HASH | |
291 | ||
292 | ||
293 | #---------------------------------------------------------------------- | |
294 | ||
295 | ||
296 | class ThreadedTransactionsBase(BaseThreadedTestCase): | |
297 | dbopenflags = db.DB_THREAD | db.DB_AUTO_COMMIT | |
298 | envflags = (db.DB_THREAD | | |
299 | db.DB_INIT_MPOOL | | |
300 | db.DB_INIT_LOCK | | |
301 | db.DB_INIT_LOG | | |
302 | db.DB_INIT_TXN | |
303 | ) | |
304 | readers = 0 | |
305 | writers = 0 | |
306 | records = 2000 | |
307 | txnFlag = 0 | |
308 | ||
309 | def setEnvOpts(self): | |
310 | #self.env.set_lk_detect(db.DB_LOCK_DEFAULT) | |
311 | pass | |
312 | ||
313 | def test03_ThreadedTransactions(self): | |
314 | if verbose: | |
315 | print '\n', '-=' * 30 | |
316 | print "Running %s.test03_ThreadedTransactions..." % \ | |
317 | self.__class__.__name__ | |
318 | ||
319 | threads = [] | |
320 | for x in range(self.writers): | |
321 | wt = Thread(target = self.writerThread, | |
322 | args = (self.d, self.records, x), | |
323 | name = 'writer %d' % x, | |
324 | )#verbose = verbose) | |
325 | threads.append(wt) | |
326 | ||
327 | for x in range(self.readers): | |
328 | rt = Thread(target = self.readerThread, | |
329 | args = (self.d, x), | |
330 | name = 'reader %d' % x, | |
331 | )#verbose = verbose) | |
332 | threads.append(rt) | |
333 | ||
334 | dt = Thread(target = self.deadlockThread) | |
335 | dt.start() | |
336 | ||
337 | for t in threads: | |
338 | t.start() | |
339 | for t in threads: | |
340 | t.join() | |
341 | ||
342 | self.doLockDetect = False | |
343 | dt.join() | |
344 | ||
345 | def doWrite(self, d, name, start, stop): | |
346 | finished = False | |
347 | while not finished: | |
348 | try: | |
349 | txn = self.env.txn_begin(None, self.txnFlag) | |
350 | for x in range(start, stop): | |
351 | key = '%04d' % x | |
352 | d.put(key, self.makeData(key), txn) | |
353 | if verbose and x % 100 == 0: | |
354 | print "%s: records %d - %d finished" % (name, start, x) | |
355 | txn.commit() | |
356 | finished = True | |
357 | except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val: | |
358 | if verbose: | |
359 | print "%s: Aborting transaction (%s)" % (name, val[1]) | |
360 | txn.abort() | |
361 | time.sleep(0.05) | |
362 | ||
363 | def writerThread(self, d, howMany, writerNum): | |
364 | name = currentThread().getName() | |
365 | start = howMany * writerNum | |
366 | stop = howMany * (writerNum + 1) - 1 | |
367 | if verbose: | |
368 | print "%s: creating records %d - %d" % (name, start, stop) | |
369 | ||
370 | step = 100 | |
371 | for x in range(start, stop, step): | |
372 | self.doWrite(d, name, x, min(stop, x+step)) | |
373 | ||
374 | if verbose: | |
375 | print "%s: finished creating records" % name | |
376 | if verbose: | |
377 | print "%s: deleting a few records" % name | |
378 | ||
379 | finished = False | |
380 | while not finished: | |
381 | try: | |
382 | recs = [] | |
383 | txn = self.env.txn_begin(None, self.txnFlag) | |
384 | for x in range(10): | |
385 | key = int(random() * howMany) + start | |
386 | key = '%04d' % key | |
387 | data = d.get(key, None, txn, db.DB_RMW) | |
388 | if data is not None: | |
389 | d.delete(key, txn) | |
390 | recs.append(key) | |
391 | txn.commit() | |
392 | finished = True | |
393 | if verbose: | |
394 | print "%s: deleted records %s" % (name, recs) | |
395 | except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val: | |
396 | if verbose: | |
397 | print "%s: Aborting transaction (%s)" % (name, val[1]) | |
398 | txn.abort() | |
399 | time.sleep(0.05) | |
400 | ||
401 | if verbose: | |
402 | print "%s: thread finished" % name | |
403 | ||
404 | def readerThread(self, d, readerNum): | |
405 | time.sleep(0.01 * readerNum + 0.05) | |
406 | name = currentThread().getName() | |
407 | ||
408 | for loop in range(5): | |
409 | finished = False | |
410 | while not finished: | |
411 | try: | |
412 | txn = self.env.txn_begin(None, self.txnFlag) | |
413 | c = d.cursor(txn) | |
414 | count = 0 | |
415 | rec = c.first() | |
416 | while rec: | |
417 | count += 1 | |
418 | key, data = rec | |
419 | self.assertEqual(self.makeData(key), data) | |
420 | rec = c.next() | |
421 | if verbose: print "%s: found %d records" % (name, count) | |
422 | c.close() | |
423 | txn.commit() | |
424 | finished = True | |
425 | except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val: | |
426 | if verbose: | |
427 | print "%s: Aborting transaction (%s)" % (name, val[1]) | |
428 | c.close() | |
429 | txn.abort() | |
430 | time.sleep(0.05) | |
431 | ||
432 | time.sleep(0.05) | |
433 | ||
434 | if verbose: | |
435 | print "%s: thread finished" % name | |
436 | ||
437 | def deadlockThread(self): | |
438 | self.doLockDetect = True | |
439 | while self.doLockDetect: | |
440 | time.sleep(0.5) | |
441 | try: | |
442 | aborted = self.env.lock_detect( | |
443 | db.DB_LOCK_RANDOM, db.DB_LOCK_CONFLICT) | |
444 | if verbose and aborted: | |
445 | print "deadlock: Aborted %d deadlocked transaction(s)" \ | |
446 | % aborted | |
447 | except db.DBError: | |
448 | pass | |
449 | ||
450 | ||
451 | class BTreeThreadedTransactions(ThreadedTransactionsBase): | |
452 | dbtype = db.DB_BTREE | |
453 | writers = 3 | |
454 | readers = 5 | |
455 | records = 2000 | |
456 | ||
457 | class HashThreadedTransactions(ThreadedTransactionsBase): | |
458 | dbtype = db.DB_HASH | |
459 | writers = 1 | |
460 | readers = 5 | |
461 | records = 2000 | |
462 | ||
463 | class BTreeThreadedNoWaitTransactions(ThreadedTransactionsBase): | |
464 | dbtype = db.DB_BTREE | |
465 | writers = 3 | |
466 | readers = 5 | |
467 | records = 2000 | |
468 | txnFlag = db.DB_TXN_NOWAIT | |
469 | ||
470 | class HashThreadedNoWaitTransactions(ThreadedTransactionsBase): | |
471 | dbtype = db.DB_HASH | |
472 | writers = 1 | |
473 | readers = 5 | |
474 | records = 2000 | |
475 | txnFlag = db.DB_TXN_NOWAIT | |
476 | ||
477 | ||
478 | #---------------------------------------------------------------------- | |
479 | ||
480 | def test_suite(): | |
481 | suite = unittest.TestSuite() | |
482 | ||
483 | if have_threads: | |
484 | suite.addTest(unittest.makeSuite(BTreeConcurrentDataStore)) | |
485 | suite.addTest(unittest.makeSuite(HashConcurrentDataStore)) | |
486 | suite.addTest(unittest.makeSuite(BTreeSimpleThreaded)) | |
487 | suite.addTest(unittest.makeSuite(HashSimpleThreaded)) | |
488 | suite.addTest(unittest.makeSuite(BTreeThreadedTransactions)) | |
489 | suite.addTest(unittest.makeSuite(HashThreadedTransactions)) | |
490 | suite.addTest(unittest.makeSuite(BTreeThreadedNoWaitTransactions)) | |
491 | suite.addTest(unittest.makeSuite(HashThreadedNoWaitTransactions)) | |
492 | ||
493 | else: | |
494 | print "Threads not available, skipping thread tests." | |
495 | ||
496 | return suite | |
497 | ||
498 | ||
499 | if __name__ == '__main__': | |
500 | unittest.main(defaultTest='test_suite') |