| |
| |
| |
| """ |
| (c) 2017 - Copyright Red Hat Inc |
| |
| Authors: |
| Pierre-Yves Chibon <pingou@pingoured.fr> |
| |
| |
| This server listens to message sent to redis via post commits hook and find |
| the list of files modified by the commits listed in the message and sync |
| them into the database. |
| |
| Using this mechanism, we no longer need to block the git push until all the |
| files have been uploaded (which when migrating some large projects over to |
| pagure can be really time-consuming). |
| |
| """ |
| |
| import json |
| import logging |
| import os |
| import traceback |
| import inspect |
| import trollius |
| import trollius_redis |
| |
| from sqlalchemy.exc import SQLAlchemyError |
| |
| _log = logging.getLogger(__name__) |
| |
| if 'PAGURE_CONFIG' not in os.environ \ |
| and os.path.exists('/etc/pagure/pagure.cfg'): |
| print 'Using configuration file `/etc/pagure/pagure.cfg`' |
| os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg' |
| |
| |
| import pagure |
| import pagure.exceptions |
| import pagure.lib |
| import pagure.lib.notify |
| |
| |
| def format_callstack(): |
| """ Format the callstack to find out the stack trace. """ |
| ind = 0 |
| for ind, frame in enumerate(f[0] for f in inspect.stack()): |
| if '__name__' not in frame.f_globals: |
| continue |
| modname = frame.f_globals['__name__'].split('.')[0] |
| if modname != "logging": |
| break |
| |
| def _format_frame(frame): |
| """ Format the frame. """ |
| return ' File "%s", line %i in %s\n %s' % (frame) |
| |
| stack = traceback.extract_stack() |
| stack = stack[:-ind] |
| return "\n".join([_format_frame(frame) for frame in stack]) |
| |
| def get_files_to_load(title, new_commits_list, abspath): |
| |
| _log.info('%s: Retrieve the list of files changed' % title) |
| file_list = [] |
| new_commits_list.reverse() |
| n = len(new_commits_list) |
| for idx, commit in enumerate(new_commits_list): |
| if (idx % 100) == 0: |
| _log.info( |
| 'Loading files change in commits for %s: %s/%s', |
| title, idx, n) |
| if commit == new_commits_list[0]: |
| filenames = pagure.lib.git.read_git_lines( |
| ['diff-tree', '--no-commit-id', '--name-only', '-r', '--root', |
| commit], abspath) |
| else: |
| filenames = pagure.lib.git.read_git_lines( |
| ['diff-tree', '--no-commit-id', '--name-only', '-r', commit], |
| abspath) |
| for line in filenames: |
| if line.strip(): |
| file_list.append(line.strip()) |
| |
| return file_list |
| |
| |
| @trollius.coroutine |
| def handle_messages(): |
| ''' Handles connecting to redis and acting upon messages received. |
| In this case, it means logging into the DB the commits specified in the |
| message for the specified repo. |
| |
| The currently accepted message format looks like: |
| |
| :: |
| |
| { |
| "project": { |
| "name": "foo", |
| "namespace": null, |
| "parent": null, |
| "username": { |
| "name": "user" |
| } |
| }, |
| "abspath": "/srv/git/repositories/pagure.git", |
| "commits": [ |
| "b7b4059c44d692d7df3227ce58ce01191e5407bd", |
| "f8d0899bb6654590ffdef66b539fd3b8cf873b35", |
| "9b6fdc48d3edab82d3de28953271ea52b0a96117" |
| ], |
| "data_type": "ticket", |
| "agent": "pingou", |
| } |
| |
| ''' |
| |
| host = pagure.APP.config.get('REDIS_HOST', '0.0.0.0') |
| port = pagure.APP.config.get('REDIS_PORT', 6379) |
| dbname = pagure.APP.config.get('REDIS_DB', 0) |
| connection = yield trollius.From(trollius_redis.Connection.create( |
| host=host, port=port, db=dbname)) |
| |
| |
| subscriber = yield trollius.From(connection.start_subscribe()) |
| |
| |
| yield trollius.From(subscriber.subscribe(['pagure.loadjson'])) |
| |
| |
| while True: |
| reply = yield trollius.From(subscriber.next_published()) |
| _log.info( |
| 'Received: %s on channel: %s', |
| repr(reply.value), reply.channel) |
| _log.info('Loading the json') |
| data = json.loads(reply.value) |
| _log.info('Done: loading the json') |
| |
| commits = data['commits'] |
| abspath = data['abspath'] |
| repo = data['project']['name'] |
| username = data['project']['user']['name'] \ |
| if data['project']['parent'] else None |
| namespace = data['project']['namespace'] |
| data_type = data['data_type'] |
| agent = data['agent'] |
| |
| if data_type not in ['ticket', 'pull-request']: |
| _log.info('Invalid data_type retrieved: %s', data_type) |
| continue |
| |
| session = pagure.lib.create_session(pagure.APP.config['DB_URL']) |
| |
| _log.info('Looking for project: %s%s of user: %s', |
| '%s/' % namespace if namespace else '', |
| repo, username) |
| project = pagure.lib._get_project( |
| session, repo, user=username, namespace=namespace) |
| |
| if not project: |
| _log.info('No project found') |
| continue |
| |
| _log.info('Found project: %s', project.fullname) |
| |
| _log.info( |
| '%s: Processing %s commits in %s', project.fullname, |
| len(commits), abspath) |
| |
| file_list = set(get_files_to_load(project.fullname, commits, abspath)) |
| n = len(file_list) |
| _log.info('%s files to process' % n) |
| mail_body = [] |
| |
| for idx, filename in enumerate(file_list): |
| _log.info( |
| 'Loading: %s: %s -- %s/%s', project.fullname, filename, |
| idx+1, n) |
| tmp = 'Loading: %s -- %s/%s' % (filename, idx+1, n) |
| json_data = None |
| data = ''.join( |
| pagure.lib.git.read_git_lines( |
| ['show', 'HEAD:%s' % filename], abspath)) |
| if data and not filename.startswith('files/'): |
| try: |
| json_data = json.loads(data) |
| except ValueError: |
| pass |
| if json_data: |
| try: |
| if data_type == 'ticket': |
| pagure.lib.git.update_ticket_from_git( |
| session, |
| reponame=repo, |
| namespace=namespace, |
| username=username, |
| issue_uid=filename, |
| json_data=json_data |
| ) |
| tmp += ' ... ... Done' |
| except Exception as err: |
| _log.info('data: %s', json_data) |
| session.rollback() |
| _log.exception(err) |
| tmp += ' ... ... FAILED\n' |
| tmp += format_callstack() |
| break |
| finally: |
| mail_body.append(tmp) |
| else: |
| tmp += ' ... ... SKIPPED - No JSON data' |
| mail_body.append(tmp) |
| |
| try: |
| session.commit() |
| _log.info( |
| 'Emailing results for %s to %s', project.fullname, agent) |
| try: |
| if not agent: |
| raise pagure.exceptions.PagureException( |
| 'No agent found: %s' % agent) |
| user_obj = pagure.lib.get_user(session, agent) |
| pagure.lib.notify.send_email( |
| '\n'.join(mail_body), |
| 'Issue import report', |
| user_obj.default_email) |
| except pagure.exceptions.PagureException as err: |
| _log.exception('Could not find user %s' % agent) |
| except SQLAlchemyError as err: |
| session.rollback() |
| finally: |
| session.close() |
| _log.info('Ready for another') |
| |
| |
| def main(): |
| ''' Start the main async loop. ''' |
| |
| try: |
| loop = trollius.get_event_loop() |
| tasks = [ |
| trollius.async(handle_messages()), |
| ] |
| loop.run_until_complete(trollius.wait(tasks)) |
| loop.run_forever() |
| except KeyboardInterrupt: |
| pass |
| except trollius.ConnectionResetError: |
| pass |
| |
| _log.info("End Connection") |
| loop.close() |
| _log.info("End") |
| |
| |
| if __name__ == '__main__': |
| formatter = logging.Formatter( |
| "%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s") |
| |
| logging.basicConfig(level=logging.DEBUG) |
| |
| |
| _log.setLevel(logging.DEBUG) |
| shellhandler = logging.StreamHandler() |
| shellhandler.setLevel(logging.DEBUG) |
| |
| aslog = logging.getLogger("asyncio") |
| aslog.setLevel(logging.DEBUG) |
| aslog = logging.getLogger("trollius") |
| aslog.setLevel(logging.DEBUG) |
| |
| |
| mklog = logging.getLogger("MARKDOWN") |
| mklog.setLevel(logging.WARN) |
| |
| shellhandler.setFormatter(formatter) |
| _log.addHandler(shellhandler) |
| main() |