diff --git a/pagure-stream-server.py b/pagure-stream-server.py new file mode 100644 index 0000000..d513988 --- /dev/null +++ b/pagure-stream-server.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python + +""" + (c) 2014-2015 - Copyright Red Hat Inc + + Authors: + Pierre-Yves Chibon + + +Streaming server for pagure's eventsource feature +This server takes messages sent to redis and publish them at the specified +endpoint + +To test, run this script and in another terminal +nc localhost 8080 + HELLO + + GET /test/issue/26?foo=bar HTTP/1.1 + +""" + +import datetime +import logging +import os +import urlparse + +import trollius +import trollius_redis + +log = logging.getLogger(__name__) + + +if 'PAGURE_CONFIG' not in os.environ \ + and os.path.exists('/etc/pagure/pagure.cfg'): + print 'Using configuration file `/etc/pagure/pagure.cfg`' + os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg' + + +import pagure +import pagure.lib + + +clients = {} + + +@trollius.coroutine +def handle_client(client_reader, client_writer): + # give client a chance to respond, timeout after 10 seconds + data = yield trollius.From(trollius.wait_for( + client_reader.readline(), + timeout=10.0)) + + if data is None: + log.warning("Expected ticket uid, received None") + return + + data = data.decode().rstrip().split() + log.info("Received %s", data) + if not data: + log.warning("No URL provided: %s" % data) + return + + if not '/' in data[1]: + log.warning("Invalid URL provided: %s" % data[1]) + return + + url = urlparse.urlsplit(data[1]) + + client_writer.write(( + "HTTP/1.0 200 OK\n" + "Content-Type: text/event-stream\n" + "Cache: nocache\n" + "Connection: keep-alive\n" + "Access-Control-Allow-Origin: *\n\n" + ).encode()) + + username = None + if url.path.startswith('/fork'): + username, repo, issue, issueid = url.path.split('/')[2:6] + else: + repo, issue, issueid = url.path.split('/')[1:4] + + repo = pagure.lib.get_project(pagure.SESSION, repo, user=username) + + if repo is None: + log.warning("Project '%s' not found" % repo) + return + + if not repo.settings.get('issue_tracker', True): + log.warning("No issue tracker found for this project") + return + + issue = pagure.lib.search_issues(pagure.SESSION, repo, issueid=issueid) + + if issue is None or issue.project != repo: + log.warning("Issue '%s' not found" % issueid) + return + + if issue.private: + # TODO: find a way to do auth + log.warning( + "This issue is private and you are not allowed to view it") + return + + try: + connection = yield trollius.From(trollius_redis.Connection.create( + host=pagure.APP.config['REDIS_HOST'], + port=pagure.APP.config['REDIS_PORT'], + db=pagure.APP.config['REDIS_DB'])) + + # Create subscriber. + subscriber = yield trollius.From(connection.start_subscribe()) + + # Subscribe to channel. + yield trollius.From(subscriber.subscribe([issue.uid])) + + # Inside a while loop, wait for incoming events. + while True: + reply = yield trollius.From(subscriber.next_published()) + #print(u'Received: ', repr(reply.value), u'on channel', reply.channel) + log.info(reply) + log.info("Sending %s", reply.value) + client_writer.write(('data: %s\n\n' % reply.value).encode()) + yield trollius.From(client_writer.drain()) + + except trollius.ConnectionResetError: + pass + finally: + # Wathever happens, close the connection. + connection.close() + client_writer.close() + + +def main(): + + try: + loop = trollius.get_event_loop() + coro = trollius.start_server( + handle_client, host=None, port=8080, loop=loop) + server = loop.run_until_complete(coro) + print('Serving on {}'.format(server.sockets[0].getsockname())) + loop.run_forever() + except KeyboardInterrupt: + pass + except trollius.ConnectionResetError: + pass + + # Close the server + server.close() + log.info("End Connection") + loop.run_until_complete(server.wait_closed()) + loop.close() + log.info("End") + + +if __name__ == '__main__': + log = logging.getLogger("") + formatter = logging.Formatter( + "%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s") + + # setup console logging + log.setLevel(logging.DEBUG) + ch = logging.StreamHandler() + ch.setLevel(logging.DEBUG) + + aslog = logging.getLogger("asyncio") + aslog.setLevel(logging.DEBUG) + + ch.setFormatter(formatter) + log.addHandler(ch) + main()