|
|
71057f |
|
|
|
71057f |
#include <climits></climits>
|
|
|
71057f |
#include <cstring></cstring>
|
|
|
71057f |
|
|
|
71057f |
#include <fcntl.h></fcntl.h>
|
|
|
71057f |
#include <unistd.h></unistd.h>
|
|
|
71057f |
#include <sys types.h=""></sys>
|
|
|
71057f |
#include <sys socket.h=""></sys>
|
|
|
71057f |
#include <sys epoll.h=""></sys>
|
|
|
71057f |
#include <sys eventfd.h=""></sys>
|
|
|
71057f |
#include <netinet in.h=""></netinet>
|
|
|
71057f |
#include <netinet tcp.h=""></netinet>
|
|
|
71057f |
#include <netdb.h></netdb.h>
|
|
|
71057f |
|
|
|
71057f |
#include "protocol.h"
|
|
|
71057f |
|
|
|
71057f |
|
|
|
6ebd3b |
#define DebugMSG HideDebugMSG
|
|
|
6ebd3b |
#define SrvDebugMSG HideDebugMSG
|
|
|
65ef39 |
#define ConnDebugMSG ShowDebugMSG
|
|
|
51b3f0 |
|
|
|
51b3f0 |
|
|
|
51b3f0 |
|
|
|
51b3f0 |
Protocol::Protocol():
|
|
|
51b3f0 |
thread(),
|
|
|
51b3f0 |
epollFd(-1),
|
|
|
51b3f0 |
eventFd(-1),
|
|
|
51b3f0 |
state(STATE_NONE),
|
|
|
51b3f0 |
stateWanted(STATE_NONE),
|
|
|
51b3f0 |
closeWaiters(0),
|
|
|
51b3f0 |
finishWaiters(0),
|
|
|
51b3f0 |
connFirst(), connLast(),
|
|
|
51b3f0 |
srvFirst(), srvLast()
|
|
|
51b3f0 |
{ }
|
|
|
51b3f0 |
|
|
|
71057f |
|
|
|
71057f |
Protocol::~Protocol() {
|
|
|
51b3f0 |
assert(state == STATE_NONE || state == STATE_FINISHED);
|
|
|
71057f |
closeWait();
|
|
|
71057f |
while(finishWaiters > 0) std::this_thread::yield();
|
|
|
51b3f0 |
DebugMSG();
|
|
|
71057f |
if (thread) thread->join();
|
|
|
51b3f0 |
DebugMSG("deleted");
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
|
|
|
71057f |
bool Protocol::open() {
|
|
|
51b3f0 |
DebugMSG();
|
|
|
71057f |
State s = state;
|
|
|
71057f |
do {
|
|
|
71057f |
if (s != STATE_NONE && s != STATE_FINISHED) return false;
|
|
|
71057f |
} while(!state.compare_exchange_weak(s, STATE_OPENING));
|
|
|
71057f |
if (s == STATE_FINISHED) while(finishWaiters > 0) std::this_thread::yield();
|
|
|
71057f |
if (thread) thread->join();
|
|
|
71057f |
|
|
|
71057f |
thread = new std::thread(&Protocol::threadRun, this);
|
|
|
51b3f0 |
state = STATE_INITIALIZING;
|
|
|
51b3f0 |
DebugMSG();
|
|
|
71057f |
return true;
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
|
|
|
71057f |
void Protocol::wantState(State state) {
|
|
|
71057f |
ReadLock lock(stateProtector);
|
|
|
71057f |
State s = this->state;
|
|
|
51b3f0 |
DebugMSG("%d, %d", s, state);
|
|
|
71057f |
if (s <= STATE_OPENING || s >= STATE_CLOSING || s >= state) return;
|
|
|
51b3f0 |
DebugMSG("apply");
|
|
|
71057f |
|
|
|
71057f |
State sw = stateWanted;
|
|
|
71057f |
do {
|
|
|
51b3f0 |
if (sw >= state) return;
|
|
|
71057f |
} while(!stateWanted.compare_exchange_weak(sw, state));
|
|
|
71057f |
wakeup();
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
|
|
|
71057f |
void Protocol::closeReq()
|
|
|
51b3f0 |
{ DebugMSG(); wantState(STATE_CLOSE_REQ); }
|
|
|
71057f |
void Protocol::close()
|
|
|
51b3f0 |
{ DebugMSG(); wantState(STATE_CLOSING); }
|
|
|
71057f |
|
|
|
71057f |
|
|
|
71057f |
void Protocol::closeWait(unsigned long long timeoutUs, bool withReq) {
|
|
|
51b3f0 |
DebugMSG();
|
|
|
71057f |
stateProtector.lock();
|
|
|
71057f |
State s = state;
|
|
|
71057f |
if (s <= STATE_OPENING || s >= STATE_FINISHED)
|
|
|
71057f |
{ stateProtector.unlock(); return; }
|
|
|
71057f |
|
|
|
71057f |
|
|
|
71057f |
if (s == STATE_CLOSED) {
|
|
|
71057f |
finishWaiters++;
|
|
|
71057f |
stateProtector.unlock();
|
|
|
71057f |
} else {
|
|
|
71057f |
if (withReq && s < STATE_CLOSE_REQ) closeReq();
|
|
|
71057f |
closeWaiters++;
|
|
|
71057f |
stateProtector.unlock();
|
|
|
71057f |
|
|
|
71057f |
std::mutex fakeMutex;
|
|
|
71057f |
{
|
|
|
71057f |
std::unique_lock<std::mutex> fakeLock(fakeMutex);</std::mutex>
|
|
|
71057f |
if (timeoutUs && s < STATE_CLOSING) {
|
|
|
71057f |
std::chrono::steady_clock::time_point t
|
|
|
71057f |
= std::chrono::steady_clock::now()
|
|
|
71057f |
+ std::chrono::microseconds(timeoutUs);
|
|
|
71057f |
while(s < STATE_CLOSED) {
|
|
|
71057f |
if (t <= std::chrono::steady_clock::now()) { close(); break; }
|
|
|
51b3f0 |
DebugMSG("wait_until, %d", s);
|
|
|
71057f |
closeCondition.wait_until(fakeLock, t);
|
|
|
71057f |
s = state;
|
|
|
71057f |
}
|
|
|
71057f |
}
|
|
|
71057f |
if (s < STATE_CLOSING) close();
|
|
|
71057f |
while(s < STATE_CLOSED) {
|
|
|
51b3f0 |
DebugMSG("wait, %d", s);
|
|
|
71057f |
closeCondition.wait(fakeLock);
|
|
|
71057f |
s = state;
|
|
|
71057f |
}
|
|
|
51b3f0 |
DebugMSG("awaiten");
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
finishWaiters++;
|
|
|
71057f |
closeWaiters--;
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
|
|
|
71057f |
while(s != STATE_FINISHED) { std::this_thread::yield(); s = state; }
|
|
|
71057f |
|
|
|
71057f |
finishWaiters--;
|
|
|
51b3f0 |
DebugMSG();
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
|
|
|
71057f |
void Protocol::wakeup() {
|
|
|
71057f |
unsigned long long num = 0;
|
|
|
71057f |
::write(eventFd, &num, sizeof(num));
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
|
|
|
71057f |
void Protocol::updateEvents(Connection &connection) {
|
|
|
65ef39 |
unsigned int events = EPOLLERR | EPOLLRDHUP;
|
|
|
71057f |
if (connection.readQueue.count()) events |= EPOLLIN;
|
|
|
71057f |
if (connection.writeQueue.count()) events |= EPOLLOUT;
|
|
|
6ebd3b |
|
|
|
6ebd3b |
ConnDebugMSG("events: %08x", events);
|
|
|
71057f |
if (connection.events.exchange(events) == events) return;
|
|
|
6ebd3b |
ConnDebugMSG("apply");
|
|
|
71057f |
|
|
|
71057f |
struct epoll_event event = {};
|
|
|
71057f |
event.data.ptr = &connection;
|
|
|
71057f |
event.events = events;
|
|
|
71057f |
epoll_ctl(epollFd, EPOLL_CTL_MOD, connection.sockId, &event);
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
|
|
|
71057f |
void Protocol::initSocket(int sockId) {
|
|
|
71057f |
int tcp_nodelay = 1;
|
|
|
71057f |
setsockopt(sockId, IPPROTO_TCP, TCP_NODELAY, &tcp_nodelay, sizeof(int));
|
|
|
71057f |
fcntl(sockId, F_SETFL, fcntl(sockId, F_GETFL, 0) | O_NONBLOCK);
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
|
|
|
71057f |
void Protocol::threadConnQueue() {
|
|
|
71057f |
while(connQueue.peek()) {
|
|
|
6ebd3b |
ConnDebugMSG();
|
|
|
71057f |
Connection &connection = connQueue.pop();
|
|
|
71057f |
connection.enqueued = false;
|
|
|
71057f |
|
|
|
71057f |
if (connection.stateLocal == Connection::STATE_NONE) {
|
|
|
6ebd3b |
ConnDebugMSG("none");
|
|
|
71057f |
int sockId = connection.sockId;
|
|
|
71057f |
bool connected = false;
|
|
|
71057f |
|
|
|
71057f |
if (sockId >= 0) {
|
|
|
6ebd3b |
ConnDebugMSG("accepting");
|
|
|
71057f |
initSocket(sockId);
|
|
|
71057f |
} else
|
|
|
71057f |
if (connection.addressSize >= sizeof(sa_family_t)) {
|
|
|
6ebd3b |
ConnDebugMSG("connecting");
|
|
|
71057f |
struct sockaddr *addr = (struct sockaddr*)connection.address;
|
|
|
71057f |
sockId = ::socket(addr->sa_family, SOCK_STREAM, 0);
|
|
|
71057f |
if (sockId >= 0) {
|
|
|
71057f |
initSocket(sockId);
|
|
|
6ebd3b |
if (0 == ::connect(sockId, addr, (int)connection.addressSize))
|
|
|
71057f |
connected = true;
|
|
|
6ebd3b |
} else {
|
|
|
6ebd3b |
ConnDebugMSG("none -> cannot create socket");
|
|
|
6ebd3b |
}
|
|
|
6ebd3b |
}
|
|
|
6ebd3b |
|
|
|
6ebd3b |
struct epoll_event event = {};
|
|
|
6ebd3b |
if (sockId >= 0) {
|
|
|
6ebd3b |
event.data.ptr = &connection;
|
|
|
65ef39 |
event.events = EPOLLERR | EPOLLRDHUP;
|
|
|
6ebd3b |
if (!connected) event.events |= EPOLLOUT;
|
|
|
6ebd3b |
if (0 != epoll_ctl(epollFd, EPOLL_CTL_ADD, sockId, &event)) {
|
|
|
6ebd3b |
ConnDebugMSG("epoll_ctl(%d, ADD, %d, %08x) errno: %d", epollFd, sockId, event.events, errno);
|
|
|
6ebd3b |
::close(sockId);
|
|
|
6ebd3b |
connection.sockId = sockId = -1;
|
|
|
6ebd3b |
assert(false);
|
|
|
71057f |
}
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
if (sockId >= 0) {
|
|
|
6ebd3b |
ConnDebugMSG("opening");
|
|
|
71057f |
connection.protocol = this;
|
|
|
71057f |
connection.prev = connLast;
|
|
|
71057f |
assert(!connection.next);
|
|
|
71057f |
(connection.prev ? connection.prev->next : connFirst) = &connection;
|
|
|
71057f |
if (connection.server) {
|
|
|
71057f |
connection.srvPrev = connection.server->connLast;
|
|
|
71057f |
assert(!connection.srvNext);
|
|
|
71057f |
(connection.srvPrev ? connection.srvPrev->srvNext : connection.server->connFirst) = &connection;
|
|
|
71057f |
}
|
|
|
71057f |
connection.sockId = sockId;
|
|
|
71057f |
|
|
|
71057f |
if (connected) {
|
|
|
6ebd3b |
ConnDebugMSG("to open");
|
|
|
6ebd3b |
connection.events = event.events;
|
|
|
71057f |
connection.state = connection.stateLocal = Connection::STATE_OPEN;
|
|
|
71057f |
connection.stateProtector.wait();
|
|
|
6ebd3b |
connection.onOpen(connection.address, connection.addressSize);
|
|
|
71057f |
} else {
|
|
|
6ebd3b |
assert(!connection.server);
|
|
|
6ebd3b |
ConnDebugMSG("to connecting");
|
|
|
71057f |
connection.state = connection.stateLocal = Connection::STATE_CONNECTING;
|
|
|
71057f |
connection.stateProtector.wait();
|
|
|
71057f |
}
|
|
|
71057f |
} else {
|
|
|
6ebd3b |
ConnDebugMSG("none -> to closed");
|
|
|
6ebd3b |
Server *server = connection.server;
|
|
|
6ebd3b |
assert(connected || !server);
|
|
|
71057f |
connection.server = nullptr;
|
|
|
6ebd3b |
connection.sockId = -1;
|
|
|
71057f |
connection.onOpeningError();
|
|
|
71057f |
connection.state = Connection::STATE_CLOSED;
|
|
|
71057f |
while(connection.closeWaiters > 0)
|
|
|
71057f |
{ connection.closeCondition.notify_all(); std::this_thread::yield(); }
|
|
|
71057f |
connection.state = Connection::STATE_FINISHED;
|
|
|
6ebd3b |
ConnDebugMSG("none -> to finished");
|
|
|
6ebd3b |
if (server) threadSrvDisconnect(*server, connection, true);
|
|
|
71057f |
}
|
|
|
71057f |
} else {
|
|
|
71057f |
Connection::State stateWanted = connection.stateWanted;
|
|
|
71057f |
|
|
|
6ebd3b |
if ( connection.stateLocal == Connection::STATE_CONNECTING
|
|
|
6ebd3b |
&& stateWanted == Connection::STATE_CLOSING )
|
|
|
6ebd3b |
{
|
|
|
6ebd3b |
ConnDebugMSG("connecting -> to closing");
|
|
|
6ebd3b |
connection.onOpeningError();
|
|
|
6ebd3b |
connection.state = connection.stateLocal = Connection::STATE_CLOSING;
|
|
|
6ebd3b |
connection.stateProtector.wait();
|
|
|
6ebd3b |
}
|
|
|
6ebd3b |
|
|
|
6ebd3b |
if ( connection.stateLocal == Connection::STATE_OPEN
|
|
|
71057f |
&& stateWanted == Connection::STATE_CLOSE_REQ )
|
|
|
71057f |
{
|
|
|
6ebd3b |
ConnDebugMSG("to close req");
|
|
|
71057f |
connection.state = connection.stateLocal = Connection::STATE_CLOSE_REQ;
|
|
|
71057f |
connection.stateProtector.wait();
|
|
|
71057f |
connection.onCloseReqested();
|
|
|
71057f |
} else
|
|
|
6ebd3b |
if ( connection.stateLocal >= Connection::STATE_OPEN
|
|
|
71057f |
&& connection.stateLocal <= Connection::STATE_CLOSE_REQ
|
|
|
71057f |
&& stateWanted == Connection::STATE_CLOSING )
|
|
|
71057f |
{
|
|
|
6ebd3b |
ConnDebugMSG("to closing");
|
|
|
71057f |
connection.state = connection.stateLocal = Connection::STATE_CLOSING;
|
|
|
71057f |
connection.stateProtector.wait();
|
|
|
71057f |
while(Connection::Task *task = connection.readQueue.peek()) {
|
|
|
71057f |
connection.readQueue.pop();
|
|
|
71057f |
if (!connection.onReadReady(*task, true))
|
|
|
71057f |
assert(false);
|
|
|
71057f |
}
|
|
|
71057f |
while(Connection::Task *task = connection.writeQueue.peek()) {
|
|
|
71057f |
connection.writeQueue.pop();
|
|
|
71057f |
if (!connection.onWriteReady(*task, true))
|
|
|
71057f |
assert(false);
|
|
|
71057f |
}
|
|
|
6ebd3b |
connection.errorLocal = connection.error;
|
|
|
6ebd3b |
connection.onClose(connection.errorLocal);
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
if (connection.stateLocal == Connection::STATE_CLOSING && !connection.enqueued) {
|
|
|
6ebd3b |
ConnDebugMSG("to closed");
|
|
|
6ebd3b |
Server *server = connection.server;
|
|
|
6ebd3b |
if (server) {
|
|
|
71057f |
(connection.srvPrev ? connection.srvPrev->srvNext : connection.server->connFirst) = connection.srvNext;
|
|
|
71057f |
(connection.srvNext ? connection.srvNext->srvPrev : connection.server->connLast) = connection.srvPrev;
|
|
|
71057f |
connection.server = nullptr;
|
|
|
71057f |
}
|
|
|
6ebd3b |
bool error = connection.errorLocal;
|
|
|
6ebd3b |
connection.errorLocal = false;
|
|
|
71057f |
(connection.prev ? connection.prev->next : connFirst) = connection.next;
|
|
|
71057f |
(connection.next ? connection.next->prev : connLast) = connection.prev;
|
|
|
6ebd3b |
if (connection.sockId >= 0) {
|
|
|
6ebd3b |
struct epoll_event event = {};
|
|
|
6ebd3b |
epoll_ctl(epollFd, EPOLL_CTL_DEL, connection.sockId, &event);
|
|
|
6ebd3b |
::close(connection.sockId);
|
|
|
6ebd3b |
connection.sockId = -1;
|
|
|
6ebd3b |
}
|
|
|
71057f |
connection.protocol = nullptr;
|
|
|
71057f |
memset(connection.address, 0, sizeof(connection.address));
|
|
|
71057f |
connection.addressSize = 0;
|
|
|
71057f |
connection.stateWanted = Connection::STATE_NONE;
|
|
|
71057f |
connection.stateLocal = Connection::STATE_NONE;
|
|
|
71057f |
connection.state = Connection::STATE_CLOSED;
|
|
|
71057f |
while(connection.closeWaiters > 0)
|
|
|
71057f |
{ connection.closeCondition.notify_all(); std::this_thread::yield(); }
|
|
|
71057f |
connection.state = Connection::STATE_FINISHED;
|
|
|
6ebd3b |
ConnDebugMSG("to finished");
|
|
|
6ebd3b |
if (server) threadSrvDisconnect(*server, connection, error);
|
|
|
71057f |
}
|
|
|
71057f |
}
|
|
|
71057f |
}
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
|
|
|
71057f |
void Protocol::threadConnEvents(Connection &connection, unsigned int events) {
|
|
|
71057f |
bool needUpdate = false;
|
|
|
71057f |
|
|
|
71057f |
if (connection.stateLocal == Connection::STATE_CONNECTING) {
|
|
|
6ebd3b |
ConnDebugMSG("connecting");
|
|
|
6ebd3b |
if (events & (EPOLLHUP | EPOLLERR)) {
|
|
|
6ebd3b |
connection.close(true);
|
|
|
6ebd3b |
return;
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
if (events & EPOLLOUT) {
|
|
|
6ebd3b |
ConnDebugMSG("connecting -> to open");
|
|
|
71057f |
connection.state = connection.stateLocal = Connection::STATE_OPEN;
|
|
|
71057f |
connection.stateProtector.wait();
|
|
|
6ebd3b |
connection.onOpen(connection.address, connection.addressSize);
|
|
|
71057f |
needUpdate = true;
|
|
|
71057f |
}
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
if ( connection.stateLocal < Connection::STATE_OPEN
|
|
|
71057f |
|| connection.stateLocal >= Connection::STATE_CLOSING )
|
|
|
71057f |
return;
|
|
|
71057f |
|
|
|
65ef39 |
bool closing = (bool)(events & (EPOLLHUP | EPOLLRDHUP | EPOLLERR));
|
|
|
65ef39 |
|
|
|
71057f |
if (events & EPOLLIN) {
|
|
|
6ebd3b |
ConnDebugMSG("input");
|
|
|
71057f |
bool ready = true;
|
|
|
71057f |
int pops = 0;
|
|
|
65ef39 |
int count = (connection.readQueue.count() + 2)*2;
|
|
|
71057f |
while(ready) {
|
|
|
71057f |
Connection::Task *task = connection.readQueue.peek();
|
|
|
71057f |
if (!task) {
|
|
|
71057f |
if (pops) needUpdate = true;
|
|
|
71057f |
break;
|
|
|
71057f |
}
|
|
|
71057f |
size_t completion = 0;
|
|
|
71057f |
while(task->size > task->completion) {
|
|
|
71057f |
size_t size = task->size - task->completion;
|
|
|
71057f |
if (size > INT_MAX) size = INT_MAX;
|
|
|
71057f |
int res = ::recv(
|
|
|
71057f |
connection.sockId,
|
|
|
71057f |
(unsigned char*)task->data + task->completion,
|
|
|
71057f |
(int)size, MSG_DONTWAIT );
|
|
|
6ebd3b |
if (res == EAGAIN || res == 0) {
|
|
|
71057f |
ready = false;
|
|
|
71057f |
break;
|
|
|
71057f |
} else
|
|
|
6ebd3b |
if (res < 0) {
|
|
|
71057f |
connection.close(true);
|
|
|
71057f |
return;
|
|
|
71057f |
} else {
|
|
|
6ebd3b |
completion += res;
|
|
|
6ebd3b |
task->completion += res;
|
|
|
71057f |
}
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
if (task->size <= task->completion || (completion && task->watch)) {
|
|
|
71057f |
connection.readQueue.pop();
|
|
|
71057f |
if (!connection.onReadReady(*task, false))
|
|
|
71057f |
connection.readQueue.unpop(*task); else ++pops;
|
|
|
65ef39 |
if (!closing && --count < 0) ready = false;
|
|
|
71057f |
}
|
|
|
71057f |
}
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
65ef39 |
if (closing) {
|
|
|
6ebd3b |
ConnDebugMSG("hup");
|
|
|
71057f |
connection.close((bool)(events & EPOLLERR));
|
|
|
71057f |
return;
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
if (events & EPOLLOUT) {
|
|
|
6ebd3b |
ConnDebugMSG("output");
|
|
|
71057f |
bool ready = true;
|
|
|
71057f |
int pops = 0;
|
|
|
6ebd3b |
int count = connection.writeQueue.count() + 2;
|
|
|
71057f |
while(ready) {
|
|
|
71057f |
Connection::Task *task = connection.writeQueue.peek();
|
|
|
71057f |
if (!task) {
|
|
|
71057f |
if (pops) needUpdate = true;
|
|
|
71057f |
break;
|
|
|
71057f |
}
|
|
|
71057f |
size_t completion = 0;
|
|
|
71057f |
while(task->size > task->completion) {
|
|
|
71057f |
size_t size = task->size - task->completion;
|
|
|
71057f |
if (size > INT_MAX) size = INT_MAX;
|
|
|
71057f |
int res = ::send(
|
|
|
71057f |
connection.sockId,
|
|
|
71057f |
(const unsigned char*)task->data + task->completion,
|
|
|
71057f |
(int)size, MSG_DONTWAIT );
|
|
|
6ebd3b |
if (res == EAGAIN || res == 0) {
|
|
|
71057f |
ready = false;
|
|
|
71057f |
break;
|
|
|
71057f |
} else
|
|
|
6ebd3b |
if (res < 0) {
|
|
|
71057f |
connection.close(true);
|
|
|
71057f |
return;
|
|
|
71057f |
} else {
|
|
|
6ebd3b |
completion += res;
|
|
|
6ebd3b |
task->completion += res;
|
|
|
71057f |
}
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
if (task->size <= task->completion || (completion && task->watch)) {
|
|
|
71057f |
connection.writeQueue.pop();
|
|
|
71057f |
if (!connection.onWriteReady(*task, false))
|
|
|
71057f |
connection.writeQueue.unpop(*task); else ++pops;
|
|
|
6ebd3b |
if (--count <= 0) ready = false;
|
|
|
71057f |
}
|
|
|
71057f |
}
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
if (needUpdate) updateEvents(connection);
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
|
|
|
6ebd3b |
void Protocol::threadSrvDisconnect(Server& server, Connection& connection, bool error) {
|
|
|
6ebd3b |
server.onDisconnect(&connection, error);
|
|
|
6ebd3b |
if (!server.connFirst && server.stateLocal == Server::STATE_CLOSING_CONNECTIONS) {
|
|
|
6ebd3b |
SrvDebugMSG("to closing");
|
|
|
6ebd3b |
server.state = server.stateLocal = Server::STATE_CLOSING;
|
|
|
6ebd3b |
server.stateProtector.wait();
|
|
|
6ebd3b |
server.onClose(server.error);
|
|
|
6ebd3b |
if (!server.enqueued.exchange(true)) srvQueue.push(server);
|
|
|
6ebd3b |
}
|
|
|
6ebd3b |
}
|
|
|
6ebd3b |
|
|
|
6ebd3b |
|
|
|
71057f |
void Protocol::threadSrvQueue() {
|
|
|
71057f |
while(srvQueue.peek()) {
|
|
|
61c265 |
SrvDebugMSG();
|
|
|
71057f |
Server &server = srvQueue.pop();
|
|
|
71057f |
server.enqueued = false;
|
|
|
71057f |
|
|
|
71057f |
if (server.stateLocal == Server::STATE_NONE) {
|
|
|
61c265 |
SrvDebugMSG("none");
|
|
|
71057f |
int sockId = -1;
|
|
|
71057f |
if (server.addressSize >= sizeof(sa_family_t)) {
|
|
|
71057f |
struct sockaddr *addr = (struct sockaddr*)server.address;
|
|
|
71057f |
sockId = ::socket(addr->sa_family, SOCK_STREAM, 0);
|
|
|
71057f |
if (sockId >= 0) {
|
|
|
71057f |
initSocket(sockId);
|
|
|
71057f |
if ( 0 != ::bind(sockId, addr, (int)server.addressSize)
|
|
|
71057f |
|| 0 != ::listen(sockId, 128) )
|
|
|
71057f |
{
|
|
|
61c265 |
SrvDebugMSG("cannot bind/listen");
|
|
|
71057f |
::close(sockId);
|
|
|
71057f |
sockId = -1;
|
|
|
71057f |
}
|
|
|
61c265 |
} else {
|
|
|
61c265 |
SrvDebugMSG("cannot create socket");
|
|
|
71057f |
}
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
if (sockId >= 0) {
|
|
|
61c265 |
SrvDebugMSG("to open");
|
|
|
71057f |
server.protocol = this;
|
|
|
71057f |
server.prev = srvLast;
|
|
|
71057f |
assert(!server.next);
|
|
|
71057f |
(server.prev ? server.prev->next : srvFirst) = &server;
|
|
|
71057f |
server.sockId = sockId;
|
|
|
71057f |
|
|
|
71057f |
struct epoll_event event = {};
|
|
|
71057f |
event.data.ptr = &server;
|
|
|
6ebd3b |
event.events = EPOLLIN | EPOLLERR;
|
|
|
71057f |
epoll_ctl(epollFd, EPOLL_CTL_ADD, sockId, &event);
|
|
|
71057f |
|
|
|
71057f |
server.state = server.stateLocal = Server::STATE_OPEN;
|
|
|
71057f |
server.stateProtector.wait();
|
|
|
6ebd3b |
server.onOpen();
|
|
|
71057f |
} else {
|
|
|
61c265 |
SrvDebugMSG("opening error -> to closed");
|
|
|
71057f |
server.onOpeningError();
|
|
|
71057f |
server.state = Server::STATE_CLOSED;
|
|
|
71057f |
while(server.closeWaiters > 0)
|
|
|
71057f |
{ server.closeCondition.notify_all(); std::this_thread::yield(); }
|
|
|
71057f |
server.state = Server::STATE_FINISHED;
|
|
|
61c265 |
SrvDebugMSG("opening error -> finished");
|
|
|
71057f |
}
|
|
|
71057f |
} else {
|
|
|
71057f |
Server::State stateWanted = server.stateWanted;
|
|
|
71057f |
if ( server.stateLocal == Server::STATE_OPEN
|
|
|
71057f |
&& stateWanted == Server::STATE_CLOSE_REQ )
|
|
|
71057f |
{
|
|
|
61c265 |
SrvDebugMSG("to close req");
|
|
|
71057f |
for(Connection *conn = server.connFirst; conn; conn = conn->srvNext)
|
|
|
71057f |
conn->closeReq();
|
|
|
71057f |
struct epoll_event event = {};
|
|
|
71057f |
epoll_ctl(epollFd, EPOLL_CTL_DEL, server.sockId, &event);
|
|
|
71057f |
server.state = server.stateLocal = Server::STATE_CLOSE_REQ;
|
|
|
71057f |
server.stateProtector.wait();
|
|
|
71057f |
server.onCloseReqested();
|
|
|
71057f |
} else
|
|
|
71057f |
if ( ( server.stateLocal == Server::STATE_OPEN
|
|
|
71057f |
|| server.stateLocal == Server::STATE_CLOSE_REQ )
|
|
|
71057f |
&& stateWanted == Server::STATE_CLOSING_CONNECTIONS )
|
|
|
71057f |
{
|
|
|
61c265 |
SrvDebugMSG("to closing connections");
|
|
|
71057f |
if (server.stateLocal != Server::STATE_CLOSE_REQ) {
|
|
|
71057f |
struct epoll_event event = {};
|
|
|
71057f |
epoll_ctl(epollFd, EPOLL_CTL_DEL, server.sockId, &event);
|
|
|
71057f |
}
|
|
|
71057f |
if (server.connFirst) {
|
|
|
71057f |
for(Connection *conn = server.connFirst; conn; conn = conn->srvNext)
|
|
|
71057f |
conn->close(true);
|
|
|
71057f |
server.state = server.stateLocal = Server::STATE_CLOSING_CONNECTIONS;
|
|
|
71057f |
} else {
|
|
|
61c265 |
SrvDebugMSG("to closing");
|
|
|
71057f |
server.state = server.stateLocal = Server::STATE_CLOSING;
|
|
|
71057f |
server.stateProtector.wait();
|
|
|
6ebd3b |
server.onClose(server.error);
|
|
|
71057f |
}
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
if (server.stateLocal == Server::STATE_CLOSING && !server.enqueued) {
|
|
|
61c265 |
SrvDebugMSG("to closed");
|
|
|
6ebd3b |
if (server.sockId >= 0) {
|
|
|
6ebd3b |
struct epoll_event event = {};
|
|
|
6ebd3b |
epoll_ctl(epollFd, EPOLL_CTL_DEL, server.sockId, &event);
|
|
|
6ebd3b |
::close(server.sockId);
|
|
|
6ebd3b |
server.sockId = -1;
|
|
|
6ebd3b |
}
|
|
|
71057f |
(server.prev ? server.prev->next : srvFirst) = server.next;
|
|
|
71057f |
(server.next ? server.next->prev : srvLast) = server.prev;
|
|
|
71057f |
server.protocol = nullptr;
|
|
|
71057f |
memset(server.address, 0, sizeof(server.address));
|
|
|
71057f |
server.addressSize = 0;
|
|
|
71057f |
server.stateWanted = Server::STATE_NONE;
|
|
|
71057f |
server.stateLocal = Server::STATE_NONE;
|
|
|
61c265 |
SrvDebugMSG("closeWaiters");
|
|
|
6ebd3b |
server.state = Server::STATE_CLOSED;
|
|
|
71057f |
while(server.closeWaiters > 0)
|
|
|
71057f |
{ server.closeCondition.notify_all(); std::this_thread::yield(); }
|
|
|
71057f |
server.state = Server::STATE_FINISHED;
|
|
|
61c265 |
SrvDebugMSG("finished");
|
|
|
71057f |
}
|
|
|
71057f |
}
|
|
|
71057f |
}
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
|
|
|
71057f |
void Protocol::threadSrvEvents(Server &server, unsigned int events) {
|
|
|
6ebd3b |
if (events & (EPOLLHUP | EPOLLERR)) {
|
|
|
71057f |
if ( server.stateLocal >= Server::STATE_OPEN
|
|
|
71057f |
&& server.stateLocal < Server::STATE_CLOSING )
|
|
|
71057f |
server.close((bool)(events & EPOLLERR));
|
|
|
71057f |
return;
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
if (events & EPOLLIN) {
|
|
|
71057f |
if ( server.stateLocal >= Server::STATE_OPEN
|
|
|
71057f |
&& server.stateLocal < Server::STATE_CLOSE_REQ)
|
|
|
71057f |
{
|
|
|
71057f |
unsigned char address[MAX_ADDR_SIZE] = {};
|
|
|
71057f |
socklen_t len = sizeof(address);
|
|
|
71057f |
int res = ::accept(server.sockId, (struct sockaddr*)address, &len);
|
|
|
71057f |
assert(len < sizeof(address));
|
|
|
71057f |
if (res >= 0) {
|
|
|
71057f |
if (Connection *conn = server.onConnect(address, len))
|
|
|
71057f |
if (!conn->open(*this, address, len, res))
|
|
|
71057f |
server.onDisconnect(conn, true);
|
|
|
71057f |
} else
|
|
|
71057f |
if (res != EAGAIN) {
|
|
|
71057f |
server.close(true);
|
|
|
71057f |
return;
|
|
|
71057f |
}
|
|
|
71057f |
}
|
|
|
71057f |
}
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
|
|
|
71057f |
void Protocol::threadRun() {
|
|
|
51b3f0 |
DebugMSG();
|
|
|
71057f |
epollFd = ::epoll_create(32);
|
|
|
71057f |
eventFd = ::eventfd(0, EFD_NONBLOCK);
|
|
|
6ebd3b |
assert(epollFd >= 0);
|
|
|
6ebd3b |
assert(eventFd >= 0);
|
|
|
71057f |
|
|
|
71057f |
struct epoll_event event = {};
|
|
|
71057f |
event.events = EPOLLIN;
|
|
|
71057f |
epoll_ctl(epollFd, EPOLL_CTL_ADD, eventFd, &event);
|
|
|
71057f |
|
|
|
71057f |
const int maxCount = 16;
|
|
|
71057f |
struct epoll_event events[maxCount] = {};
|
|
|
71057f |
int count = 0;
|
|
|
71057f |
|
|
|
71057f |
State stateLocal = STATE_OPEN;
|
|
|
71057f |
state = stateLocal;
|
|
|
71057f |
|
|
|
71057f |
while(true) {
|
|
|
71057f |
for(int i = 0; i < count; ++i) {
|
|
|
71057f |
Root *pointer = (Root*)events[i].data.ptr;
|
|
|
71057f |
if (!pointer) {
|
|
|
71057f |
unsigned long long num = 0;
|
|
|
71057f |
while(::read(eventFd, &num, sizeof(num)) > 0 && !num);
|
|
|
71057f |
} else
|
|
|
71057f |
if (Connection *connection = dynamic_cast<connection*>(pointer)) {</connection*>
|
|
|
71057f |
threadConnEvents(*connection, events[i].events);
|
|
|
71057f |
} else
|
|
|
71057f |
if (Server *server = dynamic_cast<server*>(pointer)) {</server*>
|
|
|
71057f |
threadSrvEvents(*server, events[i].events);
|
|
|
71057f |
}
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
State sw = stateWanted;
|
|
|
71057f |
if (stateLocal == STATE_OPEN && sw == STATE_CLOSE_REQ) {
|
|
|
51b3f0 |
DebugMSG("to closse req");
|
|
|
71057f |
state = stateLocal = STATE_CLOSE_REQ;
|
|
|
71057f |
stateProtector.wait();
|
|
|
71057f |
for(Server *server = srvFirst; server; server = server->next)
|
|
|
71057f |
server->closeReq();
|
|
|
71057f |
for(Connection *conn = connFirst; conn; conn = conn->next)
|
|
|
71057f |
if (!conn->server)
|
|
|
71057f |
conn->closeReq();
|
|
|
71057f |
} else
|
|
|
71057f |
if (stateLocal >= STATE_OPEN && stateLocal <= STATE_CLOSE_REQ && sw == STATE_CLOSING) {
|
|
|
51b3f0 |
DebugMSG("to closing");
|
|
|
51b3f0 |
state = stateLocal = STATE_CLOSING;
|
|
|
71057f |
stateProtector.wait();
|
|
|
71057f |
for(Server *server = srvFirst; server; server = server->next)
|
|
|
71057f |
server->close(true);
|
|
|
71057f |
for(Connection *conn = connFirst; conn; conn = conn->next)
|
|
|
71057f |
if (!conn->server)
|
|
|
71057f |
conn->close(true);
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
while(connQueue.peek() || srvQueue.peek()) {
|
|
|
71057f |
threadConnQueue();
|
|
|
71057f |
threadSrvQueue();
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
if (stateLocal == STATE_CLOSING) {
|
|
|
51b3f0 |
assert(!connFirst && !srvFirst);
|
|
|
71057f |
break;
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
51b3f0 |
count = epoll_wait(epollFd, events, maxCount, 1000);
|
|
|
6ebd3b |
DebugMSG("epoll count: %d", count);
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
51b3f0 |
DebugMSG("closeWaiters");
|
|
|
6ebd3b |
state = STATE_CLOSED;
|
|
|
71057f |
while(closeWaiters > 0)
|
|
|
71057f |
{ closeCondition.notify_all(); std::this_thread::yield(); }
|
|
|
71057f |
|
|
|
71057f |
::close(epollFd);
|
|
|
71057f |
::close(eventFd);
|
|
|
51b3f0 |
epollFd = -1;
|
|
|
51b3f0 |
eventFd = -1;
|
|
|
51b3f0 |
assert(!connFirst && !srvFirst);
|
|
|
51b3f0 |
assert(!connQueue.peek() && !srvQueue.peek());
|
|
|
71057f |
|
|
|
51b3f0 |
stateWanted = STATE_NONE;
|
|
|
51b3f0 |
stateLocal = STATE_NONE;
|
|
|
71057f |
state = STATE_FINISHED;
|
|
|
51b3f0 |
DebugMSG("finished");
|
|
|
71057f |
}
|
|
|
71057f |
|