From dd2ea4156e06e19ba487a2796fc655c2b1e06639 Mon Sep 17 00:00:00 2001 From: Ivan Mahonin Date: Oct 24 2021 07:06:33 +0000 Subject: tcpqueue --- diff --git a/common.h b/common.h new file mode 100644 index 0000000..54e2280 --- /dev/null +++ b/common.h @@ -0,0 +1,98 @@ +#ifndef COMMON_H +#define COMMON_H + + +enum { + PACKET_SIZE = 1024, + PACKETS_COUNT = 8*PACKET_SIZE, + TCP_BUFFER_SIZE = PACKET_SIZE*PACKETS_COUNT, +}; + +enum : unsigned short { + CMD_SHIFT = 12, + CMD_MASK = (unsigned short)(0xffff << CMD_SHIFT), + SIZE_MASK = (unsigned short)(~CMD_MASK), + + CMD_DATA = 0, + CMD_TERM = 1, + CMD_CONFIRM = 2, +}; + + + +typedef unsigned long long Time; + + +inline bool cycleLequal(unsigned int a, unsigned int b) + { return (b - a) <= 0x7fffffff; } +inline bool cycleLess(unsigned int a, unsigned int b) + { return !cycleLequal(b, a); } +inline bool timeLequal(Time a, Time b) + { return (b - a) <= 0x7fffffffffffffffull; } +inline bool timeLess(Time a, Time b) + { return !timeLequal(b, a); } + + + +class Address { +public: + union { + struct { unsigned int ipUInt; }; + struct { unsigned char ip[4]; }; + }; + unsigned short port; + inline Address(): ipUInt(), port() { } + inline bool operator<(const Address &other) const { + return ipUInt < other.ipUInt ? true + : other.ipUInt < ipUInt ? false + : port < other.port; + } + + void print() const; +}; + + +class ConnId { +public: + Address address; + unsigned int id; + inline ConnId(): id() { } + inline bool operator<(const ConnId &other) const { + return address < other.address ? true + : other.address < address ? false + : id < other.id; + } +}; + + +struct __attribute__((__packed__)) Packet { + unsigned int connId; + unsigned int index; + unsigned short cmdSize; + unsigned char payload[PACKET_SIZE]; + + enum { HEADER_SIZE = ((Packet*)nullptr)->payload - nullptr; }; + + inline Packet(): + connId(), index(), cmdSize(), payload() { } + + inline int getSize() const + { return cmdSize & SIZE_MASK; } + inline int getCommand() const + { return cmdSize >> CMD_SHIFT; } + + inline init( + unsigned int connId, + unsigned int index, + unsigned short cmd, + unsigned short size ) + { + this->connId = connId; + this->index = index; + this->cmd = cmd; + this->size = size; + } +}; + + +#endif diff --git a/socket.h b/socket.h new file mode 100644 index 0000000..c563eb4 --- /dev/null +++ b/socket.h @@ -0,0 +1,43 @@ +#ifndef SOCKET_H +#define SOCEKT_H + + +#include "common.h" + + +class Poll { +private: + int count; + int allocatedCount; + void *data; + +public: + Poll(); + ~Poll(); + + void add(int sockId, bool wantRead, bool wantWrite); + void clear(); + bool wait(Time time); +}; + + +class Socket { +private: + Socket(); + +public: + static int tcpConnect(const Address &address); + static int tcpListen(const Address &address); + static int tcpAccept(int soskId); + static int tcpSendAsync(int soskId, const void *data, int size); + static int tcpRecvAsync(int soskId, void *data, int size); + + static int udpBind(int soskId, const Address &address); + static int udpSend(int soskId, const Address &address, const void *data, int size); + static int udpRecvAsync(Address *address, void *data, int size); + + static void close(int soskId); +}; + + +#endif diff --git a/tcpqueue.cpp b/tcpqueue.cpp new file mode 100644 index 0000000..a7c47c6 --- /dev/null +++ b/tcpqueue.cpp @@ -0,0 +1,108 @@ + + +#include + +#include "socket.h" +#include "connection.h" +#include "tcpqueue.h" + + + +TcpQueue::TcpQueue(Connection &connection): + connection(connection), buffer(), begin(), end() { } + + +TcpQueue::~TcpQueue() + { } + + +bool TcpQueue::push(const void *data, int size) { + if (!data || size <= 0 || freeSize() < size) + return false; + + int part1 = ALLOCED_SIZE - end; + if (size > part1) { + memcpy(buffer + end, data, part1); + memcpy(buffer, (const unsigned char*)data + part1, size - part1); + } else { + memcpy(buffer + end, data, size); + } + end = (end + size)%ALLOCED_SIZE; + + return true; +} + + +bool TcpQueue::pop(void *data, int size) { + if (!data || size <= 0 || busySize() < size) + return false; + + int part1 = ALLOCED_SIZE - begin; + if (size > part1) { + memcpy(data, buffer + begin, part1); + memcpy((unsigned char*)data + part1, buffer, size - part1); + } else { + memcpy(data, buffer + begin, size); + } + begin = (begin + size)%ALLOCED_SIZE; + + return true; +} + + +int TcpQueue::readFromTcp() { + if (connection.tcpSockId < 0) + return 0; + int size = freeSize(); + if (size <= 0) + return 0; + + int doneSize = 0; + int parts[2]; + parts[0] = ALLOCED_SIZE - end; + parts[1] = parts[0] - size; + if (parts[1] < 0) + { parts[0] = size; parts[1] = 0; } + + for(int i = 0; i < 2; ++i) { + while(parts[i]) { + int res = Socket::tcpRecvAsync(connection.tcpSockId, buffer + end, parts[i]); + if (res <= 0) + return doneSize; + doneSize += res; + end += res; + parts[i] -= res; + } + if (end >= ALLOCED_SIZE) end = 0; + } + return doneSize; +} + + +int TcpQueue::writeToTcp() { + if (connection.tcpSockId < 0) + return 0; + int size = busySize(); + if (size <= 0) + return 0; + + int doneSize = 0; + int parts[2]; + parts[0] = ALLOCED_SIZE - begin; + parts[1] = parts[0] - size; + if (parts[1] < 0) + { parts[0] = size; parts[1] = 0; } + + for(int i = 0; i < 2; ++i) { + while(parts[i]) { + int res = Socket::tcpSendAsync(connection.tcpSockId, buffer + begin, parts[i]); + if (res <= 0) + return doneSize; + doneSize += res; + begin += res; + parts[i] -= res; + } + if (begin >= ALLOCED_SIZE) begin = 0; + } + return doneSize; +} diff --git a/tcpqueue.h b/tcpqueue.h new file mode 100644 index 0000000..d1d7f7d --- /dev/null +++ b/tcpqueue.h @@ -0,0 +1,39 @@ +#ifndef TCPQUEUE_H +#define TCPQUEUE_H + + +#include "common.h" + + +class Connection; + + +class TcpQueue { +public: + enum { ALLOCED_SIZE = TCP_BUFFER_SIZE + 1 }; + +public: + Connection &connection; +private: + unsigned char buffer[ALLOCED_SIZE]; + int begin; + int end; + +public: + explicit TcpQueue(Connection &connection); + ~TcpQueue(); + + inline int busySize() const + { return end < begin ? end + ALLOCED_SIZE - begin : end - begin; } + inline int freeSize() const + { return TCP_BUFFER_SIZE - busySize(); } + + bool push(const void *data, int size); + bool pop(void *data, int size); + + int readFromTcp(); + int writeToTcp(); +}; + + +#endif