Blob Blame Raw
# -*- coding: utf-8 -*-

"""
 (c) 2018 - Copyright Red Hat Inc

 Authors:
   Pierre-Yves Chibon <pingou@pingoured.fr>

"""

from __future__ import unicode_literals, absolute_import

import datetime
import hashlib
import hmac
import json
import os
import os.path
import time
import uuid

import requests
import six

from celery import Celery
from celery.signals import after_setup_task_logger
from celery.utils.log import get_task_logger
from kitchen.text.converters import to_bytes
from sqlalchemy.exc import SQLAlchemyError

import pagure.lib.query
from pagure.config import config as pagure_config
from pagure.lib.tasks_utils import pagure_task
from pagure.mail_logging import format_callstack
from pagure.lib.lib_ci import trigger_jenkins_build
from pagure.utils import split_project_fullname, set_up_logging

# logging.config.dictConfig(pagure_config.get('LOGGING') or {'version': 1})
_log = get_task_logger(__name__)
_i = 0


if os.environ.get("PAGURE_BROKER_URL"):  # pragma: no cover
    broker_url = os.environ["PAGURE_BROKER_URL"]
elif pagure_config.get("BROKER_URL"):
    broker_url = pagure_config["BROKER_URL"]
else:
    broker_url = "redis://%s" % pagure_config["REDIS_HOST"]

conn = Celery("tasks", broker=broker_url, backend=broker_url)
conn.conf.update(pagure_config["CELERY_CONFIG"])


@after_setup_task_logger.connect
def augment_celery_log(**kwargs):
    set_up_logging(force=True)


def call_web_hooks(project, topic, msg, urls):
    """ Sends the web-hook notification. """
    _log.info("Processing project: %s - topic: %s", project.fullname, topic)
    _log.debug("msg: %s", msg)

    # Send web-hooks notification
    global _i
    _i += 1
    year = datetime.datetime.utcnow().year
    if isinstance(topic, six.text_type):
        topic = to_bytes(topic, encoding="utf8", nonstring="passthru")
    msg["pagure_instance"] = pagure_config["APP_URL"]
    msg["project_fullname"] = project.fullname
    msg = dict(
        topic=topic.decode("utf-8"),
        msg=msg,
        timestamp=int(time.time()),
        msg_id="%s-%s" % (year, uuid.uuid4()),
        i=_i,
    )

    content = json.dumps(msg, sort_keys=True)
    hashhex = hmac.new(
        project.hook_token.encode("utf-8"),
        content.encode("utf-8"),
        hashlib.sha1,
    ).hexdigest()
    hashhex256 = hmac.new(
        project.hook_token.encode("utf-8"),
        content.encode("utf-8"),
        hashlib.sha256,
    ).hexdigest()
    headers = {
        "X-Pagure": pagure_config["APP_URL"],
        "X-Pagure-project": project.fullname,
        "X-Pagure-Signature": hashhex,
        "X-Pagure-Signature-256": hashhex256,
        "X-Pagure-Topic": topic,
        "Content-Type": "application/json",
    }
    for url in sorted(urls):
        url = url.strip()
        _log.info("Calling url %s" % url)
        try:
            req = requests.post(url, headers=headers, data=content, timeout=60)
            if not req:
                _log.info(
                    "An error occured while querying: %s - "
                    "Error code: %s" % (url, req.status_code)
                )
        except (requests.exceptions.RequestException, Exception) as err:
            _log.info(
                "An error occured while querying: %s - Error: %s" % (url, err)
            )


@conn.task(queue=pagure_config.get("WEBHOOK_CELERY_QUEUE", None), bind=True)
@pagure_task
def webhook_notification(
    self, session, topic, msg, namespace=None, name=None, user=None
):
    """ Send webhook notifications about an event on that project.

    :arg session: SQLAlchemy session object
    :type session: sqlalchemy.orm.session.Session
    :arg topic: the topic for the notification
    :type topic: str
    :arg msg: the message to send via web-hook
    :type msg: str
    :kwarg namespace: the namespace of the project
    :type namespace: None or str
    :kwarg name: the name of the project
    :type name: None or str
    :kwarg user: the user of the project, only set if the project is a fork
    :type user: None or str

    """
    project = pagure.lib.query._get_project(
        session, namespace=namespace, name=name, user=user
    )

    if not project:
        session.close()
        raise RuntimeError(
            "Project: %s/%s from user: %s not found in the DB"
            % (namespace, name, user)
        )

    urls = project.settings.get("Web-hooks")
    if not urls:
        _log.info("No URLs set: %s" % urls)
        return

    urls = urls.split("\n")
    _log.info("Got the project and urls, going to the webhooks")
    call_web_hooks(project, topic, msg, urls)


