Blob Blame Raw

#include <cstdio>

#include "socket.h"
#include "tunnel.h"



//#define DebugMsg HideDebugMSG
#define DebugMsg ShowDebugMSG



Tunnel::Tunnel():
	time(),                   // will be set by initTime() called by initSpeed()
	udpSendDuration(),        // will be
	udpPartialSendDuration(), //  set
	udpResendDuration(),      //   by
	udpConfirmDuration(),     //    initSpeed()
	udpSilenceDuration(300000000),
	udpKeepAliveDuration(udpSilenceDuration/32),
	timeQwant(1200000000),
	pollDuration(5000000),
	floatConnections(),
	udpSockId(-1),
	tcpSockId(-1),
	nextTermSendTime()        // will be set by initTime() called by initSpeed()
{
	initSpeed(1024*1024);
}


Tunnel::~Tunnel()
	{ close(); }


void Tunnel::initSpeed(unsigned long long bytesPerSecond) {
	udpSendDuration        = (1000000ull*PACKET_SIZE + bytesPerSecond/2)/bytesPerSecond;
	udpPartialSendDuration = 10*udpSendDuration;
	udpResendDuration      = udpSendDuration * PACKETS_COUNT;
	udpConfirmDuration     = udpResendDuration/8;
	initTime();
}


bool Tunnel::initUdpServer(const Address &address, const Address &remoteTcpAddress, const char *key) {
	closeUdpServer();
	logf("Init UDP server at address: "); address.print();
	if (remoteTcpAddress.port)
		{ logf(", with remote TCP address: "); remoteTcpAddress.print(); }
	logf("\n");
	
	if (!crypt.setKey(key))
		{ logf("bad key, cancelled\n"); return false; }
	
	udpSockId = Socket::udpBind(address);
	if (udpSockId < 0)
		{ crypt.resetKey(); logf("cannot bind, cancelled\n"); return false; }
	
	logf("success\n");
	this->remoteTcpAddress = remoteTcpAddress;
	return true;
}


bool Tunnel::initTcpServer(const Address &address, const Address &remoteUdpAddress) {
	closeTcpServer();
	logf("Init TCP server at address: "); address.print(); logf("\n");
	
	if (udpSockId < 0)
		{ logf("UDP server was not initialized, cancelled\n"); return false; }
	
	tcpSockId = Socket::tcpListen(address);
	if (tcpSockId < 0)
		{ logf("cannot listen, cancelled\n"); return false; }
	
	logf("success\n");
	this->remoteUdpAddress = remoteUdpAddress;
	return true;
}


void Tunnel::closeUdpServer() {
	if (udpSockId < 0) return;
	closeTcpServer();
	
	logf("Close UDP server\n");
	for(ConnMap::iterator i = connections.begin(); i != connections.end(); ++i)
		Socket::close(i->second.tcpSockId);
	connections.clear();
	termQueue.clear();
	Socket::close(udpSockId);
	udpSockId = -1;
	remoteTcpAddress = Address();
	crypt.resetKey();
}


void Tunnel::closeTcpServer() {
	if (tcpSockId < 0) return;
	logf("Close TCP server\n");
	Socket::close(tcpSockId);
	tcpSockId = -1;
	remoteUdpAddress = Address();
}


void Tunnel::close()
	{ closeUdpServer(); }


