Blame tunnel.cpp

873e9f
873e9f
#include <cstdio></cstdio>
873e9f
873e9f
#include "socket.h"
873e9f
#include "tunnel.h"
873e9f
873e9f
873e9f
460064
//#define DebugMsg HideDebugMSG
460064
#define DebugMsg ShowDebugMSG
460064
460064
460064
873e9f
Tunnel::Tunnel():
1d16df
	time(),                   // will be set by initTime() called by initSpeed()
1d16df
	udpSendDuration(),        // will be
1d16df
	udpPartialSendDuration(), //  set
1d16df
	udpResendDuration(),      //   by
1d16df
	udpConfirmDuration(),     //    initSpeed()
c47604
	udpSilenceDuration(300000000),
c47604
	udpKeepAliveDuration(udpSilenceDuration/32),
0f653b
	timeQwant(1200000000),
873e9f
	pollDuration(5000000),
b95e8b
	floatConnections(),
873e9f
	udpSockId(-1),
873e9f
	tcpSockId(-1),
1d16df
	nextTermSendTime()        // will be set by initTime() called by initSpeed()
1d16df
{
1d16df
	initSpeed(1024*1024);
1d16df
}
873e9f
c47604
873e9f
Tunnel::~Tunnel()
873e9f
	{ close(); }
873e9f
873e9f
1d16df
void Tunnel::initSpeed(unsigned long long bytesPerSecond) {
1d16df
	udpSendDuration        = (1000000ull*PACKET_SIZE + bytesPerSecond/2)/bytesPerSecond;
1d16df
	udpPartialSendDuration = 10*udpSendDuration;
1d16df
	udpResendDuration      = udpSendDuration * PACKETS_COUNT;
1d16df
	udpConfirmDuration     = udpResendDuration/8;
1d16df
	initTime();
1d16df
}
1d16df
1d16df
c47604
bool Tunnel::initUdpServer(const Address &address, const Address &remoteTcpAddress, const char *key) {
873e9f
	closeUdpServer();
ab5ba6
	logf("Init UDP server at address: "); address.print();
873e9f
	if (remoteTcpAddress.port)
ab5ba6
		{ logf(", with remote TCP address: "); remoteTcpAddress.print(); }
ab5ba6
	logf("\n");
873e9f
	
c47604
	if (!crypt.setKey(key))
ab5ba6
		{ logf("bad key, cancelled\n"); return false; }
c47604
	
873e9f
	udpSockId = Socket::udpBind(address);
873e9f
	if (udpSockId < 0)
ab5ba6
		{ crypt.resetKey(); logf("cannot bind, cancelled\n"); return false; }
873e9f
	
ab5ba6
	logf("success\n");
873e9f
	this->remoteTcpAddress = remoteTcpAddress;
873e9f
	return true;
873e9f
}
873e9f
c47604
c47604
bool Tunnel::initTcpServer(const Address &address, const Address &remoteUdpAddress) {
873e9f
	closeTcpServer();
ab5ba6
	logf("Init TCP server at address: "); address.print(); logf("\n");
873e9f
	
873e9f
	if (udpSockId < 0)
ab5ba6
		{ logf("UDP server was not initialized, cancelled\n"); return false; }
873e9f
	
c47604
	tcpSockId = Socket::tcpListen(address);
c47604
	if (tcpSockId < 0)
ab5ba6
		{ logf("cannot listen, cancelled\n"); return false; }
c47604
	
ab5ba6
	logf("success\n");
c47604
	this->remoteUdpAddress = remoteUdpAddress;
873e9f
	return true;
873e9f
}
873e9f
873e9f
873e9f
void Tunnel::closeUdpServer() {
873e9f
	if (udpSockId < 0) return;
873e9f
	closeTcpServer();
873e9f
	
ab5ba6
	logf("Close UDP server\n");
873e9f
	for(ConnMap::iterator i = connections.begin(); i != connections.end(); ++i)
873e9f
		Socket::close(i->second.tcpSockId);
873e9f
	connections.clear();
873e9f
	termQueue.clear();
873e9f
	Socket::close(udpSockId);
873e9f
	udpSockId = -1;
873e9f
	remoteTcpAddress = Address();
c47604
	crypt.resetKey();
873e9f
}
873e9f
873e9f
873e9f
void Tunnel::closeTcpServer() {
873e9f
	if (tcpSockId < 0) return;
ab5ba6
	logf("Close TCP server\n");
873e9f
	Socket::close(tcpSockId);
873e9f
	tcpSockId = -1;
873e9f
	remoteUdpAddress = Address();
873e9f
}
873e9f
873e9f
873e9f
void Tunnel::close()
873e9f
	{ closeUdpServer(); }
