Blame connection.cpp

71057f
71057f
#include <cstring></cstring>
b42f0a
#include <climits></climits>
71057f
71057f
#include <chrono></chrono>
71057f
#include <thread></thread>
71057f
#include <mutex></mutex>
71057f
b42f0a
#include <unistd.h></unistd.h>
b42f0a
#include <sys types.h=""></sys>
b42f0a
#include <sys socket.h=""></sys>
b42f0a
#include <sys epoll.h=""></sys>
b42f0a
#include <netinet in.h=""></netinet>
b42f0a
71057f
#include "protocol.h"
71057f
#include "connection.h"
71057f
71057f
b42f0a
#define DebugMSG ShowDebugMSG
6ebd3b
6ebd3b
6ebd3b
71057f
Connection::Connection():
b42f0a
	server(), srvPrev(), srvNext() { }
b42f0a
b42f0a
b42f0a
Connection::~Connection()
b42f0a
	{ destroy(); }
71057f
71057f
71057f
bool Connection::open(Protocol &protocol, void *address, size_t addressSize, int sockId, Server *server) {
b42f0a
	OpenLock lock(*this, protocol, address, addressSize);
b42f0a
	if (!lock.success)
b42f0a
		return false;
71057f
	this->sockId = sockId;
71057f
	this->server = server;
71057f
	return true;
71057f
}
71057f
71057f
b42f0a
bool Connection::open(Protocol &protocol, void *address, size_t addressSize)
b42f0a
	{ return open(protocol, address, addressSize, -1, nullptr); }
