#include <chrono>
#include "utils.h"
#include "protocol.h"
#include "connection.h"
Connection::Connection():
socket(), lastId(), closeRequested(), closeTimeUs() { }
Connection::~Connection()
{ close(); }
Protocol& Connection::getProtocol() const
{ assert(socket); return socket->protocol; }
const Address& Connection::getRemoteAddress() const
{ assert(socket); return socket->address; }
const ::Handle<Server>& Connection::getServer() const
{ assert(socket); return socket->server; }
void Connection::open(Socket *socket) {
Lock lock(mutex);
close();
if (!socket) return;
this->socket = socket;
onOpen();
}
void Connection::close(ErrorCode errorCode) {
Socket *socketCopy;
{
Lock lock(mutex);
if (!socket)
return;
if (!errorCode && (!readQueue.empty() || !writeQueue.empty()))
errorCode = ERR_CONNECTION_UNFINISHED_TASKS;
onClose(errorCode);
readQueue.clear();
writeQueue.clear();
socketCopy = socket;
socket = nullptr;
closeRequested = false;
closeTimeUs = 0;
closeAwaitCondition.notify_all();
}
socketCopy->finalize();
}
Connection::ReqId Connection::writeReq(const void *data, size_t size, void *userData) {
if (!data || !size) return 0;
Lock lock(mutex);
if (!socket) return 0;
writeQueue.emplace_back(++lastId, userData, data, size);
return lastId;
}
Connection::ReqId Connection::readReq(void *data, size_t size, void *userData) {
if (!data || !size) return 0;
Lock lock(mutex);
if (!socket) return 0;
readQueue.emplace_back(++lastId, userData, data, size);
return lastId;
}
void Connection::closeReq() {
Lock lock(mutex);
if (!socket || closeRequested) return;
closeRequested = true;
onCloseRequested();
}
void Connection::closeAwait(unsigned long long timeoutUs, bool withRequest) {
std::unique_lock<Mutex> uniqlock(mutex);
if (!socket) return;
if (withRequest) closeReq();
unsigned long long timeUs = monotonicTimeUs() + timeoutUs;
if (closeTimeUs > timeUs)
{ closeTimeUs = timeUs; closeAwaitCondition.notify_all(); }
while(!socket) {
unsigned long long timeUs = monotonicTimeUs();
if (timeUs >= closeTimeUs)
{ close(); break; }
closeAwaitCondition.wait_for(uniqlock, std::chrono::microseconds(closeTimeUs - timeUs));
}
}
void Connection::onOpen() { }
void Connection::onCloseRequested() { }
void Connection::onClose(ErrorCode) { }
void Connection::onReadReady(const ReadReq&) { }
void Connection::onWriteReady(const WriteReq&) { }