Initial commit of OpenSPARC T2 design and verification files.
[OpenSPARC-T2-DV] / tools / src / nas,5.n2.os.2 / lib / python / lib / python2.4 / bsddb / test / test_thread.py
CommitLineData
86530b38
AT
1"""TestCases for multi-threaded access to a DB.
2"""
3
4import os
5import sys
6import time
7import errno
8import shutil
9import tempfile
10from pprint import pprint
11from random import random
12
13try:
14 True, False
15except NameError:
16 True = 1
17 False = 0
18
19DASH = '-'
20
21try:
22 from threading import Thread, currentThread
23 have_threads = True
24except ImportError:
25 have_threads = False
26
27import unittest
28from test_all import verbose
29
30try:
31 # For Pythons w/distutils pybsddb
32 from bsddb3 import db, dbutils
33except ImportError:
34 # For Python 2.3
35 from bsddb import db, dbutils
36
37
38#----------------------------------------------------------------------
39
40class BaseThreadedTestCase(unittest.TestCase):
41 dbtype = db.DB_UNKNOWN # must be set in derived class
42 dbopenflags = 0
43 dbsetflags = 0
44 envflags = 0
45
46 def setUp(self):
47 if verbose:
48 dbutils._deadlock_VerboseFile = sys.stdout
49
50 homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
51 self.homeDir = homeDir
52 try:
53 os.mkdir(homeDir)
54 except OSError, e:
55 if e.errno <> errno.EEXIST: raise
56 self.env = db.DBEnv()
57 self.setEnvOpts()
58 self.env.open(homeDir, self.envflags | db.DB_CREATE)
59
60 self.filename = self.__class__.__name__ + '.db'
61 self.d = db.DB(self.env)
62 if self.dbsetflags:
63 self.d.set_flags(self.dbsetflags)
64 self.d.open(self.filename, self.dbtype, self.dbopenflags|db.DB_CREATE)
65
66 def tearDown(self):
67 self.d.close()
68 self.env.close()
69 shutil.rmtree(self.homeDir)
70
71 def setEnvOpts(self):
72 pass
73
74 def makeData(self, key):
75 return DASH.join([key] * 5)
76
77
78#----------------------------------------------------------------------
79
80
81class ConcurrentDataStoreBase(BaseThreadedTestCase):
82 dbopenflags = db.DB_THREAD
83 envflags = db.DB_THREAD | db.DB_INIT_CDB | db.DB_INIT_MPOOL
84 readers = 0 # derived class should set
85 writers = 0
86 records = 1000
87
88 def test01_1WriterMultiReaders(self):
89 if verbose:
90 print '\n', '-=' * 30
91 print "Running %s.test01_1WriterMultiReaders..." % \
92 self.__class__.__name__
93
94 threads = []
95 for x in range(self.writers):
96 wt = Thread(target = self.writerThread,
97 args = (self.d, self.records, x),
98 name = 'writer %d' % x,
99 )#verbose = verbose)
100 threads.append(wt)
101
102 for x in range(self.readers):
103 rt = Thread(target = self.readerThread,
104 args = (self.d, x),
105 name = 'reader %d' % x,
106 )#verbose = verbose)
107 threads.append(rt)
108
109 for t in threads:
110 t.start()
111 for t in threads:
112 t.join()
113
114 def writerThread(self, d, howMany, writerNum):
115 #time.sleep(0.01 * writerNum + 0.01)
116 name = currentThread().getName()
117 start = howMany * writerNum
118 stop = howMany * (writerNum + 1) - 1
119 if verbose:
120 print "%s: creating records %d - %d" % (name, start, stop)
121
122 for x in range(start, stop):
123 key = '%04d' % x
124 dbutils.DeadlockWrap(d.put, key, self.makeData(key),
125 max_retries=12)
126 if verbose and x % 100 == 0:
127 print "%s: records %d - %d finished" % (name, start, x)
128
129 if verbose:
130 print "%s: finished creating records" % name
131
132## # Each write-cursor will be exclusive, the only one that can update the DB...
133## if verbose: print "%s: deleting a few records" % name
134## c = d.cursor(flags = db.DB_WRITECURSOR)
135## for x in range(10):
136## key = int(random() * howMany) + start
137## key = '%04d' % key
138## if d.has_key(key):
139## c.set(key)
140## c.delete()
141
142## c.close()
143 if verbose:
144 print "%s: thread finished" % name
145
146 def readerThread(self, d, readerNum):
147 time.sleep(0.01 * readerNum)
148 name = currentThread().getName()
149
150 for loop in range(5):
151 c = d.cursor()
152 count = 0
153 rec = c.first()
154 while rec:
155 count += 1
156 key, data = rec
157 self.assertEqual(self.makeData(key), data)
158 rec = c.next()
159 if verbose:
160 print "%s: found %d records" % (name, count)
161 c.close()
162 time.sleep(0.05)
163
164 if verbose:
165 print "%s: thread finished" % name
166
167
168class BTreeConcurrentDataStore(ConcurrentDataStoreBase):
169 dbtype = db.DB_BTREE
170 writers = 2
171 readers = 10
172 records = 1000
173
174
175class HashConcurrentDataStore(ConcurrentDataStoreBase):
176 dbtype = db.DB_HASH
177 writers = 2
178 readers = 10
179 records = 1000
180
181
182#----------------------------------------------------------------------
183
184class SimpleThreadedBase(BaseThreadedTestCase):
185 dbopenflags = db.DB_THREAD
186 envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK
187 readers = 5
188 writers = 3
189 records = 1000
190
191 def setEnvOpts(self):
192 self.env.set_lk_detect(db.DB_LOCK_DEFAULT)
193
194 def test02_SimpleLocks(self):
195 if verbose:
196 print '\n', '-=' * 30
197 print "Running %s.test02_SimpleLocks..." % self.__class__.__name__
198
199 threads = []
200 for x in range(self.writers):
201 wt = Thread(target = self.writerThread,
202 args = (self.d, self.records, x),
203 name = 'writer %d' % x,
204 )#verbose = verbose)
205 threads.append(wt)
206 for x in range(self.readers):
207 rt = Thread(target = self.readerThread,
208 args = (self.d, x),
209 name = 'reader %d' % x,
210 )#verbose = verbose)
211 threads.append(rt)
212
213 for t in threads:
214 t.start()
215 for t in threads:
216 t.join()
217
218 def writerThread(self, d, howMany, writerNum):
219 name = currentThread().getName()
220 start = howMany * writerNum
221 stop = howMany * (writerNum + 1) - 1
222 if verbose:
223 print "%s: creating records %d - %d" % (name, start, stop)
224
225 # create a bunch of records
226 for x in xrange(start, stop):
227 key = '%04d' % x
228 dbutils.DeadlockWrap(d.put, key, self.makeData(key),
229 max_retries=12)
230
231 if verbose and x % 100 == 0:
232 print "%s: records %d - %d finished" % (name, start, x)
233
234 # do a bit or reading too
235 if random() <= 0.05:
236 for y in xrange(start, x):
237 key = '%04d' % x
238 data = dbutils.DeadlockWrap(d.get, key, max_retries=12)
239 self.assertEqual(data, self.makeData(key))
240
241 # flush them
242 try:
243 dbutils.DeadlockWrap(d.sync, max_retries=12)
244 except db.DBIncompleteError, val:
245 if verbose:
246 print "could not complete sync()..."
247
248 # read them back, deleting a few
249 for x in xrange(start, stop):
250 key = '%04d' % x
251 data = dbutils.DeadlockWrap(d.get, key, max_retries=12)
252 if verbose and x % 100 == 0:
253 print "%s: fetched record (%s, %s)" % (name, key, data)
254 self.assertEqual(data, self.makeData(key))
255 if random() <= 0.10:
256 dbutils.DeadlockWrap(d.delete, key, max_retries=12)
257 if verbose:
258 print "%s: deleted record %s" % (name, key)
259
260 if verbose:
261 print "%s: thread finished" % name
262
263 def readerThread(self, d, readerNum):
264 time.sleep(0.01 * readerNum)
265 name = currentThread().getName()
266
267 for loop in range(5):
268 c = d.cursor()
269 count = 0
270 rec = dbutils.DeadlockWrap(c.first, max_retries=10)
271 while rec:
272 count += 1
273 key, data = rec
274 self.assertEqual(self.makeData(key), data)
275 rec = dbutils.DeadlockWrap(c.next, max_retries=10)
276 if verbose:
277 print "%s: found %d records" % (name, count)
278 c.close()
279 time.sleep(0.05)
280
281 if verbose:
282 print "%s: thread finished" % name
283
284
285class BTreeSimpleThreaded(SimpleThreadedBase):
286 dbtype = db.DB_BTREE
287
288
289class HashSimpleThreaded(SimpleThreadedBase):
290 dbtype = db.DB_HASH
291
292
293#----------------------------------------------------------------------
294
295
296class ThreadedTransactionsBase(BaseThreadedTestCase):
297 dbopenflags = db.DB_THREAD | db.DB_AUTO_COMMIT
298 envflags = (db.DB_THREAD |
299 db.DB_INIT_MPOOL |
300 db.DB_INIT_LOCK |
301 db.DB_INIT_LOG |
302 db.DB_INIT_TXN
303 )
304 readers = 0
305 writers = 0
306 records = 2000
307 txnFlag = 0
308
309 def setEnvOpts(self):
310 #self.env.set_lk_detect(db.DB_LOCK_DEFAULT)
311 pass
312
313 def test03_ThreadedTransactions(self):
314 if verbose:
315 print '\n', '-=' * 30
316 print "Running %s.test03_ThreadedTransactions..." % \
317 self.__class__.__name__
318
319 threads = []
320 for x in range(self.writers):
321 wt = Thread(target = self.writerThread,
322 args = (self.d, self.records, x),
323 name = 'writer %d' % x,
324 )#verbose = verbose)
325 threads.append(wt)
326
327 for x in range(self.readers):
328 rt = Thread(target = self.readerThread,
329 args = (self.d, x),
330 name = 'reader %d' % x,
331 )#verbose = verbose)
332 threads.append(rt)
333
334 dt = Thread(target = self.deadlockThread)
335 dt.start()
336
337 for t in threads:
338 t.start()
339 for t in threads:
340 t.join()
341
342 self.doLockDetect = False
343 dt.join()
344
345 def doWrite(self, d, name, start, stop):
346 finished = False
347 while not finished:
348 try:
349 txn = self.env.txn_begin(None, self.txnFlag)
350 for x in range(start, stop):
351 key = '%04d' % x
352 d.put(key, self.makeData(key), txn)
353 if verbose and x % 100 == 0:
354 print "%s: records %d - %d finished" % (name, start, x)
355 txn.commit()
356 finished = True
357 except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
358 if verbose:
359 print "%s: Aborting transaction (%s)" % (name, val[1])
360 txn.abort()
361 time.sleep(0.05)
362
363 def writerThread(self, d, howMany, writerNum):
364 name = currentThread().getName()
365 start = howMany * writerNum
366 stop = howMany * (writerNum + 1) - 1
367 if verbose:
368 print "%s: creating records %d - %d" % (name, start, stop)
369
370 step = 100
371 for x in range(start, stop, step):
372 self.doWrite(d, name, x, min(stop, x+step))
373
374 if verbose:
375 print "%s: finished creating records" % name
376 if verbose:
377 print "%s: deleting a few records" % name
378
379 finished = False
380 while not finished:
381 try:
382 recs = []
383 txn = self.env.txn_begin(None, self.txnFlag)
384 for x in range(10):
385 key = int(random() * howMany) + start
386 key = '%04d' % key
387 data = d.get(key, None, txn, db.DB_RMW)
388 if data is not None:
389 d.delete(key, txn)
390 recs.append(key)
391 txn.commit()
392 finished = True
393 if verbose:
394 print "%s: deleted records %s" % (name, recs)
395 except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
396 if verbose:
397 print "%s: Aborting transaction (%s)" % (name, val[1])
398 txn.abort()
399 time.sleep(0.05)
400
401 if verbose:
402 print "%s: thread finished" % name
403
404 def readerThread(self, d, readerNum):
405 time.sleep(0.01 * readerNum + 0.05)
406 name = currentThread().getName()
407
408 for loop in range(5):
409 finished = False
410 while not finished:
411 try:
412 txn = self.env.txn_begin(None, self.txnFlag)
413 c = d.cursor(txn)
414 count = 0
415 rec = c.first()
416 while rec:
417 count += 1
418 key, data = rec
419 self.assertEqual(self.makeData(key), data)
420 rec = c.next()
421 if verbose: print "%s: found %d records" % (name, count)
422 c.close()
423 txn.commit()
424 finished = True
425 except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
426 if verbose:
427 print "%s: Aborting transaction (%s)" % (name, val[1])
428 c.close()
429 txn.abort()
430 time.sleep(0.05)
431
432 time.sleep(0.05)
433
434 if verbose:
435 print "%s: thread finished" % name
436
437 def deadlockThread(self):
438 self.doLockDetect = True
439 while self.doLockDetect:
440 time.sleep(0.5)
441 try:
442 aborted = self.env.lock_detect(
443 db.DB_LOCK_RANDOM, db.DB_LOCK_CONFLICT)
444 if verbose and aborted:
445 print "deadlock: Aborted %d deadlocked transaction(s)" \
446 % aborted
447 except db.DBError:
448 pass
449
450
451class BTreeThreadedTransactions(ThreadedTransactionsBase):
452 dbtype = db.DB_BTREE
453 writers = 3
454 readers = 5
455 records = 2000
456
457class HashThreadedTransactions(ThreadedTransactionsBase):
458 dbtype = db.DB_HASH
459 writers = 1
460 readers = 5
461 records = 2000
462
463class BTreeThreadedNoWaitTransactions(ThreadedTransactionsBase):
464 dbtype = db.DB_BTREE
465 writers = 3
466 readers = 5
467 records = 2000
468 txnFlag = db.DB_TXN_NOWAIT
469
470class HashThreadedNoWaitTransactions(ThreadedTransactionsBase):
471 dbtype = db.DB_HASH
472 writers = 1
473 readers = 5
474 records = 2000
475 txnFlag = db.DB_TXN_NOWAIT
476
477
478#----------------------------------------------------------------------
479
480def test_suite():
481 suite = unittest.TestSuite()
482
483 if have_threads:
484 suite.addTest(unittest.makeSuite(BTreeConcurrentDataStore))
485 suite.addTest(unittest.makeSuite(HashConcurrentDataStore))
486 suite.addTest(unittest.makeSuite(BTreeSimpleThreaded))
487 suite.addTest(unittest.makeSuite(HashSimpleThreaded))
488 suite.addTest(unittest.makeSuite(BTreeThreadedTransactions))
489 suite.addTest(unittest.makeSuite(HashThreadedTransactions))
490 suite.addTest(unittest.makeSuite(BTreeThreadedNoWaitTransactions))
491 suite.addTest(unittest.makeSuite(HashThreadedNoWaitTransactions))
492
493 else:
494 print "Threads not available, skipping thread tests."
495
496 return suite
497
498
499if __name__ == '__main__':
500 unittest.main(defaultTest='test_suite')