873e9f
873e9f
c47604
bool Tunnel::recvUdpPacket() {
b95e8b
	ConnId id, floatId;
c47604
	unsigned char raw[CRYPT_PACKET_SIZE] = {};
c47604
	int rawSize = Socket::udpRecvAsync(udpSockId, id.address, raw, sizeof(raw));
c47604
	if (rawSize <= 0)
c47604
		return false;
c47604
	
4b7e37
	IpMap::const_iterator ipi = ipMap.find(id.address.ipUInt);
4b7e37
	if (ipi != ipMap.end()) id.address.ipUInt = ipi->second;
4b7e37
	
c47604
	unsigned char data[CRYPT_PACKET_SIZE] = {};
c47604
	int dataSize = CRYPT_PACKET_SIZE;
460064
	if (!crypt.decrypt(raw, rawSize, data, dataSize)) {
460064
		DebugMsg("bad packet received%s", (id.address.print(), ""));
c47604
		return true;
460064
	}
c47604
	
c47604
	void *ptr = data;
c47604
	Packet &packet = *(Packet*)ptr;
c47604
	
c47604
	int cmd = packet.getCommand();
460064
	if (cmd < 0 || cmd > CMDMAX) {
460064
		DebugMsg("bad command %d", cmd);
c47604
		return true;
460064
	}
460064
	
c47604
	int size = packet.getSize();
460064
	if (size + HEADER_SIZE > dataSize) {
460064
		DebugMsg("bad size %d", size);
c47604
		return true;
460064
	}
c47604
	
b95e8b
	floatId.id = id.id = packet.connId;
b95e8b
	ConnId &searchId = floatConnections ? floatId : id;
b95e8b
	
b95e8b
	ConnMap::iterator i = connections.find(searchId);
b95e8b
	if (i == connections.end() && cmd == CMD_DATA && remoteTcpAddress.port && !closedConnections.count(searchId)) {
ab5ba6
		logf("Open incoming connection from UDP: ");
c47604
		id.address.print();
ab5ba6
		logf(", id: %u\n", id.id);
c47604
		int sockId = Socket::tcpConnect(remoteTcpAddress);
c47604
		if (sockId < 0) {
ab5ba6
			logf("cannot init TCP connection, cancelled\n");
c47604
		} else {
b95e8b
			i = connections.emplace(searchId, Connection::Args(*this, id, sockId)).first;
873e9f
		}
873e9f
	}
c47604
	
c47604
	if (i == connections.end()) {
5314e4
		DebugMsg("unknown connection: %u", (id.address.print(), id.id));
460064
		if (cmd != CMD_TERM) termQueue.insert(id);
c47604
		return true;
c47604
	}
c47604
	
c47604
	Connection &conn = i->second;
c47604
	if (!conn.udpActive) {
460064
		DebugMsg("inactive connection: %u", id.id);
c47604
		if (cmd != CMD_TERM) termQueue.insert(id);
c47604
		return true;
c47604
	}
c47604
	
c47604
	if (cmd == CMD_DATA) {
c47604
		conn.lastUdpInputTime = time;
c47604
		conn.udpRecvQueue.recvUdp(packet.index, packet.payload, size);
c47604
	} else
c47604
	if (cmd == CMD_CONFIRM) {
c47604
		conn.lastUdpInputTime = time;
c47604
		conn.udpSendQueue.confirm(packet.index, packet.payload, size*8);
c47604
	} else
c47604
	if (cmd == CMD_TERM) {
460064
		DebugMsg("%u, inactivate udp by incoming term packet", conn.id.id);
c47604
		conn.udpActive = false;
c47604
	}
c47604
	
c47604
	return true;
c47604
}
c47604
c47604
c47604
void Tunnel::sendUdpPacket(const Address &address, const Packet &packet) {
c47604
	unsigned char raw[CRYPT_PACKET_SIZE] = {};
c47604
	int rawSize = CRYPT_PACKET_SIZE;
c47604
	if (crypt.encrypt(&packet, HEADER_SIZE + packet.getSize(), raw, rawSize))
c47604
		Socket::udpSend(udpSockId, address, raw, rawSize);
873e9f
}
873e9f
873e9f
1d16df
void Tunnel::initTime() {
1d16df
	time = monotonicTime();
1d16df
	nextTermSendTime = time + udpConfirmDuration;
1d16df
	crypt.setTime(globalTime(), timeQwant);
1d16df
}
1d16df
1d16df
873e9f
bool Tunnel::iteration() {
873e9f
	if (udpSockId < 0)
873e9f
		return false;
873e9f
	
873e9f
	// prepare poll
873e9f
	
873e9f
	poll.list.clear();
873e9f
	
873e9f
	poll.list.emplace_back();
873e9f
	Poll::Entry &e = poll.list.back();
873e9f
	e.sockId = udpSockId;
873e9f
	e.wantRead = true;
873e9f
	
873e9f
	if (tcpSockId >= 0) {
460064
		poll.list.emplace_back();
873e9f
		Poll::Entry &e = poll.list.back();
873e9f
		e.sockId = tcpSockId;
873e9f
		e.wantRead = true;
873e9f
	}
873e9f
	
1d16df
	time = monotonicTime();
873e9f
	Time pollTime = time + pollDuration;
873e9f
	for(ConnMap::iterator i = connections.begin(); i != connections.end(); ++i) {
873e9f
		Connection &conn = i->second;
873e9f
		if (conn.tcpSockId >= 0) {
873e9f
			poll.list.emplace_back();
873e9f
			Poll::Entry &e = poll.list.back();
873e9f
			e.connection = &conn;
873e9f
			e.sockId = conn.tcpSockId;
873e9f
			e.wantRead = conn.udpActive && conn.wantReadFromTcp();
873e9f
			e.wantWrite = conn.wantWriteToTcp();
873e9f
		}
1d16df
		if (conn.udpActive)
1d16df
			conn.whenWriteToUdp(pollTime);
873e9f
	}
1d16df
	if (timeLess(pollTime, time)) pollTime = time;
873e9f
	
873e9f
	// wait for events
1d16df
	DebugMsg("poll duration: %llu", pollTime - time);
460064
	poll.wait(pollTime - time);
873e9f
	time = monotonicTime();
460064
	crypt.setTime(globalTime(), timeQwant);
873e9f
	
873e9f
	if (poll.list[0].closed || (tcpSockId >= 0 && poll.list[1].closed)) {
460064
		if (poll.list[0].closed)
ab5ba6
			logf("UDP socket closed\n");
460064
		if (tcpSockId >= 0 && poll.list[1].closed)
ab5ba6
			logf("TCP server socket closed\n");
873e9f
		poll.list.clear();
873e9f
		close();
873e9f
		return false;
873e9f
	}
873e9f
	
873e9f
	// read and parse udp
873e9f
	if (poll.list[0].canRead)
873e9f
		while(recvUdpPacket());
873e9f
	
873e9f
	if (tcpSockId >= 0 && poll.list[1].canRead) {
873e9f
		// accept new tcp connection
460064
		Address address;
460064
		int sockId = Socket::tcpAccept(tcpSockId, address);
873e9f
		if (sockId >= 0) {
b95e8b
			ConnId id, floatId;
b95e8b
			floatId.id = id.id = crypt.random();
4b7e37
			id.address = remoteUdpAddress;
4b7e37
			IpMap::const_iterator ipi = ipMap.find(id.address.ipUInt);
4b7e37
			if (ipi != ipMap.end()) id.address.ipUInt = ipi->second;
4b7e37
			
ab5ba6
			logf("Open incoming connection from TCP: ");
460064
			address.print();
ab5ba6
			logf(", id: %u\n", id.id);
b95e8b
			Connection &conn = connections.emplace(
b95e8b
				floatConnections ? floatId : id,
b95e8b
				Connection::Args(*this, id, sockId) ).first->second;
873e9f
			conn.tcpRecvQueue.readFromTcp();
873e9f
		}
873e9f
	}
873e9f
	
873e9f
	// tcp read and write
873e9f
	int first = tcpSockId < 0 ? 1 : 2;
873e9f
	for(Poll::List::const_iterator i = poll.list.begin() + first; i != poll.list.end(); ++i) {
873e9f
		Connection &conn = *i->connection;
873e9f
		conn.udpRecvQueue.sendTcp();
c47604
		if (i->canWrite) while(true) {
c47604
			int size = conn.tcpSendQueue.writeToTcp();
c47604
			if (size <= 0) break;
c47604
			conn.udpRecvQueue.sendTcp();
c47604
		}
873e9f
		if (i->canRead && conn.udpActive)
873e9f
			conn.tcpRecvQueue.readFromTcp();
873e9f
		if (i->closed || (!conn.udpActive && !conn.tcpSendQueue.busySize())) {
460064
			DebugMsg("%u, inactivate tcp", conn.id.id);
873e9f
			Socket::close(conn.tcpSockId);
873e9f
			conn.tcpSockId = -1;
873e9f
		}
873e9f
	}
873e9f
	
873e9f
	// write to udp
c47604
	for(ConnMap::iterator i = connections.begin(); i != connections.end(); ++i) {
c47604
		Connection &conn = i->second;
c47604
		if (conn.udpActive) {
c47604
			bool sent = false;
c47604
			if (conn.udpSendQueue.sendUdp()) sent = true;
c47604
			if (conn.udpRecvQueue.sendUdpConfirm()) sent = true;
c47604
			if (sent) conn.lastUdpOutputTime = time;
c47604
			if (timeLequal(conn.lastUdpOutputTime + udpKeepAliveDuration, time)) {
460064
				DebugMsg("%u, udp keep alive", conn.id.id);
c47604
				conn.udpRecvQueue.sendUdpConfirm(true);
c47604
				conn.lastUdpOutputTime = time;
c47604
			}
c47604
		}
c47604
	}
c47604
	
460064
	// remove closed connections
460064
	for(ClosedConnMap::iterator i = closedConnections.begin(); i != closedConnections.end();)
460064
		if (timeLess(i->second, time))
460064
			closedConnections.erase(i++); else ++i;
460064
	
460064
	// close connections
873e9f
	for(ConnMap::iterator i = connections.begin(); i != connections.end();) {
873e9f
		ConnMap::iterator j = i++;
873e9f
		Connection &conn = j->second;
c47604
		
460064
		if (conn.udpActive && timeLess(conn.lastUdpInputTime + udpSilenceDuration, time)) {
460064
			DebugMsg("%u, inactivate udp by silence timeout", conn.id.id);
c47604
			conn.udpActive = false;
460064
			termQueue.insert(conn.id);
460064
		}
c47604
		
c47604
		if (!conn.udpActive && !conn.wantWriteToTcp()) {
460064
			DebugMsg("%u, inactivate tcp", conn.id.id);
c47604
			Socket::close(conn.tcpSockId);
c47604
			conn.tcpSockId = -1;
ab5ba6
			logf("Close connection, id: %u\n", conn.id.id);
b95e8b
			closedConnections[j->first] = time + 2*udpSilenceDuration;
460064
			connections.erase(j);
c47604
			continue;
c47604
		}
c47604
		
c47604
		if (conn.tcpSockId < 0 && !conn.udpSendQueue.canSentToUdp()) {
460064
			DebugMsg("%u, inactivate udp, all data sent", conn.id.id);
c47604
			conn.udpActive = false;
460064
			termQueue.insert(conn.id);
ab5ba6
			logf("Close connection, id: %u\n", conn.id.id);
b95e8b
			closedConnections[j->first] = time + 2*udpSilenceDuration;
460064
			connections.erase(j);
c47604
			continue;
c47604
		}
c47604
		
c47604
		if (!conn.udpActive && conn.tcpSockId < 0) {
ab5ba6
			logf("Close connection, id: %u\n", conn.id.id);
b95e8b
			closedConnections[j->first] = time + 2*udpSilenceDuration;
460064
			connections.erase(j);
c47604
			continue;
873e9f
		}
873e9f
	}
873e9f
	
873e9f
	// send udp term packets
460064
	if (timeLequal(nextTermSendTime, time) && !termQueue.empty()) {
873e9f
		for(ConnSet::const_iterator i = termQueue.begin(); i != termQueue.end(); ++i) {
460064
			DebugMsg("%u, send term packet", i->id);
873e9f
			Packet packet;
873e9f
			packet.init(i->id, 0, CMD_TERM, 0);
873e9f
			sendUdpPacket(i->address, packet);
873e9f
		}
873e9f
		termQueue.clear();
460064
		nextTermSendTime = time + udpConfirmDuration;
873e9f
	}
873e9f
	
873e9f
	poll.list.clear();
873e9f
	return true;
873e9f
}
873e9f
873e9f
873e9f
void Tunnel::run() {
1d16df
	initTime();
873e9f
	while(iteration());
873e9f
}
873e9f