Blame connection.cpp

f07ad6
541903
#include <chrono></chrono>
541903
541903
#include "utils.h"
f07ad6
#include "protocol.h"
f07ad6
f07ad6
f07ad6
Connection::Connection():
646228
	started(), switching(), socket(), lastId(), closeRequested(), closeTimeUs() { }
f07ad6
646228
646228
Connection::~Connection() {
646228
	Lock lock(mutex);
646228
	assert(!started && !socket);
646228
	clean();
646228
}
f07ad6
f07ad6
f07ad6
Protocol& Connection::getProtocol() const
f07ad6
	{ assert(socket); return socket->protocol; }
f07ad6
const Address& Connection::getRemoteAddress() const
f07ad6
	{ assert(socket); return socket->address; }
9bbba5
const Connection::Handle& Connection::getConnection() const
9bbba5
	{ assert(socket); return socket->connection; }
98bb38
const Server::Handle& Connection::getServer() const
f07ad6
	{ assert(socket); return socket->server; }
f07ad6
f07ad6
646228
void Connection::clean() {
646228
	readQueue.clear();
646228
	writeQueue.clear();
646228
	started = false;
646228
	socket = nullptr;
646228
	closeRequested = false;
646228
	closeTimeUs = 0;
646228
}
646228
646228
2499ad
void Connection::readDone() {
2499ad
	ReadReq req = readQueue.front();
2499ad
	readQueue.pop_front();
2499ad
	onReadReady(req);
2499ad
	if (isAllReadDone()) socket->setWantRead(false);
2499ad
}
2499ad
2499ad
2499ad
void Connection::writeDone() {
2499ad
	WriteReq req = writeQueue.front();
2499ad
	writeQueue.pop_front();
2499ad
	onWriteReady(req);
2499ad
	if (isAllWriteDone()) socket->setWantWrite(false);
2499ad
}
2499ad
2499ad
646228
ErrorCode Connection::open(Socket *socket) {
f07ad6
	Lock lock(mutex);
f07ad6
	close();
646228
	
646228
	if (!socket) return ERR_INVALID_ARGS;
646228
	if (switching) return ERR_CONNECTION_IS_SWITCHING;
f07ad6
	this->socket = socket;
646228
	
646228
	switching = true;
646228
	ErrorCode errorCode = onOpen();
646228
	if (errorCode) clean(); else started = true;
646228
	switching = false;
646228
646228
	return errorCode;
f07ad6
}
f07ad6
f07ad6
f07ad6
void Connection::close(ErrorCode errorCode) {
646228
	Lock lock(mutex);
646228
	
646228
	if (!started || switching) return;
646228
	if (!errorCode && (!readQueue.empty() || !writeQueue.empty()))
646228
		errorCode = ERR_CONNECTION_UNFINISHED_TASKS;
646228
	
646228
	switching = true;
646228
	onClose(errorCode);
5a39be
	getProtocol().serverDisconnect(getServer(), getConnection());
646228
	Socket *socketCopy = socket;
646228
	clean();
646228
	switching = false;
646228
	
646228
	closeWaitCondition.notify_all();
541903
	socketCopy->finalize();
f07ad6
}
f07ad6
f07ad6
541903
void Connection::closeReq() {
541903
	Lock lock(mutex);
2499ad
	if (!started || switching || closeRequested) return;
541903
	closeRequested = true;
541903
	onCloseRequested();
541903
}
541903
541903
646228
void Connection::closeWait(unsigned long long timeoutUs, bool withRequest) {
541903
	std::unique_lock<mutex> uniqlock(mutex);</mutex>
541903
2499ad
	if (!started || switching) return;
541903
	if (withRequest) closeReq();
541903
	unsigned long long timeUs = monotonicTimeUs() + timeoutUs;
541903
	if (closeTimeUs > timeUs)
646228
		{ closeTimeUs = timeUs; closeWaitCondition.notify_all(); }
541903
	
646228
	while(started) {
541903
		unsigned long long timeUs = monotonicTimeUs();
541903
		if (timeUs >= closeTimeUs)
646228
			{ close(ERR_CONNECTION_LOST); break; }
646228
		closeWaitCondition.wait_for(uniqlock, std::chrono::microseconds(closeTimeUs - timeUs));
541903
	}
541903
}
541903
541903
5a39be
Connection::ReqId Connection::writeReq(const void *data, size_t size, void *userData) {
5a39be
	if (!data || !size) return 0;
5a39be
	Lock lock(mutex);
2499ad
	if (started == switching) return 0; // already closed or closing now
5a39be
	writeQueue.emplace_back(++lastId, userData, data, size);
2499ad
	socket->setWantWrite(true);
5a39be
	return lastId;
5a39be
}
5a39be
5a39be
5a39be
Connection::ReqId Connection::readReq(void *data, size_t size, void *userData) {
5a39be
	if (!data || !size) return 0;
5a39be
	Lock lock(mutex);
2499ad
	if (started == switching) return 0; // already closed or closing now
5a39be
	readQueue.emplace_back(++lastId, userData, data, size);
2499ad
	socket->setWantRead(true);
5a39be
	return lastId;
5a39be
}
5a39be
5a39be
646228
ErrorCode Connection::onOpen() { return ERR_NONE; }
541903
void Connection::onCloseRequested() { }
f07ad6
void Connection::onClose(ErrorCode) { }
f07ad6
void Connection::onReadReady(const ReadReq&) { }
f07ad6
void Connection::onWriteReady(const WriteReq&) { }
f07ad6