bool Tunnel::recvUdpPacket() {
	ConnId id, floatId;
	unsigned char raw[CRYPT_PACKET_SIZE] = {};
	int rawSize = Socket::udpRecvAsync(udpSockId, id.address, raw, sizeof(raw));
	if (rawSize <= 0)
		return false;
	
	IpMap::const_iterator ipi = ipMap.find(id.address.ipUInt);
	if (ipi != ipMap.end()) id.address.ipUInt = ipi->second;
	
	unsigned char data[CRYPT_PACKET_SIZE] = {};
	int dataSize = CRYPT_PACKET_SIZE;
	if (!crypt.decrypt(raw, rawSize, data, dataSize)) {
		DebugMsg("bad packet received%s", (id.address.print(), ""));
		return true;
	}
	
	void *ptr = data;
	Packet &packet = *(Packet*)ptr;
	
	int cmd = packet.getCommand();
	if (cmd < 0 || cmd > CMDMAX) {
		DebugMsg("bad command %d", cmd);
		return true;
	}
	
	int size = packet.getSize();
	if (size + HEADER_SIZE > dataSize) {
		DebugMsg("bad size %d", size);
		return true;
	}
	
	floatId.id = id.id = packet.connId;
	ConnId &searchId = floatConnections ? floatId : id;
	
	ConnMap::iterator i = connections.find(searchId);
	if (i == connections.end() && cmd == CMD_DATA && remoteTcpAddress.port && !closedConnections.count(searchId)) {
		logf("Open incoming connection from UDP: ");
		id.address.print();
		logf(", id: %u\n", id.id);
		int sockId = Socket::tcpConnect(remoteTcpAddress);
		if (sockId < 0) {
			logf("cannot init TCP connection, cancelled\n");
		} else {
			i = connections.emplace(searchId, Connection::Args(*this, id, sockId)).first;
		}
	}
	
	if (i == connections.end()) {
		DebugMsg("unknown connection: %u", (id.address.print(), id.id));
		if (cmd != CMD_TERM) termQueue.insert(id);
		return true;
	}
	
	Connection &conn = i->second;
	if (!conn.udpActive) {
		DebugMsg("inactive connection: %u", id.id);
		if (cmd != CMD_TERM) termQueue.insert(id);
		return true;
	}
	
	if (cmd == CMD_DATA) {
		conn.lastUdpInputTime = time;
		conn.udpRecvQueue.recvUdp(packet.index, packet.payload, size);
	} else
	if (cmd == CMD_CONFIRM) {
		conn.lastUdpInputTime = time;
		conn.udpSendQueue.confirm(packet.index, packet.payload, size*8);
	} else
	if (cmd == CMD_TERM) {
		DebugMsg("%u, inactivate udp by incoming term packet", conn.id.id);
		conn.udpActive = false;
	}
	
	return true;
}


void Tunnel::sendUdpPacket(const Address &address, const Packet &packet) {
	unsigned char raw[CRYPT_PACKET_SIZE] = {};
	int rawSize = CRYPT_PACKET_SIZE;
	if (crypt.encrypt(&packet, HEADER_SIZE + packet.getSize(), raw, rawSize))
		Socket::udpSend(udpSockId, address, raw, rawSize);
}


void Tunnel::initTime() {
	time = monotonicTime();
	nextTermSendTime = time + udpConfirmDuration;
	crypt.setTime(globalTime(), timeQwant);
}


