Commit | Line | Data |
---|---|---|
920dae64 AT |
1 | """TestCases for exercising a Recno DB. |
2 | """ | |
3 | ||
4 | import os | |
5 | import sys | |
6 | import errno | |
7 | import tempfile | |
8 | from pprint import pprint | |
9 | import unittest | |
10 | ||
11 | from test_all import verbose | |
12 | ||
13 | try: | |
14 | # For Pythons w/distutils pybsddb | |
15 | from bsddb3 import db | |
16 | except ImportError: | |
17 | # For Python 2.3 | |
18 | from bsddb import db | |
19 | ||
20 | letters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ' | |
21 | ||
22 | ||
23 | #---------------------------------------------------------------------- | |
24 | ||
25 | class SimpleRecnoTestCase(unittest.TestCase): | |
26 | def setUp(self): | |
27 | self.filename = tempfile.mktemp() | |
28 | ||
29 | def tearDown(self): | |
30 | try: | |
31 | os.remove(self.filename) | |
32 | except OSError, e: | |
33 | if e.errno <> errno.EEXIST: raise | |
34 | ||
35 | def test01_basic(self): | |
36 | d = db.DB() | |
37 | d.open(self.filename, db.DB_RECNO, db.DB_CREATE) | |
38 | ||
39 | for x in letters: | |
40 | recno = d.append(x * 60) | |
41 | assert type(recno) == type(0) | |
42 | assert recno >= 1 | |
43 | if verbose: | |
44 | print recno, | |
45 | ||
46 | if verbose: print | |
47 | ||
48 | stat = d.stat() | |
49 | if verbose: | |
50 | pprint(stat) | |
51 | ||
52 | for recno in range(1, len(d)+1): | |
53 | data = d[recno] | |
54 | if verbose: | |
55 | print data | |
56 | ||
57 | assert type(data) == type("") | |
58 | assert data == d.get(recno) | |
59 | ||
60 | try: | |
61 | data = d[0] # This should raise a KeyError!?!?! | |
62 | except db.DBInvalidArgError, val: | |
63 | assert val[0] == db.EINVAL | |
64 | if verbose: print val | |
65 | else: | |
66 | self.fail("expected exception") | |
67 | ||
68 | try: | |
69 | data = d[100] | |
70 | except KeyError: | |
71 | pass | |
72 | else: | |
73 | self.fail("expected exception") | |
74 | ||
75 | data = d.get(100) | |
76 | assert data == None | |
77 | ||
78 | keys = d.keys() | |
79 | if verbose: | |
80 | print keys | |
81 | assert type(keys) == type([]) | |
82 | assert type(keys[0]) == type(123) | |
83 | assert len(keys) == len(d) | |
84 | ||
85 | items = d.items() | |
86 | if verbose: | |
87 | pprint(items) | |
88 | assert type(items) == type([]) | |
89 | assert type(items[0]) == type(()) | |
90 | assert len(items[0]) == 2 | |
91 | assert type(items[0][0]) == type(123) | |
92 | assert type(items[0][1]) == type("") | |
93 | assert len(items) == len(d) | |
94 | ||
95 | assert d.has_key(25) | |
96 | ||
97 | del d[25] | |
98 | assert not d.has_key(25) | |
99 | ||
100 | d.delete(13) | |
101 | assert not d.has_key(13) | |
102 | ||
103 | data = d.get_both(26, "z" * 60) | |
104 | assert data == "z" * 60 | |
105 | if verbose: | |
106 | print data | |
107 | ||
108 | fd = d.fd() | |
109 | if verbose: | |
110 | print fd | |
111 | ||
112 | c = d.cursor() | |
113 | rec = c.first() | |
114 | while rec: | |
115 | if verbose: | |
116 | print rec | |
117 | rec = c.next() | |
118 | ||
119 | c.set(50) | |
120 | rec = c.current() | |
121 | if verbose: | |
122 | print rec | |
123 | ||
124 | c.put(-1, "a replacement record", db.DB_CURRENT) | |
125 | ||
126 | c.set(50) | |
127 | rec = c.current() | |
128 | assert rec == (50, "a replacement record") | |
129 | if verbose: | |
130 | print rec | |
131 | ||
132 | rec = c.set_range(30) | |
133 | if verbose: | |
134 | print rec | |
135 | ||
136 | # test that non-existant key lookups work (and that | |
137 | # DBC_set_range doesn't have a memleak under valgrind) | |
138 | rec = c.set_range(999999) | |
139 | assert rec == None | |
140 | if verbose: | |
141 | print rec | |
142 | ||
143 | c.close() | |
144 | d.close() | |
145 | ||
146 | d = db.DB() | |
147 | d.open(self.filename) | |
148 | c = d.cursor() | |
149 | ||
150 | # put a record beyond the consecutive end of the recno's | |
151 | d[100] = "way out there" | |
152 | assert d[100] == "way out there" | |
153 | ||
154 | try: | |
155 | data = d[99] | |
156 | except KeyError: | |
157 | pass | |
158 | else: | |
159 | self.fail("expected exception") | |
160 | ||
161 | try: | |
162 | d.get(99) | |
163 | except db.DBKeyEmptyError, val: | |
164 | assert val[0] == db.DB_KEYEMPTY | |
165 | if verbose: print val | |
166 | else: | |
167 | self.fail("expected exception") | |
168 | ||
169 | rec = c.set(40) | |
170 | while rec: | |
171 | if verbose: | |
172 | print rec | |
173 | rec = c.next() | |
174 | ||
175 | c.close() | |
176 | d.close() | |
177 | ||
178 | def test02_WithSource(self): | |
179 | """ | |
180 | A Recno file that is given a "backing source file" is essentially a | |
181 | simple ASCII file. Normally each record is delimited by \n and so is | |
182 | just a line in the file, but you can set a different record delimiter | |
183 | if needed. | |
184 | """ | |
185 | source = os.path.join(os.path.dirname(sys.argv[0]), | |
186 | 'db_home/test_recno.txt') | |
187 | if not os.path.isdir('db_home'): | |
188 | os.mkdir('db_home') | |
189 | f = open(source, 'w') # create the file | |
190 | f.close() | |
191 | ||
192 | d = db.DB() | |
193 | # This is the default value, just checking if both int | |
194 | d.set_re_delim(0x0A) | |
195 | d.set_re_delim('\n') # and char can be used... | |
196 | d.set_re_source(source) | |
197 | d.open(self.filename, db.DB_RECNO, db.DB_CREATE) | |
198 | ||
199 | data = "The quick brown fox jumped over the lazy dog".split() | |
200 | for datum in data: | |
201 | d.append(datum) | |
202 | d.sync() | |
203 | d.close() | |
204 | ||
205 | # get the text from the backing source | |
206 | text = open(source, 'r').read() | |
207 | text = text.strip() | |
208 | if verbose: | |
209 | print text | |
210 | print data | |
211 | print text.split('\n') | |
212 | ||
213 | assert text.split('\n') == data | |
214 | ||
215 | # open as a DB again | |
216 | d = db.DB() | |
217 | d.set_re_source(source) | |
218 | d.open(self.filename, db.DB_RECNO) | |
219 | ||
220 | d[3] = 'reddish-brown' | |
221 | d[8] = 'comatose' | |
222 | ||
223 | d.sync() | |
224 | d.close() | |
225 | ||
226 | text = open(source, 'r').read() | |
227 | text = text.strip() | |
228 | if verbose: | |
229 | print text | |
230 | print text.split('\n') | |
231 | ||
232 | assert text.split('\n') == \ | |
233 | "The quick reddish-brown fox jumped over the comatose dog".split() | |
234 | ||
235 | def test03_FixedLength(self): | |
236 | d = db.DB() | |
237 | d.set_re_len(40) # fixed length records, 40 bytes long | |
238 | d.set_re_pad('-') # sets the pad character... | |
239 | d.set_re_pad(45) # ...test both int and char | |
240 | d.open(self.filename, db.DB_RECNO, db.DB_CREATE) | |
241 | ||
242 | for x in letters: | |
243 | d.append(x * 35) # These will be padded | |
244 | ||
245 | d.append('.' * 40) # this one will be exact | |
246 | ||
247 | try: # this one will fail | |
248 | d.append('bad' * 20) | |
249 | except db.DBInvalidArgError, val: | |
250 | assert val[0] == db.EINVAL | |
251 | if verbose: print val | |
252 | else: | |
253 | self.fail("expected exception") | |
254 | ||
255 | c = d.cursor() | |
256 | rec = c.first() | |
257 | while rec: | |
258 | if verbose: | |
259 | print rec | |
260 | rec = c.next() | |
261 | ||
262 | c.close() | |
263 | d.close() | |
264 | ||
265 | ||
266 | #---------------------------------------------------------------------- | |
267 | ||
268 | ||
269 | def test_suite(): | |
270 | return unittest.makeSuite(SimpleRecnoTestCase) | |
271 | ||
272 | ||
273 | if __name__ == '__main__': | |
274 | unittest.main(defaultTest='test_suite') |