Blame protocol.cpp

71057f
71057f
#include <climits></climits>
71057f
#include <cstring></cstring>
71057f
71057f
#include <fcntl.h></fcntl.h>
71057f
#include <unistd.h></unistd.h>
71057f
#include <sys types.h=""></sys>
71057f
#include <sys socket.h=""></sys>
71057f
#include <sys epoll.h=""></sys>
71057f
#include <sys eventfd.h=""></sys>
71057f
#include <netinet in.h=""></netinet>
71057f
#include <netinet tcp.h=""></netinet>
71057f
#include <netdb.h></netdb.h>
71057f
71057f
#include "protocol.h"
71057f
71057f
6ebd3b
#define DebugMSG     HideDebugMSG
51b3f0
51b3f0
51b3f0
Protocol::Protocol():
51b3f0
	thread(),
51b3f0
	epollFd(-1),
51b3f0
	eventFd(-1),
51b3f0
	state(STATE_NONE),
51b3f0
	stateWanted(STATE_NONE),
51b3f0
	closeWaiters(0),
51b3f0
	finishWaiters(0),
b42f0a
	sockFirst(), sockLast()
51b3f0
	{ }
51b3f0
71057f
71057f
Protocol::~Protocol() {
51b3f0
	assert(state == STATE_NONE || state == STATE_FINISHED);
71057f
	closeWait();
71057f
	while(finishWaiters > 0) std::this_thread::yield();
51b3f0
	DebugMSG();
71057f
	if (thread) thread->join();
51b3f0
	DebugMSG("deleted");
71057f
}
71057f
71057f
71057f
bool Protocol::open() {
51b3f0
	DebugMSG();
71057f
	State s = state;
71057f
	do {
71057f
		if (s != STATE_NONE && s != STATE_FINISHED) return false;
71057f
	} while(!state.compare_exchange_weak(s, STATE_OPENING));
71057f
	if (s == STATE_FINISHED) while(finishWaiters > 0) std::this_thread::yield();
71057f
	if (thread) thread->join();
71057f
	
71057f
	thread = new std::thread(&Protocol::threadRun, this);
51b3f0
	state = STATE_INITIALIZING;
51b3f0
	DebugMSG();
71057f
	return true;
71057f
}
71057f
71057f
71057f
void Protocol::wantState(State state) {
71057f
	ReadLock lock(stateProtector);
71057f
	State s = this->state;
51b3f0
	DebugMSG("%d, %d", s, state);
71057f
	if (s <= STATE_OPENING || s >= STATE_CLOSING || s >= state) return;
51b3f0
	DebugMSG("apply");
71057f
	
71057f
	State sw = stateWanted;
71057f
	do {
51b3f0
		if (sw >= state) return;
71057f
	} while(!stateWanted.compare_exchange_weak(sw, state));
71057f
	wakeup();
71057f
}
71057f
71057f
71057f
void Protocol::closeReq()
51b3f0
	{ DebugMSG(); wantState(STATE_CLOSE_REQ); }
71057f
void Protocol::close()
51b3f0
	{ DebugMSG(); wantState(STATE_CLOSING); }
