|
Pierre-Yves Chibon |
1dd40f |
#!/usr/bin/env python
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
"""
|
|
Abhijeet Kasurde |
a8d2ec |
(c) 2015-2017 - Copyright Red Hat Inc
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
Authors:
|
|
Pierre-Yves Chibon |
1dd40f |
Pierre-Yves Chibon <pingou@pingoured.fr></pingou@pingoured.fr>
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
Streaming server for pagure's eventsource feature
|
|
Pierre-Yves Chibon |
1dd40f |
This server takes messages sent to redis and publish them at the specified
|
|
Pierre-Yves Chibon |
1dd40f |
endpoint
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
To test, run this script and in another terminal
|
|
Pierre-Yves Chibon |
1dd40f |
nc localhost 8080
|
|
Pierre-Yves Chibon |
1dd40f |
HELLO
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
GET /test/issue/26?foo=bar HTTP/1.1
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
"""
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
67d1cc |
from __future__ import unicode_literals, absolute_import
|
|
Aurélien Bompard |
dcf6f6 |
|
|
Pierre-Yves Chibon |
1dd40f |
import logging
|
|
Pierre-Yves Chibon |
1dd40f |
import os
|
|
Aurélien Bompard |
831553 |
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Patrick Uiterwijk |
c998ef |
import redis
|
|
Julen Landa Alustiza |
47a9ab |
import trololio
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Aurélien Bompard |
831553 |
from six.moves.urllib.parse import urlparse
|
|
Aurélien Bompard |
831553 |
|
|
Pierre-Yves Chibon |
1dd40f |
log = logging.getLogger(__name__)
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
if 'PAGURE_CONFIG' not in os.environ \
|
|
Pierre-Yves Chibon |
1dd40f |
and os.path.exists('/etc/pagure/pagure.cfg'):
|
|
Aurélien Bompard |
831553 |
print('Using configuration file `/etc/pagure/pagure.cfg`')
|
|
Pierre-Yves Chibon |
1dd40f |
os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg'
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
de6f12 |
import pagure # noqa: E402
|
|
Pierre-Yves Chibon |
cf98be |
import pagure.lib.model_base # noqa: E402
|
|
Pierre-Yves Chibon |
930073 |
import pagure.lib.query # noqa: E402
|
|
Pierre-Yves Chibon |
0c5199 |
from pagure.exceptions import PagureException, PagureEvException # noqa: E402
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
39bf6b |
SERVER = None
|
|
Pierre-Yves Chibon |
b130e5 |
SESSION = None
|
|
Patrick Uiterwijk |
c998ef |
POOL = redis.ConnectionPool(
|
|
Pierre-Yves Chibon |
b130e5 |
host=pagure.config.config['REDIS_HOST'],
|
|
Pierre-Yves Chibon |
b130e5 |
port=pagure.config.config['REDIS_PORT'],
|
|
Pierre-Yves Chibon |
b130e5 |
db=pagure.config.config['REDIS_DB'])
|
|
Pierre-Yves Chibon |
b130e5 |
|
|
Pierre-Yves Chibon |
b130e5 |
|
|
Pierre-Yves Chibon |
b130e5 |
def _get_session():
|
|
Pierre-Yves Chibon |
b130e5 |
global SESSION
|
|
Pierre-Yves Chibon |
b130e5 |
if SESSION is None:
|
|
Aurélien Bompard |
831553 |
print(pagure.config.config['DB_URL'])
|
|
Pierre-Yves Chibon |
cf98be |
SESSION = pagure.lib.model_base.create_session(
|
|
Pierre-Yves Chibon |
930073 |
pagure.config.config['DB_URL'])
|
|
Pierre-Yves Chibon |
b130e5 |
|
|
Pierre-Yves Chibon |
b130e5 |
return SESSION
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Adam Williamson |
01c1a4 |
|
|
Adam Williamson |
01c1a4 |
def _get_issue(repo, objid):
|
|
Adam Williamson |
01c1a4 |
"""Get a Ticket (issue) instance for a given repo (Project) and
|
|
Adam Williamson |
01c1a4 |
objid (issue number).
|
|
Pierre-Yves Chibon |
116ab2 |
"""
|
|
Adam Williamson |
01c1a4 |
issue = None
|
|
Adam Williamson |
01c1a4 |
if not repo.settings.get('issue_tracker', True):
|
|
Adam Williamson |
01c1a4 |
raise PagureEvException("No issue tracker found for this project")
|
|
Pierre-Yves Chibon |
116ab2 |
|
|
Pierre-Yves Chibon |
b130e5 |
session = _get_session()
|
|
Pierre-Yves Chibon |
930073 |
issue = pagure.lib.query.search_issues(session, repo, issueid=objid)
|
|
Pierre-Yves Chibon |
116ab2 |
|
|
Adam Williamson |
01c1a4 |
if issue is None or issue.project != repo:
|
|
Adam Williamson |
01c1a4 |
raise PagureEvException("Issue '%s' not found" % objid)
|
|
Pierre-Yves Chibon |
116ab2 |
|
|
Adam Williamson |
01c1a4 |
if issue.private:
|
|
Adam Williamson |
01c1a4 |
# TODO: find a way to do auth
|
|
Adam Williamson |
01c1a4 |
raise PagureEvException(
|
|
Adam Williamson |
01c1a4 |
"This issue is private and you are not allowed to view it")
|
|
Pierre-Yves Chibon |
116ab2 |
|
|
Adam Williamson |
01c1a4 |
return issue
|
|
Pierre-Yves Chibon |
116ab2 |
|
|
Pierre-Yves Chibon |
116ab2 |
|
|
Adam Williamson |
01c1a4 |
def _get_pull_request(repo, objid):
|
|
Adam Williamson |
01c1a4 |
"""Get a PullRequest instance for a given repo (Project) and objid
|
|
Adam Williamson |
01c1a4 |
(request number).
|
|
Adam Williamson |
01c1a4 |
"""
|
|
Adam Williamson |
01c1a4 |
if not repo.settings.get('pull_requests', True):
|
|
Adam Williamson |
01c1a4 |
raise PagureEvException(
|
|
Adam Williamson |
01c1a4 |
"No pull-request tracker found for this project")
|
|
Adam Williamson |
01c1a4 |
|
|
Pierre-Yves Chibon |
b130e5 |
session = _get_session()
|
|
Pierre-Yves Chibon |
930073 |
request = pagure.lib.query.search_pull_requests(
|
|
Pierre-Yves Chibon |
b130e5 |
session, project_id=repo.id, requestid=objid)
|
|
Adam Williamson |
01c1a4 |
|
|
Adam Williamson |
01c1a4 |
if request is None or request.project != repo:
|
|
Adam Williamson |
01c1a4 |
raise PagureEvException("Pull-Request '%s' not found" % objid)
|
|
Adam Williamson |
01c1a4 |
|
|
Adam Williamson |
01c1a4 |
return request
|
|
Adam Williamson |
01c1a4 |
|
|
Adam Williamson |
01c1a4 |
|
|
Adam Williamson |
01c1a4 |
# Dict representing known object types that we handle requests for,
|
|
Adam Williamson |
01c1a4 |
# and the bound functions for getting an object instance from the
|
|
Adam Williamson |
01c1a4 |
# parsed path data. Has to come after the functions it binds
|
|
Adam Williamson |
01c1a4 |
OBJECTS = {
|
|
Adam Williamson |
01c1a4 |
'issue': _get_issue,
|
|
Adam Williamson |
01c1a4 |
'pull-request': _get_pull_request
|
|
Adam Williamson |
01c1a4 |
}
|
|
Adam Williamson |
01c1a4 |
|
|
Adam Williamson |
01c1a4 |
|
|
Adam Williamson |
01c1a4 |
def get_obj_from_path(path):
|
|
Adam Williamson |
01c1a4 |
""" Return the Ticket or Request object based on the path provided.
|
|
Adam Williamson |
01c1a4 |
"""
|
|
Pierre-Yves Chibon |
c13fca |
(username, namespace, reponame, objtype, objid) = pagure.utils.parse_path(
|
|
Pierre-Yves Chibon |
c13fca |
path)
|
|
Pierre-Yves Chibon |
b130e5 |
session = _get_session()
|
|
Pierre-Yves Chibon |
930073 |
repo = pagure.lib.query.get_authorized_project(
|
|
Pierre-Yves Chibon |
b130e5 |
session, reponame, user=username, namespace=namespace)
|
|
Farhaan Bukhsh |
baa162 |
|
|
Adam Williamson |
01c1a4 |
if repo is None:
|
|
Adam Williamson |
01c1a4 |
raise PagureEvException("Project '%s' not found" % reponame)
|
|
Pierre-Yves Chibon |
116ab2 |
|
|
Adam Williamson |
01c1a4 |
# find the appropriate object getter function from OBJECTS
|
|
Adam Williamson |
01c1a4 |
try:
|
|
Adam Williamson |
01c1a4 |
getfunc = OBJECTS[objtype]
|
|
Adam Williamson |
01c1a4 |
except KeyError:
|
|
Adam Williamson |
01c1a4 |
raise PagureEvException("Invalid object provided: '%s'" % objtype)
|
|
Pierre-Yves Chibon |
e07ca8 |
|
|
Adam Williamson |
01c1a4 |
return getfunc(repo, objid)
|
|
Pierre-Yves Chibon |
116ab2 |
|
|
Pierre-Yves Chibon |
116ab2 |
|
|
Neal Gompa |
4b2e7f |
@trololio.coroutine
|
|
Pierre-Yves Chibon |
1dd40f |
def handle_client(client_reader, client_writer):
|
|
Pierre-Yves Chibon |
ac3518 |
data = None
|
|
Pierre-Yves Chibon |
ac3518 |
while True:
|
|
Pierre-Yves Chibon |
ac3518 |
# give client a chance to respond, timeout after 10 seconds
|
|
Julen Landa Alustiza |
47a9ab |
line = yield trololio.From(trololio.asyncio.wait_for(
|
|
Pierre-Yves Chibon |
ac3518 |
client_reader.readline(),
|
|
Pierre-Yves Chibon |
ac3518 |
timeout=10.0))
|
|
Pierre-Yves Chibon |
ac3518 |
if not line.decode().strip():
|
|
Pierre-Yves Chibon |
ac3518 |
break
|
|
Pierre-Yves Chibon |
ac3518 |
line = line.decode().rstrip()
|
|
Pierre-Yves Chibon |
ac3518 |
if data is None:
|
|
Pierre-Yves Chibon |
ac3518 |
data = line
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
if data is None:
|
|
Pierre-Yves Chibon |
1dd40f |
log.warning("Expected ticket uid, received None")
|
|
Pierre-Yves Chibon |
1dd40f |
return
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
data = data.decode().rstrip().split()
|
|
Pierre-Yves Chibon |
1dd40f |
log.info("Received %s", data)
|
|
Pierre-Yves Chibon |
1dd40f |
if not data:
|
|
Pierre-Yves Chibon |
1dd40f |
log.warning("No URL provided: %s" % data)
|
|
Pierre-Yves Chibon |
1dd40f |
return
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Abhijeet Kasurde |
a8d2ec |
if '/' not in data[1]:
|
|
Pierre-Yves Chibon |
1dd40f |
log.warning("Invalid URL provided: %s" % data[1])
|
|
Pierre-Yves Chibon |
1dd40f |
return
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Aurélien Bompard |
831553 |
url = urlparse(data[1])
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
3d3084 |
try:
|
|
Pierre-Yves Chibon |
3d3084 |
obj = get_obj_from_path(url.path)
|
|
Pierre-Yves Chibon |
c13fca |
except PagureException as err:
|
|
Pierre-Yves Chibon |
3d3084 |
log.warning(err.message)
|
|
Pierre-Yves Chibon |
3d3084 |
return
|
|
Pierre-Yves Chibon |
3d3084 |
|
|
Pierre-Yves Chibon |
b130e5 |
origin = pagure.config.config.get('APP_URL')
|
|
Pierre-Yves Chibon |
734609 |
if origin.endswith('/'):
|
|
Pierre-Yves Chibon |
734609 |
origin = origin[:-1]
|
|
Pierre-Yves Chibon |
734609 |
|
|
Pierre-Yves Chibon |
1dd40f |
client_writer.write((
|
|
Pierre-Yves Chibon |
1dd40f |
"HTTP/1.0 200 OK\n"
|
|
Pierre-Yves Chibon |
1dd40f |
"Content-Type: text/event-stream\n"
|
|
Pierre-Yves Chibon |
1dd40f |
"Cache: nocache\n"
|
|
Pierre-Yves Chibon |
1dd40f |
"Connection: keep-alive\n"
|
|
Pierre-Yves Chibon |
734609 |
"Access-Control-Allow-Origin: %s\n\n" % origin
|
|
Pierre-Yves Chibon |
1dd40f |
).encode())
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1f4ebf |
|
|
Patrick Uiterwijk |
c998ef |
conn = redis.Redis(connection_pool=POOL)
|
|
Patrick Uiterwijk |
c998ef |
subscriber = conn.pubsub(ignore_subscribe_messages=True)
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Patrick Uiterwijk |
c998ef |
try:
|
|
Patrick Uiterwijk |
c998ef |
subscriber.subscribe('pagure.%s' % obj.uid)
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
# Inside a while loop, wait for incoming events.
|
|
Patrick Uiterwijk |
c998ef |
oncall = 0
|
|
Pierre-Yves Chibon |
1dd40f |
while True:
|
|
Patrick Uiterwijk |
c998ef |
msg = subscriber.get_message()
|
|
Patrick Uiterwijk |
c998ef |
if msg is None:
|
|
Patrick Uiterwijk |
c998ef |
# Send a ping to see if the client is still alive
|
|
Patrick Uiterwijk |
c998ef |
if oncall >= 5:
|
|
Patrick Uiterwijk |
c998ef |
# Only send a ping once every 5 seconds
|
|
Patrick Uiterwijk |
c998ef |
client_writer.write(('event: ping\n\n').encode())
|
|
Patrick Uiterwijk |
c998ef |
oncall = 0
|
|
Patrick Uiterwijk |
c998ef |
oncall += 1
|
|
Neal Gompa |
4b2e7f |
yield trololio.From(client_writer.drain())
|
|
Julen Landa Alustiza |
47a9ab |
yield trololio.From(trololio.asyncio.sleep(1))
|
|
Patrick Uiterwijk |
c998ef |
else:
|
|
Patrick Uiterwijk |
c998ef |
log.info("Sending %s", msg['data'])
|
|
Patrick Uiterwijk |
c998ef |
client_writer.write(('data: %s\n\n' % msg['data']).encode())
|
|
Neal Gompa |
4b2e7f |
yield trololio.From(client_writer.drain())
|
|
Patrick Uiterwijk |
c998ef |
|
|
Patrick Uiterwijk |
c998ef |
except OSError:
|
|
Patrick Uiterwijk |
c998ef |
log.info("Client closed connection")
|
|
Neal Gompa |
4b2e7f |
except trololio.ConnectionResetError as err:
|
|
Pierre-Yves Chibon |
e89a19 |
log.exception("ERROR: ConnectionResetError in handle_client")
|
|
Pierre-Yves Chibon |
1f4ebf |
except Exception as err:
|
|
Pierre-Yves Chibon |
e89a19 |
log.exception("ERROR: Exception in handle_client")
|
|
Patrick Uiterwijk |
c998ef |
log.info(type(err))
|
|
Pierre-Yves Chibon |
1dd40f |
finally:
|
|
Pierre-Yves Chibon |
1dd40f |
# Wathever happens, close the connection.
|
|
Patrick Uiterwijk |
c998ef |
log.info("Client left. Goodbye!")
|
|
Patrick Uiterwijk |
c998ef |
subscriber.close()
|
|
Pierre-Yves Chibon |
1dd40f |
client_writer.close()
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Neal Gompa |
4b2e7f |
@trololio.coroutine
|
|
Pierre-Yves Chibon |
39bf6b |
def stats(client_reader, client_writer):
|
|
Pierre-Yves Chibon |
39bf6b |
|
|
Pierre-Yves Chibon |
39bf6b |
try:
|
|
Pierre-Yves Chibon |
39bf6b |
log.info('Clients: %s', SERVER.active_count)
|
|
Pierre-Yves Chibon |
39bf6b |
client_writer.write((
|
|
Pierre-Yves Chibon |
39bf6b |
"HTTP/1.0 200 OK\n"
|
|
Pierre-Yves Chibon |
39bf6b |
"Cache: nocache\n\n"
|
|
Pierre-Yves Chibon |
39bf6b |
).encode())
|
|
Pierre-Yves Chibon |
39bf6b |
client_writer.write(('data: %s\n\n' % SERVER.active_count).encode())
|
|
Neal Gompa |
4b2e7f |
yield trololio.From(client_writer.drain())
|
|
Pierre-Yves Chibon |
39bf6b |
|
|
Neal Gompa |
4b2e7f |
except trololio.ConnectionResetError as err:
|
|
Pierre-Yves Chibon |
af9aab |
log.info(err)
|
|
Pierre-Yves Chibon |
39bf6b |
finally:
|
|
Pierre-Yves Chibon |
39bf6b |
client_writer.close()
|
|
Pierre-Yves Chibon |
39bf6b |
return
|
|
Pierre-Yves Chibon |
39bf6b |
|
|
Pierre-Yves Chibon |
39bf6b |
|
|
Pierre-Yves Chibon |
1dd40f |
def main():
|
|
Pierre-Yves Chibon |
39bf6b |
global SERVER
|
|
Pierre-Yves Chibon |
b130e5 |
_get_session()
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
try:
|
|
Julen Landa Alustiza |
47a9ab |
loop = trololio.asyncio.get_event_loop()
|
|
Julen Landa Alustiza |
47a9ab |
coro = trololio.asyncio.start_server(
|
|
Pierre-Yves Chibon |
8d3302 |
handle_client,
|
|
Pierre-Yves Chibon |
8d3302 |
host=None,
|
|
Pierre-Yves Chibon |
b130e5 |
port=pagure.config.config['EVENTSOURCE_PORT'],
|
|
Pierre-Yves Chibon |
8d3302 |
loop=loop)
|
|
Pierre-Yves Chibon |
39bf6b |
SERVER = loop.run_until_complete(coro)
|
|
Abhijeet Kasurde |
a8d2ec |
log.info(
|
|
Abhijeet Kasurde |
a8d2ec |
'Serving server at {}'.format(SERVER.sockets[0].getsockname()))
|
|
Pierre-Yves Chibon |
b130e5 |
if pagure.config.config.get('EV_STATS_PORT'):
|
|
Julen Landa Alustiza |
47a9ab |
stats_coro = trololio.asyncio.start_server(
|
|
Pierre-Yves Chibon |
39bf6b |
stats,
|
|
Pierre-Yves Chibon |
39bf6b |
host=None,
|
|
Pierre-Yves Chibon |
b130e5 |
port=pagure.config.config.get('EV_STATS_PORT'),
|
|
Pierre-Yves Chibon |
39bf6b |
loop=loop)
|
|
Pierre-Yves Chibon |
39bf6b |
stats_server = loop.run_until_complete(stats_coro)
|
|
Pierre-Yves Chibon |
d8a38d |
log.info('Serving stats at {}'.format(
|
|
Pierre-Yves Chibon |
d8a38d |
stats_server.sockets[0].getsockname()))
|
|
Pierre-Yves Chibon |
1dd40f |
loop.run_forever()
|
|
Pierre-Yves Chibon |
1dd40f |
except KeyboardInterrupt:
|
|
Pierre-Yves Chibon |
1dd40f |
pass
|
|
Neal Gompa |
4b2e7f |
except trololio.ConnectionResetError as err:
|
|
Pierre-Yves Chibon |
e89a19 |
log.exception("ERROR: ConnectionResetError in main")
|
|
Abhijeet Kasurde |
a8d2ec |
except Exception:
|
|
Pierre-Yves Chibon |
e89a19 |
log.exception("ERROR: Exception in main")
|
|
Pierre-Yves Chibon |
069455 |
finally:
|
|
Pierre-Yves Chibon |
069455 |
# Close the server
|
|
Pierre-Yves Chibon |
069455 |
SERVER.close()
|
|
Pierre-Yves Chibon |
b130e5 |
if pagure.config.config.get('EV_STATS_PORT'):
|
|
Pierre-Yves Chibon |
069455 |
stats_server.close()
|
|
Pierre-Yves Chibon |
069455 |
log.info("End Connection")
|
|
Pierre-Yves Chibon |
069455 |
loop.run_until_complete(SERVER.wait_closed())
|
|
Pierre-Yves Chibon |
069455 |
loop.close()
|
|
Pierre-Yves Chibon |
069455 |
log.info("End")
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
if __name__ == '__main__':
|
|
Pierre-Yves Chibon |
1dd40f |
log = logging.getLogger("")
|
|
Pierre-Yves Chibon |
1dd40f |
formatter = logging.Formatter(
|
|
Pierre-Yves Chibon |
1dd40f |
"%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s")
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
# setup console logging
|
|
Pierre-Yves Chibon |
1dd40f |
log.setLevel(logging.DEBUG)
|
|
Pierre-Yves Chibon |
1dd40f |
ch = logging.StreamHandler()
|
|
Pierre-Yves Chibon |
1dd40f |
ch.setLevel(logging.DEBUG)
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
aslog = logging.getLogger("asyncio")
|
|
Pierre-Yves Chibon |
1dd40f |
aslog.setLevel(logging.DEBUG)
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
ch.setFormatter(formatter)
|
|
Pierre-Yves Chibon |
1dd40f |
log.addHandler(ch)
|
|
Pierre-Yves Chibon |
1dd40f |
main()
|