Blame connection.cpp

f07ad6
541903
#include <chrono></chrono>
541903
541903
#include "utils.h"
f07ad6
#include "protocol.h"
f07ad6
#include "connection.h"
f07ad6
f07ad6
f07ad6
Connection::Connection():
541903
	socket(), lastId(), closeRequested(), closeTimeUs() { }
f07ad6
f07ad6
Connection::~Connection()
f07ad6
	{ close(); }
f07ad6
f07ad6
f07ad6
Protocol& Connection::getProtocol() const
f07ad6
	{ assert(socket); return socket->protocol; }
f07ad6
const Address& Connection::getRemoteAddress() const
f07ad6
	{ assert(socket); return socket->address; }
541903
const ::Handle<server>& Connection::getServer() const</server>
f07ad6
	{ assert(socket); return socket->server; }
f07ad6
f07ad6
f07ad6
void Connection::open(Socket *socket) {
f07ad6
	Lock lock(mutex);
f07ad6
	close();
f07ad6
	if (!socket) return;
f07ad6
	this->socket = socket;
f07ad6
	onOpen();
f07ad6
}
f07ad6
f07ad6
f07ad6
void Connection::close(ErrorCode errorCode) {
f07ad6
	Socket *socketCopy;
f07ad6
	{
f07ad6
		Lock lock(mutex);
f07ad6
		if (!socket)
f07ad6
			return;
f07ad6
		if (!errorCode && (!readQueue.empty() || !writeQueue.empty()))
f07ad6
			errorCode = ERR_CONNECTION_UNFINISHED_TASKS;
f07ad6
		
f07ad6
		onClose(errorCode);
f07ad6
		
f07ad6
		readQueue.clear();
f07ad6
		writeQueue.clear();
f07ad6
		socketCopy = socket;
f07ad6
		socket = nullptr;
541903
		closeRequested = false;
541903
		closeTimeUs = 0;
541903
		closeAwaitCondition.notify_all();
f07ad6
	}
541903
	socketCopy->finalize();
f07ad6
}
f07ad6
f07ad6
f07ad6
Connection::ReqId Connection::writeReq(const void *data, size_t size, void *userData) {
f07ad6
	if (!data || !size) return 0;
f07ad6
	Lock lock(mutex);
f07ad6
	if (!socket) return 0;
f07ad6
	writeQueue.emplace_back(++lastId, userData, data, size);
f07ad6
	return lastId;
f07ad6
}
f07ad6
f07ad6
f07ad6
Connection::ReqId Connection::readReq(void *data, size_t size, void *userData) {
f07ad6
	if (!data || !size) return 0;
f07ad6
	Lock lock(mutex);
f07ad6
	if (!socket) return 0;
f07ad6
	readQueue.emplace_back(++lastId, userData, data, size);
f07ad6
	return lastId;
f07ad6
}
f07ad6
f07ad6
541903
void Connection::closeReq() {
541903
	Lock lock(mutex);
541903
	if (!socket || closeRequested) return;
541903
	closeRequested = true;
541903
	onCloseRequested();
541903
}
541903
541903
541903
void Connection::closeAwait(unsigned long long timeoutUs, bool withRequest) {
541903
	std::unique_lock<mutex> uniqlock(mutex);</mutex>
541903
541903
	if (!socket) return;
541903
	if (withRequest) closeReq();
541903
	unsigned long long timeUs = monotonicTimeUs() + timeoutUs;
541903
	if (closeTimeUs > timeUs)
541903
		{ closeTimeUs = timeUs; closeAwaitCondition.notify_all(); }
541903
	
541903
	while(!socket) {
541903
		unsigned long long timeUs = monotonicTimeUs();
541903
		if (timeUs >= closeTimeUs)
541903
			{ close(); break; }
541903
		closeAwaitCondition.wait_for(uniqlock, std::chrono::microseconds(closeTimeUs - timeUs));
541903
	}
541903
}
541903
541903
f07ad6
void Connection::onOpen() { }
541903
void Connection::onCloseRequested() { }
f07ad6
void Connection::onClose(ErrorCode) { }
f07ad6
void Connection::onReadReady(const ReadReq&) { }
f07ad6
void Connection::onWriteReady(const WriteReq&) { }
f07ad6