Blame tunnel.cpp

873e9f
873e9f
#include <cstdio></cstdio>
873e9f
873e9f
#include "socket.h"
873e9f
#include "tunnel.h"
873e9f
873e9f
873e9f
873e9f
Tunnel::Tunnel():
873e9f
	time(monotonicTime()),
873e9f
	udpSendDuration(1000),
873e9f
	udpPartialSendDuration(500000),
873e9f
	udpResendDuration(udpSendDuration * PACKETS_COUNT),
873e9f
	udpConfirmDuration(udpResendDuration/8),
c47604
	udpSilenceDuration(300000000),
c47604
	udpKeepAliveDuration(udpSilenceDuration/32),
873e9f
	pollDuration(5000000),
873e9f
	udpSockId(-1),
873e9f
	tcpSockId(-1),
873e9f
	nextTermSendTime(time + udpConfirmDuration)
873e9f
	{ }
873e9f
c47604
873e9f
Tunnel::~Tunnel()
873e9f
	{ close(); }
873e9f
873e9f
c47604
bool Tunnel::initUdpServer(const Address &address, const Address &remoteTcpAddress, const char *key) {
873e9f
	closeUdpServer();
873e9f
	printf("Init UDP server at address: "); address.print();
873e9f
	if (remoteTcpAddress.port)
873e9f
		{ printf(", with remote TCP address: "); remoteTcpAddress.print(); }
873e9f
	printf("\n");
873e9f
	
c47604
	if (!crypt.setKey(key))
c47604
		{ printf("bad key, cancelled"); return false; }
c47604
	
873e9f
	udpSockId = Socket::udpBind(address);
873e9f
	if (udpSockId < 0)
c47604
		{ crypt.resetKey(); printf("cannot bind, cancelled"); return false; }
873e9f
	
873e9f
	printf("success");
873e9f
	this->remoteTcpAddress = remoteTcpAddress;
873e9f
	return true;
873e9f
}
873e9f
c47604
c47604
bool Tunnel::initTcpServer(const Address &address, const Address &remoteUdpAddress) {
873e9f
	closeTcpServer();
873e9f
	printf("Init TCP server at address: "); address.print(); printf("\n");
873e9f
	
873e9f
	if (udpSockId < 0)
873e9f
		{ printf("UDP server was not initialized, cancelled\n"); return false; }
873e9f
	
c47604
	tcpSockId = Socket::tcpListen(address);
c47604
	if (tcpSockId < 0)
c47604
		{ printf("cannot listen, cancelled"); return false; }
c47604
	
873e9f
	printf("success");
c47604
	this->remoteUdpAddress = remoteUdpAddress;
873e9f
	return true;
873e9f
}
873e9f
873e9f
873e9f
void Tunnel::closeUdpServer() {
873e9f
	if (udpSockId < 0) return;
873e9f
	closeTcpServer();
873e9f
	
873e9f
	printf("Close UDP server");
873e9f
	for(ConnMap::iterator i = connections.begin(); i != connections.end(); ++i)
873e9f
		Socket::close(i->second.tcpSockId);
873e9f
	connections.clear();
873e9f
	termQueue.clear();
873e9f
	Socket::close(udpSockId);
873e9f
	udpSockId = -1;
873e9f
	remoteTcpAddress = Address();
c47604
	crypt.resetKey();
873e9f
}
873e9f
873e9f
873e9f
void Tunnel::closeTcpServer() {
873e9f
	if (tcpSockId < 0) return;
873e9f
	printf("Close TCP server");
873e9f
	Socket::close(tcpSockId);
873e9f
	tcpSockId = -1;
873e9f
	remoteUdpAddress = Address();
873e9f
}
873e9f
873e9f
873e9f
void Tunnel::close()
873e9f
	{ closeUdpServer(); }
