Blame pagure-loadjson/pagure_loadjson_server.py

Pierre-Yves Chibon c82b71
#!/usr/bin/env python
Pierre-Yves Chibon c82b71
# -*- coding: utf-8 -*-
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
"""
Pierre-Yves Chibon c82b71
 (c) 2017 - Copyright Red Hat Inc
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
 Authors:
Pierre-Yves Chibon c82b71
   Pierre-Yves Chibon <pingou@pingoured.fr></pingou@pingoured.fr>
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
This server listens to message sent to redis via post commits hook and find
Pierre-Yves Chibon c82b71
the list of files modified by the commits listed in the message and sync
Pierre-Yves Chibon c82b71
them into the database.
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
Using this mechanism, we no longer need to block the git push until all the
Pierre-Yves Chibon c82b71
files have been uploaded (which when migrating some large projects over to
Pierre-Yves Chibon c82b71
pagure can be really time-consuming).
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
"""
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
import json
Pierre-Yves Chibon c82b71
import logging
Pierre-Yves Chibon c82b71
import os
Pierre-Yves Chibon 694378
import traceback
Matt Prahl c9a583
import inspect
Pierre-Yves Chibon c82b71
import trollius
Pierre-Yves Chibon c82b71
import trollius_redis
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon 694378
from sqlalchemy.exc import SQLAlchemyError
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
_log = logging.getLogger(__name__)
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
if 'PAGURE_CONFIG' not in os.environ \
Pierre-Yves Chibon c82b71
        and os.path.exists('/etc/pagure/pagure.cfg'):
Pierre-Yves Chibon c82b71
    print 'Using configuration file `/etc/pagure/pagure.cfg`'
Pierre-Yves Chibon c82b71
    os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg'
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
import pagure
Pierre-Yves Chibon 694378
import pagure.exceptions
Pierre-Yves Chibon c82b71
import pagure.lib
Pierre-Yves Chibon 694378
import pagure.lib.notify
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon 694378
def format_callstack():
Pierre-Yves Chibon 694378
    """ Format the callstack to find out the stack trace. """
Pierre-Yves Chibon 694378
    ind = 0
Pierre-Yves Chibon 694378
    for ind, frame in enumerate(f[0] for f in inspect.stack()):
Pierre-Yves Chibon 694378
        if '__name__' not in frame.f_globals:
Pierre-Yves Chibon 694378
            continue
Pierre-Yves Chibon 694378
        modname = frame.f_globals['__name__'].split('.')[0]
Pierre-Yves Chibon 694378
        if modname != "logging":
Pierre-Yves Chibon 694378
            break
Pierre-Yves Chibon 694378
Pierre-Yves Chibon 694378
    def _format_frame(frame):
Pierre-Yves Chibon 694378
        """ Format the frame. """
Pierre-Yves Chibon 694378
        return '  File "%s", line %i in %s\n    %s' % (frame)
Pierre-Yves Chibon 694378
Pierre-Yves Chibon 694378
    stack = traceback.extract_stack()
Pierre-Yves Chibon 694378
    stack = stack[:-ind]
Pierre-Yves Chibon 694378
    return "\n".join([_format_frame(frame) for frame in stack])
Pierre-Yves Chibon 694378
Pierre-Yves Chibon c82b71
def get_files_to_load(title, new_commits_list, abspath):
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
    _log.info('%s: Retrieve the list of files changed' % title)
Pierre-Yves Chibon c82b71
    file_list = []
Pierre-Yves Chibon c82b71
    new_commits_list.reverse()
Pierre-Yves Chibon c82b71
    n = len(new_commits_list)
Pierre-Yves Chibon c82b71
    for idx, commit in enumerate(new_commits_list):
Pierre-Yves Chibon c82b71
        if (idx % 100) == 0:
Pierre-Yves Chibon c82b71
            _log.info(
Pierre-Yves Chibon c82b71
                'Loading files change in commits for %s: %s/%s',
Pierre-Yves Chibon c82b71
                title, idx, n)
Pierre-Yves Chibon c82b71
        if commit == new_commits_list[0]:
