mirror of
https://github.com/classilla/tenfourfox.git
synced 2024-10-05 19:56:58 +00:00
103 lines
4.1 KiB
Python
103 lines
4.1 KiB
Python
import json, os, urllib, urlparse
|
|
|
|
def redirect(url, response):
|
|
response.add_required_headers = False
|
|
response.writer.write_status(301)
|
|
response.writer.write_header("access-control-allow-origin", "*")
|
|
response.writer.write_header("location", url)
|
|
response.writer.end_headers()
|
|
response.writer.write("")
|
|
|
|
def create_redirect_url(request, swap_scheme = False):
|
|
parsed = urlparse.urlsplit(request.url)
|
|
destination_netloc = parsed.netloc
|
|
scheme = parsed.scheme
|
|
|
|
if swap_scheme:
|
|
scheme = "http" if parsed.scheme == "https" else "https"
|
|
hostname = parsed.netloc.split(':')[0]
|
|
port = request.server.config["ports"][scheme][0]
|
|
destination_netloc = ":".join([hostname, str(port)])
|
|
|
|
# Remove "redirection" from query to avoid redirect loops.
|
|
parsed_query = dict(urlparse.parse_qsl(parsed.query))
|
|
assert "redirection" in parsed_query
|
|
del parsed_query["redirection"]
|
|
|
|
destination_url = urlparse.urlunsplit(urlparse.SplitResult(
|
|
scheme = scheme,
|
|
netloc = destination_netloc,
|
|
path = parsed.path,
|
|
query = urllib.urlencode(parsed_query),
|
|
fragment = None))
|
|
|
|
return destination_url
|
|
|
|
def main(request, response):
|
|
if "redirection" in request.GET:
|
|
redirection = request.GET["redirection"]
|
|
if redirection == "no-redirect":
|
|
pass
|
|
elif redirection == "keep-scheme-redirect":
|
|
redirect(create_redirect_url(request, swap_scheme=False), response)
|
|
return
|
|
elif redirection == "swap-scheme-redirect":
|
|
redirect(create_redirect_url(request, swap_scheme=True), response)
|
|
return
|
|
else:
|
|
raise ValueError ("Invalid redirect type: %s" % redirection)
|
|
|
|
content_type = "text/plain"
|
|
response_data = ""
|
|
|
|
if "action" in request.GET:
|
|
action = request.GET["action"]
|
|
|
|
if "content_type" in request.GET:
|
|
content_type = request.GET["content_type"]
|
|
|
|
key = request.GET["key"]
|
|
stash = request.server.stash
|
|
path = request.GET.get("path", request.url.split('?'))[0]
|
|
|
|
if action == "put":
|
|
value = request.GET["value"]
|
|
stash.take(key=key, path=path)
|
|
stash.put(key=key, value=value, path=path)
|
|
response_data = json.dumps({"status": "success", "result": key})
|
|
elif action == "purge":
|
|
value = stash.take(key=key, path=path)
|
|
if content_type == "image/png":
|
|
response_data = open(os.path.join(request.doc_root,
|
|
"images",
|
|
"smiley.png")).read()
|
|
elif content_type == "audio/mpeg":
|
|
response_data = open(os.path.join(request.doc_root,
|
|
"media",
|
|
"sound_5.oga")).read()
|
|
elif content_type == "video/mp4":
|
|
response_data = open(os.path.join(request.doc_root,
|
|
"media",
|
|
"movie_5.mp4")).read()
|
|
elif content_type == "application/javascript":
|
|
response_data = open(os.path.join(request.doc_root,
|
|
"mixed-content",
|
|
"generic",
|
|
"worker.js")).read()
|
|
else:
|
|
response_data = "/* purged */"
|
|
elif action == "take":
|
|
value = stash.take(key=key, path=path)
|
|
if value is None:
|
|
status = "allowed"
|
|
else:
|
|
status = "blocked"
|
|
response_data = json.dumps({"status": status, "result": value})
|
|
|
|
response.add_required_headers = False
|
|
response.writer.write_status(200)
|
|
response.writer.write_header("content-type", content_type)
|
|
response.writer.write_header("cache-control", "no-cache; must-revalidate")
|
|
response.writer.end_headers()
|
|
response.writer.write(response_data)
|