bool Tunnel::iteration() {
	if (udpSockId < 0)
		return false;
	
	// prepare poll
	
	poll.list.clear();
	
	poll.list.emplace_back();
	Poll::Entry &e = poll.list.back();
	e.sockId = udpSockId;
	e.wantRead = true;
	
	if (tcpSockId >= 0) {
		poll.list.emplace_back();
		Poll::Entry &e = poll.list.back();
		e.sockId = tcpSockId;
		e.wantRead = true;
	}
	
	time = monotonicTime();
	Time pollTime = time + pollDuration;
	for(ConnMap::iterator i = connections.begin(); i != connections.end(); ++i) {
		Connection &conn = i->second;
		if (conn.tcpSockId >= 0) {
			poll.list.emplace_back();
			Poll::Entry &e = poll.list.back();
			e.connection = &conn;
			e.sockId = conn.tcpSockId;
			e.wantRead = conn.udpActive && conn.wantReadFromTcp();
			e.wantWrite = conn.wantWriteToTcp();
		}
		if (conn.udpActive)
			conn.whenWriteToUdp(pollTime);
	}
	if (timeLess(pollTime, time)) pollTime = time;
	
	// wait for events
	DebugMsg("poll duration: %llu", pollTime - time);
	poll.wait(pollTime - time);
	time = monotonicTime();
	crypt.setTime(globalTime(), timeQwant);
	
	if (poll.list[0].closed || (tcpSockId >= 0 && poll.list[1].closed)) {
		if (poll.list[0].closed)
			logf("UDP socket closed\n");
		if (tcpSockId >= 0 && poll.list[1].closed)
			logf("TCP server socket closed\n");
		poll.list.clear();
		close();
		return false;
	}
	
	// read and parse udp
	if (poll.list[0].canRead)
		while(recvUdpPacket());
	
	if (tcpSockId >= 0 && poll.list[1].canRead) {
		// accept new tcp connection
		Address address;
		int sockId = Socket::tcpAccept(tcpSockId, address);
		if (sockId >= 0) {
			ConnId id, floatId;
			floatId.id = id.id = crypt.random();
			id.address = remoteUdpAddress;
			IpMap::const_iterator ipi = ipMap.find(id.address.ipUInt);
			if (ipi != ipMap.end()) id.address.ipUInt = ipi->second;
			
			logf("Open incoming connection from TCP: ");
			address.print();
			logf(", id: %u\n", id.id);
			Connection &conn = connections.emplace(
				floatConnections ? floatId : id,
				Connection::Args(*this, id, sockId) ).first->second;
			conn.tcpRecvQueue.readFromTcp();
		}
	}
	
	// tcp read and write
	int first = tcpSockId < 0 ? 1 : 2;
	for(Poll::List::const_iterator i = poll.list.begin() + first; i != poll.list.end(); ++i) {
		Connection &conn = *i->connection;
		conn.udpRecvQueue.sendTcp();
		if (i->canWrite) while(true) {
			int size = conn.tcpSendQueue.writeToTcp();
			if (size <= 0) break;
			conn.udpRecvQueue.sendTcp();
		}
		if (i->canRead && conn.udpActive)
			conn.tcpRecvQueue.readFromTcp();
		if (i->closed || (!conn.udpActive && !conn.tcpSendQueue.busySize())) {
			DebugMsg("%u, inactivate tcp", conn.id.id);
			Socket::close(conn.tcpSockId);
			conn.tcpSockId = -1;
		}
	}
	
	// write to udp
	for(ConnMap::iterator i = connections.begin(); i != connections.end(); ++i) {
		Connection &conn = i->second;
		if (conn.udpActive) {
			bool sent = false;
			if (conn.udpSendQueue.sendUdp()) sent = true;
			if (conn.udpRecvQueue.sendUdpConfirm()) sent = true;
			if (sent) conn.lastUdpOutputTime = time;
			if (timeLequal(conn.lastUdpOutputTime + udpKeepAliveDuration, time)) {
				DebugMsg("%u, udp keep alive", conn.id.id);
				conn.udpRecvQueue.sendUdpConfirm(true);
				conn.lastUdpOutputTime = time;
			}
		}
	}
	
	// remove closed connections
	for(ClosedConnMap::iterator i = closedConnections.begin(); i != closedConnections.end();)
		if (timeLess(i->second, time))
			closedConnections.erase(i++); else ++i;
	
	// close connections
	for(ConnMap::iterator i = connections.begin(); i != connections.end();) {
		ConnMap::iterator j = i++;
		Connection &conn = j->second;
		
		if (conn.udpActive && timeLess(conn.lastUdpInputTime + udpSilenceDuration, time)) {
			DebugMsg("%u, inactivate udp by silence timeout", conn.id.id);
			conn.udpActive = false;
			termQueue.insert(conn.id);
		}
		
		if (!conn.udpActive && !conn.wantWriteToTcp()) {
			DebugMsg("%u, inactivate tcp", conn.id.id);
			Socket::close(conn.tcpSockId);
			conn.tcpSockId = -1;
			logf("Close connection, id: %u\n", conn.id.id);
			closedConnections[j->first] = time + 2*udpSilenceDuration;
			connections.erase(j);
			continue;
		}
		
		if (conn.tcpSockId < 0 && !conn.udpSendQueue.canSentToUdp()) {
			DebugMsg("%u, inactivate udp, all data sent", conn.id.id);
			conn.udpActive = false;
			termQueue.insert(conn.id);
			logf("Close connection, id: %u\n", conn.id.id);
			closedConnections[j->first] = time + 2*udpSilenceDuration;
			connections.erase(j);
			continue;
		}
		
		if (!conn.udpActive && conn.tcpSockId < 0) {
			logf("Close connection, id: %u\n", conn.id.id);
			closedConnections[j->first] = time + 2*udpSilenceDuration;
			connections.erase(j);
			continue;
		}
	}
	
	// send udp term packets
	if (timeLequal(nextTermSendTime, time) && !termQueue.empty()) {
		for(ConnSet::const_iterator i = termQueue.begin(); i != termQueue.end(); ++i) {
			DebugMsg("%u, send term packet", i->id);
			Packet packet;
			packet.init(i->id, 0, CMD_TERM, 0);
			sendUdpPacket(i->address, packet);
		}
		termQueue.clear();
		nextTermSendTime = time + udpConfirmDuration;
	}
	
	poll.list.clear();
	return true;
}


void Tunnel::run() {
	initTime();
	while(iteration());
}