Blob Blame Raw
# -*- coding: utf-8 -*-

"""
 (c) 2018 - Copyright Red Hat Inc

 Authors:
   Pierre-Yves Chibon <pingou@pingoured.fr>

"""

from __future__ import unicode_literals

import datetime
import hashlib
import hmac
import json
import os
import os.path
import time
import uuid

import requests
import six

from celery import Celery
from celery.signals import after_setup_task_logger
from celery.utils.log import get_task_logger
from kitchen.text.converters import to_bytes
from sqlalchemy.exc import SQLAlchemyError

import pagure.lib
from pagure.config import config as pagure_config
from pagure.lib.tasks import pagure_task
from pagure.mail_logging import format_callstack
from pagure.lib.lib_ci import trigger_jenkins_build
from pagure.utils import split_project_fullname, set_up_logging

# logging.config.dictConfig(pagure_config.get('LOGGING') or {'version': 1})
_log = get_task_logger(__name__)
_i = 0


if os.environ.get("PAGURE_BROKER_URL"):  # pragma: no cover
    broker_url = os.environ["PAGURE_BROKER_URL"]
elif pagure_config.get("BROKER_URL"):
    broker_url = pagure_config["BROKER_URL"]
else:
    broker_url = "redis://%s" % pagure_config["REDIS_HOST"]

conn = Celery("tasks", broker=broker_url, backend=broker_url)
conn.conf.update(pagure_config["CELERY_CONFIG"])


@after_setup_task_logger.connect
def augment_celery_log(**kwargs):
    set_up_logging(force=True)


def call_web_hooks(project, topic, msg, urls):
    """ Sends the web-hook notification. """
    _log.info("Processing project: %s - topic: %s", project.fullname, topic)
    _log.debug("msg: %s", msg)

    # Send web-hooks notification
    global _i
    _i += 1
    year = datetime.datetime.utcnow().year
    if isinstance(topic, six.text_type):
        topic = to_bytes(topic, encoding="utf8", nonstring="passthru")
    msg["pagure_instance"] = pagure_config["APP_URL"]
    msg["project_fullname"] = project.fullname
    msg = dict(
        topic=topic.decode("utf-8"),
        msg=msg,
        timestamp=int(time.time()),
        msg_id="%s-%s" % (year, uuid.uuid4()),
        i=_i,
    )

    content = json.dumps(msg, sort_keys=True)
    hashhex = hmac.new(
        project.hook_token.encode("utf-8"),
        content.encode("utf-8"),
        hashlib.sha1,
    ).hexdigest()
    hashhex256 = hmac.new(
        project.hook_token.encode("utf-8"),
        content.encode("utf-8"),
        hashlib.sha256,
    ).hexdigest()
    headers = {
        "X-Pagure": pagure_config["APP_URL"],
        "X-Pagure-project": project.fullname,
        "X-Pagure-Signature": hashhex,
        "X-Pagure-Signature-256": hashhex256,
        "X-Pagure-Topic": topic,
        "Content-Type": "application/json",
    }
    for url in sorted(urls):
        url = url.strip()
        _log.info("Calling url %s" % url)
        try:
            req = requests.post(
                url, headers=headers, data=content, timeout=60
            )
            if not req:
                _log.info(
                    "An error occured while querying: %s - "
                    "Error code: %s" % (url, req.status_code)
                )
        except (requests.exceptions.RequestException, Exception) as err:
            _log.info(
                "An error occured while querying: %s - Error: %s" % (url, err)
            )


@conn.task(queue=pagure_config.get("WEBHOOK_CELERY_QUEUE", None), bind=True)
@pagure_task
def webhook_notification(
    self, session, topic, msg, namespace=None, name=None, user=None
):
    """ Send webhook notifications about an event on that project.

    :arg session: SQLAlchemy session object
    :type session: sqlalchemy.orm.session.Session
    :arg topic: the topic for the notification
    :type topic: str
    :arg msg: the message to send via web-hook
    :type msg: str
    :kwarg namespace: the namespace of the project
    :type namespace: None or str
    :kwarg name: the name of the project
    :type name: None or str
    :kwarg user: the user of the project, only set if the project is a fork
    :type user: None or str

    """
    project = pagure.lib._get_project(
        session, namespace=namespace, name=name, user=user
    )

    if not project:
        session.close()
        raise RuntimeError(
            "Project: %s/%s from user: %s not found in the DB"
            % (namespace, name, user)
        )

    urls = project.settings.get("Web-hooks")
    if not urls:
        _log.info("No URLs set: %s" % urls)
        return

    urls = urls.split("\n")
    _log.info("Got the project and urls, going to the webhooks")
    call_web_hooks(project, topic, msg, urls)


