import cgi
import urllib.parse
import answer
from model.model import Model
class Request:
def __init__(self, server, env):
self.server = server
self.env = env
self.method = str(self.env["REQUEST_METHOD"])
self.protocol = self.server.config['protocol']
self.domain = self.server.config['domain']
self.prefix = self.server.config['urlprefix']
self.urlvars = dict()
self.postvars = dict()
self.action = ''
self.path = None
self.cookie = ''
self.connection = None
self.model = None
self.user = None
self.answer = None
self.session = None
# read url vars
if 'QUERY_STRING' in self.env:
urlvars = urllib.parse.parse_qs(self.env["QUERY_STRING"])
for k, v in urlvars.items():
assert k not in self.urlvars
self.urlvars[k] = v
# read cookies
self.cookie = self.env.get('HTTP_COOKIE', '')
assert(type(self.cookie) is str)
# read path
path = self.env["REQUEST_URI"].split('?')[0]
if path.startswith(self.prefix):
path = path[len(self.prefix):]
path_list = [x for x in path.split('/') if x]
path_list_filtered = list()
for i in path_list:
if i == '..':
if path_list_filtered:
path_list_filtered.pop();
elif i != '.':
path_list_filtered.append(i)
self.path = path_list_filtered
self.answer = answer.Answer(self)
self.server.sessions.attach_session(self)
def read_postvars(self):
if self.method == 'POST':
postvars = cgi.FieldStorage(
fp = self.env["wsgi.input"],
environ = self.env )
duplicates = set()
for k in postvars:
kk = str(k)
if kk in self.postvars:
duplicates.add(kk)
del self.postvars[kk]
elif not kk in duplicates:
self.postvars[kk] = postvars[kk].value
# get action
if 'action' in self.postvars:
action = self.postvars['action']
del self.postvars['action']
if not action is None:
self.action = str(action)
def create_model(self, connection):
self.connection = connection
self.model = Model(self.connection, self.answer, self.session.user_id if self.session else 0)
if self.session:
self.user = self.model.users.get_by_id(self.session.user_id)
if not self.user:
self.server.sessions.close_session(self)
self.model = Model(self.connection, self.answer, 0)
def get_urlpath(self, path = None, domain = False):
if path is None:
path = self.path
assert(type(path) is list)
urlpath = self.prefix + '/' + '/'.join(path)
if domain:
urlpath = 'https://' + self.domain + urlpath
return urlpath
def get_urlpath_escaped(self, path = None, domain = False):
return self.answer.e( self.get_urlpath(path, domain) )
def translate(self, text):
return self.answer.translate(text)
def t(self, text):
return self.translate(text)