Blob Blame Raw

#include <chrono>

#include "utils.h"
#include "protocol.h"


Connection::Connection():
	started(), switching(), socket(), lastId(), closeRequested(), closeTimeUs() { }


Connection::~Connection() {
	Lock lock(mutex);
	assert(!started && !socket);
	clean();
}


Protocol& Connection::getProtocol() const
	{ assert(socket); return socket->protocol; }
const Address& Connection::getRemoteAddress() const
	{ assert(socket); return socket->address; }
const Connection::Handle& Connection::getConnection() const
	{ assert(socket); return socket->connection; }
const Server::Handle& Connection::getServer() const
	{ assert(socket); return socket->server; }


void Connection::clean() {
	readQueue.clear();
	writeQueue.clear();
	started = false;
	socket = nullptr;
	closeRequested = false;
	closeTimeUs = 0;
}


void Connection::readDone() {
	ReadReq req = readQueue.front();
	readQueue.pop_front();
	onReadReady(req);
	if (isAllReadDone()) socket->setWantRead(false);
}


void Connection::writeDone() {
	WriteReq req = writeQueue.front();
	writeQueue.pop_front();
	onWriteReady(req);
	if (isAllWriteDone()) socket->setWantWrite(false);
}


ErrorCode Connection::open(Socket *socket) {
	Lock lock(mutex);
	close();
	
	if (!socket) return ERR_INVALID_ARGS;
	if (switching) return ERR_CONNECTION_IS_SWITCHING;
	this->socket = socket;
	
	switching = true;
	ErrorCode errorCode = onOpen();
	if (errorCode) clean(); else started = true;
	switching = false;

	return errorCode;
}


void Connection::close(ErrorCode errorCode) {
	Lock lock(mutex);
	
	if (!started || switching) return;
	if (!errorCode && (!readQueue.empty() || !writeQueue.empty()))
		errorCode = ERR_CONNECTION_UNFINISHED_TASKS;
	
	switching = true;
	onClose(errorCode);
	getProtocol().serverDisconnect(getServer(), getConnection());
	Socket *socketCopy = socket;
	clean();
	switching = false;
	
	closeWaitCondition.notify_all();
	socketCopy->finalize();
}


void Connection::closeReq() {
	Lock lock(mutex);
	if (!started || switching || closeRequested) return;
	closeRequested = true;
	onCloseRequested();
}


void Connection::closeWait(unsigned long long timeoutUs, bool withRequest) {
	std::unique_lock<Mutex> uniqlock(mutex);

	if (!started || switching) return;
	if (withRequest) closeReq();
	unsigned long long timeUs = monotonicTimeUs() + timeoutUs;
	if (closeTimeUs > timeUs)
		{ closeTimeUs = timeUs; closeWaitCondition.notify_all(); }
	
	while(started) {
		unsigned long long timeUs = monotonicTimeUs();
		if (timeUs >= closeTimeUs)
			{ close(ERR_CONNECTION_LOST); break; }
		closeWaitCondition.wait_for(uniqlock, std::chrono::microseconds(closeTimeUs - timeUs));
	}
}


Connection::ReqId Connection::writeReq(const void *data, size_t size, void *userData) {
	if (!data || !size) return 0;
	Lock lock(mutex);
	if (started == switching) return 0; // already closed or closing now
	writeQueue.emplace_back(++lastId, userData, data, size);
	socket->setWantWrite(true);
	return lastId;
}


Connection::ReqId Connection::readReq(void *data, size_t size, void *userData) {
	if (!data || !size) return 0;
	Lock lock(mutex);
	if (started == switching) return 0; // already closed or closing now
	readQueue.emplace_back(++lastId, userData, data, size);
	socket->setWantRead(true);
	return lastId;
}


ErrorCode Connection::onOpen() { return ERR_NONE; }
void Connection::onCloseRequested() { }
void Connection::onClose(ErrorCode) { }
void Connection::onReadReady(const ReadReq&) { }
void Connection::onWriteReady(const WriteReq&) { }