Pierre-Yves Chibon c82b71
            filenames = pagure.lib.git.read_git_lines(
Pierre-Yves Chibon c82b71
                ['diff-tree', '--no-commit-id', '--name-only', '-r', '--root',
Pierre-Yves Chibon c82b71
                    commit], abspath)
Pierre-Yves Chibon c82b71
        else:
Pierre-Yves Chibon c82b71
            filenames = pagure.lib.git.read_git_lines(
Pierre-Yves Chibon c82b71
                ['diff-tree', '--no-commit-id', '--name-only', '-r', commit],
Pierre-Yves Chibon c82b71
                abspath)
Pierre-Yves Chibon c82b71
        for line in filenames:
Pierre-Yves Chibon c82b71
            if line.strip():
Pierre-Yves Chibon c82b71
                file_list.append(line.strip())
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
    return file_list
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
@trollius.coroutine
Pierre-Yves Chibon c82b71
def handle_messages():
Pierre-Yves Chibon c82b71
    ''' Handles connecting to redis and acting upon messages received.
Pierre-Yves Chibon c82b71
    In this case, it means logging into the DB the commits specified in the
Pierre-Yves Chibon c82b71
    message for the specified repo.
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
    The currently accepted message format looks like:
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
    ::
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
        {
Pierre-Yves Chibon c82b71
          "project": {
Pierre-Yves Chibon c82b71
            "name": "foo",
Pierre-Yves Chibon c82b71
            "namespace": null,
Pierre-Yves Chibon c82b71
            "parent": null,
Pierre-Yves Chibon c82b71
            "username": {
Pierre-Yves Chibon c82b71
              "name": "user"
Pierre-Yves Chibon c82b71
            }
Pierre-Yves Chibon c82b71
          },
Pierre-Yves Chibon c82b71
          "abspath": "/srv/git/repositories/pagure.git",
Pierre-Yves Chibon c82b71
          "commits": [
Pierre-Yves Chibon c82b71
            "b7b4059c44d692d7df3227ce58ce01191e5407bd",
Pierre-Yves Chibon c82b71
            "f8d0899bb6654590ffdef66b539fd3b8cf873b35",
Pierre-Yves Chibon c82b71
            "9b6fdc48d3edab82d3de28953271ea52b0a96117"
Pierre-Yves Chibon 694378
          ],
Pierre-Yves Chibon 694378
          "data_type": "ticket",
Pierre-Yves Chibon 694378
          "agent": "pingou",
Pierre-Yves Chibon c82b71
        }
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
    '''
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
    host = pagure.APP.config.get('REDIS_HOST', '0.0.0.0')
Pierre-Yves Chibon c82b71
    port = pagure.APP.config.get('REDIS_PORT', 6379)
Pierre-Yves Chibon c82b71
    dbname = pagure.APP.config.get('REDIS_DB', 0)
Pierre-Yves Chibon c82b71
    connection = yield trollius.From(trollius_redis.Connection.create(
Pierre-Yves Chibon c82b71
        host=host, port=port, db=dbname))
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
    # Create subscriber.
Pierre-Yves Chibon c82b71
    subscriber = yield trollius.From(connection.start_subscribe())
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
    # Subscribe to channel.
Pierre-Yves Chibon c82b71
    yield trollius.From(subscriber.subscribe(['pagure.loadjson']))
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
    # Inside a while loop, wait for incoming events.
Pierre-Yves Chibon c82b71
    while True:
Pierre-Yves Chibon c82b71
        reply = yield trollius.From(subscriber.next_published())
Pierre-Yves Chibon c82b71
        _log.info(
Pierre-Yves Chibon c82b71
            'Received: %s on channel: %s',
Pierre-Yves Chibon c82b71
            repr(reply.value), reply.channel)
Pierre-Yves Chibon 63092e
        _log.info('Loading the json')
Pierre-Yves Chibon c82b71
        data = json.loads(reply.value)
Pierre-Yves Chibon 63092e
        _log.info('Done: loading the json')
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
        commits = data['commits']
Pierre-Yves Chibon c82b71
        abspath = data['abspath']
Pierre-Yves Chibon c82b71
        repo = data['project']['name']
