Commit | Line | Data |
---|---|---|
920dae64 AT |
1 | """ |
2 | TestCases for checking dbShelve objects. | |
3 | """ | |
4 | ||
5 | import sys, os, string | |
6 | import tempfile, random | |
7 | from pprint import pprint | |
8 | from types import * | |
9 | import unittest | |
10 | ||
11 | try: | |
12 | # For Pythons w/distutils pybsddb | |
13 | from bsddb3 import db, dbshelve | |
14 | except ImportError: | |
15 | # For Python 2.3 | |
16 | from bsddb import db, dbshelve | |
17 | ||
18 | from test_all import verbose | |
19 | ||
20 | ||
21 | #---------------------------------------------------------------------- | |
22 | ||
23 | # We want the objects to be comparable so we can test dbshelve.values | |
24 | # later on. | |
25 | class DataClass: | |
26 | def __init__(self): | |
27 | self.value = random.random() | |
28 | ||
29 | def __cmp__(self, other): | |
30 | return cmp(self.value, other) | |
31 | ||
32 | class DBShelveTestCase(unittest.TestCase): | |
33 | def setUp(self): | |
34 | self.filename = tempfile.mktemp() | |
35 | self.do_open() | |
36 | ||
37 | def tearDown(self): | |
38 | self.do_close() | |
39 | try: | |
40 | os.remove(self.filename) | |
41 | except os.error: | |
42 | pass | |
43 | ||
44 | def populateDB(self, d): | |
45 | for x in string.letters: | |
46 | d['S' + x] = 10 * x # add a string | |
47 | d['I' + x] = ord(x) # add an integer | |
48 | d['L' + x] = [x] * 10 # add a list | |
49 | ||
50 | inst = DataClass() # add an instance | |
51 | inst.S = 10 * x | |
52 | inst.I = ord(x) | |
53 | inst.L = [x] * 10 | |
54 | d['O' + x] = inst | |
55 | ||
56 | ||
57 | # overridable in derived classes to affect how the shelf is created/opened | |
58 | def do_open(self): | |
59 | self.d = dbshelve.open(self.filename) | |
60 | ||
61 | # and closed... | |
62 | def do_close(self): | |
63 | self.d.close() | |
64 | ||
65 | ||
66 | ||
67 | def test01_basics(self): | |
68 | if verbose: | |
69 | print '\n', '-=' * 30 | |
70 | print "Running %s.test01_basics..." % self.__class__.__name__ | |
71 | ||
72 | self.populateDB(self.d) | |
73 | self.d.sync() | |
74 | self.do_close() | |
75 | self.do_open() | |
76 | d = self.d | |
77 | ||
78 | l = len(d) | |
79 | k = d.keys() | |
80 | s = d.stat() | |
81 | f = d.fd() | |
82 | ||
83 | if verbose: | |
84 | print "length:", l | |
85 | print "keys:", k | |
86 | print "stats:", s | |
87 | ||
88 | assert 0 == d.has_key('bad key') | |
89 | assert 1 == d.has_key('IA') | |
90 | assert 1 == d.has_key('OA') | |
91 | ||
92 | d.delete('IA') | |
93 | del d['OA'] | |
94 | assert 0 == d.has_key('IA') | |
95 | assert 0 == d.has_key('OA') | |
96 | assert len(d) == l-2 | |
97 | ||
98 | values = [] | |
99 | for key in d.keys(): | |
100 | value = d[key] | |
101 | values.append(value) | |
102 | if verbose: | |
103 | print "%s: %s" % (key, value) | |
104 | self.checkrec(key, value) | |
105 | ||
106 | dbvalues = d.values() | |
107 | assert len(dbvalues) == len(d.keys()) | |
108 | values.sort() | |
109 | dbvalues.sort() | |
110 | assert values == dbvalues | |
111 | ||
112 | items = d.items() | |
113 | assert len(items) == len(values) | |
114 | ||
115 | for key, value in items: | |
116 | self.checkrec(key, value) | |
117 | ||
118 | assert d.get('bad key') == None | |
119 | assert d.get('bad key', None) == None | |
120 | assert d.get('bad key', 'a string') == 'a string' | |
121 | assert d.get('bad key', [1, 2, 3]) == [1, 2, 3] | |
122 | ||
123 | d.set_get_returns_none(0) | |
124 | self.assertRaises(db.DBNotFoundError, d.get, 'bad key') | |
125 | d.set_get_returns_none(1) | |
126 | ||
127 | d.put('new key', 'new data') | |
128 | assert d.get('new key') == 'new data' | |
129 | assert d['new key'] == 'new data' | |
130 | ||
131 | ||
132 | ||
133 | def test02_cursors(self): | |
134 | if verbose: | |
135 | print '\n', '-=' * 30 | |
136 | print "Running %s.test02_cursors..." % self.__class__.__name__ | |
137 | ||
138 | self.populateDB(self.d) | |
139 | d = self.d | |
140 | ||
141 | count = 0 | |
142 | c = d.cursor() | |
143 | rec = c.first() | |
144 | while rec is not None: | |
145 | count = count + 1 | |
146 | if verbose: | |
147 | print rec | |
148 | key, value = rec | |
149 | self.checkrec(key, value) | |
150 | rec = c.next() | |
151 | del c | |
152 | ||
153 | assert count == len(d) | |
154 | ||
155 | count = 0 | |
156 | c = d.cursor() | |
157 | rec = c.last() | |
158 | while rec is not None: | |
159 | count = count + 1 | |
160 | if verbose: | |
161 | print rec | |
162 | key, value = rec | |
163 | self.checkrec(key, value) | |
164 | rec = c.prev() | |
165 | ||
166 | assert count == len(d) | |
167 | ||
168 | c.set('SS') | |
169 | key, value = c.current() | |
170 | self.checkrec(key, value) | |
171 | del c | |
172 | ||
173 | ||
174 | ||
175 | def checkrec(self, key, value): | |
176 | x = key[1] | |
177 | if key[0] == 'S': | |
178 | assert type(value) == StringType | |
179 | assert value == 10 * x | |
180 | ||
181 | elif key[0] == 'I': | |
182 | assert type(value) == IntType | |
183 | assert value == ord(x) | |
184 | ||
185 | elif key[0] == 'L': | |
186 | assert type(value) == ListType | |
187 | assert value == [x] * 10 | |
188 | ||
189 | elif key[0] == 'O': | |
190 | assert type(value) == InstanceType | |
191 | assert value.S == 10 * x | |
192 | assert value.I == ord(x) | |
193 | assert value.L == [x] * 10 | |
194 | ||
195 | else: | |
196 | raise AssertionError, 'Unknown key type, fix the test' | |
197 | ||
198 | #---------------------------------------------------------------------- | |
199 | ||
200 | class BasicShelveTestCase(DBShelveTestCase): | |
201 | def do_open(self): | |
202 | self.d = dbshelve.DBShelf() | |
203 | self.d.open(self.filename, self.dbtype, self.dbflags) | |
204 | ||
205 | def do_close(self): | |
206 | self.d.close() | |
207 | ||
208 | ||
209 | class BTreeShelveTestCase(BasicShelveTestCase): | |
210 | dbtype = db.DB_BTREE | |
211 | dbflags = db.DB_CREATE | |
212 | ||
213 | ||
214 | class HashShelveTestCase(BasicShelveTestCase): | |
215 | dbtype = db.DB_HASH | |
216 | dbflags = db.DB_CREATE | |
217 | ||
218 | ||
219 | class ThreadBTreeShelveTestCase(BasicShelveTestCase): | |
220 | dbtype = db.DB_BTREE | |
221 | dbflags = db.DB_CREATE | db.DB_THREAD | |
222 | ||
223 | ||
224 | class ThreadHashShelveTestCase(BasicShelveTestCase): | |
225 | dbtype = db.DB_HASH | |
226 | dbflags = db.DB_CREATE | db.DB_THREAD | |
227 | ||
228 | ||
229 | #---------------------------------------------------------------------- | |
230 | ||
231 | class BasicEnvShelveTestCase(DBShelveTestCase): | |
232 | def do_open(self): | |
233 | self.homeDir = homeDir = os.path.join( | |
234 | os.path.dirname(sys.argv[0]), 'db_home') | |
235 | try: os.mkdir(homeDir) | |
236 | except os.error: pass | |
237 | self.env = db.DBEnv() | |
238 | self.env.open(homeDir, self.envflags | db.DB_INIT_MPOOL | db.DB_CREATE) | |
239 | ||
240 | self.filename = os.path.split(self.filename)[1] | |
241 | self.d = dbshelve.DBShelf(self.env) | |
242 | self.d.open(self.filename, self.dbtype, self.dbflags) | |
243 | ||
244 | ||
245 | def do_close(self): | |
246 | self.d.close() | |
247 | self.env.close() | |
248 | ||
249 | ||
250 | def tearDown(self): | |
251 | self.do_close() | |
252 | import glob | |
253 | files = glob.glob(os.path.join(self.homeDir, '*')) | |
254 | for file in files: | |
255 | os.remove(file) | |
256 | ||
257 | ||
258 | ||
259 | class EnvBTreeShelveTestCase(BasicEnvShelveTestCase): | |
260 | envflags = 0 | |
261 | dbtype = db.DB_BTREE | |
262 | dbflags = db.DB_CREATE | |
263 | ||
264 | ||
265 | class EnvHashShelveTestCase(BasicEnvShelveTestCase): | |
266 | envflags = 0 | |
267 | dbtype = db.DB_HASH | |
268 | dbflags = db.DB_CREATE | |
269 | ||
270 | ||
271 | class EnvThreadBTreeShelveTestCase(BasicEnvShelveTestCase): | |
272 | envflags = db.DB_THREAD | |
273 | dbtype = db.DB_BTREE | |
274 | dbflags = db.DB_CREATE | db.DB_THREAD | |
275 | ||
276 | ||
277 | class EnvThreadHashShelveTestCase(BasicEnvShelveTestCase): | |
278 | envflags = db.DB_THREAD | |
279 | dbtype = db.DB_HASH | |
280 | dbflags = db.DB_CREATE | db.DB_THREAD | |
281 | ||
282 | ||
283 | #---------------------------------------------------------------------- | |
284 | # TODO: Add test cases for a DBShelf in a RECNO DB. | |
285 | ||
286 | ||
287 | #---------------------------------------------------------------------- | |
288 | ||
289 | def test_suite(): | |
290 | suite = unittest.TestSuite() | |
291 | ||
292 | suite.addTest(unittest.makeSuite(DBShelveTestCase)) | |
293 | suite.addTest(unittest.makeSuite(BTreeShelveTestCase)) | |
294 | suite.addTest(unittest.makeSuite(HashShelveTestCase)) | |
295 | suite.addTest(unittest.makeSuite(ThreadBTreeShelveTestCase)) | |
296 | suite.addTest(unittest.makeSuite(ThreadHashShelveTestCase)) | |
297 | suite.addTest(unittest.makeSuite(EnvBTreeShelveTestCase)) | |
298 | suite.addTest(unittest.makeSuite(EnvHashShelveTestCase)) | |
299 | suite.addTest(unittest.makeSuite(EnvThreadBTreeShelveTestCase)) | |
300 | suite.addTest(unittest.makeSuite(EnvThreadHashShelveTestCase)) | |
301 | ||
302 | return suite | |
303 | ||
304 | ||
305 | if __name__ == '__main__': | |
306 | unittest.main(defaultTest='test_suite') |