Blob Blame Raw
#ifndef CONNECTION_H
#define CONNECTION_H


#include <cstring>

#include <deque>
#include <mutex>
#include <condition_variable>

#include "common.h"
#include "address.h"


enum : ErrorCode  {
	ERR_CONNECTION_COMMON = ERR_CONNECTION,
	ERR_CONNECTION_FAILED,
	ERR_CONNECTION_UNFINISHED_TASKS,
	ERR_CONNECTION_LOST,
	ERR_CONNECTION_IS_SWITCHING,
};


class Protocol;
class Socket;
class Server;


class Connection: public Shared {
public:
	typedef THandle<Connection> Handle;
	
	typedef unsigned long long ReqId;
	typedef RecursiveMutex Mutex;
	typedef std::lock_guard<Mutex> Lock;
	
	template<typename T>
	struct TRWReq {
		typedef T Type;
		typedef std::deque< TRWReq<T> > Queue;
		
		ReqId id;
		void *userData;
		Type *data;
		size_t size;
		size_t doneSize;

		inline TRWReq(ReqId id = 0, void *userData = nullptr, Type *data = nullptr, size_t size = 0):
			id(id), userData(userData), data(data), size(size), doneSize() { }
		
		inline bool success() const
			{ return doneSize >= size; }
		inline size_t remains() const
			{ assert(size >= doneSize); return size - doneSize; }
		inline void processed(size_t size) {
			assert(size <= remains());
			if (doneSize > size || size > remains())
				doneSize = size; else doneSize += size;
		}
		inline Type* current() const
			{ assert(size >= doneSize); return const_cast<char*>((const char*)data + doneSize); }
	};
	
	typedef TRWReq<void> ReadReq;
	typedef TRWReq<const void> WriteReq;
	typedef ReadReq::Queue ReadQueue;
	typedef WriteReq::Queue WriteQueue;
	
	template<typename T>
	class TRWLock {
	public:
		typedef TRWReq<T> ReqType;
		typedef TRWLock<T> LockType;
		
		Lock lock;
		Connection &connection;
		ReqType * const request;
		
	protected:
		inline TRWLock(Connection &connection, typename ReqType::Queue &queue):
			lock(connection.mutex),
			connection(connection),
			request(connection.started && !connection.switching && !queue.empty() ? &queue.front() : nullptr)
			{ }
	
	public:
		inline operator bool() const
			{ return request; }
		inline bool success() const
			{ return request && request->success(); }
		inline ReqType& operator*() const
			{ assert(request); return request; }
		inline ReqType* operator->() const
			{ assert(request); return request; }
	};
	
	class ReadLock: public TRWLock<void> {
	public:
		inline ReadLock(Connection &connection):
			LockType(connection, connection.readQueue) { }
		inline ~ReadLock()
			{ if (success()) connection.readDone(); }
	};
	
	class WriteLock: public TRWLock<const void> {
	public:
		inline WriteLock(Connection &connection):
			LockType(connection, connection.writeQueue) { }
		inline ~WriteLock()
			{ if (success()) connection.writeDone(); }
	};
	
private:
	Mutex mutex;
	bool started;
	bool switching;
	Socket *socket;
	
	ReqId lastId;
	ReadQueue readQueue;
	WriteQueue writeQueue;
	
	bool closeRequested;
	unsigned long long closeTimeUs;
	std::condition_variable_any closeWaitCondition;
	
	// mutex must be locked before call
	// used in open and close
	void clean();
	
	friend class Protocol;
	ErrorCode open(Socket *socket);
	
	void readDone();
	void writeDone();
	
public:
	Connection();
	virtual ~Connection();
	
	void close(ErrorCode errorCode = ERR_NONE);
	void closeReq();
	void closeWait(unsigned long long timeoutUs, bool withReqest = true);
	
	ReqId writeReq(const void *data, size_t size, void *userData = nullptr);
	ReqId readReq(void *data, size_t size, void *userData = nullptr);
	
protected:
	// mutex must be locked before calling of all following methods
	inline bool isCloseRequested() const { return started && closeRequested; }
	inline bool isAllReadDone() const { return readQueue.empty() || readQueue.front().success(); }
	inline bool isAllWriteDone() const { return writeQueue.empty() || writeQueue.front().success(); }
	inline bool isAllDone() const { return isAllReadDone() && isAllWriteDone(); }
	Protocol& getProtocol() const;
	const Address& getRemoteAddress() const;
	const THandle<Connection>& getConnection() const;
	const THandle<Server>& getServer() const;

	virtual ErrorCode onOpen();
	virtual void onCloseRequested();
	virtual void onClose(ErrorCode errorCode);
	virtual void onReadReady(const ReadReq &req);
	virtual void onWriteReady(const WriteReq &req);
};


#endif