|
|
71057f |
|
|
|
71057f |
#include <climits></climits>
|
|
|
71057f |
#include <cstring></cstring>
|
|
|
71057f |
|
|
|
71057f |
#include <fcntl.h></fcntl.h>
|
|
|
71057f |
#include <unistd.h></unistd.h>
|
|
|
71057f |
#include <sys types.h=""></sys>
|
|
|
71057f |
#include <sys socket.h=""></sys>
|
|
|
71057f |
#include <sys epoll.h=""></sys>
|
|
|
71057f |
#include <sys eventfd.h=""></sys>
|
|
|
71057f |
#include <netinet in.h=""></netinet>
|
|
|
71057f |
#include <netinet tcp.h=""></netinet>
|
|
|
71057f |
#include <netdb.h></netdb.h>
|
|
|
71057f |
|
|
|
71057f |
#include "protocol.h"
|
|
|
71057f |
|
|
|
71057f |
|
|
|
6ebd3b |
#define DebugMSG HideDebugMSG
|
|
|
51b3f0 |
|
|
|
51b3f0 |
|
|
|
51b3f0 |
Protocol::Protocol():
|
|
|
51b3f0 |
thread(),
|
|
|
51b3f0 |
epollFd(-1),
|
|
|
51b3f0 |
eventFd(-1),
|
|
|
51b3f0 |
state(STATE_NONE),
|
|
|
51b3f0 |
stateWanted(STATE_NONE),
|
|
|
51b3f0 |
closeWaiters(0),
|
|
|
51b3f0 |
finishWaiters(0),
|
|
|
b42f0a |
sockFirst(), sockLast()
|
|
|
51b3f0 |
{ }
|
|
|
51b3f0 |
|
|
|
71057f |
|
|
|
71057f |
Protocol::~Protocol() {
|
|
|
51b3f0 |
assert(state == STATE_NONE || state == STATE_FINISHED);
|
|
|
71057f |
closeWait();
|
|
|
71057f |
while(finishWaiters > 0) std::this_thread::yield();
|
|
|
51b3f0 |
DebugMSG();
|
|
|
71057f |
if (thread) thread->join();
|
|
|
51b3f0 |
DebugMSG("deleted");
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
|
|
|
71057f |
bool Protocol::open() {
|
|
|
51b3f0 |
DebugMSG();
|
|
|
71057f |
State s = state;
|
|
|
71057f |
do {
|
|
|
71057f |
if (s != STATE_NONE && s != STATE_FINISHED) return false;
|
|
|
71057f |
} while(!state.compare_exchange_weak(s, STATE_OPENING));
|
|
|
71057f |
if (s == STATE_FINISHED) while(finishWaiters > 0) std::this_thread::yield();
|
|
|
71057f |
if (thread) thread->join();
|
|
|
71057f |
|
|
|
71057f |
thread = new std::thread(&Protocol::threadRun, this);
|
|
|
51b3f0 |
state = STATE_INITIALIZING;
|
|
|
51b3f0 |
DebugMSG();
|
|
|
71057f |
return true;
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
|
|
|
71057f |
void Protocol::wantState(State state) {
|
|
|
71057f |
ReadLock lock(stateProtector);
|
|
|
71057f |
State s = this->state;
|
|
|
51b3f0 |
DebugMSG("%d, %d", s, state);
|
|
|
71057f |
if (s <= STATE_OPENING || s >= STATE_CLOSING || s >= state) return;
|
|
|
51b3f0 |
DebugMSG("apply");
|
|
|
71057f |
|
|
|
71057f |
State sw = stateWanted;
|
|
|
71057f |
do {
|
|
|
51b3f0 |
if (sw >= state) return;
|
|
|
71057f |
} while(!stateWanted.compare_exchange_weak(sw, state));
|
|
|
71057f |
wakeup();
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
|
|
|
71057f |
void Protocol::closeReq()
|
|
|
51b3f0 |
{ DebugMSG(); wantState(STATE_CLOSE_REQ); }
|
|
|
71057f |
void Protocol::close()
|
|
|
51b3f0 |
{ DebugMSG(); wantState(STATE_CLOSING); }
|
|
|
71057f |
|
|
|
71057f |
|
|
|
71057f |
void Protocol::closeWait(unsigned long long timeoutUs, bool withReq) {
|
|
|
51b3f0 |
DebugMSG();
|
|
|
71057f |
stateProtector.lock();
|
|
|
71057f |
State s = state;
|
|
|
71057f |
if (s <= STATE_OPENING || s >= STATE_FINISHED)
|
|
|
71057f |
{ stateProtector.unlock(); return; }
|
|
|
71057f |
|
|
|
71057f |
|
|
|
71057f |
if (s == STATE_CLOSED) {
|
|
|
71057f |
finishWaiters++;
|
|
|
71057f |
stateProtector.unlock();
|
|
|
71057f |
} else {
|
|
|
71057f |
if (withReq && s < STATE_CLOSE_REQ) closeReq();
|
|
|
71057f |
closeWaiters++;
|
|
|
71057f |
stateProtector.unlock();
|
|
|
71057f |
|
|
|
71057f |
std::mutex fakeMutex;
|
|
|
71057f |
{
|
|
|
71057f |
std::unique_lock<std::mutex> fakeLock(fakeMutex);</std::mutex>
|
|
|
71057f |
if (timeoutUs && s < STATE_CLOSING) {
|
|
|
71057f |
std::chrono::steady_clock::time_point t
|
|
|
71057f |
= std::chrono::steady_clock::now()
|
|
|
71057f |
+ std::chrono::microseconds(timeoutUs);
|
|
|
71057f |
while(s < STATE_CLOSED) {
|
|
|
71057f |
if (t <= std::chrono::steady_clock::now()) { close(); break; }
|
|
|
51b3f0 |
DebugMSG("wait_until, %d", s);
|
|
|
71057f |
closeCondition.wait_until(fakeLock, t);
|
|
|
71057f |
s = state;
|
|
|
71057f |
}
|
|
|
71057f |
}
|
|
|
71057f |
if (s < STATE_CLOSING) close();
|
|
|
71057f |
while(s < STATE_CLOSED) {
|
|
|
51b3f0 |
DebugMSG("wait, %d", s);
|
|
|
71057f |
closeCondition.wait(fakeLock);
|
|
|
71057f |
s = state;
|
|
|
71057f |
}
|
|
|
51b3f0 |
DebugMSG("awaiten");
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
finishWaiters++;
|
|
|
71057f |
closeWaiters--;
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
|
|
|
71057f |
while(s != STATE_FINISHED) { std::this_thread::yield(); s = state; }
|
|
|
71057f |
|
|
|
71057f |
finishWaiters--;
|
|
|
51b3f0 |
DebugMSG();
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
|
|
|
71057f |
void Protocol::wakeup() {
|
|
|
71057f |
unsigned long long num = 0;
|
|
|
71057f |
::write(eventFd, &num, sizeof(num));
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
|
|
|
71057f |
void Protocol::threadRun() {
|
|
|
51b3f0 |
DebugMSG();
|
|
|
71057f |
epollFd = ::epoll_create(32);
|
|
|
71057f |
eventFd = ::eventfd(0, EFD_NONBLOCK);
|
|
|
6ebd3b |
assert(epollFd >= 0);
|
|
|
6ebd3b |
assert(eventFd >= 0);
|
|
|
71057f |
|
|
|
71057f |
struct epoll_event event = {};
|
|
|
71057f |
event.events = EPOLLIN;
|
|
|
71057f |
epoll_ctl(epollFd, EPOLL_CTL_ADD, eventFd, &event);
|
|
|
71057f |
|
|
|
71057f |
const int maxCount = 16;
|
|
|
71057f |
struct epoll_event events[maxCount] = {};
|
|
|
71057f |
int count = 0;
|
|
|
71057f |
|
|
|
71057f |
State stateLocal = STATE_OPEN;
|
|
|
71057f |
state = stateLocal;
|
|
|
71057f |
|
|
|
71057f |
while(true) {
|
|
|
71057f |
for(int i = 0; i < count; ++i) {
|
|
|
b42f0a |
if (Socket *socket = (Socket*)events[i].data.ptr) {
|
|
|
b42f0a |
socket->handleEvents(events[i].events);
|
|
|
b42f0a |
} else {
|
|
|
71057f |
unsigned long long num = 0;
|
|
|
71057f |
while(::read(eventFd, &num, sizeof(num)) > 0 && !num);
|
|
|
71057f |
}
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
State sw = stateWanted;
|
|
|
71057f |
if (stateLocal == STATE_OPEN && sw == STATE_CLOSE_REQ) {
|
|
|
51b3f0 |
DebugMSG("to closse req");
|
|
|
71057f |
state = stateLocal = STATE_CLOSE_REQ;
|
|
|
71057f |
stateProtector.wait();
|
|
|
b42f0a |
for(Socket *socket = sockFirst; socket; socket = socket->next)
|
|
|
b42f0a |
socket->closeReq();
|
|
|
71057f |
} else
|
|
|
71057f |
if (stateLocal >= STATE_OPEN && stateLocal <= STATE_CLOSE_REQ && sw == STATE_CLOSING) {
|
|
|
51b3f0 |
DebugMSG("to closing");
|
|
|
51b3f0 |
state = stateLocal = STATE_CLOSING;
|
|
|
71057f |
stateProtector.wait();
|
|
|
b42f0a |
for(Socket *socket = sockFirst; socket; socket = socket->next)
|
|
|
b42f0a |
socket->close(true);
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
b42f0a |
while(sockQueue.peek()) {
|
|
|
b42f0a |
Socket &socket = sockQueue.pop();
|
|
|
b42f0a |
socket.enqueued = false;
|
|
|
b42f0a |
socket.handleState();
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
b42f0a |
if (stateLocal == STATE_CLOSING)
|
|
|
71057f |
break;
|
|
|
71057f |
|
|
|
51b3f0 |
count = epoll_wait(epollFd, events, maxCount, 1000);
|
|
|
6ebd3b |
DebugMSG("epoll count: %d", count);
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
b42f0a |
assert(!sockFirst);
|
|
|
b42f0a |
|
|
|
51b3f0 |
DebugMSG("closeWaiters");
|
|
|
6ebd3b |
state = STATE_CLOSED;
|
|
|
71057f |
while(closeWaiters > 0)
|
|
|
71057f |
{ closeCondition.notify_all(); std::this_thread::yield(); }
|
|
|
71057f |
|
|
|
b42f0a |
epoll_ctl(epollFd, EPOLL_CTL_DEL, eventFd, &event);
|
|
|
b42f0a |
|
|
|
71057f |
::close(epollFd);
|
|
|
71057f |
::close(eventFd);
|
|
|
51b3f0 |
epollFd = -1;
|
|
|
51b3f0 |
eventFd = -1;
|
|
|
b42f0a |
assert(!sockFirst);
|
|
|
b42f0a |
assert(!sockQueue.peek());
|
|
|
71057f |
|
|
|
51b3f0 |
stateWanted = STATE_NONE;
|
|
|
51b3f0 |
stateLocal = STATE_NONE;
|
|
|
71057f |
state = STATE_FINISHED;
|
|
|
51b3f0 |
DebugMSG("finished");
|
|
|
71057f |
}
|
|
|
71057f |
|