#ifndef CONNECTION_H
#define CONNECTION_H
#include <cstring>
#include <deque>
#include <mutex>
#include <condition_variable>
#include "common.h"
#include "address.h"
enum : ErrorCode {
ERR_CONNECTION_COMMON = ERR_CONNECTION,
ERR_CONNECTION_FAILED,
ERR_CONNECTION_UNFINISHED_TASKS,
ERR_CONNECTION_LOST,
};
class Protocol;
class Socket;
class Server;
class Connection: public Shared {
public:
typedef THandle<Connection> Handle;
typedef unsigned long long ReqId;
typedef std::recursive_mutex Mutex;
typedef std::lock_guard<Mutex> Lock;
template<typename T>
struct TRWReq {
typedef T Type;
typedef std::deque< TRWReq<T> > Queue;
ReqId id;
void *userData;
Type *data;
size_t size;
size_t doneSize;
inline TRWReq(ReqId id = 0, void *userData = nullptr, Type *data = nullptr, size_t size = 0):
id(id), userData(userData), data(data), size(size), doneSize() { }
inline bool success() const
{ return doneSize >= size; }
inline size_t remains() const
{ assert(size >= doneSize); return size - doneSize; }
inline void processed(size_t size) {
assert(size <= remains());
if (doneSize > size || size > remains())
doneSize = size; else doneSize += size;
}
inline Type* current() const
{ assert(size >= doneSize); return const_cast<char*>((const char*)data + doneSize); }
};
typedef TRWReq<void> ReadReq;
typedef TRWReq<const void> WriteReq;
typedef ReadReq::Queue ReadQueue;
typedef WriteReq::Queue WriteQueue;
template<typename T>
class TRWLock {
public:
typedef TRWReq<T> ReqType;
typedef TRWLock<T> LockType;
Lock lock;
Connection &connection;
ReqType * const request;
protected:
inline TRWLock(Connection &connection, typename ReqType::Queue &queue):
lock(connection.mutex), connection(connection), request(queue.empty() ? nullptr : &queue.front()) { }
public:
inline operator bool() const
{ return request; }
inline bool success() const
{ return request && request->success(); }
inline ReqType& operator*() const
{ assert(request); return request; }
inline ReqType* operator->() const
{ assert(request); return request; }
};
class ReadLock: public TRWLock<void> {
public:
inline ReadLock(Connection &connection):
LockType(connection, connection.readQueue) { }
inline ~ReadLock()
{ if (success()) connection.onReadReady(*request); }
};
class WriteLock: public TRWLock<const void> {
public:
inline WriteLock(Connection &connection):
LockType(connection, connection.writeQueue) { }
inline ~WriteLock()
{ if (success()) connection.onWriteReady(*request); }
};
private:
Mutex mutex;
Socket *socket;
ReqId lastId;
ReadQueue readQueue;
WriteQueue writeQueue;
bool closeRequested;
unsigned long long closeTimeUs;
std::condition_variable_any closeAwaitCondition;
friend class Protocol;
void open(Socket *socket);
public:
Connection();
virtual ~Connection();
void close(ErrorCode errorCode = ERR_NONE);
ReqId writeReq(const void *data, size_t size, void *userData = nullptr);
ReqId readReq(void *data, size_t size, void *userData = nullptr);
void closeReq();
void closeAwait(unsigned long long timeoutUs, bool withReqest = true);
protected:
// mutex must be locked before calling of all following methods
Protocol& getProtocol() const;
const Address& getRemoteAddress() const;
const THandle<Server>& getServer() const;
virtual void onOpen();
virtual void onCloseRequested();
virtual void onClose(ErrorCode errorCode);
virtual void onReadReady(const ReadReq &req);
virtual void onWriteReady(const WriteReq &req);
};
#endif