85e1dea064f12fa4b8bdeb5406ceaf1428654552
from test
.test_support
import TestFailed
, have_unicode
, TESTFN
# Tests that try a number of pickle protocols should have a
# for proto in protocols:
assert pickle
.HIGHEST_PROTOCOL
== cPickle
.HIGHEST_PROTOCOL
== 2
protocols
= range(pickle
.HIGHEST_PROTOCOL
+ 1)
# Return True if opcode code appears in the pickle, else False.
def opcode_in_pickle(code
, pickle
):
for op
, dummy
, dummy
in pickletools
.genops(pickle
):
# Return the number of times opcode code appears in pickle.
def count_opcode(code
, pickle
):
for op
, dummy
, dummy
in pickletools
.genops(pickle
):
# We can't very well test the extension registry without putting known stuff
# in it, but we have to be careful to restore its original state. Code
# e = ExtensionSaver(extension_code)
# fiddle w/ the extension registry's stuff for extension_code
# Remember current registration for code (if any), and remove it (if
def __init__(self
, code
):
if code
in copy_reg
._inverted
_registry
:
self
.pair
= copy_reg
._inverted
_registry
[code
]
copy_reg
.remove_extension(self
.pair
[0], self
.pair
[1], code
)
# Restore previous registration for code.
curpair
= copy_reg
._inverted
_registry
.get(code
)
copy_reg
.remove_extension(curpair
[0], curpair
[1], code
)
copy_reg
.add_extension(pair
[0], pair
[1], code
)
def __cmp__(self
, other
):
return cmp(self
.__dict
__, other
.__dict
__)
C
.__module
__ = "__main__"
def __init__(self
, a
, b
):
def __getinitargs__(self
):
class use_metaclass(object):
__metaclass__
= metaclass
# DATA0 .. DATA2 are the pickles we expect under the various protocols, for
# the object returned by create_data().
# break into multiple strings to avoid confusing font-lock-mode
18: c GLOBAL '__builtin__ complex'
166: i INST '__main__ C' (MARK at 165)
182: d DICT (MARK at 181)
218: t TUPLE (MARK at 151)
highest protocol among opcodes = 0
DATA1
= (']q\x01(K\x00L1L\nG@\x00\x00\x00\x00\x00\x00\x00'
'c__builtin__\ncomplex\nq\x02(G@\x08\x00\x00\x00\x00\x00'
'\x00G\x00\x00\x00\x00\x00\x00\x00\x00tRq\x03K\x01J\xff\xff'
'\xff\xffK\xffJ\x01\xff\xff\xffJ\x00\xff\xff\xffM\xff\xff'
'J\x01\x00\xff\xffJ\x00\x00\xff\xffJ\xff\xff\xff\x7fJ\x01\x00'
'\x00\x80J\x00\x00\x00\x80(U\x03abcq\x04h\x04(c__main__\n'
'C\nq\x05oq\x06}q\x07(U\x03fooq\x08K\x01U\x03barq\tK\x02ubh'
19: c GLOBAL '__builtin__ complex'
102: J BININT -2147483647
107: J BININT -2147483648
113: U SHORT_BINSTRING 'abc'
123: c GLOBAL '__main__ C'
144: U SHORT_BINSTRING 'foo'
153: U SHORT_BINSTRING 'bar'
162: u SETITEMS (MARK at 143)
166: t TUPLE (MARK at 112)
173: e APPENDS (MARK at 3)
highest protocol among opcodes = 1
DATA2
= ('\x80\x02]q\x01(K\x00\x8a\x01\x01G@\x00\x00\x00\x00\x00\x00\x00'
'c__builtin__\ncomplex\nq\x02G@\x08\x00\x00\x00\x00\x00\x00G\x00'
'\x00\x00\x00\x00\x00\x00\x00\x86Rq\x03K\x01J\xff\xff\xff\xffK'
'\xffJ\x01\xff\xff\xffJ\x00\xff\xff\xffM\xff\xffJ\x01\x00\xff\xff'
'J\x00\x00\xff\xffJ\xff\xff\xff\x7fJ\x01\x00\x00\x80J\x00\x00\x00'
'\x80(U\x03abcq\x04h\x04(c__main__\nC\nq\x05oq\x06}q\x07(U\x03foo'
'q\x08K\x01U\x03barq\tK\x02ubh\x06tq\nh\nK\x05e.')
20: c GLOBAL '__builtin__ complex'
102: J BININT -2147483647
107: J BININT -2147483648
113: U SHORT_BINSTRING 'abc'
123: c GLOBAL '__main__ C'
144: U SHORT_BINSTRING 'foo'
153: U SHORT_BINSTRING 'bar'
162: u SETITEMS (MARK at 143)
166: t TUPLE (MARK at 112)
173: e APPENDS (MARK at 5)
highest protocol among opcodes = 2
# Append some integer test cases at cPickle.c's internal size
uint1max
, -uint1max
, -uint1max
-1,
uint2max
, -uint2max
, -uint2max
-1,
int4max
, -int4max
, -int4max
-1])
class AbstractPickleTests(unittest
.TestCase
):
# Subclass must define self.dumps, self.loads, self.error.
_testdata
= create_data()
# test various datatypes not tested by testdata
# XXX test __reduce__ protocol?
def test_roundtrip_equality(self
):
expected
= self
._testdata
s
= self
.dumps(expected
, proto
)
self
.assertEqual(expected
, got
)
def test_load_from_canned_string(self
):
expected
= self
._testdata
for canned
in DATA0
, DATA1
, DATA2
:
self
.assertEqual(expected
, got
)
# There are gratuitous differences between pickles produced by
# pickle and cPickle, largely because cPickle starts PUT indices at
# 1 and pickle starts them at 0. See XXX comment in cPickle's put2() --
# there's a comment with an exclamation point there whose meaning
# is a mystery. cPickle also suppresses PUT for objects with a refcount
def dont_test_disassembly(self
):
from cStringIO
import StringIO
from pickletools
import dis
for proto
, expected
in (0, DATA0_DIS
), (1, DATA1_DIS
):
s
= self
.dumps(self
._testdata
, proto
)
got
= filelike
.getvalue()
self
.assertEqual(expected
, got
)
def test_recursive_list(self
):
self
.assertEqual(len(x
), 1)
def test_recursive_dict(self
):
self
.assertEqual(x
.keys(), [1])
def test_recursive_inst(self
):
self
.assertEqual(dir(x
), dir(i
))
self
.assert_(x
.attr
is x
)
def test_recursive_multi(self
):
self
.assertEqual(len(x
), 1)
self
.assertEqual(dir(x
[0]), dir(i
))
self
.assertEqual(x
[0].attr
.keys(), [1])
self
.assert_(x
[0].attr
[1] is x
)
self
.assertRaises(self
.error
, self
.loads
, 'garyp')
def test_insecure_strings(self
):
insecure
= ["abc", "2 + 2", # not quoted
#"'abc' + 'def'", # not a single quoted string
"'abc", # quote is not closed
"'abc\"", # open quote and close quote don't match
"'abc' ?", # junk after close quote
"'\\'", # trailing backslash
# some tests of the quoting rules
#"'\\\\a\'\'\'\\\'\\\\\''",
buf
= "S" + s
+ "\012p0\012."
self
.assertRaises(ValueError, self
.loads
, buf
)
endcases
= [unicode(''), unicode('<\\u>'), unicode('<\\\u1234>'),
unicode('<\n>'), unicode('<\\>')]
s
= self
.dumps(expected
, proto
)
self
.assertEqual(expected
, n2
)
maxint64
= (1L << 63) - 1
data
= 'I' + str(maxint64
) + '\n.'
self
.assertEqual(got
, maxint64
)
# Try too with a bogus literal.
data
= 'I' + str(maxint64
) + 'JUNK\n.'
self
.assertRaises(ValueError, self
.loads
, data
)
# 256 bytes is where LONG4 begins.
for nbits
in 1, 8, 8*254, 8*255, 8*256, 8*257:
for npos
in nbase
-1, nbase
, nbase
+1:
pickle
= self
.dumps(n
, proto
)
# Try a monster. This is quadratic-time in protos 0 & 1, so don't
nbase
= long("deadbeeffeedface", 16)
nbase
+= nbase
<< 1000000
def test_getinitargs(self
):
def test_metaclass(self
):
self
.assertEqual(a
.__class
__, b
.__class
__)
def test_structseq(self
):
if hasattr(os
, "statvfs"):
t
= os
.statvfs(os
.curdir
)
build_none
= pickle
.NONE
+ pickle
.STOP
expected
= pickle
.PROTO
+ chr(proto
) + expected
p
= self
.dumps(None, proto
)
self
.assertEqual(p
, expected
)
oob
= protocols
[-1] + 1 # a future protocol
badpickle
= pickle
.PROTO
+ chr(oob
) + build_none
except ValueError, detail
:
self
.failUnless(str(detail
).startswith(
"unsupported pickle protocol"))
self
.fail("expected bad protocol number to raise ValueError")
x
= 12345678910111213141516178920L
self
.assertEqual(opcode_in_pickle(pickle
.LONG1
, s
), proto
>= 2)
x
= 12345678910111213141516178920L << (256*8)
self
.assertEqual(opcode_in_pickle(pickle
.LONG4
, s
), proto
>= 2)
def test_short_tuples(self
):
# Map (proto, len(tuple)) to expected opcode.
expected_opcode
= {(0, 0): pickle
.TUPLE
,
(1, 0): pickle
.EMPTY_TUPLE
,
(2, 0): pickle
.EMPTY_TUPLE
,
self
.assertEqual(x
, y
, (proto
, x
, s
, y
))
expected
= expected_opcode
[proto
, len(x
)]
self
.assertEqual(opcode_in_pickle(expected
, s
), True)
def test_singletons(self
):
# Map (proto, singleton) to expected opcode.
expected_opcode
= {(0, None): pickle
.NONE
,
(2, True): pickle
.NEWTRUE
,
(2, False): pickle
.NEWFALSE
,
for x
in None, False, True:
self
.assert_(x
is y
, (proto
, x
, s
, y
))
expected
= expected_opcode
[proto
, x
]
self
.assertEqual(opcode_in_pickle(expected
, s
), True)
def test_newobj_tuple(self
):
self
.assertEqual(tuple(x
), tuple(y
))
self
.assertEqual(x
.__dict
__, y
.__dict
__)
def test_newobj_list(self
):
self
.assertEqual(list(x
), list(y
))
self
.assertEqual(x
.__dict
__, y
.__dict
__)
def test_newobj_generic(self
):
detail
= (proto
, C
, B
, x
, y
, type(y
))
self
.assertEqual(B(x
), B(y
), detail
)
self
.assertEqual(x
.__dict
__, y
.__dict
__, detail
)
# Register a type with copy_reg, with extension code extcode. Pickle
# an object of that type. Check that the resulting pickle uses opcode
# (EXT[124]) under proto 2, and not in proto 1.
def produce_global_ext(self
, extcode
, opcode
):
e
= ExtensionSaver(extcode
)
copy_reg
.add_extension(__name__
, "MyList", extcode
)
# Dump using protocol 1 for comparison.
self
.assert_(__name__
in s1
)
self
.assert_("MyList" in s1
)
self
.assertEqual(opcode_in_pickle(opcode
, s1
), False)
self
.assertEqual(list(x
), list(y
))
self
.assertEqual(x
.__dict
__, y
.__dict
__)
# Dump using protocol 2 for test.
self
.assert_(__name__
not in s2
)
self
.assert_("MyList" not in s2
)
self
.assertEqual(opcode_in_pickle(opcode
, s2
), True)
self
.assertEqual(list(x
), list(y
))
self
.assertEqual(x
.__dict
__, y
.__dict
__)
def test_global_ext1(self
):
self
.produce_global_ext(0x00000001, pickle
.EXT1
) # smallest EXT1 code
self
.produce_global_ext(0x000000ff, pickle
.EXT1
) # largest EXT1 code
def test_global_ext2(self
):
self
.produce_global_ext(0x00000100, pickle
.EXT2
) # smallest EXT2 code
self
.produce_global_ext(0x0000ffff, pickle
.EXT2
) # largest EXT2 code
self
.produce_global_ext(0x0000abcd, pickle
.EXT2
) # check endianness
def test_global_ext4(self
):
self
.produce_global_ext(0x00010000, pickle
.EXT4
) # smallest EXT4 code
self
.produce_global_ext(0x7fffffff, pickle
.EXT4
) # largest EXT4 code
self
.produce_global_ext(0x12abcdef, pickle
.EXT4
) # check endianness
def test_list_chunking(self
):
n
= 10 # too small to chunk
num_appends
= count_opcode(pickle
.APPENDS
, s
)
self
.assertEqual(num_appends
, proto
> 0)
n
= 2500 # expect at least two chunks when proto > 0
num_appends
= count_opcode(pickle
.APPENDS
, s
)
self
.assertEqual(num_appends
, 0)
self
.failUnless(num_appends
>= 2)
def test_dict_chunking(self
):
n
= 10 # too small to chunk
x
= dict.fromkeys(range(n
))
num_setitems
= count_opcode(pickle
.SETITEMS
, s
)
self
.assertEqual(num_setitems
, proto
> 0)
n
= 2500 # expect at least two chunks when proto > 0
x
= dict.fromkeys(range(n
))
num_setitems
= count_opcode(pickle
.SETITEMS
, s
)
self
.assertEqual(num_setitems
, 0)
self
.failUnless(num_setitems
>= 2)
def test_simple_newobj(self
):
x
= object.__new
__(SimpleNewObj
) # avoid __init__
self
.assertEqual(opcode_in_pickle(pickle
.NEWOBJ
, s
), proto
>= 2)
y
= self
.loads(s
) # will raise TypeError if __init__ called
self
.assertEqual(y
.abc
, 666)
self
.assertEqual(x
.__dict
__, y
.__dict
__)
def test_newobj_list_slots(self
):
self
.assertEqual(list(x
), list(y
))
self
.assertEqual(x
.__dict
__, y
.__dict
__)
self
.assertEqual(x
.foo
, y
.foo
)
self
.assertEqual(x
.bar
, y
.bar
)
def test_reduce_overrides_default_reduce_ex(self
):
self
.assertEqual(x
._reduce
_called
, 0)
self
.assertEqual(x
._reduce
_called
, 1)
self
.assertEqual(y
._reduce
_called
, 0)
def test_reduce_ex_called(self
):
self
.assertEqual(x
._proto
, None)
self
.assertEqual(x
._proto
, proto
)
self
.assertEqual(y
._proto
, None)
def test_reduce_ex_overrides_reduce(self
):
self
.assertEqual(x
._proto
, None)
self
.assertEqual(x
._proto
, proto
)
self
.assertEqual(y
._proto
, None)
# Test classes for reduce_ex
# No __reduce_ex__ here, but inheriting it from object
def __reduce_ex__(self
, proto
):
# No __reduce__ here, but inheriting it from object
def __reduce_ex__(self
, proto
):
raise TestFailed
, "This __reduce__ shouldn't be called"
# Test classes for newobj
class MyComplex(complex):
class MyUnicode(unicode):
sample
= {"a": 1, "b": 2}
myclasses
= [MyInt
, MyLong
, MyFloat
,
class SimpleNewObj(object):
def __init__(self
, a
, b
, c
):
# raise an error, to make sure this isn't called
raise TypeError("SimpleNewObj.__init__() didn't expect to get called")
class AbstractPickleModuleTests(unittest
.TestCase
):
def test_dump_closed_file(self
):
self
.assertRaises(ValueError, self
.module
.dump
, 123, f
)
def test_load_closed_file(self
):
self
.assertRaises(ValueError, self
.module
.dump
, 123, f
)
def test_highest_protocol(self
):
# Of course this needs to be changed when HIGHEST_PROTOCOL changes.
self
.assertEqual(self
.module
.HIGHEST_PROTOCOL
, 2)
from cStringIO
import StringIO
# With and without keyword arguments
self
.module
.dump(123, f
, -1)
self
.module
.dump(123, file=f
, protocol
=-1)
self
.module
.dumps(123, -1)
self
.module
.dumps(123, protocol
=-1)
self
.module
.Pickler(f
, -1)
self
.module
.Pickler(f
, protocol
=-1)
class AbstractPersistentPicklerTests(unittest
.TestCase
):
# This class defines persistent_id() and persistent_load()
# functions that should be used by the pickler. All even integers
# are pickled using persistent ids.
def persistent_id(self
, object):
if isinstance(object, int) and object % 2 == 0:
def persistent_load(self
, oid
):
def test_persistence(self
):
self
.assertEqual(self
.loads(self
.dumps(L
)), L
)
self
.assertEqual(self
.id_count
, 5)
self
.assertEqual(self
.load_count
, 5)
def test_bin_persistence(self
):
self
.assertEqual(self
.loads(self
.dumps(L
, 1)), L
)
self
.assertEqual(self
.id_count
, 5)
self
.assertEqual(self
.load_count
, 5)