Basic TestCases for BTree and hash DBs, with and without a DBEnv, with
from pprint
import pprint
# For Pythons w/distutils pybsddb
from test_all
import verbose
#----------------------------------------------------------------------
class VersionTestCase(unittest
.TestCase
):
def test00_version(self
):
print 'bsddb.db.version(): %s' % (info
, )
print db
.DB_VERSION_STRING
assert info
== (db
.DB_VERSION_MAJOR
, db
.DB_VERSION_MINOR
,
#----------------------------------------------------------------------
class BasicTestCase(unittest
.TestCase
):
dbtype
= db
.DB_UNKNOWN
# must be set in derived class
_numKeys
= 1002 # PRIVATE. NOTE: must be an even value
homeDir
= os
.path
.join(os
.path
.dirname(sys
.argv
[0]), 'db_home')
# unix returns ENOENT, windows returns ESRCH
if e
.errno
not in (errno
.ENOENT
, errno
.ESRCH
): raise
self
.env
.set_lg_max(1024*1024)
self
.env
.set_flags(self
.envsetflags
, 1)
self
.env
.open(homeDir
, self
.envflags | db
.DB_CREATE
)
tempfile
.tempdir
= homeDir
self
.filename
= os
.path
.split(tempfile
.mktemp())[1]
# Yes, a bare except is intended, since we're re-raising the exc.
self
.filename
= tempfile
.mktemp()
self
.d
.set_flags(self
.dbsetflags
)
self
.d
.open(self
.filename
, self
.dbname
, self
.dbtype
,
self
.dbopenflags|db
.DB_CREATE
, self
.dbmode
)
self
.d
.open(self
.filename
, # try out keyword args
flags
= self
.dbopenflags|db
.DB_CREATE
)
shutil
.rmtree(self
.homeDir
)
## Make a new DBEnv to remove the env files from the home dir.
## (It can't be done while the env is open, nor after it has been
## closed, so we make a new one to do it.)
#os.remove(os.path.join(self.homeDir, self.filename))
def populateDB(self
, _txn
=None):
for x
in range(self
._numKeys
/2):
key
= '%04d' % (self
._numKeys
- x
) # insert keys in reverse order
data
= self
.makeData(key
)
d
.put('empty value', '', _txn
)
for x
in range(self
._numKeys
/2-1):
key
= '%04d' % x
# and now some in forward order
data
= self
.makeData(key
)
print "created %d records" % num
return DASH
.join([key
] * 5)
#----------------------------------------
def test01_GetsAndPuts(self
):
print "Running %s.test01_GetsAndPuts..." % self
.__class
__.__name
__
for key
in ['0001', '0100', '0400', '0700', '0999']:
assert d
.get('0321') == '0321-0321-0321-0321-0321'
# By default non-existant keys return None...
assert d
.get('abcd') == None
# ...but they raise exceptions in other situations. Call
# set_get_returns_none() to change it.
except db
.DBNotFoundError
, val
:
assert val
[0] == db
.DB_NOTFOUND
self
.fail("expected exception")
d
.put('abcd', 'a new record')
assert d
.get('abcd') == 'a new record'
d
.put('abcd', 'same key')
if self
.dbsetflags
& db
.DB_DUP
:
assert d
.get('abcd') == 'a new record'
assert d
.get('abcd') == 'same key'
d
.put('abcd', 'this should fail', flags
=db
.DB_NOOVERWRITE
)
except db
.DBKeyExistError
, val
:
assert val
[0] == db
.DB_KEYEXIST
self
.fail("expected exception")
if self
.dbsetflags
& db
.DB_DUP
:
assert d
.get('abcd') == 'a new record'
assert d
.get('abcd') == 'same key'
self
.d
.open(self
.filename
, self
.dbname
)
self
.d
.open(self
.filename
)
assert d
.get('0321') == '0321-0321-0321-0321-0321'
if self
.dbsetflags
& db
.DB_DUP
:
assert d
.get('abcd') == 'a new record'
assert d
.get('abcd') == 'same key'
rec
= d
.get_both('0555', '0555-0555-0555-0555-0555')
assert d
.get_both('0555', 'bad data') == None
data
= d
.get('bad key', 'bad data')
assert data
== 'bad data'
# any object can pass through
data
= d
.get('bad key', self
)
assert type(s
) == type({})
print 'd.stat() returned this dictionary:'
#----------------------------------------
def test02_DictionaryMethods(self
):
print "Running %s.test02_DictionaryMethods..." % \
for key
in ['0002', '0101', '0401', '0701', '0998']:
assert data
== self
.makeData(key
)
assert len(d
) == self
._numKeys
assert len(keys
) == self
._numKeys
assert type(keys
) == type([])
d
['new record'] = 'a new record'
assert len(d
) == self
._numKeys
+1
assert len(keys
) == self
._numKeys
+1
d
['new record'] = 'a replacement record'
assert len(d
) == self
._numKeys
+1
assert len(keys
) == self
._numKeys
+1
print "the first 10 keys are:"
assert d
['new record'] == 'a replacement record'
assert d
.has_key('0001') == 1
assert d
.has_key('spam') == 0
assert len(items
) == self
._numKeys
+1
assert type(items
) == type([])
assert type(items
[0]) == type(())
assert len(items
[0]) == 2
print "the first 10 items are:"
assert len(values
) == self
._numKeys
+1
assert type(values
) == type([])
print "the first 10 values are:"
#----------------------------------------
def test03_SimpleCursorStuff(self
, get_raises_error
=0, set_raises_error
=0):
print "Running %s.test03_SimpleCursorStuff (get_error %s, set_error %s)..." % \
(self
.__class
__.__name
__, get_raises_error
, set_raises_error
)
if self
.env
and self
.dbopenflags
& db
.DB_AUTO_COMMIT
:
txn
= self
.env
.txn_begin()
c
= self
.d
.cursor(txn
=txn
)
if verbose
and count
% 100 == 0:
except db
.DBNotFoundError
, val
:
assert val
[0] == db
.DB_NOTFOUND
self
.fail("unexpected DBNotFoundError")
assert c
.get_current_size() == len(c
.current()[1]), "%s != len(%r)" % (c
.get_current_size(), c
.current()[1])
assert count
== self
._numKeys
if verbose
and count
% 100 == 0:
except db
.DBNotFoundError
, val
:
assert val
[0] == db
.DB_NOTFOUND
self
.fail("unexpected DBNotFoundError")
assert count
== self
._numKeys
assert rec
[1] == self
.makeData('0505')
assert c
.get_current_size() == len(rec
[1])
# make sure we get empty values properly
rec
= c
.set('empty value')
assert c
.get_current_size() == 0
except db
.DBNotFoundError
, val
:
assert val
[0] == db
.DB_NOTFOUND
self
.fail("expected exception")
self
.fail("expected None: %r" % (n
,))
rec
= c
.get_both('0404', self
.makeData('0404'))
assert rec
== ('0404', self
.makeData('0404'))
n
= c
.get_both('0404', 'bad data')
except db
.DBNotFoundError
, val
:
assert val
[0] == db
.DB_NOTFOUND
self
.fail("expected exception")
self
.fail("expected None: %r" % (n
,))
if self
.d
.get_type() == db
.DB_BTREE
:
print "searched for '011', found: ", rec
rec
= c
.set_range('011',dlen
=0,doff
=0)
print "searched (partial) for '011', found: ", rec
if rec
[1] != '': self
.fail('expected empty data portion')
ev
= c
.set_range('empty value')
print "search for 'empty value' returned", ev
if ev
[1] != '': self
.fail('empty value lookup failed')
except db
.DBKeyEmptyError
, val
:
assert val
[0] == db
.DB_KEYEMPTY
self
.fail('exception expected')
c2
= c
.dup(db
.DB_POSITION
)
assert c
.current() == c2
.current()
c2
.put('', 'a new value', db
.DB_CURRENT
)
assert c
.current() == c2
.current()
assert c
.current()[1] == 'a new value'
c2
.put('', 'er', db
.DB_CURRENT
, dlen
=0, doff
=5)
assert c2
.current()[1] == 'a newer value'
# time to abuse the closed cursors and hope we don't crash
'dup': (db
.DB_POSITION
,),
'put':('', 'spam', db
.DB_CURRENT
),
for method
, args
in methods_to_test
.items():
print "attempting to use a closed cursor's %s method" % \
# a bug may cause a NULL pointer dereference...
apply(getattr(c
, method
), args
)
self
.fail("no exception raised when using a buggy cursor's"
# free cursor referencing a closed database, it should not barf:
oldcursor
= self
.d
.cursor(txn
=txn
)
# this would originally cause a segfault when the cursor for a
# closed database was cleaned up. it should not anymore.
# SF pybsddb bug id 667343
def test03b_SimpleCursorWithoutGetReturnsNone0(self
):
# same test but raise exceptions instead of returning None
print "Running %s.test03b_SimpleCursorStuffWithoutGetReturnsNone..." % \
old
= self
.d
.set_get_returns_none(0)
self
.test03_SimpleCursorStuff(get_raises_error
=1, set_raises_error
=1)
def test03b_SimpleCursorWithGetReturnsNone1(self
):
# same test but raise exceptions instead of returning None
print "Running %s.test03b_SimpleCursorStuffWithoutGetReturnsNone..." % \
old
= self
.d
.set_get_returns_none(1)
self
.test03_SimpleCursorStuff(get_raises_error
=0, set_raises_error
=1)
def test03c_SimpleCursorGetReturnsNone2(self
):
# same test but raise exceptions instead of returning None
print "Running %s.test03c_SimpleCursorStuffWithoutSetReturnsNone..." % \
old
= self
.d
.set_get_returns_none(1)
old
= self
.d
.set_get_returns_none(2)
self
.test03_SimpleCursorStuff(get_raises_error
=0, set_raises_error
=0)
#----------------------------------------
def test04_PartialGetAndPut(self
):
print "Running %s.test04_PartialGetAndPut..." % \
data
= "1" * 1000 + "2" * 1000
assert d
.get(key
) == data
assert d
.get(key
, dlen
=20, doff
=990) == ("1" * 10) + ("2" * 10)
d
.put("partialtest2", ("1" * 30000) + "robin" )
assert d
.get("partialtest2", dlen
=5, doff
=30000) == "robin"
# There seems to be a bug in DB here... Commented out the test for
##assert d.get("partialtest2", dlen=5, doff=30010) == ""
if self
.dbsetflags
!= db
.DB_DUP
:
# Partial put with duplicate records requires a cursor
d
.put(key
, "0000", dlen
=2000, doff
=0)
assert d
.get(key
) == "0000"
d
.put(key
, "1111", dlen
=1, doff
=2)
assert d
.get(key
) == "0011110"
#----------------------------------------
def test05_GetSize(self
):
print "Running %s.test05_GetSize..." % self
.__class
__.__name
__
for i
in range(1, 50000, 500):
assert d
.get_size(key
) == i
#----------------------------------------
def test06_Truncate(self
):
# truncate is a feature of BerkeleyDB 3.3 and above
print "Running %s.test99_Truncate..." % self
.__class
__.__name
__
assert num
>= 1, "truncate returned <= 0 on non-empty database"
assert num
== 0, "truncate on empty DB returned nonzero (%r)" % (num
,)
#----------------------------------------------------------------------
class BasicBTreeTestCase(BasicTestCase
):
class BasicHashTestCase(BasicTestCase
):
class BasicBTreeWithThreadFlagTestCase(BasicTestCase
):
dbopenflags
= db
.DB_THREAD
class BasicHashWithThreadFlagTestCase(BasicTestCase
):
dbopenflags
= db
.DB_THREAD
class BasicBTreeWithEnvTestCase(BasicTestCase
):
dbopenflags
= db
.DB_THREAD
envflags
= db
.DB_THREAD | db
.DB_INIT_MPOOL | db
.DB_INIT_LOCK
class BasicHashWithEnvTestCase(BasicTestCase
):
dbopenflags
= db
.DB_THREAD
envflags
= db
.DB_THREAD | db
.DB_INIT_MPOOL | db
.DB_INIT_LOCK
#----------------------------------------------------------------------
class BasicTransactionTestCase(BasicTestCase
):
dbopenflags
= db
.DB_THREAD | db
.DB_AUTO_COMMIT
envflags
= (db
.DB_THREAD | db
.DB_INIT_MPOOL | db
.DB_INIT_LOCK |
envsetflags
= db
.DB_AUTO_COMMIT
BasicTestCase
.tearDown(self
)
txn
= self
.env
.txn_begin()
BasicTestCase
.populateDB(self
, _txn
=txn
)
self
.txn
= self
.env
.txn_begin()
def test06_Transactions(self
):
print "Running %s.test06_Transactions..." % self
.__class
__.__name
__
assert d
.get('new rec', txn
=self
.txn
) == None
d
.put('new rec', 'this is a new record', self
.txn
)
assert d
.get('new rec', txn
=self
.txn
) == 'this is a new record'
assert d
.get('new rec') == None
self
.txn
= self
.env
.txn_begin()
assert d
.get('new rec', txn
=self
.txn
) == None
d
.put('new rec', 'this is a new record', self
.txn
)
assert d
.get('new rec', txn
=self
.txn
) == 'this is a new record'
assert d
.get('new rec') == 'this is a new record'
self
.txn
= self
.env
.txn_begin()
if verbose
and count
% 100 == 0:
assert count
== self
._numKeys
+1
c
.close() # Cursors *MUST* be closed before commit!
self
.env
.txn_checkpoint (0, 0, 0)
except db
.DBIncompleteError
:
# must have at least one log file present:
logs
= self
.env
.log_archive(db
.DB_ARCH_ABS | db
.DB_ARCH_LOG
)
self
.txn
= self
.env
.txn_begin()
#----------------------------------------
def test07_TxnTruncate(self
):
# truncate is a feature of BerkeleyDB 3.3 and above
print "Running %s.test07_TxnTruncate..." % self
.__class
__.__name
__
txn
= self
.env
.txn_begin()
assert num
>= 1, "truncate returned <= 0 on non-empty database"
assert num
== 0, "truncate on empty DB returned nonzero (%r)" % (num
,)
#----------------------------------------
def test08_TxnLateUse(self
):
txn
= self
.env
.txn_begin()
raise RuntimeError, "DBTxn.abort() called after DB_TXN no longer valid w/o an exception"
txn
= self
.env
.txn_begin()
raise RuntimeError, "DBTxn.commit() called after DB_TXN no longer valid w/o an exception"
class BTreeTransactionTestCase(BasicTransactionTestCase
):
class HashTransactionTestCase(BasicTransactionTestCase
):
#----------------------------------------------------------------------
class BTreeRecnoTestCase(BasicTestCase
):
dbsetflags
= db
.DB_RECNUM
def test07_RecnoInBTree(self
):
print "Running %s.test07_RecnoInBTree..." % self
.__class
__.__name
__
assert type(rec
) == type(())
print "Record #200 is ", rec
assert type(num
) == type(1)
print "recno of d['0200'] is ", num
assert c
.set_recno(num
) == rec
class BTreeRecnoWithThreadFlagTestCase(BTreeRecnoTestCase
):
dbopenflags
= db
.DB_THREAD
#----------------------------------------------------------------------
class BasicDUPTestCase(BasicTestCase
):
def test08_DuplicateKeys(self
):
print "Running %s.test08_DuplicateKeys..." % \
for x
in "The quick brown fox jumped over the lazy dog.".split():
assert rec
== ('dup1', 'The')
assert next
== ('dup1', 'quick')
assert next_dup
== ('dup1', 'quick')
class BTreeDUPTestCase(BasicDUPTestCase
):
class HashDUPTestCase(BasicDUPTestCase
):
class BTreeDUPWithThreadTestCase(BasicDUPTestCase
):
dbopenflags
= db
.DB_THREAD
class HashDUPWithThreadTestCase(BasicDUPTestCase
):
dbopenflags
= db
.DB_THREAD
#----------------------------------------------------------------------
class BasicMultiDBTestCase(BasicTestCase
):
if self
.dbtype
== db
.DB_BTREE
:
def test09_MultiDB(self
):
print "Running %s.test09_MultiDB..." % self
.__class
__.__name
__
d2
.open(self
.filename
, "second", self
.dbtype
,
self
.dbopenflags|db
.DB_CREATE
)
d3
.open(self
.filename
, "third", self
.otherType(),
self
.dbopenflags|db
.DB_CREATE
)
for x
in "The quick brown fox jumped over the lazy dog".split():
d2
.put(x
, self
.makeData(x
))
self
.d
= d1
= d2
= d3
= None
self
.d
= d1
= db
.DB(self
.env
)
d1
.open(self
.filename
, self
.dbname
, flags
= self
.dbopenflags
)
d2
.open(self
.filename
, "second", flags
= self
.dbopenflags
)
d3
.open(self
.filename
, "third", flags
= self
.dbopenflags
)
if verbose
and (count
% 50) == 0:
assert count
== self
._numKeys
# Strange things happen if you try to use Multiple DBs per file without a
# DBEnv with MPOOL and LOCKing...
class BTreeMultiDBTestCase(BasicMultiDBTestCase
):
dbopenflags
= db
.DB_THREAD
envflags
= db
.DB_THREAD | db
.DB_INIT_MPOOL | db
.DB_INIT_LOCK
class HashMultiDBTestCase(BasicMultiDBTestCase
):
dbopenflags
= db
.DB_THREAD
envflags
= db
.DB_THREAD | db
.DB_INIT_MPOOL | db
.DB_INIT_LOCK
#----------------------------------------------------------------------
#----------------------------------------------------------------------
suite
= unittest
.TestSuite()
suite
.addTest(unittest
.makeSuite(VersionTestCase
))
suite
.addTest(unittest
.makeSuite(BasicBTreeTestCase
))
suite
.addTest(unittest
.makeSuite(BasicHashTestCase
))
suite
.addTest(unittest
.makeSuite(BasicBTreeWithThreadFlagTestCase
))
suite
.addTest(unittest
.makeSuite(BasicHashWithThreadFlagTestCase
))
suite
.addTest(unittest
.makeSuite(BasicBTreeWithEnvTestCase
))
suite
.addTest(unittest
.makeSuite(BasicHashWithEnvTestCase
))
suite
.addTest(unittest
.makeSuite(BTreeTransactionTestCase
))
suite
.addTest(unittest
.makeSuite(HashTransactionTestCase
))
suite
.addTest(unittest
.makeSuite(BTreeRecnoTestCase
))
suite
.addTest(unittest
.makeSuite(BTreeRecnoWithThreadFlagTestCase
))
suite
.addTest(unittest
.makeSuite(BTreeDUPTestCase
))
suite
.addTest(unittest
.makeSuite(HashDUPTestCase
))
suite
.addTest(unittest
.makeSuite(BTreeDUPWithThreadTestCase
))
suite
.addTest(unittest
.makeSuite(HashDUPWithThreadTestCase
))
suite
.addTest(unittest
.makeSuite(BTreeMultiDBTestCase
))
suite
.addTest(unittest
.makeSuite(HashMultiDBTestCase
))
if __name__
== '__main__':
unittest
.main(defaultTest
='test_suite')