@conn.task(queue=pagure_config.get("LOGCOM_CELERY_QUEUE", None), bind=True)
@pagure_task
def log_commit_send_notifications(
    self,
    session,
    name,
    commits,
    abspath,
    branch,
    default_branch,
    namespace=None,
    username=None,
):
    """ Send webhook notifications about an event on that project.

    :arg session: SQLAlchemy session object
    :type session: sqlalchemy.orm.session.Session
    :arg topic: the topic for the notification
    :type topic: str
    :arg msg: the message to send via web-hook
    :type msg: str
    :kwarg namespace: the namespace of the project
    :type namespace: None or str
    :kwarg name: the name of the project
    :type name: None or str
    :kwarg user: the user of the project, only set if the project is a fork
    :type user: None or str

    """
    _log.info(
        "Looking for project: %s%s of %s",
        "%s/" % namespace if namespace else "",
        name,
        username,
    )
    project = pagure.lib.query._get_project(
        session, name, user=username, namespace=namespace
    )

    if not project:
        _log.info("No project found")
        return

    _log.info("Found project: %s", project.fullname)

    _log.info("Processing %s commits in %s", len(commits), abspath)

    # Only log commits when the branch is the default branch
    log_all = pagure_config.get("LOG_ALL_COMMITS", False)
    if log_all or branch == default_branch:
        pagure.lib.git.log_commits_to_db(session, project, commits, abspath)
    else:
        _log.info(
            "Not logging commits not made on the default branch: %s",
            default_branch,
        )

    # Notify subscribed users that there are new commits
    email_watchcommits = pagure_config.get("EMAIL_ON_WATCHCOMMITS", True)
    _log.info("Sending notification about the commit: %s", email_watchcommits)
    if email_watchcommits:
        pagure.lib.notify.notify_new_commits(abspath, project, branch, commits)

    try:
        session.commit()
    except SQLAlchemyError as err:  # pragma: no cover
        _log.exception(err)
        session.rollback()


def get_files_to_load(title, new_commits_list, abspath):

    _log.info("%s: Retrieve the list of files changed" % title)
    file_list = []
    new_commits_list.reverse()
    n = len(new_commits_list)
    for idx, commit in enumerate(new_commits_list):
        if (idx % 100) == 0:
            _log.info(
                "Loading files change in commits for %s: %s/%s", title, idx, n
            )
        if commit == new_commits_list[0]:
            filenames = pagure.lib.git.read_git_lines(
                [
                    "diff-tree",
                    "--no-commit-id",
                    "--name-only",
                    "-r",
                    "--root",
                    commit,
                ],
                abspath,
            )
        else:
            filenames = pagure.lib.git.read_git_lines(
                ["diff-tree", "--no-commit-id", "--name-only", "-r", commit],
                abspath,
            )
        for line in filenames:
            if line.strip():
                file_list.append(line.strip())

    return file_list