71057f
71057f
71057f
bool Connection::read(Task &task) {
6ebd3b
	DebugMSG();
71057f
	ReadLock lock(stateProtector);
71057f
	State s = state;
b42f0a
	if (s < STATE_OPEN || s >= STATE_CLOSING)
b42f0a
		return false;
6ebd3b
	DebugMSG();
6ebd3b
	readQueue.push(task);
b42f0a
	updateEvents(readQueue.count(), writeQueue.count());
71057f
	return true;
71057f
}
71057f
71057f
71057f
bool Connection::write(Task &task) {
6ebd3b
	DebugMSG();
71057f
	ReadLock lock(stateProtector);
71057f
	State s = state;
b42f0a
	if (s < STATE_OPEN || s >= STATE_CLOSING)
b42f0a
		return false;
6ebd3b
	DebugMSG();
71057f
	writeQueue.push(task);
b42f0a
	updateEvents(readQueue.count(), writeQueue.count());
71057f
	return true;
71057f
}
71057f
71057f
b42f0a
void Connection::handleState() {
b42f0a
	if (stateLocal == STATE_NONE) {
b42f0a
		DebugMSG("none");
b42f0a
		bool connected = false;
b42f0a
		
b42f0a
		if (sockId >= 0) {
b42f0a
			DebugMSG("accepting");
b42f0a
			initSocket(sockId);
b42f0a
			connected = true;
b42f0a
		} else
b42f0a
		if (addressSize >= sizeof(sa_family_t)) {
b42f0a
			DebugMSG("connecting");
b42f0a
			struct sockaddr *addr = (struct sockaddr*)address;
b42f0a
			sockId = ::socket(addr->sa_family, SOCK_STREAM, 0);
b42f0a
			if (sockId >= 0) {
b42f0a
				initSocket(sockId);
b42f0a
				if (0 == ::connect(sockId, addr, (int)addressSize))
b42f0a
					connected = true;
b42f0a
			} else {
b42f0a
				DebugMSG("none -> cannot create socket");
b42f0a
			}
b42f0a
		}
b42f0a
b42f0a
		if (sockId >= 0) {
b42f0a
			DebugMSG("opening");
b42f0a
			prev = protocol->sockLast;
b42f0a
			(prev ? prev->next : protocol->sockFirst) = this;
b42f0a
			if (server) {
b42f0a
				srvPrev = server->connLast;
b42f0a
				(srvPrev ? srvPrev->srvNext : server->connFirst) = this;
b42f0a
			}
b42f0a
			updateEvents(false, !connected);
b42f0a
			
b42f0a
			if (connected) {
b42f0a
				DebugMSG("to open");
b42f0a
				state = stateLocal = STATE_OPEN;
b42f0a
				stateProtector.wait();
b42f0a
				onOpen(address, addressSize);
b42f0a
			} else {
b42f0a
				assert(!server);
b42f0a
				DebugMSG("to connecting");
b42f0a
				state = stateLocal = STATE_CONNECTING;
b42f0a
				stateProtector.wait();
b42f0a
			}
b42f0a
		} else {
b42f0a
			DebugMSG("none -> to closed");
b42f0a
			Server *srv = server;
b42f0a
			assert(connected || !server);
b42f0a
			server = nullptr;
b42f0a
			sockId = -1;
b42f0a
			onOpeningError();
b42f0a
			state = STATE_CLOSED;
b42f0a
			stateProtector.wait();
b42f0a
			while(closeWaiters > 0)
b42f0a
				{ closeCondition.notify_all(); std::this_thread::yield(); }
b42f0a
			state = STATE_FINISHED;
b42f0a
			DebugMSG("none -> to finished");
b42f0a
			if (srv) server->handleDisconnect(*this, true);
b42f0a
		}
b42f0a
	} else {
b42f0a
		State sw = stateWanted;
b42f0a
		
b42f0a
		if ( stateLocal == STATE_CONNECTING
b42f0a
		  && sw == STATE_CLOSING )
b42f0a
		{
b42f0a
			DebugMSG("connecting -> to closing");
b42f0a
			onOpeningError();
b42f0a
			state = stateLocal = STATE_CLOSING;
b42f0a
			stateProtector.wait();
b42f0a
		}
b42f0a
		
b42f0a
		if ( stateLocal == STATE_OPEN
b42f0a
		  && sw == STATE_CLOSE_REQ )
b42f0a
		{
b42f0a
			DebugMSG("to close req");
b42f0a
			state = stateLocal = STATE_CLOSE_REQ;
b42f0a
			stateProtector.wait();
b42f0a
			onCloseReqested();
b42f0a
		}
b42f0a
		
b42f0a
		if ( stateLocal >= STATE_OPEN
b42f0a
		  && stateLocal <= STATE_CLOSE_REQ
b42f0a
		  && sw == STATE_CLOSING )
b42f0a
		{
b42f0a
			DebugMSG("to closing");
b42f0a
			state = stateLocal = STATE_CLOSING;
b42f0a
			stateProtector.wait();
b42f0a
			while(Task *task = readQueue.peek())
b42f0a
				{ readQueue.pop(); onReadReady(*task, true); }
b42f0a
			while(Task *task = writeQueue.peek())
b42f0a
				{ writeQueue.pop(); onWriteReady(*task, true); }
b42f0a
			onClose(error);
b42f0a
		}
b42f0a
		
b42f0a
		if ( stateLocal == STATE_CLOSING && !enqueued ) {
b42f0a
			DebugMSG("to closed");
b42f0a
			Server *srv = server;
b42f0a
			bool err = error;
b42f0a
			
b42f0a
			if (server) {
b42f0a
				(srvPrev ? srvPrev->srvNext : server->connFirst) = srvNext;
b42f0a
				(srvNext ? srvNext->srvPrev : server->connLast ) = srvPrev;
b42f0a
				server = nullptr;
b42f0a
			}
b42f0a
			error = false;
b42f0a
			
b42f0a
			(prev ? prev->next : protocol->sockFirst) = next;
b42f0a
			(next ? next->prev : protocol->sockLast ) = prev;
b42f0a
			if (sockId >= 0) {
b42f0a
				struct epoll_event event = {};
b42f0a
				epoll_ctl(protocol->epollFd, EPOLL_CTL_DEL, sockId, &event);
b42f0a
				::close(sockId);
b42f0a
				sockId = -1;
b42f0a
			}
b42f0a
			events = 0xffffffff;
b42f0a
			protocol = nullptr;
b42f0a
			
b42f0a
			memset(address, 0, sizeof(address));
b42f0a
			addressSize = 0;
b42f0a
			stateLocal = STATE_NONE;
b42f0a
			state = STATE_CLOSED;
b42f0a
			stateWanted = STATE_NONE;
b42f0a
			while(closeWaiters > 0)
b42f0a
				{ closeCondition.notify_all(); std::this_thread::yield(); }
b42f0a
			state = STATE_FINISHED;
b42f0a
			DebugMSG("to finished");
b42f0a
			if (srv) srv->handleDisconnect(*this, err);
b42f0a
		}
b42f0a
	}
b42f0a
}
b42f0a
b42f0a
b42f0a
void Connection::handleEvents(unsigned int events) {
b42f0a
	bool needUpdate = false;
b42f0a
	DebugMSG("%08x", events);
b42f0a
	
b42f0a
	if (stateLocal == STATE_CONNECTING) {
b42f0a
		DebugMSG("connecting");
b42f0a
		if (events & (EPOLLHUP | EPOLLERR)) {
b42f0a
			close(true);
b42f0a
			return;
b42f0a
		}
b42f0a
		
b42f0a
		if (events & EPOLLOUT) {
b42f0a
			DebugMSG("connecting -> to open");
b42f0a
			state = stateLocal = STATE_OPEN;
b42f0a
			stateProtector.wait();
b42f0a
			onOpen(address, addressSize);
b42f0a
			needUpdate = true;
b42f0a
		}
b42f0a
	}
b42f0a
b42f0a
	if ( stateLocal < STATE_OPEN
b42f0a
	  || stateLocal >= STATE_CLOSING )
b42f0a
		return;
b42f0a
	
b42f0a
	bool closing = (bool)(events & (EPOLLHUP | EPOLLRDHUP | EPOLLERR));
b42f0a
	
b42f0a
	if (events & EPOLLIN) {
b42f0a
		DebugMSG("input");
b42f0a
		bool ready = true;
b42f0a
		int pops = 0;
b42f0a
		int count = (readQueue.count() + 2)*2;
b42f0a
		while(ready) {
b42f0a
			Task *task = readQueue.peek();
b42f0a
			if (!task) {
b42f0a
				if (pops) needUpdate = true;
b42f0a
				break;
b42f0a
			}
b42f0a
			
b42f0a
			while(task->size > task->completion) {
b42f0a
				size_t size = task->size - task->completion;
b42f0a
				if (size > INT_MAX) size = INT_MAX;
b42f0a
				int res = ::recv(
b42f0a
					sockId,
b42f0a
					(unsigned char*)task->data + task->completion,
b42f0a
					(int)size, MSG_DONTWAIT );
b42f0a
				if (res <= 0) {
b42f0a
					ready = false;
b42f0a
					break;
b42f0a
				} else {
b42f0a
					task->completion += res;
b42f0a
				}
b42f0a
			}
b42f0a
			
b42f0a
			if (task->size <= task->completion) {
b42f0a
				readQueue.pop();
b42f0a
				onReadReady(*task, false);
b42f0a
				++pops;
b42f0a
				if (!closing && --count < 0) ready = false;
b42f0a
			}
b42f0a
		}
b42f0a
	}
b42f0a
	
b42f0a
	if (closing) {
b42f0a
		DebugMSG("hup");
b42f0a
		close((bool)(events & EPOLLERR));
b42f0a
		return;
b42f0a
	}
b42f0a
	
b42f0a
	if (events & EPOLLOUT) {
b42f0a
		DebugMSG("output");
b42f0a
		bool ready = true;
b42f0a
		int pops = 0;
b42f0a
		int count = writeQueue.count() + 2;
b42f0a
		while(ready) {
b42f0a
			Task *task = writeQueue.peek();
b42f0a
			if (!task) {
b42f0a
				if (pops) needUpdate = true;
b42f0a
				break;
b42f0a
			}
b42f0a
			while(task->size > task->completion) {
b42f0a
				size_t size = task->size - task->completion;
b42f0a
				if (size > INT_MAX) size = INT_MAX;
b42f0a
				int res = ::send(
b42f0a
					sockId,
b42f0a
					(const unsigned char*)task->data + task->completion,
b42f0a
					(int)size, MSG_DONTWAIT );
b42f0a
				if (res <= 0) {
b42f0a
					ready = false;
b42f0a
					break;
b42f0a
				} else {
b42f0a
					task->completion += res;
b42f0a
				}
b42f0a
			}
b42f0a
			
b42f0a
			if (task->size <= task->completion) {
b42f0a
				writeQueue.pop();
b42f0a
				onWriteReady(*task, false);
b42f0a
				++pops;
b42f0a
				if (--count <= 0) ready = false;
b42f0a
			}
b42f0a
		}
b42f0a
	}
b42f0a
	
b42f0a
	if (needUpdate)
b42f0a
		updateEvents(readQueue.count(), writeQueue.count());
b42f0a
}
b42f0a
b42f0a
b42f0a
void Connection::onReadReady(Task&, bool) { }
b42f0a
void Connection::onWriteReady(Task&, bool) { }
71057f