Pierre-Yves Chibon c82b71
        username = data['project']['username']['name'] \
Pierre-Yves Chibon c82b71
            if data['project']['parent'] else None
Pierre-Yves Chibon c82b71
        namespace = data['project']['namespace']
Pierre-Yves Chibon c82b71
        data_type = data['data_type']
Pierre-Yves Chibon 694378
        agent = data['agent']
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
        if data_type not in ['ticket', 'pull-request']:
Pierre-Yves Chibon c82b71
            _log.info('Invalid data_type retrieved: %s', data_type)
Pierre-Yves Chibon c82b71
            continue
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
        session = pagure.lib.create_session(pagure.APP.config['DB_URL'])
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
        _log.info('Looking for project: %s%s of user: %s',
Pierre-Yves Chibon 63092e
                 '%s/' % namespace if namespace else '',
Pierre-Yves Chibon c82b71
                 repo, username)
Farhaan Bukhsh e498a1
        project = pagure.lib._get_project(
Pierre-Yves Chibon c82b71
            session, repo, user=username, namespace=namespace)
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
        if not project:
Pierre-Yves Chibon c82b71
            _log.info('No project found')
Pierre-Yves Chibon c82b71
            continue
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
        _log.info('Found project: %s', project.fullname)
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
        _log.info(
Pierre-Yves Chibon c82b71
            '%s: Processing %s commits in %s', project.fullname,
Pierre-Yves Chibon c82b71
            len(commits), abspath)
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
        file_list = set(get_files_to_load(project.fullname, commits, abspath))
Pierre-Yves Chibon c82b71
        n = len(file_list)
Pierre-Yves Chibon c82b71
        _log.info('%s files to process' % n)
Pierre-Yves Chibon 694378
        mail_body = []
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
        for idx, filename in enumerate(file_list):
Pierre-Yves Chibon 79093f
            _log.info(
Pierre-Yves Chibon 79093f
                'Loading: %s: %s -- %s/%s', project.fullname, filename,
Pierre-Yves Chibon 79093f
                idx+1, n)
Pierre-Yves Chibon 694378
            tmp = 'Loading: %s -- %s/%s' % (filename, idx+1, n)
Pierre-Yves Chibon c82b71
            json_data = None
Pierre-Yves Chibon c82b71
            data = ''.join(
Pierre-Yves Chibon c82b71
                pagure.lib.git.read_git_lines(
Pierre-Yves Chibon c82b71
                    ['show', 'HEAD:%s' % filename], abspath))
Pierre-Yves Chibon c82b71
            if data and not filename.startswith('files/'):
Pierre-Yves Chibon c82b71
                try:
Pierre-Yves Chibon c82b71
                    json_data = json.loads(data)
Abhijeet Kasurde f4bf50
                except ValueError:
Pierre-Yves Chibon c82b71
                    pass
Pierre-Yves Chibon c82b71
            if json_data:
Pierre-Yves Chibon c82b71
                try:
Pierre-Yves Chibon c82b71
                    if data_type == 'ticket':
Pierre-Yves Chibon c82b71
                        pagure.lib.git.update_ticket_from_git(
Pierre-Yves Chibon c82b71
                            session,
Pierre-Yves Chibon c82b71
                            reponame=repo,
Pierre-Yves Chibon c82b71
                            namespace=namespace,
Pierre-Yves Chibon c82b71
                            username=username,
Pierre-Yves Chibon c82b71
                            issue_uid=filename,
Pierre-Yves Chibon c82b71
                            json_data=json_data
Pierre-Yves Chibon c82b71
                        )
Pierre-Yves Chibon 694378
                        tmp += ' ... ... Done'
Pierre-Yves Chibon c82b71
                except Exception as err:
Pierre-Yves Chibon c82b71
                    _log.info('data: %s', json_data)
Pierre-Yves Chibon c82b71
                    session.rollback()
Pierre-Yves Chibon c82b71
                    _log.exception(err)
Pierre-Yves Chibon 694378
                    tmp += ' ... ... FAILED\n'
Pierre-Yves Chibon 694378
                    tmp += format_callstack()
Pierre-Yves Chibon c82b71
                    break
