| 1 | """ |
| 2 | TestCases for checking dbShelve objects. |
| 3 | """ |
| 4 | |
| 5 | import sys, os, string |
| 6 | import tempfile, random |
| 7 | from pprint import pprint |
| 8 | from types import * |
| 9 | import unittest |
| 10 | |
| 11 | try: |
| 12 | # For Pythons w/distutils pybsddb |
| 13 | from bsddb3 import db, dbshelve |
| 14 | except ImportError: |
| 15 | # For Python 2.3 |
| 16 | from bsddb import db, dbshelve |
| 17 | |
| 18 | from test_all import verbose |
| 19 | |
| 20 | |
| 21 | #---------------------------------------------------------------------- |
| 22 | |
| 23 | # We want the objects to be comparable so we can test dbshelve.values |
| 24 | # later on. |
| 25 | class DataClass: |
| 26 | def __init__(self): |
| 27 | self.value = random.random() |
| 28 | |
| 29 | def __cmp__(self, other): |
| 30 | return cmp(self.value, other) |
| 31 | |
| 32 | class DBShelveTestCase(unittest.TestCase): |
| 33 | def setUp(self): |
| 34 | self.filename = tempfile.mktemp() |
| 35 | self.do_open() |
| 36 | |
| 37 | def tearDown(self): |
| 38 | self.do_close() |
| 39 | try: |
| 40 | os.remove(self.filename) |
| 41 | except os.error: |
| 42 | pass |
| 43 | |
| 44 | def populateDB(self, d): |
| 45 | for x in string.letters: |
| 46 | d['S' + x] = 10 * x # add a string |
| 47 | d['I' + x] = ord(x) # add an integer |
| 48 | d['L' + x] = [x] * 10 # add a list |
| 49 | |
| 50 | inst = DataClass() # add an instance |
| 51 | inst.S = 10 * x |
| 52 | inst.I = ord(x) |
| 53 | inst.L = [x] * 10 |
| 54 | d['O' + x] = inst |
| 55 | |
| 56 | |
| 57 | # overridable in derived classes to affect how the shelf is created/opened |
| 58 | def do_open(self): |
| 59 | self.d = dbshelve.open(self.filename) |
| 60 | |
| 61 | # and closed... |
| 62 | def do_close(self): |
| 63 | self.d.close() |
| 64 | |
| 65 | |
| 66 | |
| 67 | def test01_basics(self): |
| 68 | if verbose: |
| 69 | print '\n', '-=' * 30 |
| 70 | print "Running %s.test01_basics..." % self.__class__.__name__ |
| 71 | |
| 72 | self.populateDB(self.d) |
| 73 | self.d.sync() |
| 74 | self.do_close() |
| 75 | self.do_open() |
| 76 | d = self.d |
| 77 | |
| 78 | l = len(d) |
| 79 | k = d.keys() |
| 80 | s = d.stat() |
| 81 | f = d.fd() |
| 82 | |
| 83 | if verbose: |
| 84 | print "length:", l |
| 85 | print "keys:", k |
| 86 | print "stats:", s |
| 87 | |
| 88 | assert 0 == d.has_key('bad key') |
| 89 | assert 1 == d.has_key('IA') |
| 90 | assert 1 == d.has_key('OA') |
| 91 | |
| 92 | d.delete('IA') |
| 93 | del d['OA'] |
| 94 | assert 0 == d.has_key('IA') |
| 95 | assert 0 == d.has_key('OA') |
| 96 | assert len(d) == l-2 |
| 97 | |
| 98 | values = [] |
| 99 | for key in d.keys(): |
| 100 | value = d[key] |
| 101 | values.append(value) |
| 102 | if verbose: |
| 103 | print "%s: %s" % (key, value) |
| 104 | self.checkrec(key, value) |
| 105 | |
| 106 | dbvalues = d.values() |
| 107 | assert len(dbvalues) == len(d.keys()) |
| 108 | values.sort() |
| 109 | dbvalues.sort() |
| 110 | assert values == dbvalues |
| 111 | |
| 112 | items = d.items() |
| 113 | assert len(items) == len(values) |
| 114 | |
| 115 | for key, value in items: |
| 116 | self.checkrec(key, value) |
| 117 | |
| 118 | assert d.get('bad key') == None |
| 119 | assert d.get('bad key', None) == None |
| 120 | assert d.get('bad key', 'a string') == 'a string' |
| 121 | assert d.get('bad key', [1, 2, 3]) == [1, 2, 3] |
| 122 | |
| 123 | d.set_get_returns_none(0) |
| 124 | self.assertRaises(db.DBNotFoundError, d.get, 'bad key') |
| 125 | d.set_get_returns_none(1) |
| 126 | |
| 127 | d.put('new key', 'new data') |
| 128 | assert d.get('new key') == 'new data' |
| 129 | assert d['new key'] == 'new data' |
| 130 | |
| 131 | |
| 132 | |
| 133 | def test02_cursors(self): |
| 134 | if verbose: |
| 135 | print '\n', '-=' * 30 |
| 136 | print "Running %s.test02_cursors..." % self.__class__.__name__ |
| 137 | |
| 138 | self.populateDB(self.d) |
| 139 | d = self.d |
| 140 | |
| 141 | count = 0 |
| 142 | c = d.cursor() |
| 143 | rec = c.first() |
| 144 | while rec is not None: |
| 145 | count = count + 1 |
| 146 | if verbose: |
| 147 | print rec |
| 148 | key, value = rec |
| 149 | self.checkrec(key, value) |
| 150 | rec = c.next() |
| 151 | del c |
| 152 | |
| 153 | assert count == len(d) |
| 154 | |
| 155 | count = 0 |
| 156 | c = d.cursor() |
| 157 | rec = c.last() |
| 158 | while rec is not None: |
| 159 | count = count + 1 |
| 160 | if verbose: |
| 161 | print rec |
| 162 | key, value = rec |
| 163 | self.checkrec(key, value) |
| 164 | rec = c.prev() |
| 165 | |
| 166 | assert count == len(d) |
| 167 | |
| 168 | c.set('SS') |
| 169 | key, value = c.current() |
| 170 | self.checkrec(key, value) |
| 171 | del c |
| 172 | |
| 173 | |
| 174 | |
| 175 | def checkrec(self, key, value): |
| 176 | x = key[1] |
| 177 | if key[0] == 'S': |
| 178 | assert type(value) == StringType |
| 179 | assert value == 10 * x |
| 180 | |
| 181 | elif key[0] == 'I': |
| 182 | assert type(value) == IntType |
| 183 | assert value == ord(x) |
| 184 | |
| 185 | elif key[0] == 'L': |
| 186 | assert type(value) == ListType |
| 187 | assert value == [x] * 10 |
| 188 | |
| 189 | elif key[0] == 'O': |
| 190 | assert type(value) == InstanceType |
| 191 | assert value.S == 10 * x |
| 192 | assert value.I == ord(x) |
| 193 | assert value.L == [x] * 10 |
| 194 | |
| 195 | else: |
| 196 | raise AssertionError, 'Unknown key type, fix the test' |
| 197 | |
| 198 | #---------------------------------------------------------------------- |
| 199 | |
| 200 | class BasicShelveTestCase(DBShelveTestCase): |
| 201 | def do_open(self): |
| 202 | self.d = dbshelve.DBShelf() |
| 203 | self.d.open(self.filename, self.dbtype, self.dbflags) |
| 204 | |
| 205 | def do_close(self): |
| 206 | self.d.close() |
| 207 | |
| 208 | |
| 209 | class BTreeShelveTestCase(BasicShelveTestCase): |
| 210 | dbtype = db.DB_BTREE |
| 211 | dbflags = db.DB_CREATE |
| 212 | |
| 213 | |
| 214 | class HashShelveTestCase(BasicShelveTestCase): |
| 215 | dbtype = db.DB_HASH |
| 216 | dbflags = db.DB_CREATE |
| 217 | |
| 218 | |
| 219 | class ThreadBTreeShelveTestCase(BasicShelveTestCase): |
| 220 | dbtype = db.DB_BTREE |
| 221 | dbflags = db.DB_CREATE | db.DB_THREAD |
| 222 | |
| 223 | |
| 224 | class ThreadHashShelveTestCase(BasicShelveTestCase): |
| 225 | dbtype = db.DB_HASH |
| 226 | dbflags = db.DB_CREATE | db.DB_THREAD |
| 227 | |
| 228 | |
| 229 | #---------------------------------------------------------------------- |
| 230 | |
| 231 | class BasicEnvShelveTestCase(DBShelveTestCase): |
| 232 | def do_open(self): |
| 233 | self.homeDir = homeDir = os.path.join( |
| 234 | os.path.dirname(sys.argv[0]), 'db_home') |
| 235 | try: os.mkdir(homeDir) |
| 236 | except os.error: pass |
| 237 | self.env = db.DBEnv() |
| 238 | self.env.open(homeDir, self.envflags | db.DB_INIT_MPOOL | db.DB_CREATE) |
| 239 | |
| 240 | self.filename = os.path.split(self.filename)[1] |
| 241 | self.d = dbshelve.DBShelf(self.env) |
| 242 | self.d.open(self.filename, self.dbtype, self.dbflags) |
| 243 | |
| 244 | |
| 245 | def do_close(self): |
| 246 | self.d.close() |
| 247 | self.env.close() |
| 248 | |
| 249 | |
| 250 | def tearDown(self): |
| 251 | self.do_close() |
| 252 | import glob |
| 253 | files = glob.glob(os.path.join(self.homeDir, '*')) |
| 254 | for file in files: |
| 255 | os.remove(file) |
| 256 | |
| 257 | |
| 258 | |
| 259 | class EnvBTreeShelveTestCase(BasicEnvShelveTestCase): |
| 260 | envflags = 0 |
| 261 | dbtype = db.DB_BTREE |
| 262 | dbflags = db.DB_CREATE |
| 263 | |
| 264 | |
| 265 | class EnvHashShelveTestCase(BasicEnvShelveTestCase): |
| 266 | envflags = 0 |
| 267 | dbtype = db.DB_HASH |
| 268 | dbflags = db.DB_CREATE |
| 269 | |
| 270 | |
| 271 | class EnvThreadBTreeShelveTestCase(BasicEnvShelveTestCase): |
| 272 | envflags = db.DB_THREAD |
| 273 | dbtype = db.DB_BTREE |
| 274 | dbflags = db.DB_CREATE | db.DB_THREAD |
| 275 | |
| 276 | |
| 277 | class EnvThreadHashShelveTestCase(BasicEnvShelveTestCase): |
| 278 | envflags = db.DB_THREAD |
| 279 | dbtype = db.DB_HASH |
| 280 | dbflags = db.DB_CREATE | db.DB_THREAD |
| 281 | |
| 282 | |
| 283 | #---------------------------------------------------------------------- |
| 284 | # TODO: Add test cases for a DBShelf in a RECNO DB. |
| 285 | |
| 286 | |
| 287 | #---------------------------------------------------------------------- |
| 288 | |
| 289 | def test_suite(): |
| 290 | suite = unittest.TestSuite() |
| 291 | |
| 292 | suite.addTest(unittest.makeSuite(DBShelveTestCase)) |
| 293 | suite.addTest(unittest.makeSuite(BTreeShelveTestCase)) |
| 294 | suite.addTest(unittest.makeSuite(HashShelveTestCase)) |
| 295 | suite.addTest(unittest.makeSuite(ThreadBTreeShelveTestCase)) |
| 296 | suite.addTest(unittest.makeSuite(ThreadHashShelveTestCase)) |
| 297 | suite.addTest(unittest.makeSuite(EnvBTreeShelveTestCase)) |
| 298 | suite.addTest(unittest.makeSuite(EnvHashShelveTestCase)) |
| 299 | suite.addTest(unittest.makeSuite(EnvThreadBTreeShelveTestCase)) |
| 300 | suite.addTest(unittest.makeSuite(EnvThreadHashShelveTestCase)) |
| 301 | |
| 302 | return suite |
| 303 | |
| 304 | |
| 305 | if __name__ == '__main__': |
| 306 | unittest.main(defaultTest='test_suite') |