_StringTypes
= [types
.StringType
, types
.UnicodeType
]
_StringTypes
= [types
.StringType
]
START_ELEMENT
= "START_ELEMENT"
END_ELEMENT
= "END_ELEMENT"
START_DOCUMENT
= "START_DOCUMENT"
END_DOCUMENT
= "END_DOCUMENT"
PROCESSING_INSTRUCTION
= "PROCESSING_INSTRUCTION"
IGNORABLE_WHITESPACE
= "IGNORABLE_WHITESPACE"
CHARACTERS
= "CHARACTERS"
class PullDOM(xml
.sax
.ContentHandler
):
def __init__(self
, documentFactory
=None):
from xml
.dom
import XML_NAMESPACE
self
.documentFactory
= documentFactory
self
.firstEvent
= [None, None]
self
.lastEvent
= self
.firstEvent
self
.push
= self
.elementStack
.append
self
.pop
= self
.elementStack
.pop
self
._ns
_contexts
= [{XML_NAMESPACE
:'xml'}] # contains uri -> prefix dicts
self
._current
_context
= self
._ns
_contexts
[-1]
result
= self
.elementStack
[-1]
del self
.elementStack
[-1]
def setDocumentLocator(self
, locator
):
def startPrefixMapping(self
, prefix
, uri
):
if not hasattr(self
, '_xmlns_attrs'):
self
._xmlns
_attrs
.append((prefix
or 'xmlns', uri
))
self
._ns
_contexts
.append(self
._current
_context
.copy())
self
._current
_context
[uri
] = prefix
or None
def endPrefixMapping(self
, prefix
):
self
._current
_context
= self
._ns
_contexts
.pop()
def startElementNS(self
, name
, tagName
, attrs
):
# Retrieve xml namespace declaration attributes.
xmlns_uri
= 'http://www.w3.org/2000/xmlns/'
xmlns_attrs
= getattr(self
, '_xmlns_attrs', None)
if xmlns_attrs
is not None:
for aname
, value
in xmlns_attrs
:
attrs
._attrs
[(xmlns_uri
, aname
)] = value
# When using namespaces, the reader may or may not
# provide us with the original name. If not, create
# *a* valid tagName from the current context.
prefix
= self
._current
_context
[uri
]
tagName
= prefix
+ ":" + localname
node
= self
.document
.createElementNS(uri
, tagName
)
node
= self
.buildDocument(uri
, tagName
)
# When the tagname is not prefixed, it just appears as
node
= self
.document
.createElement(localname
)
node
= self
.buildDocument(None, localname
)
for aname
,value
in attrs
.items():
a_uri
, a_localname
= aname
if a_localname
== 'xmlns':
qname
= 'xmlns:' + a_localname
attr
= self
.document
.createAttributeNS(a_uri
, qname
)
node
.setAttributeNodeNS(attr
)
prefix
= self
._current
_context
[a_uri
]
qname
= prefix
+ ":" + a_localname
attr
= self
.document
.createAttributeNS(a_uri
, qname
)
node
.setAttributeNodeNS(attr
)
attr
= self
.document
.createAttribute(a_localname
)
node
.setAttributeNode(attr
)
self
.lastEvent
[1] = [(START_ELEMENT
, node
), None]
self
.lastEvent
= self
.lastEvent
[1]
def endElementNS(self
, name
, tagName
):
self
.lastEvent
[1] = [(END_ELEMENT
, self
.pop()), None]
self
.lastEvent
= self
.lastEvent
[1]
def startElement(self
, name
, attrs
):
node
= self
.document
.createElement(name
)
node
= self
.buildDocument(None, name
)
for aname
,value
in attrs
.items():
attr
= self
.document
.createAttribute(aname
)
node
.setAttributeNode(attr
)
self
.lastEvent
[1] = [(START_ELEMENT
, node
), None]
self
.lastEvent
= self
.lastEvent
[1]
def endElement(self
, name
):
self
.lastEvent
[1] = [(END_ELEMENT
, self
.pop()), None]
self
.lastEvent
= self
.lastEvent
[1]
node
= self
.document
.createComment(s
)
self
.lastEvent
[1] = [(COMMENT
, node
), None]
self
.lastEvent
= self
.lastEvent
[1]
event
= [(COMMENT
, s
), None]
self
.pending_events
.append(event
)
def processingInstruction(self
, target
, data
):
node
= self
.document
.createProcessingInstruction(target
, data
)
self
.lastEvent
[1] = [(PROCESSING_INSTRUCTION
, node
), None]
self
.lastEvent
= self
.lastEvent
[1]
event
= [(PROCESSING_INSTRUCTION
, target
, data
), None]
self
.pending_events
.append(event
)
def ignorableWhitespace(self
, chars
):
node
= self
.document
.createTextNode(chars
)
self
.lastEvent
[1] = [(IGNORABLE_WHITESPACE
, node
), None]
self
.lastEvent
= self
.lastEvent
[1]
def characters(self
, chars
):
node
= self
.document
.createTextNode(chars
)
self
.lastEvent
[1] = [(CHARACTERS
, node
), None]
self
.lastEvent
= self
.lastEvent
[1]
if self
.documentFactory
is None:
self
.documentFactory
= xml
.dom
.minidom
.Document
.implementation
def buildDocument(self
, uri
, tagname
):
# Can't do that in startDocument, since we need the tagname
# XXX: obtain DocumentType
node
= self
.documentFactory
.createDocument(uri
, tagname
, None)
self
.lastEvent
[1] = [(START_DOCUMENT
, node
), None]
self
.lastEvent
= self
.lastEvent
[1]
# Put everything we have seen so far into the document
for e
in self
.pending_events
:
if e
[0][0] == PROCESSING_INSTRUCTION
:
n
= self
.document
.createProcessingInstruction(target
, data
)
e
[0] = (PROCESSING_INSTRUCTION
, n
)
n
= self
.document
.createComment(e
[0][1])
raise AssertionError("Unknown pending event ",e
[0][0])
self
.pending_events
= None
self
.lastEvent
[1] = [(END_DOCUMENT
, self
.document
), None]
"clear(): Explicitly release parsing structures"
def warning(self
, exception
):
def error(self
, exception
):
def fatalError(self
, exception
):
def __init__(self
, stream
, parser
, bufsize
):
if not hasattr(self
.parser
, 'feed'):
self
.getEvent
= self
._slurp
# This content handler relies on namespace support
self
.parser
.setFeature(xml
.sax
.handler
.feature_namespaces
, 1)
self
.parser
.setContentHandler(self
.pulldom
)
def __getitem__(self
, pos
):
def expandNode(self
, node
):
parents
[-1].appendChild(cur_node
)
if token
== START_ELEMENT
:
elif token
== END_ELEMENT
:
# use IncrementalParser interface, so we get the desired
if not self
.pulldom
.firstEvent
[1]:
self
.pulldom
.lastEvent
= self
.pulldom
.firstEvent
while not self
.pulldom
.firstEvent
[1]:
buf
= self
.stream
.read(self
.bufsize
)
rc
= self
.pulldom
.firstEvent
[1][0]
self
.pulldom
.firstEvent
[1] = self
.pulldom
.firstEvent
[1][1]
""" Fallback replacement for getEvent() using the
standard SAX2 interface, which means we slurp the
SAX events into memory (no performance gain, but
we are compatible to all SAX parsers).
self
.parser
.parse(self
.stream
)
self
.getEvent
= self
._emit
""" Fallback replacement for getEvent() that emits
the events that _slurp() read previously.
rc
= self
.pulldom
.firstEvent
[1][0]
self
.pulldom
.firstEvent
[1] = self
.pulldom
.firstEvent
[1][1]
"""clear(): Explicitly release parsing objects"""
def startElementNS(self
, name
, tagName
, attrs
):
PullDOM
.startElementNS(self
, name
, tagName
, attrs
)
curNode
= self
.elementStack
[-1]
parentNode
= self
.elementStack
[-2]
parentNode
.appendChild(curNode
)
def startElement(self
, name
, attrs
):
PullDOM
.startElement(self
, name
, attrs
)
curNode
= self
.elementStack
[-1]
parentNode
= self
.elementStack
[-2]
parentNode
.appendChild(curNode
)
def processingInstruction(self
, target
, data
):
PullDOM
.processingInstruction(self
, target
, data
)
node
= self
.lastEvent
[0][1]
parentNode
= self
.elementStack
[-1]
parentNode
.appendChild(node
)
def ignorableWhitespace(self
, chars
):
PullDOM
.ignorableWhitespace(self
, chars
)
node
= self
.lastEvent
[0][1]
parentNode
= self
.elementStack
[-1]
parentNode
.appendChild(node
)
def characters(self
, chars
):
PullDOM
.characters(self
, chars
)
node
= self
.lastEvent
[0][1]
parentNode
= self
.elementStack
[-1]
parentNode
.appendChild(node
)
default_bufsize
= (2 ** 14) - 20
def parse(stream_or_string
, parser
=None, bufsize
=None):
bufsize
= default_bufsize
if type(stream_or_string
) in _StringTypes
:
stream
= open(stream_or_string
)
stream
= stream_or_string
parser
= xml
.sax
.make_parser()
return DOMEventStream(stream
, parser
, bufsize
)
def parseString(string
, parser
=None):
from cStringIO
import StringIO
from StringIO
import StringIO
parser
= xml
.sax
.make_parser()
return DOMEventStream(buf
, parser
, bufsize
)