Blob Blame Raw

#include <cstring>

#include "tunnel.h"
#include "connection.h"
#include "udprecvqueue.h"



//#define DebugMsg HideDebugMSG
#define DebugMsg ShowDebugMSG



UdpRecvQueue::UdpRecvQueue(Connection &connection):
	connection(connection),
	nextConfirmSendTime(connection.tunnel.time + connection.tunnel.udpConfirmDuration),
	confirmRequired(),
	finalEntryAdded(),
	entries(),
	currentIndex(),
	endIndex()
	{ }


UdpRecvQueue::~UdpRecvQueue()
	{ }


bool UdpRecvQueue::recvUdp(unsigned int index, const void *data, int size) {
	confirmRequired = true;
	
	if (size < 0 || size > PACKET_SIZE)
		return false;
	if (cycleLess(index, currentIndex) || !cycleLess(index, currentIndex + PACKETS_COUNT))
		return false;
	if (finalEntryAdded && !cycleLess(index, endIndex))
		return false;
	Entry &e = entries[(current + (index - currentIndex))%PACKETS_COUNT];
	if (e.received)
		return false;
	
	DebugMsg("%u, %u, %d", connection.id.id, index, size);
	e.received = true;
	e.size = size;
	if (e.size) {
		memcpy(e.data, data, e.size);
		if (cycleLess(endIndex, index + 1)) endIndex = index + 1;
	} else {
		finalEntryAdded = true;
		endIndex = index + 1;
	}
	
	return true;
}


bool UdpRecvQueue::sendUdpConfirm(bool force) {
	Time time = connection.tunnel.time;
	if (!force && (!confirmRequired || timeLess(time, nextConfirmSendTime)))
		return false;
	
	unsigned int count = endIndex - currentIndex;
	unsigned int bytesCount = count ? (count - 1)/8 + 1 : 0;
	
	Packet packet;
	packet.init(connection.id.id, currentIndex, CMD_CONFIRM, bytesCount);
	for(int i = 0; i < (int)count; ++i)
		if (entries[(current + i)%PACKETS_COUNT].received)
			packet.payload[i/8] |= (1 << (i%8));
	DebugMsg("%u, %u, %s", connection.id.id, currentIndex, bitsToString(packet.payload, bytesCount));
	connection.tunnel.sendUdpPacket(connection.id.address, packet);
	
	confirmRequired = false;
	nextConfirmSendTime = time + connection.tunnel.udpConfirmDuration;
	return true;
}


bool UdpRecvQueue::canSentToTcp() const
	{ return cycleLess(currentIndex, endIndex) && entries[current].received && entries[current].size; }


int UdpRecvQueue::sendTcp() {
	int sent = 0;
	while(cycleLess(currentIndex, endIndex)) {
		Entry &e = entries[current];
		if (!e.received)
			break;

		if (!e.size) { 
			DebugMsg("%d, inactivate udp, all packets received", connection.id.id);
			connection.udpActive = false;
			connection.tunnel.termQueue.insert(connection.id);
		} else
		if (!connection.tcpSendQueue.push(e.data, e.size)) {
			break;
		}
		
		++sent;
		memset(&e, 0, sizeof(e));
		current = (current + 1)%PACKETS_COUNT;
		++currentIndex;
	}
	if (sent) confirmRequired = true;
	return sent;
}


void UdpRecvQueue::whenWriteToUdp(Time &t) const {
	if (confirmRequired && timeLess(nextConfirmSendTime, t))
		t = nextConfirmSendTime;
}