#include <cstdio>
#include "socket.h"
#include "tunnel.h"
//#define DebugMsg HideDebugMSG
#define DebugMsg ShowDebugMSG
Tunnel::Tunnel():
time(), // will be set by initTime() called by initSpeed()
udpSendDuration(), // will be
udpPartialSendDuration(), // set
udpResendDuration(), // by
udpConfirmDuration(), // initSpeed()
udpSilenceDuration(300000000),
udpKeepAliveDuration(udpSilenceDuration/32),
timeQwant(1200000000),
pollDuration(5000000),
udpSockId(-1),
tcpSockId(-1),
nextTermSendTime() // will be set by initTime() called by initSpeed()
{
initSpeed(1024*1024);
}
Tunnel::~Tunnel()
{ close(); }
void Tunnel::initSpeed(unsigned long long bytesPerSecond) {
udpSendDuration = (1000000ull*PACKET_SIZE + bytesPerSecond/2)/bytesPerSecond;
udpPartialSendDuration = 10*udpSendDuration;
udpResendDuration = udpSendDuration * PACKETS_COUNT;
udpConfirmDuration = udpResendDuration/8;
initTime();
}
bool Tunnel::initUdpServer(const Address &address, const Address &remoteTcpAddress, const char *key) {
closeUdpServer();
printf("Init UDP server at address: "); address.print();
if (remoteTcpAddress.port)
{ printf(", with remote TCP address: "); remoteTcpAddress.print(); }
printf("\n");
if (!crypt.setKey(key))
{ printf("bad key, cancelled\n"); return false; }
udpSockId = Socket::udpBind(address);
if (udpSockId < 0)
{ crypt.resetKey(); printf("cannot bind, cancelled\n"); return false; }
printf("success\n");
this->remoteTcpAddress = remoteTcpAddress;
return true;
}
bool Tunnel::initTcpServer(const Address &address, const Address &remoteUdpAddress) {
closeTcpServer();
printf("Init TCP server at address: "); address.print(); printf("\n");
if (udpSockId < 0)
{ printf("UDP server was not initialized, cancelled\n"); return false; }
tcpSockId = Socket::tcpListen(address);
if (tcpSockId < 0)
{ printf("cannot listen, cancelled\n"); return false; }
printf("success\n");
this->remoteUdpAddress = remoteUdpAddress;
return true;
}
void Tunnel::closeUdpServer() {
if (udpSockId < 0) return;
closeTcpServer();
printf("Close UDP server\n");
for(ConnMap::iterator i = connections.begin(); i != connections.end(); ++i)
Socket::close(i->second.tcpSockId);
connections.clear();
termQueue.clear();
Socket::close(udpSockId);
udpSockId = -1;
remoteTcpAddress = Address();
crypt.resetKey();
}
void Tunnel::closeTcpServer() {
if (tcpSockId < 0) return;
printf("Close TCP server\n");
Socket::close(tcpSockId);
tcpSockId = -1;
remoteUdpAddress = Address();
}
void Tunnel::close()
{ closeUdpServer(); }
bool Tunnel::recvUdpPacket() {
ConnId id;
unsigned char raw[CRYPT_PACKET_SIZE] = {};
int rawSize = Socket::udpRecvAsync(udpSockId, id.address, raw, sizeof(raw));
if (rawSize <= 0)
return false;
unsigned char data[CRYPT_PACKET_SIZE] = {};
int dataSize = CRYPT_PACKET_SIZE;
if (!crypt.decrypt(raw, rawSize, data, dataSize)) {
DebugMsg("bad packet received%s", (id.address.print(), ""));
return true;
}
void *ptr = data;
Packet &packet = *(Packet*)ptr;
int cmd = packet.getCommand();
if (cmd < 0 || cmd > CMDMAX) {
DebugMsg("bad command %d", cmd);
return true;
}
int size = packet.getSize();
if (size + HEADER_SIZE > dataSize) {
DebugMsg("bad size %d", size);
return true;
}
id.id = packet.connId;
ConnMap::iterator i = connections.find(id);
if (i == connections.end() && cmd == CMD_DATA && remoteTcpAddress.port && !closedConnections.count(id)) {
printf("Open incoming connection from UDP: ");
id.address.print();
printf(", id: %u\n", id.id);
int sockId = Socket::tcpConnect(remoteTcpAddress);
if (sockId < 0) {
printf("cannot init TCP connection, cancelled\n");
} else {
i = connections.emplace(id, Connection::Args(*this, id, sockId)).first;
}
}
if (i == connections.end()) {
DebugMsg("unknown connection: %u", (id.address.print(), id.id));
if (cmd != CMD_TERM) termQueue.insert(id);
return true;
}
Connection &conn = i->second;
if (!conn.udpActive) {
DebugMsg("inactive connection: %u", id.id);
if (cmd != CMD_TERM) termQueue.insert(id);
return true;
}
if (cmd == CMD_DATA) {
conn.lastUdpInputTime = time;
conn.udpRecvQueue.recvUdp(packet.index, packet.payload, size);
} else
if (cmd == CMD_CONFIRM) {
conn.lastUdpInputTime = time;
conn.udpSendQueue.confirm(packet.index, packet.payload, size*8);
} else
if (cmd == CMD_TERM) {
DebugMsg("%u, inactivate udp by incoming term packet", conn.id.id);
conn.udpActive = false;
}
return true;
}
void Tunnel::sendUdpPacket(const Address &address, const Packet &packet) {
unsigned char raw[CRYPT_PACKET_SIZE] = {};
int rawSize = CRYPT_PACKET_SIZE;
if (crypt.encrypt(&packet, HEADER_SIZE + packet.getSize(), raw, rawSize))
Socket::udpSend(udpSockId, address, raw, rawSize);
}
void Tunnel::initTime() {
time = monotonicTime();
nextTermSendTime = time + udpConfirmDuration;
crypt.setTime(globalTime(), timeQwant);
}
bool Tunnel::iteration() {
if (udpSockId < 0)
return false;
// prepare poll
poll.list.clear();
poll.list.emplace_back();
Poll::Entry &e = poll.list.back();
e.sockId = udpSockId;
e.wantRead = true;
if (tcpSockId >= 0) {
poll.list.emplace_back();
Poll::Entry &e = poll.list.back();
e.sockId = tcpSockId;
e.wantRead = true;
}
time = monotonicTime();
Time pollTime = time + pollDuration;
for(ConnMap::iterator i = connections.begin(); i != connections.end(); ++i) {
Connection &conn = i->second;
if (conn.tcpSockId >= 0) {
poll.list.emplace_back();
Poll::Entry &e = poll.list.back();
e.connection = &conn;
e.sockId = conn.tcpSockId;
e.wantRead = conn.udpActive && conn.wantReadFromTcp();
e.wantWrite = conn.wantWriteToTcp();
}
if (conn.udpActive)
conn.whenWriteToUdp(pollTime);
}
if (timeLess(pollTime, time)) pollTime = time;
// wait for events
DebugMsg("poll duration: %llu", pollTime - time);
poll.wait(pollTime - time);
time = monotonicTime();
crypt.setTime(globalTime(), timeQwant);
if (poll.list[0].closed || (tcpSockId >= 0 && poll.list[1].closed)) {
if (poll.list[0].closed)
printf("UDP socket closed\n");
if (tcpSockId >= 0 && poll.list[1].closed)
printf("TCP server socket closed\n");
poll.list.clear();
close();
return false;
}
// read and parse udp
if (poll.list[0].canRead)
while(recvUdpPacket());
if (tcpSockId >= 0 && poll.list[1].canRead) {
// accept new tcp connection
Address address;
int sockId = Socket::tcpAccept(tcpSockId, address);
if (sockId >= 0) {
ConnId id;
id.address = remoteUdpAddress;
id.id = crypt.random();
printf("Open incoming connection from TCP: ");
address.print();
printf(", id: %u\n", id.id);
Connection &conn = connections.emplace(id, Connection::Args(*this, id, sockId)).first->second;
conn.tcpRecvQueue.readFromTcp();
}
}
// tcp read and write
int first = tcpSockId < 0 ? 1 : 2;
for(Poll::List::const_iterator i = poll.list.begin() + first; i != poll.list.end(); ++i) {
Connection &conn = *i->connection;
conn.udpRecvQueue.sendTcp();
if (i->canWrite) while(true) {
int size = conn.tcpSendQueue.writeToTcp();
if (size <= 0) break;
conn.udpRecvQueue.sendTcp();
}
if (i->canRead && conn.udpActive)
conn.tcpRecvQueue.readFromTcp();
if (i->closed || (!conn.udpActive && !conn.tcpSendQueue.busySize())) {
DebugMsg("%u, inactivate tcp", conn.id.id);
Socket::close(conn.tcpSockId);
conn.tcpSockId = -1;
}
}
// write to udp
for(ConnMap::iterator i = connections.begin(); i != connections.end(); ++i) {
Connection &conn = i->second;
if (conn.udpActive) {
bool sent = false;
if (conn.udpSendQueue.sendUdp()) sent = true;
if (conn.udpRecvQueue.sendUdpConfirm()) sent = true;
if (sent) conn.lastUdpOutputTime = time;
if (timeLequal(conn.lastUdpOutputTime + udpKeepAliveDuration, time)) {
DebugMsg("%u, udp keep alive", conn.id.id);
conn.udpRecvQueue.sendUdpConfirm(true);
conn.lastUdpOutputTime = time;
}
}
}
// remove closed connections
for(ClosedConnMap::iterator i = closedConnections.begin(); i != closedConnections.end();)
if (timeLess(i->second, time))
closedConnections.erase(i++); else ++i;
// close connections
for(ConnMap::iterator i = connections.begin(); i != connections.end();) {
ConnMap::iterator j = i++;
Connection &conn = j->second;
if (conn.udpActive && timeLess(conn.lastUdpInputTime + udpSilenceDuration, time)) {
DebugMsg("%u, inactivate udp by silence timeout", conn.id.id);
conn.udpActive = false;
termQueue.insert(conn.id);
}
if (!conn.udpActive && !conn.wantWriteToTcp()) {
DebugMsg("%u, inactivate tcp", conn.id.id);
Socket::close(conn.tcpSockId);
conn.tcpSockId = -1;
printf("Close connection, id: %u\n", conn.id.id);
closedConnections[conn.id] = time + 2*udpSilenceDuration;
connections.erase(j);
continue;
}
if (conn.tcpSockId < 0 && !conn.udpSendQueue.canSentToUdp()) {
DebugMsg("%u, inactivate udp, all data sent", conn.id.id);
conn.udpActive = false;
termQueue.insert(conn.id);
printf("Close connection, id: %u\n", conn.id.id);
closedConnections[conn.id] = time + 2*udpSilenceDuration;
connections.erase(j);
continue;
}
if (!conn.udpActive && conn.tcpSockId < 0) {
printf("Close connection, id: %u\n", conn.id.id);
closedConnections[conn.id] = time + 2*udpSilenceDuration;
connections.erase(j);
continue;
}
}
// send udp term packets
if (timeLequal(nextTermSendTime, time) && !termQueue.empty()) {
for(ConnSet::const_iterator i = termQueue.begin(); i != termQueue.end(); ++i) {
DebugMsg("%u, send term packet", i->id);
Packet packet;
packet.init(i->id, 0, CMD_TERM, 0);
sendUdpPacket(i->address, packet);
}
termQueue.clear();
nextTermSendTime = time + udpConfirmDuration;
}
poll.list.clear();
return true;
}
void Tunnel::run() {
initTime();
while(iteration());
}