Initial commit of OpenSPARC T2 architecture model.
[OpenSPARC-T2-SAM] / sam-t2 / devtools / v9 / lib / python2.4 / bsddb / test / test_queue.py
CommitLineData
920dae64
AT
1"""
2TestCases for exercising a Queue DB.
3"""
4
5import sys, os, string
6import tempfile
7from pprint import pprint
8import unittest
9
10try:
11 # For Pythons w/distutils pybsddb
12 from bsddb3 import db
13except ImportError:
14 # For Python 2.3
15 from bsddb import db
16
17from test_all import verbose
18
19
20#----------------------------------------------------------------------
21
22class SimpleQueueTestCase(unittest.TestCase):
23 def setUp(self):
24 self.filename = tempfile.mktemp()
25
26 def tearDown(self):
27 try:
28 os.remove(self.filename)
29 except os.error:
30 pass
31
32
33 def test01_basic(self):
34 # Basic Queue tests using the deprecated DBCursor.consume method.
35
36 if verbose:
37 print '\n', '-=' * 30
38 print "Running %s.test01_basic..." % self.__class__.__name__
39
40 d = db.DB()
41 d.set_re_len(40) # Queues must be fixed length
42 d.open(self.filename, db.DB_QUEUE, db.DB_CREATE)
43
44 if verbose:
45 print "before appends" + '-' * 30
46 pprint(d.stat())
47
48 for x in string.letters:
49 d.append(x * 40)
50
51 assert len(d) == 52
52
53 d.put(100, "some more data")
54 d.put(101, "and some more ")
55 d.put(75, "out of order")
56 d.put(1, "replacement data")
57
58 assert len(d) == 55
59
60 if verbose:
61 print "before close" + '-' * 30
62 pprint(d.stat())
63
64 d.close()
65 del d
66 d = db.DB()
67 d.open(self.filename)
68
69 if verbose:
70 print "after open" + '-' * 30
71 pprint(d.stat())
72
73 d.append("one more")
74 c = d.cursor()
75
76 if verbose:
77 print "after append" + '-' * 30
78 pprint(d.stat())
79
80 rec = c.consume()
81 while rec:
82 if verbose:
83 print rec
84 rec = c.consume()
85 c.close()
86
87 if verbose:
88 print "after consume loop" + '-' * 30
89 pprint(d.stat())
90
91 assert len(d) == 0, \
92 "if you see this message then you need to rebuild " \
93 "BerkeleyDB 3.1.17 with the patch in patches/qam_stat.diff"
94
95 d.close()
96
97
98
99 def test02_basicPost32(self):
100 # Basic Queue tests using the new DB.consume method in DB 3.2+
101 # (No cursor needed)
102
103 if verbose:
104 print '\n', '-=' * 30
105 print "Running %s.test02_basicPost32..." % self.__class__.__name__
106
107 if db.version() < (3, 2, 0):
108 if verbose:
109 print "Test not run, DB not new enough..."
110 return
111
112 d = db.DB()
113 d.set_re_len(40) # Queues must be fixed length
114 d.open(self.filename, db.DB_QUEUE, db.DB_CREATE)
115
116 if verbose:
117 print "before appends" + '-' * 30
118 pprint(d.stat())
119
120 for x in string.letters:
121 d.append(x * 40)
122
123 assert len(d) == 52
124
125 d.put(100, "some more data")
126 d.put(101, "and some more ")
127 d.put(75, "out of order")
128 d.put(1, "replacement data")
129
130 assert len(d) == 55
131
132 if verbose:
133 print "before close" + '-' * 30
134 pprint(d.stat())
135
136 d.close()
137 del d
138 d = db.DB()
139 d.open(self.filename)
140 #d.set_get_returns_none(true)
141
142 if verbose:
143 print "after open" + '-' * 30
144 pprint(d.stat())
145
146 d.append("one more")
147
148 if verbose:
149 print "after append" + '-' * 30
150 pprint(d.stat())
151
152 rec = d.consume()
153 while rec:
154 if verbose:
155 print rec
156 rec = d.consume()
157
158 if verbose:
159 print "after consume loop" + '-' * 30
160 pprint(d.stat())
161
162 d.close()
163
164
165
166#----------------------------------------------------------------------
167
168def test_suite():
169 return unittest.makeSuite(SimpleQueueTestCase)
170
171
172if __name__ == '__main__':
173 unittest.main(defaultTest='test_suite')