#ifndef CONNECTION_H
#define CONNECTION_H
#include <cstring>
#include <deque>
#include <mutex>
#include <condition_variable>
#include "common.h"
#include "address.h"
enum : ErrorCode {
ERR_CONNECTION_COMMON = ERR_CONNECTION,
ERR_CONNECTION_FAILED,
ERR_CONNECTION_UNFINISHED_TASKS,
ERR_CONNECTION_LOST,
ERR_CONNECTION_IS_SWITCHING,
};
class Protocol;
class Socket;
class Server;
class Connection: public Shared {
public:
typedef THandle<Connection> Handle;
typedef unsigned long long ReqId;
typedef RecursiveMutex Mutex;
typedef std::lock_guard<Mutex> Lock;
template<typename T>
struct TRWReq {
typedef T Type;
typedef std::deque< TRWReq<T> > Queue;
ReqId id;
void *userData;
Type *data;
size_t size;
size_t doneSize;
inline TRWReq(ReqId id = 0, void *userData = nullptr, Type *data = nullptr, size_t size = 0):
id(id), userData(userData), data(data), size(size), doneSize() { }
inline bool success() const
{ return doneSize >= size; }
inline size_t remains() const
{ assert(size >= doneSize); return size - doneSize; }
inline void processed(size_t size) {
assert(size <= remains());
if (doneSize > size || size > remains())
doneSize = size; else doneSize += size;
}
inline Type* current() const
{ assert(size >= doneSize); return const_cast<char*>((const char*)data + doneSize); }
};
typedef TRWReq<void> ReadReq;
typedef TRWReq<const void> WriteReq;
typedef ReadReq::Queue ReadQueue;
typedef WriteReq::Queue WriteQueue;
template<typename T>
class TRWLock {
public:
typedef TRWReq<T> ReqType;
typedef TRWLock<T> LockType;
Lock lock;
Connection &connection;
ReqType * const request;
protected:
inline TRWLock(Connection &connection, typename ReqType::Queue &queue):
lock(connection.mutex),
connection(connection),
request(connection.started && !connection.switching && !queue.empty() ? &queue.front() : nullptr)
{ }
public:
inline operator bool() const
{ return request; }
inline bool success() const
{ return request && request->success(); }
inline ReqType& operator*() const
{ assert(request); return request; }
inline ReqType* operator->() const
{ assert(request); return request; }
};
class ReadLock: public TRWLock<void> {
public:
inline ReadLock(Connection &connection):
LockType(connection, connection.readQueue) { }
inline ~ReadLock()
{ if (success()) connection.readDone(); }
};
class WriteLock: public TRWLock<const void> {
public:
inline WriteLock(Connection &connection):
LockType(connection, connection.writeQueue) { }
inline ~WriteLock()
{ if (success()) connection.writeDone(); }
};
private:
Mutex mutex;
bool started;
bool switching;
Socket *socket;
ReqId lastId;
ReadQueue readQueue;
WriteQueue writeQueue;
bool closeRequested;
unsigned long long closeTimeUs;
std::condition_variable_any closeWaitCondition;
// mutex must be locked before call
// used in open and close
void clean();
friend class Protocol;
ErrorCode open(Socket *socket);
void readDone();
void writeDone();
public:
Connection();
virtual ~Connection();
void close(ErrorCode errorCode = ERR_NONE);
void closeReq();
void closeWait(unsigned long long timeoutUs, bool withReqest = true);
ReqId writeReq(const void *data, size_t size, void *userData = nullptr);
ReqId readReq(void *data, size_t size, void *userData = nullptr);
protected:
// mutex must be locked before calling of all following methods
inline bool isCloseRequested() const { return started && closeRequested; }
inline bool isAllReadDone() const { return readQueue.empty() || readQueue.front().success(); }
inline bool isAllWriteDone() const { return writeQueue.empty() || writeQueue.front().success(); }
inline bool isAllDone() const { return isAllReadDone() && isAllWriteDone(); }
Protocol& getProtocol() const;
const Address& getRemoteAddress() const;
const THandle<Connection>& getConnection() const;
const THandle<Server>& getServer() const;
virtual ErrorCode onOpen();
virtual void onCloseRequested();
virtual void onClose(ErrorCode errorCode);
virtual void onReadReady(const ReadReq &req);
virtual void onWriteReady(const WriteReq &req);
};
#endif