873e9f
873e9f
c47604
bool Tunnel::recvUdpPacket() {
c47604
	ConnId id;
c47604
	unsigned char raw[CRYPT_PACKET_SIZE] = {};
c47604
	int rawSize = Socket::udpRecvAsync(udpSockId, id.address, raw, sizeof(raw));
c47604
	if (rawSize <= 0)
c47604
		return false;
c47604
	
c47604
	unsigned char data[CRYPT_PACKET_SIZE] = {};
c47604
	int dataSize = CRYPT_PACKET_SIZE;
c47604
	if (!crypt.decrypt(raw, rawSize, data, dataSize))
c47604
		return true;
c47604
	
c47604
	void *ptr = data;
c47604
	Packet &packet = *(Packet*)ptr;
c47604
	
c47604
	int cmd = packet.getCommand();
c47604
	if (cmd < 0 || cmd > CMDMAX)
c47604
		return true;
c47604
c47604
	int size = packet.getSize();
c47604
	if (size + HEADER_SIZE > dataSize)
c47604
		return true;
c47604
	
c47604
	id.id = packet.connId;
c47604
	ConnMap::iterator i = connections.find(id);
c47604
	if (i == connections.end() && cmd == CMD_DATA && remoteTcpAddress.port && !closedConnections.count(id)) {
c47604
		printf("Open incoming connection from UDP: ");
c47604
		id.address.print();
c47604
		printf(", id: %u\n", id.id);
c47604
		int sockId = Socket::tcpConnect(remoteTcpAddress);
c47604
		if (sockId < 0) {
c47604
			printf("cannot init TCP connection, cancelled\n");
c47604
		} else {
c47604
			i = connections.emplace(id, Connection::Args(*this, id, sockId)).first;
873e9f
		}
873e9f
	}
c47604
	
c47604
	if (i == connections.end()) {
c47604
		termQueue.insert(id);
c47604
		return true;
c47604
	}
c47604
	
c47604
	Connection &conn = i->second;
c47604
	if (!conn.udpActive) {
c47604
		if (cmd != CMD_TERM) termQueue.insert(id);
c47604
		return true;
c47604
	}
c47604
	
c47604
	if (cmd == CMD_DATA) {
c47604
		conn.lastUdpInputTime = time;
c47604
		conn.udpRecvQueue.recvUdp(packet.index, packet.payload, size);
c47604
	} else
c47604
	if (cmd == CMD_CONFIRM) {
c47604
		conn.lastUdpInputTime = time;
c47604
		conn.udpSendQueue.confirm(packet.index, packet.payload, size*8);
c47604
	} else
c47604
	if (cmd == CMD_TERM) {
c47604
		conn.udpActive = false;
c47604
	}
c47604
	
c47604
	return true;
c47604
}
c47604
c47604
c47604
void Tunnel::sendUdpPacket(const Address &address, const Packet &packet) {
c47604
	unsigned char raw[CRYPT_PACKET_SIZE] = {};
c47604
	int rawSize = CRYPT_PACKET_SIZE;
c47604
	if (crypt.encrypt(&packet, HEADER_SIZE + packet.getSize(), raw, rawSize))
c47604
		Socket::udpSend(udpSockId, address, raw, rawSize);
873e9f
}
873e9f
873e9f
873e9f
bool Tunnel::iteration() {
873e9f
	if (udpSockId < 0)
873e9f
		return false;
873e9f
	
873e9f
	// prepare poll
873e9f
	
873e9f
	poll.list.clear();
873e9f
	
873e9f
	poll.list.emplace_back();
873e9f
	Poll::Entry &e = poll.list.back();
873e9f
	e.sockId = udpSockId;
873e9f
	e.wantRead = true;
873e9f
	
873e9f
	if (tcpSockId >= 0) {
873e9f
		Poll::Entry &e = poll.list.back();
873e9f
		e.sockId = tcpSockId;
873e9f
		e.wantRead = true;
873e9f
	}
873e9f
	
873e9f
	Time pollTime = time + pollDuration;
873e9f
	for(ConnMap::iterator i = connections.begin(); i != connections.end(); ++i) {
873e9f
		Connection &conn = i->second;
873e9f
		if (conn.tcpSockId >= 0) {
873e9f
			poll.list.emplace_back();
873e9f
			Poll::Entry &e = poll.list.back();
873e9f
			e.connection = &conn;
873e9f
			e.sockId = conn.tcpSockId;
873e9f
			e.wantRead = conn.udpActive && conn.wantReadFromTcp();
873e9f
			e.wantWrite = conn.wantWriteToTcp();
873e9f
		}
873e9f
		if (conn.udpActive) {
873e9f
			Time t = conn.whenWriteToUdp();
873e9f
			if (timeLess(pollTime, t)) pollTime = t;
873e9f
		}
873e9f
	}
873e9f
	
873e9f
	// wait for events
873e9f
	poll.wait(pollTime);
873e9f
	time = monotonicTime();
c47604
	crypt.setTime(time);
873e9f
	
873e9f
	if (poll.list[0].closed || (tcpSockId >= 0 && poll.list[1].closed)) {
873e9f
		poll.list.clear();
873e9f
		close();
873e9f
		return false;
873e9f
	}
873e9f
	
873e9f
	// read and parse udp
873e9f
	if (poll.list[0].canRead)
873e9f
		while(recvUdpPacket());
873e9f
	
873e9f
	if (tcpSockId >= 0 && poll.list[1].canRead) {
873e9f
		// accept new tcp connection
873e9f
		int sockId = Socket::tcpAccept(tcpSockId);
873e9f
		if (sockId >= 0) {
873e9f
			ConnId id;
873e9f
			id.address = remoteUdpAddress;
873e9f
			id.id = ConnId::generateId();
c47604
			Connection &conn = connections.emplace(id, Connection::Args(*this, id, tcpSockId)).first->second;
873e9f
			conn.tcpRecvQueue.readFromTcp();
873e9f
		}
873e9f
	}
873e9f
	
873e9f
	// tcp read and write
873e9f
	int first = tcpSockId < 0 ? 1 : 2;
873e9f
	for(Poll::List::const_iterator i = poll.list.begin() + first; i != poll.list.end(); ++i) {
873e9f
		Connection &conn = *i->connection;
873e9f
		conn.udpRecvQueue.sendTcp();
c47604
		if (i->canWrite) while(true) {
c47604
			int size = conn.tcpSendQueue.writeToTcp();
c47604
			if (size <= 0) break;
c47604
			conn.udpRecvQueue.sendTcp();
c47604
		}
873e9f
		if (i->canRead && conn.udpActive)
873e9f
			conn.tcpRecvQueue.readFromTcp();
873e9f
		if (i->closed || (!conn.udpActive && !conn.tcpSendQueue.busySize())) {
873e9f
			Socket::close(conn.tcpSockId);
873e9f
			conn.tcpSockId = -1;
873e9f
		}
873e9f
	}
873e9f
	
873e9f
	// write to udp
c47604
	for(ConnMap::iterator i = connections.begin(); i != connections.end(); ++i) {
c47604
		Connection &conn = i->second;
c47604
		if (conn.udpActive) {
c47604
			bool sent = false;
c47604
			if (conn.udpSendQueue.sendUdp()) sent = true;
c47604
			if (conn.udpRecvQueue.sendUdpConfirm()) sent = true;
c47604
			if (sent) conn.lastUdpOutputTime = time;
c47604
			if (timeLequal(conn.lastUdpOutputTime + udpKeepAliveDuration, time)) {
c47604
				conn.udpRecvQueue.sendUdpConfirm(true);
c47604
				conn.lastUdpOutputTime = time;
c47604
			}
c47604
		}
c47604
	}
c47604
	
c47604
	// close connections	
873e9f
	for(ConnMap::iterator i = connections.begin(); i != connections.end();) {
873e9f
		ConnMap::iterator j = i++;
873e9f
		Connection &conn = j->second;
c47604
		
c47604
		if (conn.udpActive && timeLess(conn.lastUdpInputTime + udpSilenceDuration, time))
c47604
			conn.udpActive = false;
c47604
		
c47604
		if (!conn.udpActive && !conn.wantWriteToTcp()) {
c47604
			Socket::close(conn.tcpSockId);
c47604
			conn.tcpSockId = -1;
c47604
			closedConnections[conn.id] = time + 2*udpSilenceDuration;
c47604
			connections.erase(i);
c47604
			continue;
c47604
		}
c47604
		
c47604
		if (conn.tcpSockId < 0 && !conn.udpSendQueue.canSentToUdp()) {
c47604
			conn.udpActive = false;
c47604
			closedConnections[conn.id] = time + 2*udpSilenceDuration;
c47604
			connections.erase(i);
c47604
			continue;
c47604
		}
c47604
		
c47604
		if (!conn.udpActive && conn.tcpSockId < 0) {
c47604
			closedConnections[conn.id] = time + 2*udpSilenceDuration;
c47604
			connections.erase(i);
c47604
			continue;
873e9f
		}
873e9f
	}
873e9f
	
873e9f
	// send udp term packets
873e9f
	if (timeLequal(nextTermSendTime, time)) {
873e9f
		for(ConnSet::const_iterator i = termQueue.begin(); i != termQueue.end(); ++i) {
873e9f
			Packet packet;
873e9f
			packet.init(i->id, 0, CMD_TERM, 0);
873e9f
			sendUdpPacket(i->address, packet);
873e9f
		}
873e9f
		termQueue.clear();
873e9f
		nextTermSendTime = time + nextTermSendTime;
873e9f
	}
873e9f
	
873e9f
	poll.list.clear();
873e9f
	return true;
873e9f
}
873e9f
873e9f
873e9f
void Tunnel::run() {
873e9f
	time = monotonicTime();
873e9f
	nextTermSendTime = time + udpConfirmDuration;
873e9f
	while(iteration());
873e9f
}
873e9f