| |
| |
| """ |
| (c) 2015 - Copyright Red Hat Inc |
| |
| Authors: |
| Pierre-Yves Chibon <pingou@pingoured.fr> |
| |
| |
| Streaming server for pagure's eventsource feature |
| This server takes messages sent to redis and publish them at the specified |
| endpoint |
| |
| To test, run this script and in another terminal |
| nc localhost 8080 |
| HELLO |
| |
| GET /test/issue/26?foo=bar HTTP/1.1 |
| |
| """ |
| |
| import datetime |
| import logging |
| import os |
| import urlparse |
| |
| import trollius |
| import trollius_redis |
| |
| log = logging.getLogger(__name__) |
| |
| |
| if 'PAGURE_CONFIG' not in os.environ \ |
| and os.path.exists('/etc/pagure/pagure.cfg'): |
| print 'Using configuration file `/etc/pagure/pagure.cfg`' |
| os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg' |
| |
| |
| import pagure |
| import pagure.lib |
| from pagure.exceptions import PagureEvException |
| |
| SERVER = None |
| |
| |
| def _get_issue(repo, objid): |
| """Get a Ticket (issue) instance for a given repo (Project) and |
| objid (issue number). |
| """ |
| issue = None |
| if not repo.settings.get('issue_tracker', True): |
| raise PagureEvException("No issue tracker found for this project") |
| |
| issue = pagure.lib.search_issues( |
| pagure.SESSION, repo, issueid=objid) |
| |
| if issue is None or issue.project != repo: |
| raise PagureEvException("Issue '%s' not found" % objid) |
| |
| if issue.private: |
| |
| raise PagureEvException( |
| "This issue is private and you are not allowed to view it") |
| |
| return issue |
| |
| |
| def _get_pull_request(repo, objid): |
| """Get a PullRequest instance for a given repo (Project) and objid |
| (request number). |
| """ |
| if not repo.settings.get('pull_requests', True): |
| raise PagureEvException( |
| "No pull-request tracker found for this project") |
| |
| request = pagure.lib.search_pull_requests( |
| pagure.SESSION, project_id=repo.id, requestid=objid) |
| |
| if request is None or request.project != repo: |
| raise PagureEvException("Pull-Request '%s' not found" % objid) |
| |
| return request |
| |
| |
| |
| |
| |
| OBJECTS = { |
| 'issue': _get_issue, |
| 'pull-request': _get_pull_request |
| } |
| |
| |
| def _parse_path(path): |
| """Get the repo name, object type, object ID, and (if present) |
| username and/or namespace from a URL path component. Will only |
| handle the known object types from the OBJECTS dict. Assumes: |
| * Project name comes immediately before object type |
| * Object ID comes immediately after object type |
| * If a fork, path starts with /fork/(username) |
| * Namespace, if present, comes after fork username (if present) or at start |
| * No other components come before the project name |
| * None of the parsed items can contain a / |
| """ |
| username = None |
| namespace = None |
| |
| items = path.split('/')[1:] |
| |
| try: |
| objtype = [item for item in items if item in OBJECTS][-1] |
| except IndexError: |
| raise PagureEvException("No known object type found in path: %s" % path) |
| try: |
| |
| items = items[:items.index(objtype) + 2] |
| |
| (repo, objtype, objid) = items[-3:] |
| items = items[:-3] |
| except (IndexError, ValueError): |
| raise PagureEvException("No project or object ID found in path: %s" % path) |
| |
| if items and items[0] == 'fork': |
| try: |
| |
| username = items[1] |
| items = items[2:] |
| except IndexError: |
| raise PagureEvException("Path starts with /fork but no user found! Path: %s" % path) |
| |
| if items: |
| namespace = items.pop(0) |
| |
| if items: |
| raise PagureEvException("More path components than expected! Path: %s" % path) |
| |
| return (username, namespace, repo, objtype, objid) |
| |
| |
| def get_obj_from_path(path): |
| """ Return the Ticket or Request object based on the path provided. |
| """ |
| (username, namespace, reponame, objtype, objid) = _parse_path(path) |
| repo = pagure.lib.get_project(pagure.SESSION, reponame, user=username, namespace=namespace) |
| if repo is None: |
| raise PagureEvException("Project '%s' not found" % reponame) |
| |
| |
| try: |
| getfunc = OBJECTS[objtype] |
| except KeyError: |
| raise PagureEvException("Invalid object provided: '%s'" % objtype) |
| |
| return getfunc(repo, objid) |
| |
| |
| @trollius.coroutine |
| def handle_client(client_reader, client_writer): |
| data = None |
| while True: |
| |
| line = yield trollius.From(trollius.wait_for( |
| client_reader.readline(), |
| timeout=10.0)) |
| if not line.decode().strip(): |
| break |
| line = line.decode().rstrip() |
| if data is None: |
| data = line |
| |
| if data is None: |
| log.warning("Expected ticket uid, received None") |
| return |
| |
| data = data.decode().rstrip().split() |
| log.info("Received %s", data) |
| if not data: |
| log.warning("No URL provided: %s" % data) |
| return |
| |
| if not '/' in data[1]: |
| log.warning("Invalid URL provided: %s" % data[1]) |
| return |
| |
| url = urlparse.urlsplit(data[1]) |
| |
| try: |
| obj = get_obj_from_path(url.path) |
| except PagureEvException as err: |
| log.warning(err.message) |
| return |
| |
| origin = pagure.APP.config.get('APP_URL') |
| if origin.endswith('/'): |
| origin = origin[:-1] |
| |
| client_writer.write(( |
| "HTTP/1.0 200 OK\n" |
| "Content-Type: text/event-stream\n" |
| "Cache: nocache\n" |
| "Connection: keep-alive\n" |
| "Access-Control-Allow-Origin: %s\n\n" % origin |
| ).encode()) |
| |
| connection = yield trollius.From(trollius_redis.Connection.create( |
| host=pagure.APP.config['REDIS_HOST'], |
| port=pagure.APP.config['REDIS_PORT'], |
| db=pagure.APP.config['REDIS_DB'])) |
| |
| try: |
| |
| |
| subscriber = yield trollius.From(connection.start_subscribe()) |
| |
| |
| yield trollius.From(subscriber.subscribe(['pagure.%s' % obj.uid])) |
| |
| |
| while True: |
| reply = yield trollius.From(subscriber.next_published()) |
| |
| log.info(reply) |
| log.info("Sending %s", reply.value) |
| client_writer.write(('data: %s\n\n' % reply.value).encode()) |
| yield trollius.From(client_writer.drain()) |
| |
| except trollius.ConnectionResetError as err: |
| log.exception("ERROR: ConnectionResetError in handle_client") |
| except Exception as err: |
| log.exception("ERROR: Exception in handle_client") |
| finally: |
| |
| connection.close() |
| client_writer.close() |
| |
| |
| @trollius.coroutine |
| def stats(client_reader, client_writer): |
| |
| try: |
| log.info('Clients: %s', SERVER.active_count) |
| client_writer.write(( |
| "HTTP/1.0 200 OK\n" |
| "Cache: nocache\n\n" |
| ).encode()) |
| client_writer.write(('data: %s\n\n' % SERVER.active_count).encode()) |
| yield trollius.From(client_writer.drain()) |
| |
| except trollius.ConnectionResetError as err: |
| log.info(err) |
| pass |
| finally: |
| client_writer.close() |
| return |
| |
| |
| def main(): |
| global SERVER |
| |
| try: |
| loop = trollius.get_event_loop() |
| coro = trollius.start_server( |
| handle_client, |
| host=None, |
| port=pagure.APP.config['EVENTSOURCE_PORT'], |
| loop=loop) |
| SERVER = loop.run_until_complete(coro) |
| log.info('Serving server at {}'.format(SERVER.sockets[0].getsockname())) |
| if pagure.APP.config.get('EV_STATS_PORT'): |
| stats_coro = trollius.start_server( |
| stats, |
| host=None, |
| port=pagure.APP.config.get('EV_STATS_PORT'), |
| loop=loop) |
| stats_server = loop.run_until_complete(stats_coro) |
| log.info('Serving stats at {}'.format( |
| stats_server.sockets[0].getsockname())) |
| loop.run_forever() |
| except KeyboardInterrupt: |
| pass |
| except trollius.ConnectionResetError as err: |
| log.exception("ERROR: ConnectionResetError in main") |
| except Exception as err: |
| log.exception("ERROR: Exception in main") |
| finally: |
| |
| SERVER.close() |
| if pagure.APP.config.get('EV_STATS_PORT'): |
| stats_server.close() |
| log.info("End Connection") |
| loop.run_until_complete(SERVER.wait_closed()) |
| loop.close() |
| log.info("End") |
| |
| |
| if __name__ == '__main__': |
| log = logging.getLogger("") |
| formatter = logging.Formatter( |
| "%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s") |
| |
| |
| log.setLevel(logging.DEBUG) |
| ch = logging.StreamHandler() |
| ch.setLevel(logging.DEBUG) |
| |
| aslog = logging.getLogger("asyncio") |
| aslog.setLevel(logging.DEBUG) |
| |
| ch.setFormatter(formatter) |
| log.addHandler(ch) |
| main() |