Blob Blame Raw

#include <chrono>

#include "utils.h"
#include "protocol.h"


Connection::Connection():
	socket(), lastId(), closeRequested(), closeTimeUs() { }

Connection::~Connection()
	{ close(); }


Protocol& Connection::getProtocol() const
	{ assert(socket); return socket->protocol; }
const Address& Connection::getRemoteAddress() const
	{ assert(socket); return socket->address; }
const Server::Handle& Connection::getServer() const
	{ assert(socket); return socket->server; }


void Connection::open(Socket *socket) {
	Lock lock(mutex);
	close();
	if (!socket) return;
	this->socket = socket;
	onOpen();
}


void Connection::close(ErrorCode errorCode) {
	Socket *socketCopy;
	{
		Lock lock(mutex);
		if (!socket)
			return;
		if (!errorCode && (!readQueue.empty() || !writeQueue.empty()))
			errorCode = ERR_CONNECTION_UNFINISHED_TASKS;
		
		onClose(errorCode);
		
		readQueue.clear();
		writeQueue.clear();
		socketCopy = socket;
		socket = nullptr;
		closeRequested = false;
		closeTimeUs = 0;
		closeAwaitCondition.notify_all();
	}
	socketCopy->finalize();
}


Connection::ReqId Connection::writeReq(const void *data, size_t size, void *userData) {
	if (!data || !size) return 0;
	Lock lock(mutex);
	if (!socket) return 0;
	writeQueue.emplace_back(++lastId, userData, data, size);
	return lastId;
}


Connection::ReqId Connection::readReq(void *data, size_t size, void *userData) {
	if (!data || !size) return 0;
	Lock lock(mutex);
	if (!socket) return 0;
	readQueue.emplace_back(++lastId, userData, data, size);
	return lastId;
}


void Connection::closeReq() {
	Lock lock(mutex);
	if (!socket || closeRequested) return;
	closeRequested = true;
	onCloseRequested();
}


void Connection::closeAwait(unsigned long long timeoutUs, bool withRequest) {
	std::unique_lock<Mutex> uniqlock(mutex);

	if (!socket) return;
	if (withRequest) closeReq();
	unsigned long long timeUs = monotonicTimeUs() + timeoutUs;
	if (closeTimeUs > timeUs)
		{ closeTimeUs = timeUs; closeAwaitCondition.notify_all(); }
	
	while(!socket) {
		unsigned long long timeUs = monotonicTimeUs();
		if (timeUs >= closeTimeUs)
			{ close(); break; }
		closeAwaitCondition.wait_for(uniqlock, std::chrono::microseconds(closeTimeUs - timeUs));
	}
}


void Connection::onOpen() { }
void Connection::onCloseRequested() { }
void Connection::onClose(ErrorCode) { }
void Connection::onReadReady(const ReadReq&) { }
void Connection::onWriteReady(const WriteReq&) { }