Blob Blame Raw


import cgi
import urllib.parse

import answer
from model.model import Model


class Request:
  def __init__(self, server, env):
    self.server = server
    self.env = env
    self.method = str(self.env["REQUEST_METHOD"])
    self.protocol = self.server.config['protocol']
    self.domain = self.server.config['domain']
    self.prefix = self.server.config['urlprefix']
    
    self.urlvars = dict()
    self.postvars = dict()
    self.action = ''
    self.path = None
    self.cookie = ''
    self.connection = None
    self.model = None
    self.user = None
    self.answer = None
    self.session = None

    # read url vars
    if 'QUERY_STRING' in self.env:
      urlvars = urllib.parse.parse_qs(self.env["QUERY_STRING"])
      for k, v in urlvars.items():
        assert k not in self.urlvars
        self.urlvars[k] = v

    # read cookies
    self.cookie = self.env.get('HTTP_COOKIE', '')
    assert(type(self.cookie) is str)

    # read path
    path = self.env["REQUEST_URI"].split('?')[0]
    if path.startswith(self.prefix):
      path = path[len(self.prefix):]
      path_list = [x for x in path.split('/') if x]
      path_list_filtered = list()
      for i in path_list:
        if i == '..':
          if path_list_filtered:
            path_list_filtered.pop();
        elif i != '.':
          path_list_filtered.append(i)
      self.path = path_list_filtered

    self.answer = answer.Answer(self)
    self.server.sessions.attach_session(self)
  
  def read_postvars(self):
    if self.method == 'POST':
      postvars = cgi.FieldStorage(
        fp = self.env["wsgi.input"],
        environ = self.env )
      duplicates = set()
      for k in postvars:
        kk = str(k)
        if kk in self.postvars:
          duplicates.add(kk)
          del self.postvars[kk]
        elif not kk in duplicates:
          self.postvars[kk] = postvars[kk].value
      
      # get action
      if 'action' in self.postvars:
        action = self.postvars['action']
        del self.postvars['action']
        if not action is None:
          self.action = str(action)
  
  def create_model(self, connection):
    self.connection = connection
    self.model = Model(self.connection, self.answer, self.session.user_id if self.session else 0)
    if self.session:
      self.user = self.model.users.get_by_id(self.session.user_id)
      if not self.user:
        self.server.sessions.close_session(self)
        self.model = Model(self.connection, self.answer, 0)

  def get_urlpath(self, path = None, domain = False):
    if path is None:
      path = self.path
    assert(type(path) is list)
    urlpath = self.prefix + '/' + '/'.join(path)
    if domain:
      urlpath = 'https://' + self.domain + urlpath
    return urlpath

  def get_urlpath_escaped(self, path = None, domain = False):
    return self.answer.e( self.get_urlpath(path, domain) )
  
  def translate(self, text):
    return self.answer.translate(text)
  def t(self, text):
    return self.translate(text)