Initial commit of OpenSPARC T2 architecture model.
[OpenSPARC-T2-SAM] / sam-t2 / devtools / amd64 / lib / python2.4 / bsddb / test / test_lock.py
CommitLineData
920dae64
AT
1"""
2TestCases for testing the locking sub-system.
3"""
4
5import sys, os, string
6import tempfile
7import time
8from pprint import pprint
9
10try:
11 from threading import Thread, currentThread
12 have_threads = 1
13except ImportError:
14 have_threads = 0
15
16
17import unittest
18from test_all import verbose
19
20try:
21 # For Pythons w/distutils pybsddb
22 from bsddb3 import db
23except ImportError:
24 # For Python 2.3
25 from bsddb import db
26
27
28#----------------------------------------------------------------------
29
30class LockingTestCase(unittest.TestCase):
31
32 def setUp(self):
33 homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
34 self.homeDir = homeDir
35 try: os.mkdir(homeDir)
36 except os.error: pass
37 self.env = db.DBEnv()
38 self.env.open(homeDir, db.DB_THREAD | db.DB_INIT_MPOOL |
39 db.DB_INIT_LOCK | db.DB_CREATE)
40
41
42 def tearDown(self):
43 self.env.close()
44 import glob
45 files = glob.glob(os.path.join(self.homeDir, '*'))
46 for file in files:
47 os.remove(file)
48
49
50 def test01_simple(self):
51 if verbose:
52 print '\n', '-=' * 30
53 print "Running %s.test01_simple..." % self.__class__.__name__
54
55 anID = self.env.lock_id()
56 if verbose:
57 print "locker ID: %s" % anID
58 lock = self.env.lock_get(anID, "some locked thing", db.DB_LOCK_WRITE)
59 if verbose:
60 print "Aquired lock: %s" % lock
61 time.sleep(1)
62 self.env.lock_put(lock)
63 if verbose:
64 print "Released lock: %s" % lock
65
66
67
68
69 def test02_threaded(self):
70 if verbose:
71 print '\n', '-=' * 30
72 print "Running %s.test02_threaded..." % self.__class__.__name__
73
74 threads = []
75 threads.append(Thread(target = self.theThread,
76 args=(5, db.DB_LOCK_WRITE)))
77 threads.append(Thread(target = self.theThread,
78 args=(1, db.DB_LOCK_READ)))
79 threads.append(Thread(target = self.theThread,
80 args=(1, db.DB_LOCK_READ)))
81 threads.append(Thread(target = self.theThread,
82 args=(1, db.DB_LOCK_WRITE)))
83 threads.append(Thread(target = self.theThread,
84 args=(1, db.DB_LOCK_READ)))
85 threads.append(Thread(target = self.theThread,
86 args=(1, db.DB_LOCK_READ)))
87 threads.append(Thread(target = self.theThread,
88 args=(1, db.DB_LOCK_WRITE)))
89 threads.append(Thread(target = self.theThread,
90 args=(1, db.DB_LOCK_WRITE)))
91 threads.append(Thread(target = self.theThread,
92 args=(1, db.DB_LOCK_WRITE)))
93
94 for t in threads:
95 t.start()
96 for t in threads:
97 t.join()
98
99 def test03_set_timeout(self):
100 # test that the set_timeout call works
101 if hasattr(self.env, 'set_timeout'):
102 self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)
103 self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)
104 self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)
105 self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)
106
107 def theThread(self, sleepTime, lockType):
108 name = currentThread().getName()
109 if lockType == db.DB_LOCK_WRITE:
110 lt = "write"
111 else:
112 lt = "read"
113
114 anID = self.env.lock_id()
115 if verbose:
116 print "%s: locker ID: %s" % (name, anID)
117
118 lock = self.env.lock_get(anID, "some locked thing", lockType)
119 if verbose:
120 print "%s: Aquired %s lock: %s" % (name, lt, lock)
121
122 time.sleep(sleepTime)
123
124 self.env.lock_put(lock)
125 if verbose:
126 print "%s: Released %s lock: %s" % (name, lt, lock)
127
128
129#----------------------------------------------------------------------
130
131def test_suite():
132 suite = unittest.TestSuite()
133
134 if have_threads:
135 suite.addTest(unittest.makeSuite(LockingTestCase))
136 else:
137 suite.addTest(unittest.makeSuite(LockingTestCase, 'test01'))
138
139 return suite
140
141
142if __name__ == '__main__':
143 unittest.main(defaultTest='test_suite')