Pierre-Yves Chibon 694378
                finally:
Pierre-Yves Chibon 694378
                    mail_body.append(tmp)
Pierre-Yves Chibon bb9ead
            else:
Pierre-Yves Chibon bb9ead
                tmp += ' ... ... SKIPPED - No JSON data'
Pierre-Yves Chibon bb9ead
                mail_body.append(tmp)
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
        try:
Pierre-Yves Chibon c82b71
            session.commit()
Pierre-Yves Chibon 694378
            _log.info(
Pierre-Yves Chibon 694378
                'Emailing results for %s to %s', project.fullname, agent)
Pierre-Yves Chibon 694378
            try:
Pierre-Yves Chibon 694378
                if not agent:
Pierre-Yves Chibon 694378
                    raise pagure.exceptions.PagureException(
Pierre-Yves Chibon 694378
                        'No agent found: %s' % agent)
Pierre-Yves Chibon 694378
                user_obj = pagure.lib.get_user(session, agent)
Pierre-Yves Chibon 694378
                pagure.lib.notify.send_email(
Pierre-Yves Chibon 694378
                    '\n'.join(mail_body),
Pierre-Yves Chibon 694378
                    'Issue import report',
Pierre-Yves Chibon 694378
                    user_obj.default_email)
Pierre-Yves Chibon 694378
            except pagure.exceptions.PagureException as err:
Pierre-Yves Chibon 694378
                _log.exception('Could not find user %s' % agent)
Pierre-Yves Chibon c82b71
        except SQLAlchemyError as err:  # pragma: no cover
Pierre-Yves Chibon c82b71
            session.rollback()
Pierre-Yves Chibon c82b71
        finally:
Pierre-Yves Chibon c82b71
            session.close()
Pierre-Yves Chibon c82b71
        _log.info('Ready for another')
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
def main():
Pierre-Yves Chibon c82b71
    ''' Start the main async loop. '''
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
    try:
Pierre-Yves Chibon c82b71
        loop = trollius.get_event_loop()
Pierre-Yves Chibon c82b71
        tasks = [
Pierre-Yves Chibon c82b71
            trollius.async(handle_messages()),
Pierre-Yves Chibon c82b71
        ]
Pierre-Yves Chibon c82b71
        loop.run_until_complete(trollius.wait(tasks))
Pierre-Yves Chibon c82b71
        loop.run_forever()
Pierre-Yves Chibon c82b71
    except KeyboardInterrupt:
Pierre-Yves Chibon c82b71
        pass
Pierre-Yves Chibon c82b71
    except trollius.ConnectionResetError:
Pierre-Yves Chibon c82b71
        pass
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
    _log.info("End Connection")
Pierre-Yves Chibon c82b71
    loop.close()
Pierre-Yves Chibon c82b71
    _log.info("End")
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
if __name__ == '__main__':
Pierre-Yves Chibon c82b71
    formatter = logging.Formatter(
Pierre-Yves Chibon c82b71
        "%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s")
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
    logging.basicConfig(level=logging.DEBUG)
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
    # setup console logging
Pierre-Yves Chibon c82b71
    _log.setLevel(logging.DEBUG)
Pierre-Yves Chibon c82b71
    shellhandler = logging.StreamHandler()
Pierre-Yves Chibon c82b71
    shellhandler.setLevel(logging.DEBUG)
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
    aslog = logging.getLogger("asyncio")
Pierre-Yves Chibon c82b71
    aslog.setLevel(logging.DEBUG)
Pierre-Yves Chibon c82b71
    aslog = logging.getLogger("trollius")
Pierre-Yves Chibon c82b71
    aslog.setLevel(logging.DEBUG)
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
    # Turn down the logs coming from python-markdown
Pierre-Yves Chibon c82b71
    mklog = logging.getLogger("MARKDOWN")
Pierre-Yves Chibon c82b71
    mklog.setLevel(logging.WARN)
Pierre-Yves Chibon c82b71
Pierre-Yves Chibon c82b71
    shellhandler.setFormatter(formatter)
Pierre-Yves Chibon c82b71
    _log.addHandler(shellhandler)
Pierre-Yves Chibon c82b71
    main()