| |
| |
| """ |
| (c) 2015 - Copyright Red Hat Inc |
| |
| Authors: |
| Pierre-Yves Chibon <pingou@pingoured.fr> |
| |
| |
| This server listens to message sent via redis and send the corresponding |
| web-hook request. |
| |
| Using this mechanism, we no longer block the main application if the |
| receiving end is offline or so. |
| |
| """ |
| |
| import datetime |
| import hashlib |
| import hmac |
| import json |
| import logging |
| import os |
| import requests |
| import time |
| import uuid |
| |
| import six |
| import trollius |
| import trollius_redis |
| |
| from kitchen.text.converters import to_bytes |
| |
| |
| log = logging.getLogger(__name__) |
| |
| |
| if 'PAGURE_CONFIG' not in os.environ \ |
| and os.path.exists('/etc/pagure/pagure.cfg'): |
| print 'Using configuration file `/etc/pagure/pagure.cfg`' |
| os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg' |
| |
| |
| import pagure |
| import pagure.lib |
| from pagure.exceptions import PagureEvException |
| |
| _i = 0 |
| |
| |
| def call_web_hooks(project, topic, msg): |
| ''' Sends the web-hook notification. ''' |
| log.info( |
| "Processing project: %s - topic: %s", project.fullname, topic) |
| log.debug('msg: %s', msg) |
| |
| |
| global _i |
| _i += 1 |
| year = datetime.datetime.now().year |
| if isinstance(topic, six.text_type): |
| topic = to_bytes(topic, encoding='utf8', nonstring="passthru") |
| msg = dict( |
| topic=topic.decode('utf-8'), |
| msg=msg, |
| timestamp=int(time.time()), |
| msg_id=str(year) + '-' + str(uuid.uuid4()), |
| i=_i, |
| ) |
| |
| content = json.dumps(msg) |
| hashhex = hmac.new( |
| str(project.hook_token), content, hashlib.sha1).hexdigest() |
| headers = { |
| 'X-Pagure-Topic': topic, |
| 'X-Pagure-Signature': hashhex |
| } |
| msg = json.dumps(msg) |
| for url in project.settings.get('Web-hooks').split('\n'): |
| url = url.strip() |
| log.info('Calling url %s' % url) |
| try: |
| req = requests.post( |
| url, |
| headers=headers, |
| data={'payload': msg} |
| ) |
| if not req: |
| log.info( |
| 'An error occured while querying: %s - ' |
| 'Error code: %s' % (url, req.status_code)) |
| except (requests.exceptions.RequestException, Exception) as err: |
| log.info( |
| 'An error occured while querying: %s - Error: %s' % ( |
| url, err)) |
| |
| |
| @trollius.coroutine |
| def handle_messages(): |
| connection = yield trollius.From(trollius_redis.Connection.create( |
| host='0.0.0.0', port=6379, db=0)) |
| |
| |
| subscriber = yield trollius.From(connection.start_subscribe()) |
| |
| |
| yield trollius.From(subscriber.subscribe(['pagure.hook'])) |
| |
| |
| while True: |
| reply = yield trollius.From(subscriber.next_published()) |
| log.info( |
| 'Received: %s on channel: %s', |
| repr(reply.value), reply.channel) |
| data = json.loads(reply.value) |
| username = None |
| if '/' in data['project']: |
| username, projectname = data['project'].split('/', 1) |
| else: |
| projectname = data['project'] |
| project = pagure.lib.get_project( |
| session=pagure.SESSION, name=projectname, user=username) |
| log.info('Got the project, going to the webhooks') |
| call_web_hooks(project, data['topic'], data['msg']) |
| |
| |
| def main(): |
| server = None |
| try: |
| loop = trollius.get_event_loop() |
| tasks = [ |
| trollius.async(handle_messages()), |
| ] |
| loop.run_until_complete(trollius.wait(tasks)) |
| loop.run_forever() |
| except KeyboardInterrupt: |
| pass |
| except trollius.ConnectionResetError: |
| pass |
| |
| log.info("End Connection") |
| loop.close() |
| log.info("End") |
| |
| |
| if __name__ == '__main__': |
| log = logging.getLogger("") |
| formatter = logging.Formatter( |
| "%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s") |
| |
| |
| log.setLevel(logging.DEBUG) |
| ch = logging.StreamHandler() |
| ch.setLevel(logging.DEBUG) |
| |
| aslog = logging.getLogger("asyncio") |
| aslog.setLevel(logging.DEBUG) |
| |
| ch.setFormatter(formatter) |
| log.addHandler(ch) |
| main() |