#include <cstring>
#include <chrono>
#include <thread>
#include <mutex>
#include "protocol.h"
#include "connection.h"
#define DebugMSG HideDebugMSG
Connection::Connection():
state(STATE_NONE),
stateWanted(STATE_NONE),
enqueued(false),
closeWaiters(0),
finishWaiters(0),
address(),
addressSize(),
protocol(),
server(),
prev(), next(),
srvPrev(), srvNext(),
queueNext(),
sockId(-1),
stateLocal(),
errorLocal()
{ }
Connection::~Connection() {
assert(state == STATE_NONE || state == STATE_FINISHED);
closeWait();
while(finishWaiters > 0) std::this_thread::yield();
}
bool Connection::open(Protocol &protocol, void *address, size_t addressSize, int sockId, Server *server) {
if (!address || !addressSize || addressSize > MAX_ADDR_SIZE) return false;
State s = state;
do {
if (s != STATE_NONE && s != STATE_FINISHED) return false;
} while(!state.compare_exchange_weak(s, STATE_OPENING));
if (s == STATE_FINISHED) while(finishWaiters > 0) std::this_thread::yield();
ReadLock lock(protocol.stateProtector);
Protocol::State ps = protocol.state;
if (ps < Protocol::STATE_INITIALIZING || ps >= Protocol::STATE_CLOSE_REQ)
{ state = STATE_NONE; return false; }
memcpy(this->address, address, addressSize);
this->addressSize = addressSize;
this->sockId = sockId;
this->server = server;
this->protocol = &protocol;
state = STATE_INITIALIZING;
if (!enqueued.exchange(true)) protocol.connQueue.push(*this);
protocol.wakeup();
return true;
}
void Connection::wantState(State state, bool error) {
ReadLock lock(stateProtector);
State s = this->state;
DebugMSG("%d, %d", s, state);
if (s <= STATE_OPENING || s >= STATE_CLOSING || s >= state) return;
DebugMSG("apply");
State sw = stateWanted;
do {
if (sw >= state) return;
} while(!stateWanted.compare_exchange_weak(sw, state));
if (error) { DebugMSG("error"); error = false; this->error.compare_exchange_strong(error, true); }
if (!enqueued.exchange(true)) protocol->connQueue.push(*this);
protocol->wakeup();
}
void Connection::closeReq()
{ wantState(STATE_CLOSE_REQ, false); }
void Connection::close(bool error)
{ wantState(STATE_CLOSING, error); }
void Connection::closeWait(unsigned long long timeoutUs, bool withReq, bool error) {
stateProtector.lock();
State s = state;
if (s <= STATE_OPENING || s >= STATE_FINISHED)
{ stateProtector.unlock(); return; }
if (s == STATE_CLOSED) {
finishWaiters++;
stateProtector.unlock();
} else {
if (withReq && s < STATE_CLOSE_REQ) closeReq();
closeWaiters++;
stateProtector.unlock();
std::mutex fakeMutex;
{
std::unique_lock<std::mutex> fakeLock(fakeMutex);
if (timeoutUs && s < STATE_CLOSING) {
std::chrono::steady_clock::time_point t
= std::chrono::steady_clock::now()
+ std::chrono::microseconds(timeoutUs);
while(s < STATE_CLOSED) {
if (t <= std::chrono::steady_clock::now()) { close(error); break; }
closeCondition.wait_until(fakeLock, t);
s = state;
}
}
if (s < STATE_CLOSING) close(error);
while(s < STATE_CLOSED) {
closeCondition.wait(fakeLock);
s = state;
}
}
finishWaiters++;
closeWaiters--;
}
while(s != STATE_FINISHED) { std::this_thread::yield(); s = state; }
finishWaiters--;
}
bool Connection::read(Task &task) {
DebugMSG();
ReadLock lock(stateProtector);
State s = state;
if (s < STATE_OPEN || s >= STATE_CLOSING) return false;
DebugMSG();
readQueue.push(task);
protocol->updateEvents(*this);
return true;
}
bool Connection::write(Task &task) {
DebugMSG();
ReadLock lock(stateProtector);
State s = state;
if (s < STATE_OPEN || s >= STATE_CLOSING) return false;
DebugMSG();
writeQueue.push(task);
protocol->updateEvents(*this);
return true;
}
void Connection::onOpeningError() { }
void Connection::onOpen(const void*, size_t) { }
bool Connection::onReadReady(Task&, bool) { return true; }
bool Connection::onWriteReady(Task&, bool) { return true; }
void Connection::onCloseReqested() { }
void Connection::onClose(bool) { }