@conn.task(queue=pagure_config.get("LOADJSON_CELERY_QUEUE", None), bind=True)
@pagure_task
def load_json_commits_to_db(
    self,
    session,
    name,
    commits,
    abspath,
    data_type,
    agent,
    namespace=None,
    username=None,
):
    """ Loads into the database the specified commits that have been pushed
    to either the tickets or the pull-request repository.

    """

    if data_type not in ["ticket", "pull-request"]:
        _log.info("LOADJSON: Invalid data_type retrieved: %s", data_type)
        return

    _log.info(
        "LOADJSON: Looking for project: %s%s of user: %s",
        "%s/" % namespace if namespace else "",
        name,
        username,
    )

    project = pagure.lib.query._get_project(
        session, name, user=username, namespace=namespace
    )

    if not project:
        _log.info("LOADJSON: No project found")
        return

    _log.info("LOADJSON: Found project: %s", project.fullname)

    _log.info(
        "LOADJSON: %s: Processing %s commits in %s",
        project.fullname,
        len(commits),
        abspath,
    )

    file_list = set(get_files_to_load(project.fullname, commits, abspath))
    n = len(file_list)
    _log.info("LOADJSON: %s files to process" % n)
    mail_body = [
        "Good Morning",
        "",
        "This is the log of loading all the files pushed in the git repo into",
        "the database. It should ignore files that are not JSON files, this",
        "is fine.",
        "",
    ]

    for idx, filename in enumerate(sorted(file_list)):
        _log.info(
            "LOADJSON: Loading: %s: %s -- %s/%s",
            project.fullname,
            filename,
            idx + 1,
            n,
        )
        tmp = "Loading: %s -- %s/%s" % (filename, idx + 1, n)
        try:
            json_data = None
            data = "".join(
                pagure.lib.git.read_git_lines(
                    ["show", "HEAD:%s" % filename], abspath
                )
            )
            if data and not filename.startswith("files/"):
                try:
                    json_data = json.loads(data)
                except ValueError:
                    pass
            if json_data:
                if data_type == "ticket":
                    pagure.lib.git.update_ticket_from_git(
                        session,
                        reponame=name,
                        namespace=namespace,
                        username=username,
                        issue_uid=filename,
                        json_data=json_data,
                        agent=agent,
                    )
                elif data_type == "pull-request":
                    pagure.lib.git.update_request_from_git(
                        session,
                        reponame=name,
                        namespace=namespace,
                        username=username,
                        request_uid=filename,
                        json_data=json_data,
                    )
                tmp += " ... ... Done"
            else:
                tmp += " ... ... SKIPPED - No JSON data"
                mail_body.append(tmp)
        except Exception as err:
            _log.info("data: %s", json_data)
            session.rollback()
            _log.exception(err)
            tmp += " ... ... FAILED\n"
            tmp += format_callstack()
            break
        finally:
            mail_body.append(tmp)

    try:
        session.commit()
        _log.info(
            "LOADJSON: Emailing results for %s to %s", project.fullname, agent
        )
        try:
            if not agent:
                raise pagure.exceptions.PagureException(
                    "No agent found: %s" % agent
                )
            if agent != "pagure":
                user_obj = pagure.lib.query.get_user(session, agent)
                pagure.lib.notify.send_email(
                    "\n".join(mail_body),
                    "Issue import report",
                    user_obj.default_email,
                )
        except pagure.exceptions.PagureException:
            _log.exception("LOADJSON: Could not find user %s" % agent)
    except SQLAlchemyError:  # pragma: no cover
        session.rollback()
    _log.info("LOADJSON: Ready for another")


@conn.task(queue=pagure_config.get("CI_CELERY_QUEUE", None), bind=True)
@pagure_task
def trigger_ci_build(
    self,
    session,
    cause,
    branch,
    branch_to,
    ci_type,
    project_name=None,
    pr_uid=None,
):

    """ Triggers a new run of the CI system on the specified pull-request.

    """
    pagure.lib.plugins.get_plugin("Pagure CI")

    if not pr_uid and not project_name:
        _log.debug("No PR UID nor project name specified, can't trigger CI")
        session.close()
        return

    if pr_uid:
        pr = pagure.lib.query.get_request_by_uid(session, pr_uid)
        if pr.remote:
            project_name = pr.project.fullname
        else:
            project_name = pr.project_from.fullname

    user, namespace, project_name = split_project_fullname(project_name)

    _log.info("Pagure-CI: Looking for project: %s", project_name)
    project = pagure.lib.query.get_authorized_project(
        session=session,
        project_name=project_name,
        user=user,
        namespace=namespace,
    )

    if project is None:
        _log.warning(
            "Pagure-CI: No project could be found for the name %s",
            project_name,
        )
        session.close()
        return

    if project.is_fork:
        if (
            project.parent.ci_hook is None
            or project.parent.ci_hook.ci_url is None
        ):
            raise pagure.exceptions.PagureException(
                "Project %s not configured or incorectly configured for ci",
                project.parent.fullname,
            )
    elif project.ci_hook is None or project.ci_hook.ci_url is None:
        raise pagure.exceptions.PagureException(
            "Project %s not configured or incorectly configured for ci",
            project.fullname,
        )

    _log.info("Pagure-CI: project retrieved: %s", project.fullname)

    _log.info(
        "Pagure-CI: Trigger from %s cause (PR# or commit) %s branch: %s",
        project.fullname,
        cause,
        branch,
    )

    if ci_type == "jenkins":

        jenk_project = project
        if project.is_fork:
            jenk_project = project.parent

        trigger_jenkins_build(
            project_path=project.path,
            url=jenk_project.ci_hook.ci_url,
            job=jenk_project.ci_hook.ci_job,
            token=jenk_project.ci_hook.pagure_ci_token,
            branch=branch,
            branch_to=branch_to,
            cause=cause,
            ci_username=jenk_project.ci_hook.ci_username,
            ci_password=jenk_project.ci_hook.ci_password,
        )

    else:
        _log.warning("Pagure-CI:Un-supported CI type")

    _log.info("Pagure-CI: Ready for another")