@conn.task(queue=pagure_config.get("LOGCOM_CELERY_QUEUE", None), bind=True)
@pagure_task
def log_commit_send_notifications(
    self,
    session,
    name,
    commits,
    abspath,
    branch,
    default_branch,
    namespace=None,
    username=None,
):
    """ Send webhook notifications about an event on that project.

    :arg session: SQLAlchemy session object
    :type session: sqlalchemy.orm.session.Session
    :arg topic: the topic for the notification
    :type topic: str
    :arg msg: the message to send via web-hook
    :type msg: str
    :kwarg namespace: the namespace of the project
    :type namespace: None or str
    :kwarg name: the name of the project
    :type name: None or str
    :kwarg user: the user of the project, only set if the project is a fork
    :type user: None or str

    """
    _log.info(
        "Looking for project: %s%s of %s",
        "%s/" % namespace if namespace else "",
        name,
        username,
    )
    project = pagure.lib._get_project(
        session, name, user=username, namespace=namespace
    )

    if not project:
        _log.info("No project found")
        return

    _log.info("Found project: %s", project.fullname)

    _log.info("Processing %s commits in %s", len(commits), abspath)

    # Only log commits when the branch is the default branch
    if branch == default_branch:
        pagure.lib.git.log_commits_to_db(session, project, commits, abspath)

    # Notify subscribed users that there are new commits
    if pagure_config.get("EMAIL_ON_WATCHCOMMITS", True):
        pagure.lib.notify.notify_new_commits(abspath, project, branch, commits)

    try:
        session.commit()
    except SQLAlchemyError as err:  # pragma: no cover
        _log.exception(err)
        session.rollback()


def get_files_to_load(title, new_commits_list, abspath):

    _log.info("%s: Retrieve the list of files changed" % title)
    file_list = []
    new_commits_list.reverse()
    n = len(new_commits_list)
    for idx, commit in enumerate(new_commits_list):
        if (idx % 100) == 0:
            _log.info(
                "Loading files change in commits for %s: %s/%s", title, idx, n
            )
        if commit == new_commits_list[0]:
            filenames = pagure.lib.git.read_git_lines(
                [
                    "diff-tree",
                    "--no-commit-id",
                    "--name-only",
                    "-r",
                    "--root",
                    commit,
                ],
                abspath,
            )
        else:
            filenames = pagure.lib.git.read_git_lines(
                ["diff-tree", "--no-commit-id", "--name-only", "-r", commit],
                abspath,
            )
        for line in filenames:
            if line.strip():
                file_list.append(line.strip())

    return file_list


