#include <cstring>
#include "tunnel.h"
#include "connection.h"
#include "udprecvqueue.h"
//#define DebugMsg HideDebugMSG
#define DebugMsg ShowDebugMSG
UdpRecvQueue::UdpRecvQueue(Connection &connection):
connection(connection),
nextConfirmSendTime(connection.tunnel.time + connection.tunnel.udpConfirmDuration),
confirmRequired(),
finalEntryAdded(),
entries(),
currentIndex(),
endIndex()
{ }
UdpRecvQueue::~UdpRecvQueue()
{ }
bool UdpRecvQueue::recvUdp(unsigned int index, const void *data, int size) {
confirmRequired = true;
if (size < 0 || size > PACKET_SIZE)
return false;
if (cycleLess(index, currentIndex) || !cycleLess(index, currentIndex + PACKETS_COUNT))
return false;
if (finalEntryAdded && !cycleLess(index, endIndex))
return false;
Entry &e = entries[(current + (index - currentIndex))%PACKETS_COUNT];
if (e.received)
return false;
DebugMsg("%u, %u, %d", connection.id.id, index, size);
e.received = true;
e.size = size;
if (e.size) {
memcpy(e.data, data, e.size);
if (cycleLess(endIndex, index + 1)) endIndex = index + 1;
} else {
finalEntryAdded = true;
endIndex = index + 1;
}
return true;
}
bool UdpRecvQueue::sendUdpConfirm(bool force) {
Time time = connection.tunnel.time;
if (!force && (!confirmRequired || timeLess(time, nextConfirmSendTime)))
return false;
unsigned int count = endIndex - currentIndex;
unsigned int bytesCount = count ? (count - 1)/8 + 1 : 0;
Packet packet;
packet.init(connection.id.id, currentIndex, CMD_CONFIRM, bytesCount);
for(int i = 0; i < (int)count; ++i)
if (entries[(current + i)%PACKETS_COUNT].received)
packet.payload[i/8] |= (1 << (i%8));
DebugMsg("%u, %u, %s", connection.id.id, currentIndex, bitsToString(packet.payload, bytesCount));
connection.tunnel.sendUdpPacket(connection.id.address, packet);
confirmRequired = false;
nextConfirmSendTime = time + connection.tunnel.udpConfirmDuration;
return true;
}
bool UdpRecvQueue::canSentToTcp() const
{ return cycleLess(currentIndex, endIndex) && entries[current].received && entries[current].size; }
int UdpRecvQueue::sendTcp() {
int sent = 0;
while(cycleLess(currentIndex, endIndex)) {
Entry &e = entries[current];
if (!e.received)
break;
if (!e.size) {
DebugMsg("%d, inactivate udp, all packets received", connection.id.id);
connection.udpActive = false;
connection.tunnel.termQueue.insert(connection.id);
} else
if (!connection.tcpSendQueue.push(e.data, e.size)) {
break;
}
++sent;
memset(&e, 0, sizeof(e));
current = (current + 1)%PACKETS_COUNT;
++currentIndex;
}
if (sent) confirmRequired = true;
return sent;
}
void UdpRecvQueue::whenWriteToUdp(Time &t) const {
if (confirmRequired && timeLess(nextConfirmSendTime, t))
t = nextConfirmSendTime;
}