Blob Blame Raw
# -*- coding: utf-8 -*-

"""
 (c) 2014-2017 - Copyright Red Hat Inc

 Authors:
   Pierre-Yves Chibon <pingou@pingoured.fr>

"""

from __future__ import unicode_literals, absolute_import

import logging

import flask
from flask import Markup
import munch
from sqlalchemy.exc import SQLAlchemyError

import pagure.lib.query
from pagure.config import config as pagure_config
from pagure.flask_app import logout
from pagure.utils import is_admin

from flask_oidc import OpenIDConnect

oidc = OpenIDConnect()


_log = logging.getLogger(__name__)


def fas_user_from_oidc():
    if "oidc_cached_userdata" in flask.session:
        flask.g.fas_user = munch.Munch(**flask.session["oidc_cached_userdata"])
    elif oidc.user_loggedin and "oidc_logintime" in flask.session:
        email_key, fulln_key, usern_key, ssh_key, groups_key = [
            pagure_config["OIDC_PAGURE_EMAIL"],
            pagure_config["OIDC_PAGURE_FULLNAME"],
            pagure_config["OIDC_PAGURE_USERNAME"],
            pagure_config["OIDC_PAGURE_SSH_KEY"],
            pagure_config["OIDC_PAGURE_GROUPS"],
        ]
        info = oidc.user_getinfo(
            [email_key, fulln_key, usern_key, ssh_key, groups_key]
        )
        username = info.get(usern_key)
        if not username:
            fb = pagure_config["OIDC_PAGURE_USERNAME_FALLBACK"]
            if fb == "email":
                username = info[email_key].split("@")[0]
            elif fb == "sub":
                username = flask.g.oidc_id_token["sub"]
        flask.g.fas_user = munch.Munch(
            username=username,
            fullname=info.get(fulln_key, ""),
            email=info[email_key],
            ssh_key=info.get(ssh_key),
            groups=info.get(groups_key, []),
            login_time=flask.session["oidc_logintime"],
        )
        flask.session["oidc_cached_userdata"] = dict(flask.g.fas_user)


def set_user():
    if flask.g.fas_user.username is None:
        flask.flash(
            "It looks like your Identity Provider did not provide an "
            "username we could retrieve, username being needed we cannot "
            "go further.",
            "error",
        )
        logout()
        return

    flask.session["_new_user"] = False
    if not pagure.lib.query.search_user(
        flask.g.session, username=flask.g.fas_user.username
    ):
        flask.session["_new_user"] = True

    try:
        pagure.lib.query.set_up_user(
            session=flask.g.session,
            username=flask.g.fas_user.username,
            fullname=flask.g.fas_user.fullname,
            default_email=flask.g.fas_user.email,
            ssh_key=flask.g.fas_user.get("ssh_key"),
            keydir=pagure_config.get("GITOLITE_KEYDIR", None),
        )

        # If groups are managed outside pagure, set up the user at login
        if not pagure_config.get("ENABLE_GROUP_MNGT", False):
            user = pagure.lib.query.search_user(
                flask.g.session, username=flask.g.fas_user.username
            )
            old_groups = set(user.groups)
            fas_groups = set(flask.g.fas_user.groups)
            # Add the new groups
            for group in fas_groups - old_groups:
                groupobj = None
                if group:
                    groupobj = pagure.lib.query.search_groups(
                        flask.g.session, group_name=group
                    )
                if groupobj:
                    try:
                        pagure.lib.query.add_user_to_group(
                            session=flask.g.session,
                            username=flask.g.fas_user.username,
                            group=groupobj,
                            user=flask.g.fas_user.username,
                            is_admin=is_admin(),
                            from_external=True,
                        )
                    except pagure.exceptions.PagureException as err:
                        _log.error(err)
            # Remove the old groups
            for group in old_groups - fas_groups:
                if group:
                    try:
                        pagure.lib.query.delete_user_of_group(
                            session=flask.g.session,
                            username=flask.g.fas_user.username,
                            groupname=group,
                            user=flask.g.fas_user.username,
                            is_admin=is_admin(),
                            force=True,
                            from_external=True,
                        )
                    except pagure.exceptions.PagureException as err:
                        _log.error(err)

        flask.g.session.commit()
    except SQLAlchemyError as err:
        flask.g.session.rollback()
        _log.exception(err)
        message = Markup(
            "Could not set up you as a user properly,"
            ' please <a href="/about">contact an administrator</a>'
        )
        flask.flash(message, "error")
        # Ensure the user is logged out if we cannot set them up
        # correctly
        logout()


def oidc_logout():
    flask.g.fas_user = None
    del flask.session["oidc_logintime"]
    del flask.session["oidc_cached_userdata"]
    oidc.logout()