Blob Blame Raw

#include <cstring>

#include <chrono>
#include <thread>
#include <mutex>

#include "protocol.h"
#include "server.h"


#define DebugMSG ShowDebugMSG


Server::Server():
	state(STATE_NONE),
	stateWanted(STATE_NONE),
	enqueued(false),
	error(false),
	closeWaiters(0),
	finishWaiters(0),
	address(),
	addressSize(),
	protocol(),
	prev(), next(),
	queueNext(),
	connFirst(), connLast(),
	sockId(-1),
	stateLocal()
	{ }


Server::~Server() {
	assert(state == STATE_NONE || state == STATE_FINISHED);
	closeWait();
	while(finishWaiters > 0) std::this_thread::yield();
}


bool Server::open(Protocol &protocol, void *address, size_t addressSize) {
	if (!address || !addressSize || addressSize > MAX_ADDR_SIZE)
		{ DebugMSG("bad args"); return false; }
	
	State s = state;
	do {
		if (s != STATE_NONE && s != STATE_FINISHED)
			{ DebugMSG("already open or in closing state"); return false; }
	} while(!state.compare_exchange_weak(s, STATE_OPENING));
	if (s == STATE_FINISHED) while(finishWaiters > 0) std::this_thread::yield();
	
	ReadLock lock(protocol.stateProtector);
	Protocol::State ps = protocol.state;
	if (ps < Protocol::STATE_INITIALIZING || ps >= Protocol::STATE_CLOSE_REQ)
		{ DebugMSG("protocol is closed"); state = STATE_NONE; return false; }
	
	memcpy(this->address, address, addressSize);
	this->addressSize = addressSize;

	this->protocol = &protocol;
	state = STATE_INITIALIZING;
	if (!enqueued.exchange(true)) protocol.srvQueue.push(*this);
	protocol.wakeup();
	return true;
}


void Server::wantState(State state, bool error) {
	ReadLock lock(stateProtector);
	State s = this->state;
	if (s <= STATE_OPENING || s >= STATE_CLOSING_CONNECTIONS || s >= state) return;
	
	State sw = stateWanted;
	do {
		if (sw >= state) return;
	} while(!stateWanted.compare_exchange_weak(sw, state));
	if (error) { error = false; this->error.compare_exchange_strong(error, true); }
	if (!enqueued.exchange(true)) protocol->srvQueue.push(*this);
	protocol->wakeup();
}


void Server::closeReq()
	{ wantState(STATE_CLOSE_REQ, false); }
void Server::close(bool error)
	{ wantState(STATE_CLOSING_CONNECTIONS, error); }


void Server::closeWait(unsigned long long timeoutUs, bool withReq) {
	stateProtector.lock();
	State s = state;
	if (s <= STATE_OPENING || s >= STATE_FINISHED)
		{ stateProtector.unlock(); return; }
	
	if (s == STATE_CLOSED) {
		finishWaiters++;
		stateProtector.unlock();
	} else {
		if (withReq && s < STATE_CLOSE_REQ) closeReq();
		closeWaiters++;
		stateProtector.unlock();
		
		std::mutex fakeMutex;
		{
			std::unique_lock<std::mutex> fakeLock(fakeMutex);
			if (timeoutUs && s < STATE_CLOSING) {
				std::chrono::steady_clock::time_point t
					= std::chrono::steady_clock::now()
						+ std::chrono::microseconds(timeoutUs);
				while(s < STATE_CLOSED) {
					if (t <= std::chrono::steady_clock::now()) { close(true); break; }
					closeCondition.wait_until(fakeLock, t);
					s = state;
				}
			}
			if (s < STATE_CLOSING) close(true);
			while(s < STATE_CLOSED) {
				closeCondition.wait(fakeLock);
				s = state;
			}
		}
		
		finishWaiters++;
		closeWaiters--;
	}
	
	
	while(s != STATE_FINISHED) { std::this_thread::yield(); s = state; }
	
	finishWaiters--;
}


void Server::onOpeningError() { }
void Server::onOpen() { }
Connection* Server::onConnect(const void*, size_t) { return nullptr; }
void Server::onDisconnect(Connection*, bool) { }
void Server::onCloseReqested() { }
void Server::onClose(bool) { }