|
Pierre-Yves Chibon |
1dd40f |
#!/usr/bin/env python
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
"""
|
|
Pierre-Yves Chibon |
0d7c61 |
(c) 2015 - Copyright Red Hat Inc
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
Authors:
|
|
Pierre-Yves Chibon |
1dd40f |
Pierre-Yves Chibon <pingou@pingoured.fr></pingou@pingoured.fr>
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
Streaming server for pagure's eventsource feature
|
|
Pierre-Yves Chibon |
1dd40f |
This server takes messages sent to redis and publish them at the specified
|
|
Pierre-Yves Chibon |
1dd40f |
endpoint
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
To test, run this script and in another terminal
|
|
Pierre-Yves Chibon |
1dd40f |
nc localhost 8080
|
|
Pierre-Yves Chibon |
1dd40f |
HELLO
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
GET /test/issue/26?foo=bar HTTP/1.1
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
"""
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
import datetime
|
|
Pierre-Yves Chibon |
1dd40f |
import logging
|
|
Pierre-Yves Chibon |
1dd40f |
import os
|
|
Pierre-Yves Chibon |
1dd40f |
import urlparse
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
import trollius
|
|
Pierre-Yves Chibon |
1dd40f |
import trollius_redis
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
log = logging.getLogger(__name__)
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
if 'PAGURE_CONFIG' not in os.environ \
|
|
Pierre-Yves Chibon |
1dd40f |
and os.path.exists('/etc/pagure/pagure.cfg'):
|
|
Pierre-Yves Chibon |
1dd40f |
print 'Using configuration file `/etc/pagure/pagure.cfg`'
|
|
Pierre-Yves Chibon |
1dd40f |
os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg'
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
import pagure
|
|
Pierre-Yves Chibon |
1dd40f |
import pagure.lib
|
|
Pierre-Yves Chibon |
116ab2 |
from pagure.exceptions import PagureEvException
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
39bf6b |
SERVER = None
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
116ab2 |
def get_obj_from_path(path):
|
|
Pierre-Yves Chibon |
116ab2 |
""" Return the Ticket or Request object based on the path provided.
|
|
Pierre-Yves Chibon |
116ab2 |
"""
|
|
Pierre-Yves Chibon |
116ab2 |
username = None
|
|
Pierre-Yves Chibon |
048e58 |
try:
|
|
Pierre-Yves Chibon |
048e58 |
if path.startswith('/fork'):
|
|
Pierre-Yves Chibon |
048e58 |
username, repo, obj, objid = path.split('/')[2:6]
|
|
Pierre-Yves Chibon |
048e58 |
else:
|
|
Pierre-Yves Chibon |
048e58 |
repo, obj, objid = path.split('/')[1:4]
|
|
Pierre-Yves Chibon |
048e58 |
except:
|
|
Pierre-Yves Chibon |
048e58 |
raise PagureEvException("Invalid URL: %s" % path)
|
|
Pierre-Yves Chibon |
116ab2 |
|
|
Pierre-Yves Chibon |
116ab2 |
repo = pagure.lib.get_project(pagure.SESSION, repo, user=username)
|
|
Pierre-Yves Chibon |
116ab2 |
|
|
Pierre-Yves Chibon |
116ab2 |
if repo is None:
|
|
Pierre-Yves Chibon |
116ab2 |
raise PagureEvException("Project '%s' not found" % repo)
|
|
Pierre-Yves Chibon |
116ab2 |
|
|
Pierre-Yves Chibon |
116ab2 |
output = None
|
|
Pierre-Yves Chibon |
116ab2 |
if obj == 'issue':
|
|
Pierre-Yves Chibon |
116ab2 |
if not repo.settings.get('issue_tracker', True):
|
|
Pierre-Yves Chibon |
116ab2 |
raise PagureEvException("No issue tracker found for this project")
|
|
Pierre-Yves Chibon |
116ab2 |
|
|
Pierre-Yves Chibon |
116ab2 |
output = pagure.lib.search_issues(
|
|
Pierre-Yves Chibon |
116ab2 |
pagure.SESSION, repo, issueid=objid)
|
|
Pierre-Yves Chibon |
116ab2 |
|
|
Pierre-Yves Chibon |
116ab2 |
if output is None or output.project != repo:
|
|
Pierre-Yves Chibon |
116ab2 |
raise PagureEvException("Issue '%s' not found" % objid)
|
|
Pierre-Yves Chibon |
116ab2 |
|
|
Pierre-Yves Chibon |
116ab2 |
if output.private:
|
|
Pierre-Yves Chibon |
116ab2 |
# TODO: find a way to do auth
|
|
Pierre-Yves Chibon |
116ab2 |
raise PagureEvException(
|
|
Pierre-Yves Chibon |
116ab2 |
"This issue is private and you are not allowed to view it")
|
|
Pierre-Yves Chibon |
e07ca8 |
elif obj == 'pull-request':
|
|
Pierre-Yves Chibon |
116ab2 |
if not repo.settings.get('pull_requests', True):
|
|
Pierre-Yves Chibon |
116ab2 |
raise PagureEvException(
|
|
Pierre-Yves Chibon |
116ab2 |
"No pull-request tracker found for this project")
|
|
Pierre-Yves Chibon |
116ab2 |
|
|
Pierre-Yves Chibon |
116ab2 |
output = pagure.lib.search_pull_requests(
|
|
Pierre-Yves Chibon |
116ab2 |
pagure.SESSION, project_id=repo.id, requestid=objid)
|
|
Pierre-Yves Chibon |
116ab2 |
|
|
Pierre-Yves Chibon |
116ab2 |
if output is None or output.project != repo:
|
|
Pierre-Yves Chibon |
116ab2 |
raise PagureEvException("Pull-Request '%s' not found" % objid)
|
|
Pierre-Yves Chibon |
116ab2 |
|
|
Pierre-Yves Chibon |
e07ca8 |
else:
|
|
Pierre-Yves Chibon |
e07ca8 |
raise PagureEvException("Invalid object provided: '%s'" % obj)
|
|
Pierre-Yves Chibon |
e07ca8 |
|
|
Pierre-Yves Chibon |
116ab2 |
return output
|
|
Pierre-Yves Chibon |
116ab2 |
|
|
Pierre-Yves Chibon |
116ab2 |
|
|
Pierre-Yves Chibon |
1dd40f |
@trollius.coroutine
|
|
Pierre-Yves Chibon |
1dd40f |
def handle_client(client_reader, client_writer):
|
|
Pierre-Yves Chibon |
ac3518 |
data = None
|
|
Pierre-Yves Chibon |
ac3518 |
while True:
|
|
Pierre-Yves Chibon |
ac3518 |
# give client a chance to respond, timeout after 10 seconds
|
|
Pierre-Yves Chibon |
ac3518 |
line = yield trollius.From(trollius.wait_for(
|
|
Pierre-Yves Chibon |
ac3518 |
client_reader.readline(),
|
|
Pierre-Yves Chibon |
ac3518 |
timeout=10.0))
|
|
Pierre-Yves Chibon |
ac3518 |
if not line.decode().strip():
|
|
Pierre-Yves Chibon |
ac3518 |
break
|
|
Pierre-Yves Chibon |
ac3518 |
line = line.decode().rstrip()
|
|
Pierre-Yves Chibon |
ac3518 |
if data is None:
|
|
Pierre-Yves Chibon |
ac3518 |
data = line
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
if data is None:
|
|
Pierre-Yves Chibon |
1dd40f |
log.warning("Expected ticket uid, received None")
|
|
Pierre-Yves Chibon |
1dd40f |
return
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
data = data.decode().rstrip().split()
|
|
Pierre-Yves Chibon |
1dd40f |
log.info("Received %s", data)
|
|
Pierre-Yves Chibon |
1dd40f |
if not data:
|
|
Pierre-Yves Chibon |
1dd40f |
log.warning("No URL provided: %s" % data)
|
|
Pierre-Yves Chibon |
1dd40f |
return
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
if not '/' in data[1]:
|
|
Pierre-Yves Chibon |
1dd40f |
log.warning("Invalid URL provided: %s" % data[1])
|
|
Pierre-Yves Chibon |
1dd40f |
return
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
url = urlparse.urlsplit(data[1])
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
3d3084 |
try:
|
|
Pierre-Yves Chibon |
3d3084 |
obj = get_obj_from_path(url.path)
|
|
Pierre-Yves Chibon |
3d3084 |
except PagureEvException as err:
|
|
Pierre-Yves Chibon |
3d3084 |
log.warning(err.message)
|
|
Pierre-Yves Chibon |
3d3084 |
return
|
|
Pierre-Yves Chibon |
3d3084 |
|
|
Pierre-Yves Chibon |
734609 |
origin = pagure.APP.config.get('APP_URL')
|
|
Pierre-Yves Chibon |
734609 |
if origin.endswith('/'):
|
|
Pierre-Yves Chibon |
734609 |
origin = origin[:-1]
|
|
Pierre-Yves Chibon |
734609 |
|
|
Pierre-Yves Chibon |
1dd40f |
client_writer.write((
|
|
Pierre-Yves Chibon |
1dd40f |
"HTTP/1.0 200 OK\n"
|
|
Pierre-Yves Chibon |
1dd40f |
"Content-Type: text/event-stream\n"
|
|
Pierre-Yves Chibon |
1dd40f |
"Cache: nocache\n"
|
|
Pierre-Yves Chibon |
1dd40f |
"Connection: keep-alive\n"
|
|
Pierre-Yves Chibon |
734609 |
"Access-Control-Allow-Origin: %s\n\n" % origin
|
|
Pierre-Yves Chibon |
1dd40f |
).encode())
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
116ab2 |
try:
|
|
Pierre-Yves Chibon |
1dd40f |
connection = yield trollius.From(trollius_redis.Connection.create(
|
|
Pierre-Yves Chibon |
1dd40f |
host=pagure.APP.config['REDIS_HOST'],
|
|
Pierre-Yves Chibon |
1dd40f |
port=pagure.APP.config['REDIS_PORT'],
|
|
Pierre-Yves Chibon |
1dd40f |
db=pagure.APP.config['REDIS_DB']))
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
# Create subscriber.
|
|
Pierre-Yves Chibon |
1dd40f |
subscriber = yield trollius.From(connection.start_subscribe())
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
# Subscribe to channel.
|
|
Pierre-Yves Chibon |
116ab2 |
yield trollius.From(subscriber.subscribe([obj.uid]))
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
# Inside a while loop, wait for incoming events.
|
|
Pierre-Yves Chibon |
1dd40f |
while True:
|
|
Pierre-Yves Chibon |
1dd40f |
reply = yield trollius.From(subscriber.next_published())
|
|
Pierre-Yves Chibon |
1dd40f |
#print(u'Received: ', repr(reply.value), u'on channel', reply.channel)
|
|
Pierre-Yves Chibon |
1dd40f |
log.info(reply)
|
|
Pierre-Yves Chibon |
1dd40f |
log.info("Sending %s", reply.value)
|
|
Pierre-Yves Chibon |
1dd40f |
client_writer.write(('data: %s\n\n' % reply.value).encode())
|
|
Pierre-Yves Chibon |
1dd40f |
yield trollius.From(client_writer.drain())
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
except trollius.ConnectionResetError:
|
|
Pierre-Yves Chibon |
1dd40f |
pass
|
|
Pierre-Yves Chibon |
1dd40f |
finally:
|
|
Pierre-Yves Chibon |
1dd40f |
# Wathever happens, close the connection.
|
|
Pierre-Yves Chibon |
1dd40f |
connection.close()
|
|
Pierre-Yves Chibon |
1dd40f |
client_writer.close()
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
39bf6b |
@trollius.coroutine
|
|
Pierre-Yves Chibon |
39bf6b |
def stats(client_reader, client_writer):
|
|
Pierre-Yves Chibon |
39bf6b |
|
|
Pierre-Yves Chibon |
39bf6b |
try:
|
|
Pierre-Yves Chibon |
39bf6b |
log.info('Clients: %s', SERVER.active_count)
|
|
Pierre-Yves Chibon |
39bf6b |
client_writer.write((
|
|
Pierre-Yves Chibon |
39bf6b |
"HTTP/1.0 200 OK\n"
|
|
Pierre-Yves Chibon |
39bf6b |
"Cache: nocache\n\n"
|
|
Pierre-Yves Chibon |
39bf6b |
).encode())
|
|
Pierre-Yves Chibon |
39bf6b |
client_writer.write(('data: %s\n\n' % SERVER.active_count).encode())
|
|
Pierre-Yves Chibon |
39bf6b |
yield trollius.From(client_writer.drain())
|
|
Pierre-Yves Chibon |
39bf6b |
|
|
Pierre-Yves Chibon |
39bf6b |
except trollius.ConnectionResetError, err:
|
|
Pierre-Yves Chibon |
af9aab |
log.info(err)
|
|
Pierre-Yves Chibon |
39bf6b |
pass
|
|
Pierre-Yves Chibon |
39bf6b |
finally:
|
|
Pierre-Yves Chibon |
39bf6b |
client_writer.close()
|
|
Pierre-Yves Chibon |
39bf6b |
return
|
|
Pierre-Yves Chibon |
39bf6b |
|
|
Pierre-Yves Chibon |
39bf6b |
|
|
Pierre-Yves Chibon |
1dd40f |
def main():
|
|
Pierre-Yves Chibon |
39bf6b |
global SERVER
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
try:
|
|
Pierre-Yves Chibon |
1dd40f |
loop = trollius.get_event_loop()
|
|
Pierre-Yves Chibon |
1dd40f |
coro = trollius.start_server(
|
|
Pierre-Yves Chibon |
8d3302 |
handle_client,
|
|
Pierre-Yves Chibon |
8d3302 |
host=None,
|
|
Pierre-Yves Chibon |
8d3302 |
port=pagure.APP.config['EVENTSOURCE_PORT'],
|
|
Pierre-Yves Chibon |
8d3302 |
loop=loop)
|
|
Pierre-Yves Chibon |
39bf6b |
SERVER = loop.run_until_complete(coro)
|
|
Pierre-Yves Chibon |
d8a38d |
log.info('Serving server at {}'.format(SERVER.sockets[0].getsockname()))
|
|
Pierre-Yves Chibon |
39bf6b |
if pagure.APP.config.get('EV_STATS_PORT'):
|
|
Pierre-Yves Chibon |
39bf6b |
stats_coro = trollius.start_server(
|
|
Pierre-Yves Chibon |
39bf6b |
stats,
|
|
Pierre-Yves Chibon |
39bf6b |
host=None,
|
|
Pierre-Yves Chibon |
39bf6b |
port=pagure.APP.config.get('EV_STATS_PORT'),
|
|
Pierre-Yves Chibon |
39bf6b |
loop=loop)
|
|
Pierre-Yves Chibon |
39bf6b |
stats_server = loop.run_until_complete(stats_coro)
|
|
Pierre-Yves Chibon |
d8a38d |
log.info('Serving stats at {}'.format(
|
|
Pierre-Yves Chibon |
d8a38d |
stats_server.sockets[0].getsockname()))
|
|
Pierre-Yves Chibon |
1dd40f |
loop.run_forever()
|
|
Pierre-Yves Chibon |
1dd40f |
except KeyboardInterrupt:
|
|
Pierre-Yves Chibon |
1dd40f |
pass
|
|
Pierre-Yves Chibon |
1dd40f |
except trollius.ConnectionResetError:
|
|
Pierre-Yves Chibon |
1dd40f |
pass
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
# Close the server
|
|
Pierre-Yves Chibon |
c86f00 |
SERVER.close()
|
|
Pierre-Yves Chibon |
c86f00 |
if pagure.APP.config.get('EV_STATS_PORT'):
|
|
Pierre-Yves Chibon |
c86f00 |
stats_server.close()
|
|
Pierre-Yves Chibon |
1dd40f |
log.info("End Connection")
|
|
Pierre-Yves Chibon |
65732d |
loop.run_until_complete(SERVER.wait_closed())
|
|
Pierre-Yves Chibon |
1dd40f |
loop.close()
|
|
Pierre-Yves Chibon |
1dd40f |
log.info("End")
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
if __name__ == '__main__':
|
|
Pierre-Yves Chibon |
1dd40f |
log = logging.getLogger("")
|
|
Pierre-Yves Chibon |
1dd40f |
formatter = logging.Formatter(
|
|
Pierre-Yves Chibon |
1dd40f |
"%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s")
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
# setup console logging
|
|
Pierre-Yves Chibon |
1dd40f |
log.setLevel(logging.DEBUG)
|
|
Pierre-Yves Chibon |
1dd40f |
ch = logging.StreamHandler()
|
|
Pierre-Yves Chibon |
1dd40f |
ch.setLevel(logging.DEBUG)
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
aslog = logging.getLogger("asyncio")
|
|
Pierre-Yves Chibon |
1dd40f |
aslog.setLevel(logging.DEBUG)
|
|
Pierre-Yves Chibon |
1dd40f |
|
|
Pierre-Yves Chibon |
1dd40f |
ch.setFormatter(formatter)
|
|
Pierre-Yves Chibon |
1dd40f |
log.addHandler(ch)
|
|
Pierre-Yves Chibon |
1dd40f |
main()
|