Blame connection.cpp

71057f
71057f
#include <cstring></cstring>
71057f
71057f
#include <chrono></chrono>
71057f
#include <thread></thread>
71057f
#include <mutex></mutex>
71057f
71057f
#include "protocol.h"
71057f
#include "connection.h"
71057f
71057f
6ebd3b
#define DebugMSG HideDebugMSG
6ebd3b
6ebd3b
6ebd3b
71057f
Connection::Connection():
71057f
	state(STATE_NONE),
71057f
	stateWanted(STATE_NONE),
71057f
	enqueued(false),
71057f
	closeWaiters(0),
71057f
	finishWaiters(0),
71057f
	address(),
71057f
	addressSize(),
71057f
	protocol(),
71057f
	server(),
71057f
	prev(), next(),
71057f
	srvPrev(), srvNext(),
71057f
	queueNext(),
71057f
	sockId(-1),
6ebd3b
	stateLocal(),
6ebd3b
	errorLocal()
71057f
	{ }
71057f
71057f
71057f
Connection::~Connection() {
61c265
	assert(state == STATE_NONE || state == STATE_FINISHED);
71057f
	closeWait();
71057f
	while(finishWaiters > 0) std::this_thread::yield();
71057f
}
71057f
71057f
71057f
bool Connection::open(Protocol &protocol, void *address, size_t addressSize, int sockId, Server *server) {
71057f
	if (!address || !addressSize || addressSize > MAX_ADDR_SIZE) return false;
71057f
	
71057f
	State s = state;
71057f
	do {
71057f
		if (s != STATE_NONE && s != STATE_FINISHED) return false;
71057f
	} while(!state.compare_exchange_weak(s, STATE_OPENING));
71057f
	if (s == STATE_FINISHED) while(finishWaiters > 0) std::this_thread::yield();
71057f
	
71057f
	ReadLock lock(protocol.stateProtector);
71057f
	Protocol::State ps = protocol.state;
61c265
	if (ps < Protocol::STATE_INITIALIZING || ps >= Protocol::STATE_CLOSE_REQ)
71057f
		{ state = STATE_NONE; return false; }
71057f
	
71057f
	memcpy(this->address, address, addressSize);
71057f
	this->addressSize = addressSize;
71057f
	this->sockId = sockId;
71057f
	this->server = server;
71057f
71057f
	this->protocol = &protocol;
71057f
	state = STATE_INITIALIZING;
71057f
	if (!enqueued.exchange(true)) protocol.connQueue.push(*this);
71057f
	protocol.wakeup();
71057f
	return true;
71057f
}
71057f
71057f
71057f
void Connection::wantState(State state, bool error) {
71057f
	ReadLock lock(stateProtector);
71057f
	State s = this->state;
6ebd3b
	DebugMSG("%d, %d", s, state);
71057f
	if (s <= STATE_OPENING || s >= STATE_CLOSING || s >= state) return;
6ebd3b
	DebugMSG("apply");
71057f
	
71057f
	State sw = stateWanted;
71057f
	do {
6ebd3b
		if (sw >= state) return;
71057f
	} while(!stateWanted.compare_exchange_weak(sw, state));
6ebd3b
	if (error) { DebugMSG("error"); error = false; this->error.compare_exchange_strong(error, true); }
71057f
	if (!enqueued.exchange(true)) protocol->connQueue.push(*this);
71057f
	protocol->wakeup();
71057f
}
71057f
71057f
71057f
void Connection::closeReq()
71057f
	{ wantState(STATE_CLOSE_REQ, false); }
71057f
void Connection::close(bool error)
71057f
	{ wantState(STATE_CLOSING, error); }
71057f
71057f
71057f
void Connection::closeWait(unsigned long long timeoutUs, bool withReq, bool error) {
71057f
	stateProtector.lock();
71057f
	State s = state;
71057f
	if (s <= STATE_OPENING || s >= STATE_FINISHED)
71057f
		{ stateProtector.unlock(); return; }
71057f
	
71057f
	if (s == STATE_CLOSED) {
71057f
		finishWaiters++;
71057f
		stateProtector.unlock();
71057f
	} else {
71057f
		if (withReq && s < STATE_CLOSE_REQ) closeReq();
71057f
		closeWaiters++;
71057f
		stateProtector.unlock();
71057f
		
71057f
		std::mutex fakeMutex;
71057f
		{
71057f
			std::unique_lock<std::mutex> fakeLock(fakeMutex);</std::mutex>
71057f
			if (timeoutUs && s < STATE_CLOSING) {
71057f
				std::chrono::steady_clock::time_point t
71057f
					= std::chrono::steady_clock::now()
71057f
						+ std::chrono::microseconds(timeoutUs);
71057f
				while(s < STATE_CLOSED) {
71057f
					if (t <= std::chrono::steady_clock::now()) { close(error); break; }
71057f
					closeCondition.wait_until(fakeLock, t);
71057f
					s = state;
71057f
				}
71057f
			}
71057f
			if (s < STATE_CLOSING) close(error);
71057f
			while(s < STATE_CLOSED) {
71057f
				closeCondition.wait(fakeLock);
71057f
				s = state;
71057f
			}
71057f
		}
71057f
		
71057f
		finishWaiters++;
71057f
		closeWaiters--;
71057f
	}
71057f
	
71057f
	
71057f
	while(s != STATE_FINISHED) { std::this_thread::yield(); s = state; }
71057f
	
71057f
	finishWaiters--;
71057f
}
71057f
71057f
71057f
bool Connection::read(Task &task) {
6ebd3b
	DebugMSG();
71057f
	ReadLock lock(stateProtector);
71057f
	State s = state;
71057f
	if (s < STATE_OPEN || s >= STATE_CLOSING) return false;
6ebd3b
	DebugMSG();
6ebd3b
	readQueue.push(task);
71057f
	protocol->updateEvents(*this);
71057f
	return true;
71057f
}
71057f
71057f
71057f
bool Connection::write(Task &task) {
6ebd3b
	DebugMSG();
71057f
	ReadLock lock(stateProtector);
71057f
	State s = state;
71057f
	if (s < STATE_OPEN || s >= STATE_CLOSING) return false;
6ebd3b
	DebugMSG();
71057f
	writeQueue.push(task);
71057f
	protocol->updateEvents(*this);
71057f
	return true;
71057f
}
71057f
71057f
71057f
void Connection::onOpeningError() { }
71057f
void Connection::onOpen(const void*, size_t) { }
71057f
bool Connection::onReadReady(Task&, bool) { return true; }
71057f
bool Connection::onWriteReady(Task&, bool) { return true; }
71057f
void Connection::onCloseReqested() { }
71057f
void Connection::onClose(bool) { }
71057f