71057f
71057f
71057f
void Protocol::closeWait(unsigned long long timeoutUs, bool withReq) {
51b3f0
	DebugMSG();
71057f
	stateProtector.lock();
71057f
	State s = state;
71057f
	if (s <= STATE_OPENING || s >= STATE_FINISHED)
71057f
		{ stateProtector.unlock(); return; }
71057f
	
71057f
	
71057f
	if (s == STATE_CLOSED) {
71057f
		finishWaiters++;
71057f
		stateProtector.unlock();
71057f
	} else {
71057f
		if (withReq && s < STATE_CLOSE_REQ) closeReq();
71057f
		closeWaiters++;
71057f
		stateProtector.unlock();
71057f
		
71057f
		std::mutex fakeMutex;
71057f
		{
71057f
			std::unique_lock<std::mutex> fakeLock(fakeMutex);</std::mutex>
71057f
			if (timeoutUs && s < STATE_CLOSING) {
71057f
				std::chrono::steady_clock::time_point t
71057f
					= std::chrono::steady_clock::now()
71057f
						+ std::chrono::microseconds(timeoutUs);
71057f
				while(s < STATE_CLOSED) {
71057f
					if (t <= std::chrono::steady_clock::now()) { close(); break; }
51b3f0
					DebugMSG("wait_until, %d", s);
71057f
					closeCondition.wait_until(fakeLock, t);
71057f
					s = state;
71057f
				}
71057f
			}
71057f
			if (s < STATE_CLOSING) close();
71057f
			while(s < STATE_CLOSED) {
51b3f0
				DebugMSG("wait, %d", s);
71057f
				closeCondition.wait(fakeLock);
71057f
				s = state;
71057f
			}
51b3f0
			DebugMSG("awaiten");
71057f
		}
71057f
		
71057f
		finishWaiters++;
71057f
		closeWaiters--;
71057f
	}
71057f
	
71057f
	
71057f
	while(s != STATE_FINISHED) { std::this_thread::yield(); s = state; }
71057f
	
71057f
	finishWaiters--;
51b3f0
	DebugMSG();
71057f
}
71057f
71057f
71057f
void Protocol::wakeup() {
71057f
	unsigned long long num = 0;
71057f
	::write(eventFd, &num, sizeof(num));
71057f
}
71057f
71057f
71057f
void Protocol::threadRun() {
51b3f0
	DebugMSG();
71057f
	epollFd = ::epoll_create(32);
71057f
	eventFd = ::eventfd(0, EFD_NONBLOCK);
6ebd3b
	assert(epollFd >= 0);
6ebd3b
	assert(eventFd >= 0);
71057f
	
71057f
	struct epoll_event event = {};
71057f
	event.events = EPOLLIN;
71057f
	epoll_ctl(epollFd, EPOLL_CTL_ADD, eventFd, &event);
71057f
	
71057f
	const int maxCount = 16;
71057f
	struct epoll_event events[maxCount] = {};
71057f
	int count = 0;
71057f
	
71057f
	State stateLocal = STATE_OPEN;
71057f
	state = stateLocal;
71057f
	
71057f
	while(true) {
71057f
		for(int i = 0; i < count; ++i) {
b42f0a
			if (Socket *socket = (Socket*)events[i].data.ptr) {
b42f0a
				socket->handleEvents(events[i].events);
b42f0a
			} else {
71057f
				unsigned long long num = 0;
71057f
				while(::read(eventFd, &num, sizeof(num)) > 0 && !num);
71057f
			}
71057f
		}
71057f
		
71057f
		State sw = stateWanted;
71057f
		if (stateLocal == STATE_OPEN && sw == STATE_CLOSE_REQ) {
51b3f0
			DebugMSG("to closse req");
71057f
			state = stateLocal = STATE_CLOSE_REQ;
71057f
			stateProtector.wait();
b42f0a
			for(Socket *socket = sockFirst; socket; socket = socket->next)
b42f0a
				socket->closeReq();
71057f
		} else
71057f
		if (stateLocal >= STATE_OPEN && stateLocal <= STATE_CLOSE_REQ && sw == STATE_CLOSING) {
51b3f0
			DebugMSG("to closing");
51b3f0
			state = stateLocal = STATE_CLOSING;
71057f
			stateProtector.wait();
b42f0a
			for(Socket *socket = sockFirst; socket; socket = socket->next)
b42f0a
				socket->close(true);
71057f
		}
71057f
		
b42f0a
		while(sockQueue.peek()) {
b42f0a
			Socket &socket = sockQueue.pop();
b42f0a
			socket.enqueued = false;
b42f0a
			socket.handleState();
71057f
		}
71057f
		
b42f0a
		if (stateLocal == STATE_CLOSING)
71057f
			break;
71057f
		
51b3f0
		count = epoll_wait(epollFd, events, maxCount, 1000);
6ebd3b
		DebugMSG("epoll count: %d", count);
71057f
	}
71057f
	
b42f0a
	assert(!sockFirst);
b42f0a
	
51b3f0
	DebugMSG("closeWaiters");
6ebd3b
	state = STATE_CLOSED;
71057f
	while(closeWaiters > 0)
71057f
		{ closeCondition.notify_all(); std::this_thread::yield(); }
71057f
	
b42f0a
	epoll_ctl(epollFd, EPOLL_CTL_DEL, eventFd, &event);
b42f0a
71057f
	::close(epollFd);
71057f
	::close(eventFd);
51b3f0
	epollFd = -1;
51b3f0
	eventFd = -1;
b42f0a
	assert(!sockFirst);
b42f0a
	assert(!sockQueue.peek());
71057f
	
51b3f0
	stateWanted = STATE_NONE;
51b3f0
	stateLocal = STATE_NONE;
71057f
	state = STATE_FINISHED;
51b3f0
	DebugMSG("finished");
71057f
}
71057f