#include <chrono>
#include "utils.h"
#include "protocol.h"
Connection::Connection():
started(), switching(), socket(), lastId(), closeRequested(), closeTimeUs() { }
Connection::~Connection() {
Lock lock(mutex);
assert(!started && !socket);
clean();
}
Protocol& Connection::getProtocol() const
{ assert(socket); return socket->protocol; }
const Address& Connection::getRemoteAddress() const
{ assert(socket); return socket->address; }
const Connection::Handle& Connection::getConnection() const
{ assert(socket); return socket->connection; }
const Server::Handle& Connection::getServer() const
{ assert(socket); return socket->server; }
void Connection::clean() {
readQueue.clear();
writeQueue.clear();
started = false;
socket = nullptr;
closeRequested = false;
closeTimeUs = 0;
}
void Connection::readDone() {
ReadReq req = readQueue.front();
readQueue.pop_front();
onReadReady(req);
if (isAllReadDone()) socket->setWantRead(false);
}
void Connection::writeDone() {
WriteReq req = writeQueue.front();
writeQueue.pop_front();
onWriteReady(req);
if (isAllWriteDone()) socket->setWantWrite(false);
}
ErrorCode Connection::open(Socket *socket) {
Lock lock(mutex);
close();
if (!socket) return ERR_INVALID_ARGS;
if (switching) return ERR_CONNECTION_IS_SWITCHING;
this->socket = socket;
switching = true;
ErrorCode errorCode = onOpen();
if (errorCode) clean(); else started = true;
switching = false;
return errorCode;
}
void Connection::close(ErrorCode errorCode) {
Lock lock(mutex);
if (!started || switching) return;
if (!errorCode && (!readQueue.empty() || !writeQueue.empty()))
errorCode = ERR_CONNECTION_UNFINISHED_TASKS;
switching = true;
onClose(errorCode);
getProtocol().serverDisconnect(getServer(), getConnection());
Socket *socketCopy = socket;
clean();
switching = false;
closeWaitCondition.notify_all();
socketCopy->finalize();
}
void Connection::closeReq() {
Lock lock(mutex);
if (!started || switching || closeRequested) return;
closeRequested = true;
onCloseRequested();
}
void Connection::closeWait(unsigned long long timeoutUs, bool withRequest) {
std::unique_lock<Mutex> uniqlock(mutex);
if (!started || switching) return;
if (withRequest) closeReq();
unsigned long long timeUs = monotonicTimeUs() + timeoutUs;
if (closeTimeUs > timeUs)
{ closeTimeUs = timeUs; closeWaitCondition.notify_all(); }
while(started) {
unsigned long long timeUs = monotonicTimeUs();
if (timeUs >= closeTimeUs)
{ close(ERR_CONNECTION_LOST); break; }
closeWaitCondition.wait_for(uniqlock, std::chrono::microseconds(closeTimeUs - timeUs));
}
}
Connection::ReqId Connection::writeReq(const void *data, size_t size, void *userData) {
if (!data || !size) return 0;
Lock lock(mutex);
if (started == switching) return 0; // already closed or closing now
writeQueue.emplace_back(++lastId, userData, data, size);
socket->setWantWrite(true);
return lastId;
}
Connection::ReqId Connection::readReq(void *data, size_t size, void *userData) {
if (!data || !size) return 0;
Lock lock(mutex);
if (started == switching) return 0; // already closed or closing now
readQueue.emplace_back(++lastId, userData, data, size);
socket->setWantRead(true);
return lastId;
}
ErrorCode Connection::onOpen() { return ERR_NONE; }
void Connection::onCloseRequested() { }
void Connection::onClose(ErrorCode) { }
void Connection::onReadReady(const ReadReq&) { }
void Connection::onWriteReady(const WriteReq&) { }