"""CGI-savvy HTTP Server.
This module builds on SimpleHTTPServer by implementing GET and POST
requests to cgi-bin scripts.
If the os.fork() function is not present (e.g. on Windows),
os.popen2() is used as a fallback, with slightly altered semantics; if
that function is not present either (e.g. on Macintosh), only Python
scripts are supported, and they are executed by the current process.
In all cases, the implementation is intentionally naive -- all
requests are executed sychronously.
SECURITY WARNING: DON'T USE THIS CODE UNLESS YOU ARE INSIDE A FIREWALL
-- it may execute arbitrary Python code or external programs.
__all__
= ["CGIHTTPRequestHandler"]
class CGIHTTPRequestHandler(SimpleHTTPServer
.SimpleHTTPRequestHandler
):
"""Complete HTTP server with GET, HEAD and POST commands.
GET and HEAD also support running CGI scripts.
The POST command is *only* implemented for CGI scripts.
# Determine platform specifics
have_fork
= hasattr(os
, 'fork')
have_popen2
= hasattr(os
, 'popen2')
have_popen3
= hasattr(os
, 'popen3')
# Make rfile unbuffered -- we need to read one line and then pass
# the rest to a subprocess, so we can't use buffered input.
This is only implemented for CGI scripts.
self
.send_error(501, "Can only POST to CGI scripts")
"""Version of send_head that support CGI scripts"""
return SimpleHTTPServer
.SimpleHTTPRequestHandler
.send_head(self
)
"""Test whether self.path corresponds to a CGI script.
Return a tuple (dir, rest) if self.path requires running a
CGI script, None if not. Note that rest begins with a
slash if it is not empty.
The default implementation tests whether the path
begins with one of the strings in the list
self.cgi_directories (and the next character is a '/'
or the end of the string).
for x
in self
.cgi_directories
:
if path
[:i
] == x
and (not path
[i
:] or path
[i
] == '/'):
self
.cgi_info
= path
[:i
], path
[i
+1:]
cgi_directories
= ['/cgi-bin', '/htbin']
def is_executable(self
, path
):
"""Test whether argument path is an executable file."""
def is_python(self
, path
):
"""Test whether argument path is a Python script."""
head
, tail
= os
.path
.splitext(path
)
return tail
.lower() in (".py", ".pyw")
"""Execute a CGI script."""
dir, rest
= self
.cgi_info
rest
, query
= rest
[:i
], rest
[i
+1:]
script
, rest
= rest
[:i
], rest
[i
:]
scriptname
= dir + '/' + script
scriptfile
= self
.translate_path(scriptname
)
if not os
.path
.exists(scriptfile
):
self
.send_error(404, "No such CGI script (%r)" % scriptname
)
if not os
.path
.isfile(scriptfile
):
self
.send_error(403, "CGI script is not a plain file (%r)" %
ispy
= self
.is_python(scriptname
)
if not (self
.have_fork
or self
.have_popen2
or self
.have_popen3
):
self
.send_error(403, "CGI script is not a Python script (%r)" %
if not self
.is_executable(scriptfile
):
self
.send_error(403, "CGI script is not executable (%r)" %
# Reference: http://hoohoo.ncsa.uiuc.edu/cgi/env.html
# XXX Much of the following could be prepared ahead of time!
env
['SERVER_SOFTWARE'] = self
.version_string()
env
['SERVER_NAME'] = self
.server
.server_name
env
['GATEWAY_INTERFACE'] = 'CGI/1.1'
env
['SERVER_PROTOCOL'] = self
.protocol_version
env
['SERVER_PORT'] = str(self
.server
.server_port
)
env
['REQUEST_METHOD'] = self
.command
uqrest
= urllib
.unquote(rest
)
env
['PATH_INFO'] = uqrest
env
['PATH_TRANSLATED'] = self
.translate_path(uqrest
)
env
['SCRIPT_NAME'] = scriptname
env
['QUERY_STRING'] = query
host
= self
.address_string()
if host
!= self
.client_address
[0]:
env
['REMOTE_HOST'] = host
env
['REMOTE_ADDR'] = self
.client_address
[0]
authorization
= self
.headers
.getheader("authorization")
authorization
= authorization
.split()
if len(authorization
) == 2:
env
['AUTH_TYPE'] = authorization
[0]
if authorization
[0].lower() == "basic":
authorization
= base64
.decodestring(authorization
[1])
authorization
= authorization
.split(':')
if len(authorization
) == 2:
env
['REMOTE_USER'] = authorization
[0]
if self
.headers
.typeheader
is None:
env
['CONTENT_TYPE'] = self
.headers
.type
env
['CONTENT_TYPE'] = self
.headers
.typeheader
length
= self
.headers
.getheader('content-length')
env
['CONTENT_LENGTH'] = length
for line
in self
.headers
.getallmatchingheaders('accept'):
if line
[:1] in "\t\n\r ":
accept
.append(line
.strip())
accept
= accept
+ line
[7:].split(',')
env
['HTTP_ACCEPT'] = ','.join(accept
)
ua
= self
.headers
.getheader('user-agent')
env
['HTTP_USER_AGENT'] = ua
co
= filter(None, self
.headers
.getheaders('cookie'))
env
['HTTP_COOKIE'] = ', '.join(co
)
# XXX Other HTTP_* headers
# Since we're setting the env in the parent, provide empty
# values to override previously set values
for k
in ('QUERY_STRING', 'REMOTE_HOST', 'CONTENT_LENGTH',
'HTTP_USER_AGENT', 'HTTP_COOKIE'):
self
.send_response(200, "Script output follows")
decoded_query
= query
.replace('+', ' ')
# Unix -- fork as we should
if '=' not in decoded_query
:
args
.append(decoded_query
)
self
.wfile
.flush() # Always flush before forking
pid
, sts
= os
.waitpid(pid
, 0)
# throw away additional data [see bug #427345]
while select
.select([self
.rfile
], [], [], 0)[0]:
if not self
.rfile
.read(1):
self
.log_error("CGI script exit status %#x", sts
)
os
.dup2(self
.rfile
.fileno(), 0)
os
.dup2(self
.wfile
.fileno(), 1)
os
.execve(scriptfile
, args
, os
.environ
)
self
.server
.handle_error(self
.request
, self
.client_address
)
elif self
.have_popen2
or self
.have_popen3
:
# Windows -- use popen2 or popen3 to create a subprocess
if self
.is_python(scriptfile
):
if interp
.lower().endswith("w.exe"):
# On Windows, use python.exe, not pythonw.exe
interp
= interp
[:-5] + interp
[-4:]
cmdline
= "%s -u %s" % (interp
, cmdline
)
if '=' not in query
and '"' not in query
:
cmdline
= '%s "%s"' % (cmdline
, query
)
self
.log_message("command: %s", cmdline
)
except (TypeError, ValueError):
files
= popenx(cmdline
, 'b')
if self
.command
.lower() == "post" and nbytes
> 0:
data
= self
.rfile
.read(nbytes
)
# throw away additional data [see bug #427345]
while select
.select([self
.rfile
._sock
], [], [], 0)[0]:
if not self
.rfile
._sock
.recv(1):
shutil
.copyfileobj(fo
, self
.wfile
)
self
.log_error('%s', errors
)
self
.log_error("CGI script exit status %#x", sts
)
self
.log_message("CGI script exited OK")
# Other O.S. -- execute script in this process
if '=' not in decoded_query
:
sys
.argv
.append(decoded_query
)
execfile(scriptfile
, {"__name__": "__main__"})
self
.log_error("CGI script exit status %s", str(sts
))
self
.log_message("CGI script exited OK")
"""Internal routine to get nobody's uid"""
nobody
= pwd
.getpwnam('nobody')[2]
nobody
= 1 + max(map(lambda x
: x
[2], pwd
.getpwall()))
"""Test for executable file."""
return st
.st_mode
& 0111 != 0
def test(HandlerClass
= CGIHTTPRequestHandler
,
ServerClass
= BaseHTTPServer
.HTTPServer
):
SimpleHTTPServer
.test(HandlerClass
, ServerClass
)
if __name__
== '__main__':