SAX driver for the pyexpat C module. This driver works with
pyexpat.__version__ == '2.22'.
from xml
.sax
._exceptions
import *
from xml
.sax
.handler
import feature_validation
, feature_namespaces
from xml
.sax
.handler
import feature_namespace_prefixes
from xml
.sax
.handler
import feature_external_ges
, feature_external_pes
from xml
.sax
.handler
import feature_string_interning
from xml
.sax
.handler
import property_xml_string
, property_interning_dict
# xml.parsers.expat does not raise ImportError in Jython
if sys
.platform
[:4] == "java":
raise SAXReaderNotAvailable("expat not available in Java", None)
from xml
.parsers
import expat
raise SAXReaderNotAvailable("expat not supported", None)
if not hasattr(expat
, "ParserCreate"):
raise SAXReaderNotAvailable("expat not supported", None)
from xml
.sax
import xmlreader
, saxutils
, handler
AttributesImpl
= xmlreader
.AttributesImpl
AttributesNSImpl
= xmlreader
.AttributesNSImpl
# If we're using a sufficiently recent version of Python, we can use
# weak references to avoid cycles between the parser and content
# handler, otherwise we'll just have to pretend.
class ExpatLocator(xmlreader
.Locator
):
"""Locator for use with the ExpatParser class.
This uses a weak reference to the parser object to avoid creating
a circular reference between the parser and the content handler.
def __init__(self
, parser
):
self
._ref
= _mkproxy(parser
)
def getColumnNumber(self
):
if parser
._parser
is None:
return parser
._parser
.ErrorColumnNumber
if parser
._parser
is None:
return parser
._parser
.ErrorLineNumber
return parser
._source
.getPublicId()
return parser
._source
.getSystemId()
class ExpatParser(xmlreader
.IncrementalParser
, xmlreader
.Locator
):
"""SAX driver for the pyexpat C module."""
def __init__(self
, namespaceHandling
=0, bufsize
=2**16-20):
xmlreader
.IncrementalParser
.__init
__(self
, bufsize
)
self
._source
= xmlreader
.InputSource()
self
._namespaces
= namespaceHandling
self
._lex
_handler
_prop
= None
"Parse an XML document from a URL or an InputSource."
source
= saxutils
.prepare_input_source(source
)
self
._cont
_handler
.setDocumentLocator(ExpatLocator(self
))
xmlreader
.IncrementalParser
.parse(self
, source
)
def prepareParser(self
, source
):
if source
.getSystemId() != None:
self
._parser
.SetBase(source
.getSystemId())
# Redefined setContentHandler to allow changing handlers during parsing
def setContentHandler(self
, handler
):
xmlreader
.IncrementalParser
.setContentHandler(self
, handler
)
self
._reset
_cont
_handler
()
def getFeature(self
, name
):
if name
== feature_namespaces
:
elif name
== feature_string_interning
:
return self
._interning
is not None
elif name
in (feature_validation
, feature_external_pes
,
feature_namespace_prefixes
):
elif name
== feature_external_ges
:
return self
._external
_ges
raise SAXNotRecognizedException("Feature '%s' not recognized" % name
)
def setFeature(self
, name
, state
):
raise SAXNotSupportedException("Cannot set features while parsing")
if name
== feature_namespaces
:
elif name
== feature_external_ges
:
self
._external
_ges
= state
elif name
== feature_string_interning
:
if self
._interning
is None:
elif name
== feature_validation
:
raise SAXNotSupportedException(
"expat does not support validation")
elif name
== feature_external_pes
:
raise SAXNotSupportedException(
"expat does not read external parameter entities")
elif name
== feature_namespace_prefixes
:
raise SAXNotSupportedException(
"expat does not report namespace prefixes")
raise SAXNotRecognizedException(
"Feature '%s' not recognized" % name
)
def getProperty(self
, name
):
if name
== handler
.property_lexical_handler
:
return self
._lex
_handler
_prop
elif name
== property_interning_dict
:
elif name
== property_xml_string
:
if hasattr(self
._parser
, "GetInputContext"):
return self
._parser
.GetInputContext()
raise SAXNotRecognizedException(
"This version of expat does not support getting"
raise SAXNotSupportedException(
"XML string cannot be returned when not parsing")
raise SAXNotRecognizedException("Property '%s' not recognized" % name
)
def setProperty(self
, name
, value
):
if name
== handler
.property_lexical_handler
:
self
._lex
_handler
_prop
= value
self
._reset
_lex
_handler
_prop
()
elif name
== property_interning_dict
:
elif name
== property_xml_string
:
raise SAXNotSupportedException("Property '%s' cannot be set" %
raise SAXNotRecognizedException("Property '%s' not recognized" %
# IncrementalParser methods
def feed(self
, data
, isFinal
= 0):
self
._cont
_handler
.startDocument()
# The isFinal parameter is internal to the expat reader.
# If it is set to true, expat will check validity of the entire
# document. When feeding chunks, they are not normally final -
# except when invoked from close.
self
._parser
.Parse(data
, isFinal
)
exc
= SAXParseException(expat
.ErrorString(e
.code
), e
, self
)
# FIXME: when to invoke error()?
self
._err
_handler
.fatalError(exc
)
# If we are completing an external entity, do nothing here
self
.feed("", isFinal
= 1)
self
._cont
_handler
.endDocument()
# break cycle created by expat handlers pointing to our methods
def _reset_cont_handler(self
):
self
._parser
.ProcessingInstructionHandler
= \
self
._cont
_handler
.processingInstruction
self
._parser
.CharacterDataHandler
= self
._cont
_handler
.characters
def _reset_lex_handler_prop(self
):
lex
= self
._lex
_handler
_prop
parser
.CommentHandler
= None
parser
.StartCdataSectionHandler
= None
parser
.EndCdataSectionHandler
= None
parser
.StartDoctypeDeclHandler
= None
parser
.EndDoctypeDeclHandler
= None
parser
.CommentHandler
= lex
.comment
parser
.StartCdataSectionHandler
= lex
.startCDATA
parser
.EndCdataSectionHandler
= lex
.endCDATA
parser
.StartDoctypeDeclHandler
= self
.start_doctype_decl
parser
.EndDoctypeDeclHandler
= lex
.endDTD
self
._parser
= expat
.ParserCreate(None, " ",
self
._parser
.namespace_prefixes
= 1
self
._parser
.StartElementHandler
= self
.start_element_ns
self
._parser
.EndElementHandler
= self
.end_element_ns
self
._parser
= expat
.ParserCreate(intern = self
._interning
)
self
._parser
.StartElementHandler
= self
.start_element
self
._parser
.EndElementHandler
= self
.end_element
self
._reset
_cont
_handler
()
self
._parser
.UnparsedEntityDeclHandler
= self
.unparsed_entity_decl
self
._parser
.NotationDeclHandler
= self
.notation_decl
self
._parser
.StartNamespaceDeclHandler
= self
.start_namespace_decl
self
._parser
.EndNamespaceDeclHandler
= self
.end_namespace_decl
self
._decl
_handler
_prop
= None
if self
._lex
_handler
_prop
:
self
._reset
_lex
_handler
_prop
()
# self._parser.DefaultHandler =
# self._parser.DefaultHandlerExpand =
# self._parser.NotStandaloneHandler =
self
._parser
.ExternalEntityRefHandler
= self
.external_entity_ref
self
._parser
.SkippedEntityHandler
= self
.skipped_entity_handler
# This pyexpat does not support SkippedEntity
self
._parser
.SetParamEntityParsing(
expat
.XML_PARAM_ENTITY_PARSING_UNLESS_STANDALONE
)
def getColumnNumber(self
):
return self
._parser
.ErrorColumnNumber
return self
._parser
.ErrorLineNumber
return self
._source
.getPublicId()
return self
._source
.getSystemId()
def start_element(self
, name
, attrs
):
self
._cont
_handler
.startElement(name
, AttributesImpl(attrs
))
def end_element(self
, name
):
self
._cont
_handler
.endElement(name
)
def start_element_ns(self
, name
, attrs
):
for (aname
, value
) in attrs
.items():
qname
= "%s:%s" % (parts
[2], parts
[1])
apair
= parts
[0], parts
[1]
self
._cont
_handler
.startElementNS(pair
, None,
AttributesNSImpl(newattrs
, qnames
))
def end_element_ns(self
, name
):
self
._cont
_handler
.endElementNS(pair
, None)
# this is not used (call directly to ContentHandler)
def processing_instruction(self
, target
, data
):
self
._cont
_handler
.processingInstruction(target
, data
)
# this is not used (call directly to ContentHandler)
def character_data(self
, data
):
self
._cont
_handler
.characters(data
)
def start_namespace_decl(self
, prefix
, uri
):
self
._cont
_handler
.startPrefixMapping(prefix
, uri
)
def end_namespace_decl(self
, prefix
):
self
._cont
_handler
.endPrefixMapping(prefix
)
def start_doctype_decl(self
, name
, sysid
, pubid
, has_internal_subset
):
self
._lex
_handler
_prop
.startDTD(name
, pubid
, sysid
)
def unparsed_entity_decl(self
, name
, base
, sysid
, pubid
, notation_name
):
self
._dtd
_handler
.unparsedEntityDecl(name
, pubid
, sysid
, notation_name
)
def notation_decl(self
, name
, base
, sysid
, pubid
):
self
._dtd
_handler
.notationDecl(name
, pubid
, sysid
)
def external_entity_ref(self
, context
, base
, sysid
, pubid
):
if not self
._external
_ges
:
source
= self
._ent
_handler
.resolveEntity(pubid
, sysid
)
source
= saxutils
.prepare_input_source(source
,
self
._source
.getSystemId() or
self
._entity
_stack
.append((self
._parser
, self
._source
))
self
._parser
= self
._parser
.ExternalEntityParserCreate(context
)
xmlreader
.IncrementalParser
.parse(self
, source
)
return 0 # FIXME: save error info here?
(self
._parser
, self
._source
) = self
._entity
_stack
[-1]
del self
._entity
_stack
[-1]
def skipped_entity_handler(self
, name
, is_pe
):
# The SAX spec requires to report skipped PEs with a '%'
self
._cont
_handler
.skippedEntity(name
)
def create_parser(*args
, **kwargs
):
return ExpatParser(*args
, **kwargs
)
if __name__
== "__main__":
p
.setContentHandler(xml
.sax
.XMLGenerator())
p
.setErrorHandler(xml
.sax
.ErrorHandler())
p
.parse("../../../hamlet.xml")