From 50d6b6a3e810e76c0ea3a02e97e61ba656d008c4 Mon Sep 17 00:00:00 2001 From: Pierre-Yves Chibon Date: Mar 07 2018 09:04:26 +0000 Subject: Migrate the loadjson service to be celery-based This will help migrating pagure to python3 as well as scaling it up horizontally by adding more workers. Signed-off-by: Pierre-Yves Chibon --- diff --git a/pagure-loadjson/pagure_loadjson_server.py b/pagure-loadjson/pagure_loadjson_server.py deleted file mode 100644 index 94c710a..0000000 --- a/pagure-loadjson/pagure_loadjson_server.py +++ /dev/null @@ -1,286 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -""" - (c) 2017 - Copyright Red Hat Inc - - Authors: - Pierre-Yves Chibon - - -This server listens to message sent to redis via post commits hook and find -the list of files modified by the commits listed in the message and sync -them into the database. - -Using this mechanism, we no longer need to block the git push until all the -files have been uploaded (which when migrating some large projects over to -pagure can be really time-consuming). - -""" - -from __future__ import print_function -import json -import logging -import os -import traceback -import inspect -import trollius -import trollius_redis - -from sqlalchemy.exc import SQLAlchemyError - -import pagure -import pagure.exceptions -import pagure.lib -import pagure.lib.notify - - -if 'PAGURE_CONFIG' not in os.environ \ - and os.path.exists('/etc/pagure/pagure.cfg'): - print('Using configuration file `/etc/pagure/pagure.cfg`') - os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg' - - -_log = logging.getLogger(__name__) -_config = pagure.config.config.reload_config() - - -def format_callstack(): - """ Format the callstack to find out the stack trace. """ - ind = 0 - for ind, frame in enumerate(f[0] for f in inspect.stack()): - if '__name__' not in frame.f_globals: - continue - modname = frame.f_globals['__name__'].split('.')[0] - if modname != "logging": - break - - def _format_frame(frame): - """ Format the frame. """ - return ' File "%s", line %i in %s\n %s' % (frame) - - stack = traceback.extract_stack() - stack = stack[:-ind] - return "\n".join([_format_frame(frame) for frame in stack]) - -def get_files_to_load(title, new_commits_list, abspath): - - _log.info('%s: Retrieve the list of files changed' % title) - file_list = [] - new_commits_list.reverse() - n = len(new_commits_list) - for idx, commit in enumerate(new_commits_list): - if (idx % 100) == 0: - _log.info( - 'Loading files change in commits for %s: %s/%s', - title, idx, n) - if commit == new_commits_list[0]: - filenames = pagure.lib.git.read_git_lines( - ['diff-tree', '--no-commit-id', '--name-only', '-r', '--root', - commit], abspath) - else: - filenames = pagure.lib.git.read_git_lines( - ['diff-tree', '--no-commit-id', '--name-only', '-r', commit], - abspath) - for line in filenames: - if line.strip(): - file_list.append(line.strip()) - - return file_list - - -@trollius.coroutine -def handle_messages(): - ''' Handles connecting to redis and acting upon messages received. - In this case, it means logging into the DB the commits specified in the - message for the specified repo. - - The currently accepted message format looks like: - - :: - - { - "project": { - "name": "foo", - "namespace": null, - "parent": null, - "username": { - "name": "user" - } - }, - "abspath": "/srv/git/repositories/pagure.git", - "commits": [ - "b7b4059c44d692d7df3227ce58ce01191e5407bd", - "f8d0899bb6654590ffdef66b539fd3b8cf873b35", - "9b6fdc48d3edab82d3de28953271ea52b0a96117" - ], - "data_type": "ticket", - "agent": "pingou", - } - - ''' - - host = _config.get('REDIS_HOST', '0.0.0.0') - port = _config.get('REDIS_PORT', 6379) - dbname = _config.get('REDIS_DB', 0) - connection = yield trollius.From(trollius_redis.Connection.create( - host=host, port=port, db=dbname)) - - # Create subscriber. - subscriber = yield trollius.From(connection.start_subscribe()) - - # Subscribe to channel. - yield trollius.From(subscriber.subscribe(['pagure.loadjson'])) - - # Inside a while loop, wait for incoming events. - while True: - reply = yield trollius.From(subscriber.next_published()) - _log.info( - 'Received: %s on channel: %s', - repr(reply.value), reply.channel) - _log.info('Loading the json') - data = json.loads(reply.value) - _log.info('Done: loading the json') - - commits = data['commits'] - abspath = data['abspath'] - repo = data['project']['name'] - username = data['project']['user']['name'] \ - if data['project']['parent'] else None - namespace = data['project']['namespace'] - data_type = data['data_type'] - agent = data['agent'] - - if data_type not in ['ticket', 'pull-request']: - _log.info('Invalid data_type retrieved: %s', data_type) - continue - - session = pagure.lib.create_session(_config['DB_URL']) - - _log.info('Looking for project: %s%s of user: %s', - '%s/' % namespace if namespace else '', - repo, username) - project = pagure.lib._get_project( - session, repo, user=username, namespace=namespace, - case=_config.get('CASE_SENSITIVE', False)) - - if not project: - _log.info('No project found') - continue - - _log.info('Found project: %s', project.fullname) - - _log.info( - '%s: Processing %s commits in %s', project.fullname, - len(commits), abspath) - - file_list = set(get_files_to_load(project.fullname, commits, abspath)) - n = len(file_list) - _log.info('%s files to process' % n) - mail_body = [] - - for idx, filename in enumerate(file_list): - _log.info( - 'Loading: %s: %s -- %s/%s', project.fullname, filename, - idx+1, n) - tmp = 'Loading: %s -- %s/%s' % (filename, idx+1, n) - json_data = None - data = ''.join( - pagure.lib.git.read_git_lines( - ['show', 'HEAD:%s' % filename], abspath)) - if data and not filename.startswith('files/'): - try: - json_data = json.loads(data) - except ValueError: - pass - if json_data: - try: - if data_type == 'ticket': - pagure.lib.git.update_ticket_from_git( - session, - reponame=repo, - namespace=namespace, - username=username, - issue_uid=filename, - json_data=json_data, - agent=agent, - ) - tmp += ' ... ... Done' - except Exception as err: - _log.info('data: %s', json_data) - session.rollback() - _log.exception(err) - tmp += ' ... ... FAILED\n' - tmp += format_callstack() - break - finally: - mail_body.append(tmp) - else: - tmp += ' ... ... SKIPPED - No JSON data' - mail_body.append(tmp) - - try: - session.commit() - _log.info( - 'Emailing results for %s to %s', project.fullname, agent) - try: - if not agent: - raise pagure.exceptions.PagureException( - 'No agent found: %s' % agent) - user_obj = pagure.lib.get_user(session, agent) - pagure.lib.notify.send_email( - '\n'.join(mail_body), - 'Issue import report', - user_obj.default_email) - except pagure.exceptions.PagureException as err: - _log.exception('Could not find user %s' % agent) - except SQLAlchemyError as err: # pragma: no cover - session.rollback() - finally: - session.close() - _log.info('Ready for another') - - -def main(): - ''' Start the main async loop. ''' - - try: - loop = trollius.get_event_loop() - tasks = [ - trollius.async(handle_messages()), - ] - loop.run_until_complete(trollius.wait(tasks)) - loop.run_forever() - except KeyboardInterrupt: - pass - except trollius.ConnectionResetError: - pass - - _log.info("End Connection") - loop.close() - _log.info("End") - - -if __name__ == '__main__': - formatter = logging.Formatter( - "%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s") - - logging.basicConfig(level=logging.DEBUG) - - # setup console logging - _log.setLevel(logging.DEBUG) - shellhandler = logging.StreamHandler() - shellhandler.setLevel(logging.DEBUG) - - aslog = logging.getLogger("asyncio") - aslog.setLevel(logging.DEBUG) - aslog = logging.getLogger("trollius") - aslog.setLevel(logging.DEBUG) - - # Turn down the logs coming from python-markdown - mklog = logging.getLogger("MARKDOWN") - mklog.setLevel(logging.WARN) - - shellhandler.setFormatter(formatter) - _log.addHandler(shellhandler) - main() diff --git a/pagure/hooks/files/pagure_hook_requests.py b/pagure/hooks/files/pagure_hook_requests.py index 5d90779..8364955 100755 --- a/pagure/hooks/files/pagure_hook_requests.py +++ b/pagure/hooks/files/pagure_hook_requests.py @@ -7,7 +7,6 @@ based on the information pushed in the requests git repository. from __future__ import print_function -import json import os import sys @@ -18,7 +17,8 @@ if 'PAGURE_CONFIG' not in os.environ \ os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg' -import pagure.lib.git # noqa: E402 +import pagure.config # noqa: E402 +import pagure.lib.tasks_services # noqa: E402 _config = pagure.config.config @@ -43,7 +43,15 @@ def get_files_to_load(new_commits_list): def run_as_post_receive_hook(): - file_list = set() + repo = pagure.lib.git.get_repo_name(abspath) + username = pagure.lib.git.get_username(abspath) + namespace = pagure.lib.git.get_repo_namespace( + abspath, gitfolder=_config['TICKETS_FOLDER']) + if _config.get('HOOK_DEBUG', False): + print('repo:', repo) + print('user:', username) + print('namespace:', namespace) + for line in sys.stdin: if _config.get('HOOK_DEBUG', False): print(line) @@ -62,42 +70,18 @@ def run_as_post_receive_hook(): "pagure hook") return - tmp = set(get_files_to_load( - pagure.lib.git.get_revs_between(oldrev, newrev, abspath, refname))) - file_list = file_list.union(tmp) - - reponame = pagure.lib.git.get_repo_name(abspath) - username = pagure.lib.git.get_username(abspath) - namespace = pagure.lib.git.get_repo_namespace( - abspath, gitfolder=_config['REQUESTS_FOLDER']) - print('repo:', reponame, username, namespace) - - for filename in file_list: - print('To load: %s' % filename) - json_data = None - data = ''.join( - pagure.lib.git.read_git_lines( - ['show', 'HEAD:%s' % filename], abspath)) - if data: - try: - json_data = json.loads(data) - except ValueError: - pass - if json_data: - session = pagure.lib.create_session(_config['DB_URL']) - pagure.lib.git.update_request_from_git( - session, - reponame=reponame, - namespace=namespace, - username=username, - request_uid=filename, - json_data=json_data, - gitfolder=_config['GIT_FOLDER'], - docfolder=_config['DOCS_FOLDER'], - ticketfolder=_config['TICKETS_FOLDER'], - requestfolder=_config['REQUESTS_FOLDER'], - ) - session.close() + commits = pagure.lib.git.get_revs_between( + oldrev, newrev, abspath, refname) + + pagure.lib.tasks_services.load_json_commits_to_db.delay( + name=repo, + commits=commits, + abspath=abspath, + data_type='pull-request', + agent=os.environ.get('GL_USER'), + namespace=namespace, + username=username, + ) def main(args): diff --git a/pagure/hooks/files/pagure_hook_tickets.py b/pagure/hooks/files/pagure_hook_tickets.py index f38f41e..25646bb 100755 --- a/pagure/hooks/files/pagure_hook_tickets.py +++ b/pagure/hooks/files/pagure_hook_tickets.py @@ -6,7 +6,6 @@ the information pushed in the tickets git repository. """ from __future__ import print_function -import json import os import sys @@ -16,10 +15,8 @@ if 'PAGURE_CONFIG' not in os.environ \ and os.path.exists('/etc/pagure/pagure.cfg'): os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg' -import pagure # noqa: E402 -import pagure.lib.git # noqa: E402 - -from pagure.lib import REDIS # noqa: E402 +import pagure.config # noqa: E402 +import pagure.lib.tasks_services # noqa: E402 _config = pagure.config.config @@ -37,11 +34,6 @@ def run_as_post_receive_hook(): print('user:', username) print('namespace:', namespace) - session = pagure.lib.create_session(_config['DB_URL']) - project = pagure.lib._get_project( - session, repo, user=username, namespace=namespace, - case=_config.get('CASE_SENSITIVE', False)) - for line in sys.stdin: if _config.get('HOOK_DEBUG', False): print(line) @@ -63,25 +55,15 @@ def run_as_post_receive_hook(): commits = pagure.lib.git.get_revs_between( oldrev, newrev, abspath, refname) - if REDIS: - print('Sending to redis to load the data') - REDIS.publish( - 'pagure.loadjson', - json.dumps({ - 'project': project.to_json(public=True), - 'abspath': abspath, - 'commits': commits, - 'data_type': 'ticket', - 'agent': os.environ.get('GL_USER'), - }) - ) - print( - 'A report will be emailed to you once the load is finished') - else: - print('Hook not configured to connect to pagure-loadjson') - print('/!\ Your data will not be loaded into the database!') - - session.close() + pagure.lib.tasks_services.load_json_commits_to_db.delay( + name=repo, + commits=commits, + abspath=abspath, + data_type='ticket', + agent=os.environ.get('GL_USER'), + namespace=namespace, + username=username, + ) def main(args): diff --git a/pagure/lib/git.py b/pagure/lib/git.py index 6441483..69bfa70 100644 --- a/pagure/lib/git.py +++ b/pagure/lib/git.py @@ -373,9 +373,7 @@ def get_user_from_json(session, jsondata, key='user'): return user -def get_project_from_json( - session, jsondata, - gitfolder, docfolder, ticketfolder, requestfolder): +def get_project_from_json(session, jsondata): """ From the given json blob, retrieve the project info and search for it in the db and create the projec if it does not already exist. """ @@ -396,8 +394,7 @@ def get_project_from_json( parent = None if jsondata.get('parent'): parent = get_project_from_json( - session, jsondata.get('parent'), - gitfolder, docfolder, ticketfolder, requestfolder) + session, jsondata.get('parent')) pagure.lib.fork_project( session=session, @@ -410,7 +407,8 @@ def get_project_from_json( else: gitfolder = os.path.join( - gitfolder, 'forks', user.username) if parent else gitfolder + pagure_config['GIT_FOLDER'], 'forks', user.username) \ + if parent else pagure_config['GIT_FOLDER'] pagure.lib.new_project( session, user=user.username, @@ -421,9 +419,9 @@ def get_project_from_json( blacklist=pagure_config.get('BLACKLISTED_PROJECTS', []), allowed_prefix=pagure_config.get('ALLOWED_PREFIX', []), gitfolder=gitfolder, - docfolder=docfolder, - ticketfolder=ticketfolder, - requestfolder=requestfolder, + docfolder=pagure_config['DOCS_FOLDER'], + ticketfolder=pagure_config['TICKETS_FOLDER'], + requestfolder=pagure_config['REQUESTS_FOLDER'], prevent_40_chars=pagure_config.get( 'OLD_VIEW_COMMIT_ENABLED', False), ) @@ -681,8 +679,7 @@ def update_ticket_from_git( def update_request_from_git( - session, reponame, namespace, username, request_uid, json_data, - gitfolder, docfolder, ticketfolder, requestfolder): + session, reponame, namespace, username, request_uid, json_data): """ Update the specified request (identified by its unique identifier) with the data present in the json blob provided. @@ -712,13 +709,11 @@ def update_request_from_git( if not request: repo_from = get_project_from_json( - session, json_data.get('repo_from'), - gitfolder, docfolder, ticketfolder, requestfolder + session, json_data.get('repo_from') ) repo_to = get_project_from_json( - session, json_data.get('project'), - gitfolder, docfolder, ticketfolder, requestfolder + session, json_data.get('project') ) status = json_data.get('status') diff --git a/pagure/lib/tasks_services.py b/pagure/lib/tasks_services.py index b79a2db..a434d56 100644 --- a/pagure/lib/tasks_services.py +++ b/pagure/lib/tasks_services.py @@ -11,11 +11,13 @@ import datetime import hashlib import hmac +import inspect import json import logging import os import os.path import time +import traceback import uuid import requests @@ -194,3 +196,159 @@ def log_commit_send_notifications( session.rollback() finally: session.close() + + +def format_callstack(): + """ Format the callstack to find out the stack trace. """ + ind = 0 + for ind, frame in enumerate(f[0] for f in inspect.stack()): + if '__name__' not in frame.f_globals: + continue + modname = frame.f_globals['__name__'].split('.')[0] + if modname != "logging": + break + + def _format_frame(frame): + """ Format the frame. """ + return ' File "%s", line %i in %s\n %s' % (frame) + + stack = traceback.extract_stack() + stack = stack[:-ind] + return "\n".join([_format_frame(frame) for frame in stack]) + + +def get_files_to_load(title, new_commits_list, abspath): + + _log.info('%s: Retrieve the list of files changed' % title) + file_list = [] + new_commits_list.reverse() + n = len(new_commits_list) + for idx, commit in enumerate(new_commits_list): + if (idx % 100) == 0: + _log.info( + 'Loading files change in commits for %s: %s/%s', + title, idx, n) + if commit == new_commits_list[0]: + filenames = pagure.lib.git.read_git_lines( + ['diff-tree', '--no-commit-id', '--name-only', '-r', '--root', + commit], abspath) + else: + filenames = pagure.lib.git.read_git_lines( + ['diff-tree', '--no-commit-id', '--name-only', '-r', commit], + abspath) + for line in filenames: + if line.strip(): + file_list.append(line.strip()) + + return file_list + + +@conn.task(queue=pagure_config.get('LOADJSON_CELERY_QUEUE', None), bind=True) +@set_status +def load_json_commits_to_db( + self, name, commits, abspath, data_type, agent, + namespace=None, username=None): + ''' Loads into the database the specified commits that have been pushed + to either the tickets or the pull-request repository. + + ''' + + if data_type not in ['ticket', 'pull-request']: + _log.info('Invalid data_type retrieved: %s', data_type) + return + + session = pagure.lib.create_session(pagure_config['DB_URL']) + + _log.info( + 'Looking for project: %s%s of user: %s', + '%s/' % namespace if namespace else '', + name, username) + + project = pagure.lib._get_project( + session, name, user=username, namespace=namespace, + case=pagure_config.get('CASE_SENSITIVE', False)) + + if not project: + _log.info('No project found') + return + + _log.info('Found project: %s', project.fullname) + + _log.info( + '%s: Processing %s commits in %s', project.fullname, + len(commits), abspath) + + file_list = set(get_files_to_load(project.fullname, commits, abspath)) + n = len(file_list) + _log.info('%s files to process' % n) + mail_body = [] + + for idx, filename in enumerate(file_list): + _log.info( + 'Loading: %s: %s -- %s/%s', + project.fullname, filename, idx + 1, n) + tmp = 'Loading: %s -- %s/%s' % (filename, idx + 1, n) + json_data = None + data = ''.join( + pagure.lib.git.read_git_lines( + ['show', 'HEAD:%s' % filename], abspath)) + if data and not filename.startswith('files/'): + try: + json_data = json.loads(data) + except ValueError: + pass + if json_data: + try: + if data_type == 'ticket': + pagure.lib.git.update_ticket_from_git( + session, + reponame=name, + namespace=namespace, + username=username, + issue_uid=filename, + json_data=json_data, + agent=agent, + ) + elif data_type == 'pull-request': + pagure.lib.git.update_request_from_git( + session, + reponame=name, + namespace=namespace, + username=username, + request_uid=filename, + json_data=json_data, + ) + tmp += ' ... ... Done' + except Exception as err: + _log.info('data: %s', json_data) + session.rollback() + _log.exception(err) + tmp += ' ... ... FAILED\n' + tmp += format_callstack() + break + finally: + mail_body.append(tmp) + else: + tmp += ' ... ... SKIPPED - No JSON data' + mail_body.append(tmp) + + try: + session.commit() + _log.info( + 'Emailing results for %s to %s', project.fullname, agent) + try: + if not agent: + raise pagure.exceptions.PagureException( + 'No agent found: %s' % agent) + user_obj = pagure.lib.get_user(session, agent) + pagure.lib.notify.send_email( + '\n'.join(mail_body), + 'Issue import report', + user_obj.default_email) + except pagure.exceptions.PagureException as err: + _log.exception('Could not find user %s' % agent) + except SQLAlchemyError as err: # pragma: no cover + session.rollback() + finally: + session.close() + _log.info('Ready for another') diff --git a/tests/test_pagure_lib_git.py b/tests/test_pagure_lib_git.py index e257ef4..2f70756 100644 --- a/tests/test_pagure_lib_git.py +++ b/tests/test_pagure_lib_git.py @@ -2478,10 +2478,6 @@ index 0000000..60f7480 username=None, request_uid='d4182a2ac2d541d884742d3037c26e56', json_data=data, - gitfolder=os.path.join(self.path, 'repos'), - docfolder=os.path.join(self.path, 'docs'), - ticketfolder=os.path.join(self.path, 'tickets'), - requestfolder=os.path.join(self.path, 'requests') ) pagure.lib.git.update_request_from_git( @@ -2491,10 +2487,6 @@ index 0000000..60f7480 username=None, request_uid='d4182a2ac2d541d884742d3037c26e56', json_data=data, - gitfolder=os.path.join(self.path, 'repos'), - docfolder=os.path.join(self.path, 'docs'), - ticketfolder=os.path.join(self.path, 'tickets'), - requestfolder=os.path.join(self.path, 'requests') ) self.session.commit() @@ -2598,10 +2590,6 @@ index 0000000..60f7480 username=None, request_uid='d4182a2ac2d541d884742d3037c26e57', json_data=data, - gitfolder=os.path.join(self.path, 'repos'), - docfolder=os.path.join(self.path, 'docs'), - ticketfolder=os.path.join(self.path, 'tickets'), - requestfolder=os.path.join(self.path, 'requests') ) self.session.commit() @@ -2711,10 +2699,6 @@ index 0000000..60f7480 username=None, request_uid='d4182a2ac2d541d884742d3037c26e58', json_data=data, - gitfolder=os.path.join(self.path, 'repos'), - docfolder=os.path.join(self.path, 'docs'), - ticketfolder=os.path.join(self.path, 'tickets'), - requestfolder=os.path.join(self.path, 'requests') ) self.session.commit()