"""TestCases for multi-threaded access to a DB.
from pprint
import pprint
from random
import random
from threading
import Thread
, currentThread
from test_all
import verbose
# For Pythons w/distutils pybsddb
from bsddb3
import db
, dbutils
from bsddb
import db
, dbutils
#----------------------------------------------------------------------
class BaseThreadedTestCase(unittest
.TestCase
):
dbtype
= db
.DB_UNKNOWN
# must be set in derived class
dbutils
._deadlock
_VerboseFile
= sys
.stdout
homeDir
= os
.path
.join(os
.path
.dirname(sys
.argv
[0]), 'db_home')
if e
.errno
<> errno
.EEXIST
: raise
self
.env
.open(homeDir
, self
.envflags | db
.DB_CREATE
)
self
.filename
= self
.__class
__.__name
__ + '.db'
self
.d
.set_flags(self
.dbsetflags
)
self
.d
.open(self
.filename
, self
.dbtype
, self
.dbopenflags|db
.DB_CREATE
)
shutil
.rmtree(self
.homeDir
)
return DASH
.join([key
] * 5)
#----------------------------------------------------------------------
class ConcurrentDataStoreBase(BaseThreadedTestCase
):
dbopenflags
= db
.DB_THREAD
envflags
= db
.DB_THREAD | db
.DB_INIT_CDB | db
.DB_INIT_MPOOL
readers
= 0 # derived class should set
def test01_1WriterMultiReaders(self
):
print "Running %s.test01_1WriterMultiReaders..." % \
for x
in range(self
.writers
):
wt
= Thread(target
= self
.writerThread
,
args
= (self
.d
, self
.records
, x
),
for x
in range(self
.readers
):
rt
= Thread(target
= self
.readerThread
,
def writerThread(self
, d
, howMany
, writerNum
):
#time.sleep(0.01 * writerNum + 0.01)
name
= currentThread().getName()
start
= howMany
* writerNum
stop
= howMany
* (writerNum
+ 1) - 1
print "%s: creating records %d - %d" % (name
, start
, stop
)
for x
in range(start
, stop
):
dbutils
.DeadlockWrap(d
.put
, key
, self
.makeData(key
),
if verbose
and x
% 100 == 0:
print "%s: records %d - %d finished" % (name
, start
, x
)
print "%s: finished creating records" % name
## # Each write-cursor will be exclusive, the only one that can update the DB...
## if verbose: print "%s: deleting a few records" % name
## c = d.cursor(flags = db.DB_WRITECURSOR)
## key = int(random() * howMany) + start
print "%s: thread finished" % name
def readerThread(self
, d
, readerNum
):
time
.sleep(0.01 * readerNum
)
name
= currentThread().getName()
self
.assertEqual(self
.makeData(key
), data
)
print "%s: found %d records" % (name
, count
)
print "%s: thread finished" % name
class BTreeConcurrentDataStore(ConcurrentDataStoreBase
):
class HashConcurrentDataStore(ConcurrentDataStoreBase
):
#----------------------------------------------------------------------
class SimpleThreadedBase(BaseThreadedTestCase
):
dbopenflags
= db
.DB_THREAD
envflags
= db
.DB_THREAD | db
.DB_INIT_MPOOL | db
.DB_INIT_LOCK
self
.env
.set_lk_detect(db
.DB_LOCK_DEFAULT
)
def test02_SimpleLocks(self
):
print "Running %s.test02_SimpleLocks..." % self
.__class
__.__name
__
for x
in range(self
.writers
):
wt
= Thread(target
= self
.writerThread
,
args
= (self
.d
, self
.records
, x
),
for x
in range(self
.readers
):
rt
= Thread(target
= self
.readerThread
,
def writerThread(self
, d
, howMany
, writerNum
):
name
= currentThread().getName()
start
= howMany
* writerNum
stop
= howMany
* (writerNum
+ 1) - 1
print "%s: creating records %d - %d" % (name
, start
, stop
)
# create a bunch of records
for x
in xrange(start
, stop
):
dbutils
.DeadlockWrap(d
.put
, key
, self
.makeData(key
),
if verbose
and x
% 100 == 0:
print "%s: records %d - %d finished" % (name
, start
, x
)
# do a bit or reading too
for y
in xrange(start
, x
):
data
= dbutils
.DeadlockWrap(d
.get
, key
, max_retries
=12)
self
.assertEqual(data
, self
.makeData(key
))
dbutils
.DeadlockWrap(d
.sync
, max_retries
=12)
except db
.DBIncompleteError
, val
:
print "could not complete sync()..."
# read them back, deleting a few
for x
in xrange(start
, stop
):
data
= dbutils
.DeadlockWrap(d
.get
, key
, max_retries
=12)
if verbose
and x
% 100 == 0:
print "%s: fetched record (%s, %s)" % (name
, key
, data
)
self
.assertEqual(data
, self
.makeData(key
))
dbutils
.DeadlockWrap(d
.delete
, key
, max_retries
=12)
print "%s: deleted record %s" % (name
, key
)
print "%s: thread finished" % name
def readerThread(self
, d
, readerNum
):
time
.sleep(0.01 * readerNum
)
name
= currentThread().getName()
rec
= dbutils
.DeadlockWrap(c
.first
, max_retries
=10)
self
.assertEqual(self
.makeData(key
), data
)
rec
= dbutils
.DeadlockWrap(c
.next
, max_retries
=10)
print "%s: found %d records" % (name
, count
)
print "%s: thread finished" % name
class BTreeSimpleThreaded(SimpleThreadedBase
):
class HashSimpleThreaded(SimpleThreadedBase
):
#----------------------------------------------------------------------
class ThreadedTransactionsBase(BaseThreadedTestCase
):
dbopenflags
= db
.DB_THREAD | db
.DB_AUTO_COMMIT
envflags
= (db
.DB_THREAD |
#self.env.set_lk_detect(db.DB_LOCK_DEFAULT)
def test03_ThreadedTransactions(self
):
print "Running %s.test03_ThreadedTransactions..." % \
for x
in range(self
.writers
):
wt
= Thread(target
= self
.writerThread
,
args
= (self
.d
, self
.records
, x
),
for x
in range(self
.readers
):
rt
= Thread(target
= self
.readerThread
,
dt
= Thread(target
= self
.deadlockThread
)
self
.doLockDetect
= False
def doWrite(self
, d
, name
, start
, stop
):
txn
= self
.env
.txn_begin(None, self
.txnFlag
)
for x
in range(start
, stop
):
d
.put(key
, self
.makeData(key
), txn
)
if verbose
and x
% 100 == 0:
print "%s: records %d - %d finished" % (name
, start
, x
)
except (db
.DBLockDeadlockError
, db
.DBLockNotGrantedError
), val
:
print "%s: Aborting transaction (%s)" % (name
, val
[1])
def writerThread(self
, d
, howMany
, writerNum
):
name
= currentThread().getName()
start
= howMany
* writerNum
stop
= howMany
* (writerNum
+ 1) - 1
print "%s: creating records %d - %d" % (name
, start
, stop
)
for x
in range(start
, stop
, step
):
self
.doWrite(d
, name
, x
, min(stop
, x
+step
))
print "%s: finished creating records" % name
print "%s: deleting a few records" % name
txn
= self
.env
.txn_begin(None, self
.txnFlag
)
key
= int(random() * howMany
) + start
data
= d
.get(key
, None, txn
, db
.DB_RMW
)
print "%s: deleted records %s" % (name
, recs
)
except (db
.DBLockDeadlockError
, db
.DBLockNotGrantedError
), val
:
print "%s: Aborting transaction (%s)" % (name
, val
[1])
print "%s: thread finished" % name
def readerThread(self
, d
, readerNum
):
time
.sleep(0.01 * readerNum
+ 0.05)
name
= currentThread().getName()
txn
= self
.env
.txn_begin(None, self
.txnFlag
)
self
.assertEqual(self
.makeData(key
), data
)
if verbose
: print "%s: found %d records" % (name
, count
)
except (db
.DBLockDeadlockError
, db
.DBLockNotGrantedError
), val
:
print "%s: Aborting transaction (%s)" % (name
, val
[1])
print "%s: thread finished" % name
def deadlockThread(self
):
aborted
= self
.env
.lock_detect(
db
.DB_LOCK_RANDOM
, db
.DB_LOCK_CONFLICT
)
print "deadlock: Aborted %d deadlocked transaction(s)" \
class BTreeThreadedTransactions(ThreadedTransactionsBase
):
class HashThreadedTransactions(ThreadedTransactionsBase
):
class BTreeThreadedNoWaitTransactions(ThreadedTransactionsBase
):
txnFlag
= db
.DB_TXN_NOWAIT
class HashThreadedNoWaitTransactions(ThreadedTransactionsBase
):
txnFlag
= db
.DB_TXN_NOWAIT
#----------------------------------------------------------------------
suite
= unittest
.TestSuite()
suite
.addTest(unittest
.makeSuite(BTreeConcurrentDataStore
))
suite
.addTest(unittest
.makeSuite(HashConcurrentDataStore
))
suite
.addTest(unittest
.makeSuite(BTreeSimpleThreaded
))
suite
.addTest(unittest
.makeSuite(HashSimpleThreaded
))
suite
.addTest(unittest
.makeSuite(BTreeThreadedTransactions
))
suite
.addTest(unittest
.makeSuite(HashThreadedTransactions
))
suite
.addTest(unittest
.makeSuite(BTreeThreadedNoWaitTransactions
))
suite
.addTest(unittest
.makeSuite(HashThreadedNoWaitTransactions
))
print "Threads not available, skipping thread tests."
if __name__
== '__main__':
unittest
.main(defaultTest
='test_suite')