@conn.task(queue=pagure_config.get("LOADJSON_CELERY_QUEUE", None), bind=True)
@pagure_task
def load_json_commits_to_db(
    self,
    session,
    name,
    commits,
    abspath,
    data_type,
    agent,
    namespace=None,
    username=None,
):
    """ Loads into the database the specified commits that have been pushed
    to either the tickets or the pull-request repository.

    """

    if data_type not in ["ticket", "pull-request"]:
        _log.info("LOADJSON: Invalid data_type retrieved: %s", data_type)
        return

    _log.info(
        "LOADJSON: Looking for project: %s%s of user: %s",
        "%s/" % namespace if namespace else "",
        name,
        username,
    )

    project = pagure.lib._get_project(
        session, name, user=username, namespace=namespace
    )

    if not project:
        _log.info("LOADJSON: No project found")
        return

    _log.info("LOADJSON: Found project: %s", project.fullname)

    _log.info(
        "LOADJSON: %s: Processing %s commits in %s",
        project.fullname,
        len(commits),
        abspath,
    )

    file_list = set(get_files_to_load(project.fullname, commits, abspath))
    n = len(file_list)
    _log.info("LOADJSON: %s files to process" % n)
    mail_body = []

    for idx, filename in enumerate(sorted(file_list)):
        _log.info(
            "LOADJSON: Loading: %s: %s -- %s/%s",
            project.fullname,
            filename,
            idx + 1,
            n,
        )
        tmp = "Loading: %s -- %s/%s" % (filename, idx + 1, n)
        try:
            json_data = None
            data = "".join(
                pagure.lib.git.read_git_lines(
                    ["show", "HEAD:%s" % filename], abspath
                )
            )
            if data and not filename.startswith("files/"):
                try:
                    json_data = json.loads(data)
                except ValueError:
                    pass
            if json_data:
                if data_type == "ticket":
                    pagure.lib.git.update_ticket_from_git(
                        session,
                        reponame=name,
                        namespace=namespace,
                        username=username,
                        issue_uid=filename,
                        json_data=json_data,
                        agent=agent,
                    )
                elif data_type == "pull-request":
                    pagure.lib.git.update_request_from_git(
                        session,
                        reponame=name,
                        namespace=namespace,
                        username=username,
                        request_uid=filename,
                        json_data=json_data,
                    )
                tmp += " ... ... Done"
            else:
                tmp += " ... ... SKIPPED - No JSON data"
                mail_body.append(tmp)
        except Exception as err:
            _log.info("data: %s", json_data)
            session.rollback()
            _log.exception(err)
            tmp += " ... ... FAILED\n"
            tmp += format_callstack()
            break
        finally:
            mail_body.append(tmp)

    try:
        session.commit()
        _log.info(
            "LOADJSON: Emailing results for %s to %s", project.fullname, agent
        )
        try:
            if not agent:
                raise pagure.exceptions.PagureException(
                    "No agent found: %s" % agent
                )
            user_obj = pagure.lib.get_user(session, agent)
            pagure.lib.notify.send_email(
                "\n".join(mail_body),
                "Issue import report",
                user_obj.default_email,
            )
        except pagure.exceptions.PagureException as err:
            _log.exception("LOADJSON: Could not find user %s" % agent)
    except SQLAlchemyError as err:  # pragma: no cover
        session.rollback()
    _log.info("LOADJSON: Ready for another")


@conn.task(queue=pagure_config.get("CI_CELERY_QUEUE", None), bind=True)
@pagure_task
def trigger_ci_build(self, session, project_name, cause, branch, ci_type):

    """ Triggers a new run of the CI system on the specified pull-request.

    """
    pagure.lib.plugins.get_plugin("Pagure CI")

    user, namespace, project_name = split_project_fullname(project_name)

    _log.info("Pagure-CI: Looking for project: %s", project_name)
    project = pagure.lib.get_authorized_project(
        session=session,
        project_name=project_name,
        user=user,
        namespace=namespace,
    )

    if project is None:
        _log.warning(
            "Pagure-CI: No project could be found for the name %s",
            project_name,
        )
        session.close()
        return

    if project.is_fork:
        if (
            project.parent.ci_hook is None
            or project.parent.ci_hook.ci_url is None
        ):
            raise pagure.exceptions.PagureException(
                "Project %s not configured or incorectly configured for ci",
                project.parent.fullname,
            )
    elif project.ci_hook is None or project.ci_hook.ci_url is None:
        raise pagure.exceptions.PagureException(
            "Project %s not configured or incorectly configured for ci",
            project.fullname,
        )

    _log.info("Pagure-CI: project retrieved: %s", project.fullname)

    _log.info(
        "Pagure-CI: Trigger from %s cause (PR# or commit) %s branch: %s",
        project.fullname,
        cause,
        branch,
    )

    if ci_type == "jenkins":

        if project.is_fork:
            url = project.parent.ci_hook.ci_url
            job = project.parent.ci_hook.ci_job
            token = project.parent.ci_hook.pagure_ci_token
        else:
            url = project.ci_hook.ci_url
            job = project.ci_hook.ci_job
            token = project.ci_hook.pagure_ci_token

        trigger_jenkins_build(
            project_path=project.path,
            url=url,
            job=job,
            token=token,
            branch=branch,
            cause=cause,
        )

    else:
        _log.warning("Pagure-CI:Un-supported CI type")

    _log.info("Pagure-CI: Ready for another")