Commit | Line | Data |
---|---|---|
86530b38 AT |
1 | #! /usr/bin/env python |
2 | """Test script for the bsddb C module by Roger E. Masse | |
3 | Adapted to unittest format and expanded scope by Raymond Hettinger | |
4 | """ | |
5 | import os, sys | |
6 | import copy | |
7 | import bsddb | |
8 | import dbhash # Just so we know it's imported | |
9 | import unittest | |
10 | from test import test_support | |
11 | from sets import Set | |
12 | ||
13 | class TestBSDDB(unittest.TestCase): | |
14 | ||
15 | def setUp(self): | |
16 | self.f = self.openmethod[0](self.fname, 'c') | |
17 | self.d = dict(q='Guido', w='van', e='Rossum', r='invented', t='Python', y='') | |
18 | for k, v in self.d.iteritems(): | |
19 | self.f[k] = v | |
20 | ||
21 | def tearDown(self): | |
22 | self.f.sync() | |
23 | self.f.close() | |
24 | if self.fname is None: | |
25 | return | |
26 | try: | |
27 | os.remove(self.fname) | |
28 | except os.error: | |
29 | pass | |
30 | ||
31 | def test_getitem(self): | |
32 | for k, v in self.d.iteritems(): | |
33 | self.assertEqual(self.f[k], v) | |
34 | ||
35 | def test_len(self): | |
36 | self.assertEqual(len(self.f), len(self.d)) | |
37 | ||
38 | def test_change(self): | |
39 | self.f['r'] = 'discovered' | |
40 | self.assertEqual(self.f['r'], 'discovered') | |
41 | self.assert_('r' in self.f.keys()) | |
42 | self.assert_('discovered' in self.f.values()) | |
43 | ||
44 | def test_close_and_reopen(self): | |
45 | if self.fname is None: | |
46 | # if we're using an in-memory only db, we can't reopen it | |
47 | # so finish here. | |
48 | return | |
49 | self.f.close() | |
50 | self.f = self.openmethod[0](self.fname, 'w') | |
51 | for k, v in self.d.iteritems(): | |
52 | self.assertEqual(self.f[k], v) | |
53 | ||
54 | def assertSetEquals(self, seqn1, seqn2): | |
55 | self.assertEqual(Set(seqn1), Set(seqn2)) | |
56 | ||
57 | def test_mapping_iteration_methods(self): | |
58 | f = self.f | |
59 | d = self.d | |
60 | self.assertSetEquals(d, f) | |
61 | self.assertSetEquals(d.keys(), f.keys()) | |
62 | self.assertSetEquals(d.values(), f.values()) | |
63 | self.assertSetEquals(d.items(), f.items()) | |
64 | self.assertSetEquals(d.iterkeys(), f.iterkeys()) | |
65 | self.assertSetEquals(d.itervalues(), f.itervalues()) | |
66 | self.assertSetEquals(d.iteritems(), f.iteritems()) | |
67 | ||
68 | def test_iter_while_modifying_values(self): | |
69 | if not hasattr(self.f, '__iter__'): | |
70 | return | |
71 | ||
72 | di = iter(self.d) | |
73 | while 1: | |
74 | try: | |
75 | key = di.next() | |
76 | self.d[key] = 'modified '+key | |
77 | except StopIteration: | |
78 | break | |
79 | ||
80 | # it should behave the same as a dict. modifying values | |
81 | # of existing keys should not break iteration. (adding | |
82 | # or removing keys should) | |
83 | fi = iter(self.f) | |
84 | while 1: | |
85 | try: | |
86 | key = fi.next() | |
87 | self.f[key] = 'modified '+key | |
88 | except StopIteration: | |
89 | break | |
90 | ||
91 | self.test_mapping_iteration_methods() | |
92 | ||
93 | def test_iteritems_while_modifying_values(self): | |
94 | if not hasattr(self.f, 'iteritems'): | |
95 | return | |
96 | ||
97 | di = self.d.iteritems() | |
98 | while 1: | |
99 | try: | |
100 | k, v = di.next() | |
101 | self.d[k] = 'modified '+v | |
102 | except StopIteration: | |
103 | break | |
104 | ||
105 | # it should behave the same as a dict. modifying values | |
106 | # of existing keys should not break iteration. (adding | |
107 | # or removing keys should) | |
108 | fi = self.f.iteritems() | |
109 | while 1: | |
110 | try: | |
111 | k, v = fi.next() | |
112 | self.f[k] = 'modified '+v | |
113 | except StopIteration: | |
114 | break | |
115 | ||
116 | self.test_mapping_iteration_methods() | |
117 | ||
118 | def test_first_next_looping(self): | |
119 | items = [self.f.first()] | |
120 | for i in xrange(1, len(self.f)): | |
121 | items.append(self.f.next()) | |
122 | self.assertSetEquals(items, self.d.items()) | |
123 | ||
124 | def test_previous_last_looping(self): | |
125 | items = [self.f.last()] | |
126 | for i in xrange(1, len(self.f)): | |
127 | items.append(self.f.previous()) | |
128 | self.assertSetEquals(items, self.d.items()) | |
129 | ||
130 | def test_set_location(self): | |
131 | self.assertEqual(self.f.set_location('e'), ('e', self.d['e'])) | |
132 | ||
133 | def test_contains(self): | |
134 | for k in self.d: | |
135 | self.assert_(k in self.f) | |
136 | self.assert_('not here' not in self.f) | |
137 | ||
138 | def test_has_key(self): | |
139 | for k in self.d: | |
140 | self.assert_(self.f.has_key(k)) | |
141 | self.assert_(not self.f.has_key('not here')) | |
142 | ||
143 | def test_clear(self): | |
144 | self.f.clear() | |
145 | self.assertEqual(len(self.f), 0) | |
146 | ||
147 | def test__no_deadlock_first(self, debug=0): | |
148 | # do this so that testers can see what function we're in in | |
149 | # verbose mode when we deadlock. | |
150 | sys.stdout.flush() | |
151 | ||
152 | # in pybsddb's _DBWithCursor this causes an internal DBCursor | |
153 | # object is created. Other test_ methods in this class could | |
154 | # inadvertently cause the deadlock but an explicit test is needed. | |
155 | if debug: print "A" | |
156 | k,v = self.f.first() | |
157 | if debug: print "B", k | |
158 | self.f[k] = "deadlock. do not pass go. do not collect $200." | |
159 | if debug: print "C" | |
160 | # if the bsddb implementation leaves the DBCursor open during | |
161 | # the database write and locking+threading support is enabled | |
162 | # the cursor's read lock will deadlock the write lock request.. | |
163 | ||
164 | # test the iterator interface (if present) | |
165 | if hasattr(self.f, 'iteritems'): | |
166 | if debug: print "D" | |
167 | i = self.f.iteritems() | |
168 | k,v = i.next() | |
169 | if debug: print "E" | |
170 | self.f[k] = "please don't deadlock" | |
171 | if debug: print "F" | |
172 | while 1: | |
173 | try: | |
174 | k,v = i.next() | |
175 | except StopIteration: | |
176 | break | |
177 | if debug: print "F2" | |
178 | ||
179 | i = iter(self.f) | |
180 | if debug: print "G" | |
181 | while i: | |
182 | try: | |
183 | if debug: print "H" | |
184 | k = i.next() | |
185 | if debug: print "I" | |
186 | self.f[k] = "deadlocks-r-us" | |
187 | if debug: print "J" | |
188 | except StopIteration: | |
189 | i = None | |
190 | if debug: print "K" | |
191 | ||
192 | # test the legacy cursor interface mixed with writes | |
193 | self.assert_(self.f.first()[0] in self.d) | |
194 | k = self.f.next()[0] | |
195 | self.assert_(k in self.d) | |
196 | self.f[k] = "be gone with ye deadlocks" | |
197 | self.assert_(self.f[k], "be gone with ye deadlocks") | |
198 | ||
199 | def test_for_cursor_memleak(self): | |
200 | if not hasattr(self.f, 'iteritems'): | |
201 | return | |
202 | ||
203 | # do the bsddb._DBWithCursor _iter_mixin internals leak cursors? | |
204 | nc1 = len(self.f._cursor_refs) | |
205 | # create iterator | |
206 | i = self.f.iteritems() | |
207 | nc2 = len(self.f._cursor_refs) | |
208 | # use the iterator (should run to the first yeild, creating the cursor) | |
209 | k, v = i.next() | |
210 | nc3 = len(self.f._cursor_refs) | |
211 | # destroy the iterator; this should cause the weakref callback | |
212 | # to remove the cursor object from self.f._cursor_refs | |
213 | del i | |
214 | nc4 = len(self.f._cursor_refs) | |
215 | ||
216 | self.assertEqual(nc1, nc2) | |
217 | self.assertEqual(nc1, nc4) | |
218 | self.assert_(nc3 == nc1+1) | |
219 | ||
220 | def test_popitem(self): | |
221 | k, v = self.f.popitem() | |
222 | self.assert_(k in self.d) | |
223 | self.assert_(v in self.d.values()) | |
224 | self.assert_(k not in self.f) | |
225 | self.assertEqual(len(self.d)-1, len(self.f)) | |
226 | ||
227 | def test_pop(self): | |
228 | k = 'w' | |
229 | v = self.f.pop(k) | |
230 | self.assertEqual(v, self.d[k]) | |
231 | self.assert_(k not in self.f) | |
232 | self.assert_(v not in self.f.values()) | |
233 | self.assertEqual(len(self.d)-1, len(self.f)) | |
234 | ||
235 | def test_get(self): | |
236 | self.assertEqual(self.f.get('NotHere'), None) | |
237 | self.assertEqual(self.f.get('NotHere', 'Default'), 'Default') | |
238 | self.assertEqual(self.f.get('q', 'Default'), self.d['q']) | |
239 | ||
240 | def test_setdefault(self): | |
241 | self.assertEqual(self.f.setdefault('new', 'dog'), 'dog') | |
242 | self.assertEqual(self.f.setdefault('r', 'cat'), self.d['r']) | |
243 | ||
244 | def test_update(self): | |
245 | new = dict(y='life', u='of', i='brian') | |
246 | self.f.update(new) | |
247 | self.d.update(new) | |
248 | for k, v in self.d.iteritems(): | |
249 | self.assertEqual(self.f[k], v) | |
250 | ||
251 | def test_keyordering(self): | |
252 | if self.openmethod[0] is not bsddb.btopen: | |
253 | return | |
254 | keys = self.d.keys() | |
255 | keys.sort() | |
256 | self.assertEqual(self.f.first()[0], keys[0]) | |
257 | self.assertEqual(self.f.next()[0], keys[1]) | |
258 | self.assertEqual(self.f.last()[0], keys[-1]) | |
259 | self.assertEqual(self.f.previous()[0], keys[-2]) | |
260 | self.assertEqual(list(self.f), keys) | |
261 | ||
262 | class TestBTree(TestBSDDB): | |
263 | fname = test_support.TESTFN | |
264 | openmethod = [bsddb.btopen] | |
265 | ||
266 | class TestBTree_InMemory(TestBSDDB): | |
267 | fname = None | |
268 | openmethod = [bsddb.btopen] | |
269 | ||
270 | class TestHashTable(TestBSDDB): | |
271 | fname = test_support.TESTFN | |
272 | openmethod = [bsddb.hashopen] | |
273 | ||
274 | class TestHashTable_InMemory(TestBSDDB): | |
275 | fname = None | |
276 | openmethod = [bsddb.hashopen] | |
277 | ||
278 | ## # (bsddb.rnopen,'Record Numbers'), 'put' for RECNO for bsddb 1.85 | |
279 | ## # appears broken... at least on | |
280 | ## # Solaris Intel - rmasse 1/97 | |
281 | ||
282 | def test_main(verbose=None): | |
283 | test_support.run_unittest( | |
284 | TestBTree, | |
285 | TestHashTable, | |
286 | TestBTree_InMemory, | |
287 | TestHashTable_InMemory, | |
288 | ) | |
289 | ||
290 | if __name__ == "__main__": | |
291 | test_main(verbose=True) |