From 0ea18dbad77d62cfd84bbdf0f748369b0b2d9f7e Mon Sep 17 00:00:00 2001 From: Ivan Mahonin Date: Oct 07 2021 18:29:25 +0000 Subject: delete finalized connections --- diff --git a/tcp.cpp b/tcp.cpp index 10952a8..4bfa8d7 100644 --- a/tcp.cpp +++ b/tcp.cpp @@ -147,7 +147,7 @@ bool Connection::read(void *data, size_t size) { { close(ERR_TCP_CONNECTION_LOST); return false; } if (pr > 0) { int s = size > INT_MAX ? INT_MAX : (int)size; - int r = ::recv(sockId, data, s, MSG_DONTWAIT); + int r = ::recv(sockId, data, s, MSG_DONTWAIT | MSG_NOSIGNAL); if (r < 0) { if (errno != EAGAIN) { close(ERR_TCP_CONNECTION_LOST); return false; } @@ -177,7 +177,7 @@ bool Connection::write(const void *data, size_t size) { { close(ERR_TCP_CONNECTION_LOST); return false; } if (pr > 0) { int s = size > INT_MAX ? INT_MAX : (int)size; - int r = ::send(sockId, data, s, MSG_DONTWAIT); + int r = ::send(sockId, data, s, MSG_DONTWAIT | MSG_NOSIGNAL); if (r < 0) { if (errno != EAGAIN) { close(ERR_TCP_CONNECTION_LOST); return false; } @@ -209,6 +209,24 @@ Server::Server(): Server::~Server() { stop(); } +void Server::handleConnection(ConnList::iterator iter) { + handler->handleTcpConnection(*iter->connection); + { + std::lock_guard lock(connectionsMutex); + connectionsFinished.push_back(*iter); + connections.erase(iter); + } +} + +void Server::cleanConnections() { + while(!connectionsFinished.empty()) { + ConnDesc &desc = connectionsFinished.back(); + desc.thread->join(); + delete desc.thread; + delete desc.connection; + connectionsFinished.pop_back(); + } +} void Server::listen() { fcntl(sockId, F_SETFL, fcntl(sockId, F_GETFL, 0) | O_NONBLOCK); @@ -225,6 +243,7 @@ void Server::listen() { int pr = poll(&pfd, 1, pollTimeoutMs); if (pr < 0) { lastError = ERR_TCP_LISTEN_LOST; break; } + if (pr > 0 && (pr & POLLIN)) { socklen_t addrlen = sizeof(addr); int sid = ::accept(sockId, (sockaddr*)&addr, &addrlen); @@ -237,22 +256,32 @@ void Server::listen() { memcpy(remoteAddr.ip, &addr.sin_addr, sizeof(remoteAddr.ip)); remoteAddr.port = ntohs(addr.sin_port); - ConnDesc desc = {}; - desc.connection = new Connection(sid, remoteAddr); - desc.thread = new std::thread( - &Handler::handleTcpConnection, handler, std::ref(*desc.connection)); - connections.push_back(desc); + ConnList::iterator iter; + { + std::lock_guard lock(connectionsMutex); + cleanConnections(); + iter = connections.emplace(connections.end()); + } + iter->connection = new Connection(sid, remoteAddr); + iter->thread = new std::thread(&Server::handleConnection, this, iter); + } else { + std::lock_guard lock(connectionsMutex); + cleanConnections(); } } status = lastError ? STATUS_LOST : STATUS_CLOSED; + connectionsMutex.lock(); for(ConnList::iterator i = connections.begin(); i != connections.end(); ++i) i->connection->stop(); - for(ConnList::iterator i = connections.begin(); i != connections.end(); ++i) { - i->thread->join(); - delete i->thread; - delete i->connection; + while(!connections.empty()) { + std::thread *thread = connections.front().thread; + connectionsMutex.unlock(); + thread->join(); + connectionsMutex.lock(); } + cleanConnections(); + connectionsMutex.unlock(); stopping = false; } diff --git a/tcp.h b/tcp.h index 6bf2ed1..d7daa3a 100644 --- a/tcp.h +++ b/tcp.h @@ -5,6 +5,8 @@ #include #include #include +#include +#include #include "error.h" @@ -110,7 +112,8 @@ public: Connection *connection; std::thread *thread; }; - typedef std::vector ConnList; + typedef std::list ConnList; + typedef std::vector ConnVector; private: Status status; @@ -119,12 +122,17 @@ private: Address localAddr; Handler *handler; std::thread *thread; - ConnList connections; std::atomic stopping; + std::mutex connectionsMutex; + ConnList connections; + ConnVector connectionsFinished; + Server(const Server&) = delete; Server& operator=(const Server&) = delete; + void cleanConnections(); + void handleConnection(ConnList::iterator iter); void listen(); public: