A library of useful helper classes to the SAX classes, for the
convenience of application and driver writers.
import os
, urlparse
, urllib
, types
_StringTypes
= [types
.StringType
, types
.UnicodeType
]
_StringTypes
= [types
.StringType
]
# See whether the xmlcharrefreplace error handler is
from codecs
import xmlcharrefreplace_errors
_error_handling
= "xmlcharrefreplace"
del xmlcharrefreplace_errors
_error_handling
= "strict"
def __dict_replace(s
, d
):
"""Replace substrings of a string using a dictionary."""
for key
, value
in d
.items():
s
= s
.replace(key
, value
)
def escape(data
, entities
={}):
"""Escape &, <, and > in a string of data.
You can escape other strings of data by passing a dictionary as
the optional entities parameter. The keys and values must all be
strings; each key will be replaced with its corresponding value.
# must do ampersand first
data
= data
.replace("&", "&")
data
= data
.replace(">", ">")
data
= data
.replace("<", "<")
data
= __dict_replace(data
, entities
)
def unescape(data
, entities
={}):
"""Unescape &, <, and > in a string of data.
You can unescape other strings of data by passing a dictionary as
the optional entities parameter. The keys and values must all be
strings; each key will be replaced with its corresponding value.
data
= data
.replace("<", "<")
data
= data
.replace(">", ">")
data
= __dict_replace(data
, entities
)
return data
.replace("&", "&")
def quoteattr(data
, entities
={}):
"""Escape and quote an attribute value.
Escape &, <, and > in a string of data, then quote it for use as
an attribute value. The \" character will be escaped as well, if
You can escape other strings of data by passing a dictionary as
the optional entities parameter. The keys and values must all be
strings; each key will be replaced with its corresponding value.
data
= escape(data
, entities
)
data
= '"%s"' % data
.replace('"', """)
class XMLGenerator(handler
.ContentHandler
):
def __init__(self
, out
=None, encoding
="iso-8859-1"):
handler
.ContentHandler
.__init
__(self
)
self
._ns
_contexts
= [{}] # contains uri -> prefix dicts
self
._current
_context
= self
._ns
_contexts
[-1]
self
._undeclared
_ns
_maps
= []
self
._encoding
= encoding
if isinstance(text
, str):
self
._out
.write(text
.encode(self
._encoding
, _error_handling
))
self
._write
('<?xml version="1.0" encoding="%s"?>\n' %
def startPrefixMapping(self
, prefix
, uri
):
self
._ns
_contexts
.append(self
._current
_context
.copy())
self
._current
_context
[uri
] = prefix
self
._undeclared
_ns
_maps
.append((prefix
, uri
))
def endPrefixMapping(self
, prefix
):
self
._current
_context
= self
._ns
_contexts
[-1]
del self
._ns
_contexts
[-1]
def startElement(self
, name
, attrs
):
for (name
, value
) in attrs
.items():
self
._write
(' %s=%s' % (name
, quoteattr(value
)))
def endElement(self
, name
):
self
._write
('</%s>' % name
)
def startElementNS(self
, name
, qname
, attrs
):
# if the name was not namespace-scoped, use the unqualified part
# else try to restore the original prefix from the namespace
name
= self
._current
_context
[name
[0]] + ":" + name
[1]
for pair
in self
._undeclared
_ns
_maps
:
self
._write
(' xmlns:%s="%s"' % pair
)
self
._undeclared
_ns
_maps
= []
for (name
, value
) in attrs
.items():
name
= self
._current
_context
[name
[0]] + ":" + name
[1]
self
._write
(' %s=%s' % (name
, quoteattr(value
)))
def endElementNS(self
, name
, qname
):
name
= self
._current
_context
[name
[0]] + ":" + name
[1]
self
._write
('</%s>' % name
)
def characters(self
, content
):
self
._write
(escape(content
))
def ignorableWhitespace(self
, content
):
def processingInstruction(self
, target
, data
):
self
._write
('<?%s %s?>' % (target
, data
))
class XMLFilterBase(xmlreader
.XMLReader
):
"""This class is designed to sit between an XMLReader and the
client application's event handlers. By default, it does nothing
but pass requests up to the reader and events on to the handlers
unmodified, but subclasses can override specific methods to modify
the event stream or the configuration requests as they pass
def __init__(self
, parent
= None):
xmlreader
.XMLReader
.__init
__(self
)
def error(self
, exception
):
self
._err
_handler
.error(exception
)
def fatalError(self
, exception
):
self
._err
_handler
.fatalError(exception
)
def warning(self
, exception
):
self
._err
_handler
.warning(exception
)
def setDocumentLocator(self
, locator
):
self
._cont
_handler
.setDocumentLocator(locator
)
self
._cont
_handler
.startDocument()
self
._cont
_handler
.endDocument()
def startPrefixMapping(self
, prefix
, uri
):
self
._cont
_handler
.startPrefixMapping(prefix
, uri
)
def endPrefixMapping(self
, prefix
):
self
._cont
_handler
.endPrefixMapping(prefix
)
def startElement(self
, name
, attrs
):
self
._cont
_handler
.startElement(name
, attrs
)
def endElement(self
, name
):
self
._cont
_handler
.endElement(name
)
def startElementNS(self
, name
, qname
, attrs
):
self
._cont
_handler
.startElementNS(name
, qname
, attrs
)
def endElementNS(self
, name
, qname
):
self
._cont
_handler
.endElementNS(name
, qname
)
def characters(self
, content
):
self
._cont
_handler
.characters(content
)
def ignorableWhitespace(self
, chars
):
self
._cont
_handler
.ignorableWhitespace(chars
)
def processingInstruction(self
, target
, data
):
self
._cont
_handler
.processingInstruction(target
, data
)
def skippedEntity(self
, name
):
self
._cont
_handler
.skippedEntity(name
)
def notationDecl(self
, name
, publicId
, systemId
):
self
._dtd
_handler
.notationDecl(name
, publicId
, systemId
)
def unparsedEntityDecl(self
, name
, publicId
, systemId
, ndata
):
self
._dtd
_handler
.unparsedEntityDecl(name
, publicId
, systemId
, ndata
)
def resolveEntity(self
, publicId
, systemId
):
return self
._ent
_handler
.resolveEntity(publicId
, systemId
)
self
._parent
.setContentHandler(self
)
self
._parent
.setErrorHandler(self
)
self
._parent
.setEntityResolver(self
)
self
._parent
.setDTDHandler(self
)
self
._parent
.parse(source
)
def setLocale(self
, locale
):
self
._parent
.setLocale(locale
)
def getFeature(self
, name
):
return self
._parent
.getFeature(name
)
def setFeature(self
, name
, state
):
self
._parent
.setFeature(name
, state
)
def getProperty(self
, name
):
return self
._parent
.getProperty(name
)
def setProperty(self
, name
, value
):
self
._parent
.setProperty(name
, value
)
def setParent(self
, parent
):
def prepare_input_source(source
, base
= ""):
"""This function takes an InputSource and an optional base URL and
returns a fully resolved InputSource object ready for reading."""
if type(source
) in _StringTypes
:
source
= xmlreader
.InputSource(source
)
elif hasattr(source
, "read"):
source
= xmlreader
.InputSource()
source
.setSystemId(f
.name
)
if source
.getByteStream() is None:
sysid
= source
.getSystemId()
basehead
= os
.path
.dirname(os
.path
.normpath(base
))
sysidfilename
= os
.path
.join(basehead
, sysid
)
if os
.path
.isfile(sysidfilename
):
source
.setSystemId(sysidfilename
)
f
= open(sysidfilename
, "rb")
source
.setSystemId(urlparse
.urljoin(base
, sysid
))
f
= urllib
.urlopen(source
.getSystemId())