Blame connection.h

f07ad6
#ifndef CONNECTION_H
f07ad6
#define CONNECTION_H
f07ad6
f07ad6
f07ad6
#include <cstring></cstring>
f07ad6
f07ad6
#include <deque></deque>
f07ad6
#include <mutex></mutex>
541903
#include <condition_variable></condition_variable>
f07ad6
541903
#include "common.h"
f07ad6
#include "address.h"
f07ad6
f07ad6
f07ad6
enum : ErrorCode  {
f07ad6
	ERR_CONNECTION_COMMON = ERR_CONNECTION,
f07ad6
	ERR_CONNECTION_FAILED,
f07ad6
	ERR_CONNECTION_UNFINISHED_TASKS,
f07ad6
	ERR_CONNECTION_LOST,
646228
	ERR_CONNECTION_IS_SWITCHING,
f07ad6
};
f07ad6
f07ad6
f07ad6
class Protocol;
f07ad6
class Socket;
f07ad6
class Server;
f07ad6
f07ad6
541903
class Connection: public Shared {
f07ad6
public:
98bb38
	typedef THandle<connection> Handle;</connection>
541903
	
f07ad6
	typedef unsigned long long ReqId;
2499ad
	typedef RecursiveMutex Mutex;
f07ad6
	typedef std::lock_guard<mutex> Lock;</mutex>
f07ad6
	
f07ad6
	template<typename t=""></typename>
f07ad6
	struct TRWReq {
f07ad6
		typedef T Type;
f07ad6
		typedef std::deque< TRWReq<t> > Queue;</t>
f07ad6
		
f07ad6
		ReqId id;
f07ad6
		void *userData;
f07ad6
		Type *data;
f07ad6
		size_t size;
f07ad6
		size_t doneSize;
f07ad6
f07ad6
		inline TRWReq(ReqId id = 0, void *userData = nullptr, Type *data = nullptr, size_t size = 0):
f07ad6
			id(id), userData(userData), data(data), size(size), doneSize() { }
f07ad6
		
f07ad6
		inline bool success() const
f07ad6
			{ return doneSize >= size; }
f07ad6
		inline size_t remains() const
f07ad6
			{ assert(size >= doneSize); return size - doneSize; }
f07ad6
		inline void processed(size_t size) {
f07ad6
			assert(size <= remains());
f07ad6
			if (doneSize > size || size > remains())
f07ad6
				doneSize = size; else doneSize += size;
f07ad6
		}
f07ad6
		inline Type* current() const
f07ad6
			{ assert(size >= doneSize); return const_cast<char*>((const char*)data + doneSize); }</char*>
f07ad6
	};
f07ad6
	
f07ad6
	typedef TRWReq<void> ReadReq;</void>
f07ad6
	typedef TRWReq<const void=""> WriteReq;</const>
f07ad6
	typedef ReadReq::Queue ReadQueue;
f07ad6
	typedef WriteReq::Queue WriteQueue;
f07ad6
	
f07ad6
	template<typename t=""></typename>
f07ad6
	class TRWLock {
f07ad6
	public:
f07ad6
		typedef TRWReq<t> ReqType;</t>
f07ad6
		typedef TRWLock<t> LockType;</t>
f07ad6
		
f07ad6
		Lock lock;
f07ad6
		Connection &connection;
f07ad6
		ReqType * const request;
f07ad6
		
f07ad6
	protected:
f07ad6
		inline TRWLock(Connection &connection, typename ReqType::Queue &queue):
9bbba5
			lock(connection.mutex),
9bbba5
			connection(connection),
2499ad
			request(connection.started && !connection.switching && !queue.empty() ? &queue.front() : nullptr)
9bbba5
			{ }
f07ad6
	
f07ad6
	public:
f07ad6
		inline operator bool() const
f07ad6
			{ return request; }
f07ad6
		inline bool success() const
f07ad6
			{ return request && request->success(); }
f07ad6
		inline ReqType& operator*() const
f07ad6
			{ assert(request); return request; }
f07ad6
		inline ReqType* operator->() const
f07ad6
			{ assert(request); return request; }
f07ad6
	};
f07ad6
	
f07ad6
	class ReadLock: public TRWLock<void> {</void>
f07ad6
	public:
f07ad6
		inline ReadLock(Connection &connection):
f07ad6
			LockType(connection, connection.readQueue) { }
f07ad6
		inline ~ReadLock()
2499ad
			{ if (success()) connection.readDone(); }
f07ad6
	};
f07ad6
	
f07ad6
	class WriteLock: public TRWLock<const void=""> {</const>
f07ad6
	public:
f07ad6
		inline WriteLock(Connection &connection):
f07ad6
			LockType(connection, connection.writeQueue) { }
f07ad6
		inline ~WriteLock()
2499ad
			{ if (success()) connection.writeDone(); }
f07ad6
	};
f07ad6
	
f07ad6
private:
f07ad6
	Mutex mutex;
646228
	bool started;
646228
	bool switching;
f07ad6
	Socket *socket;
f07ad6
	
f07ad6
	ReqId lastId;
f07ad6
	ReadQueue readQueue;
f07ad6
	WriteQueue writeQueue;
f07ad6
	
541903
	bool closeRequested;
541903
	unsigned long long closeTimeUs;
646228
	std::condition_variable_any closeWaitCondition;
646228
	
646228
	// mutex must be locked before call
646228
	// used in open and close
646228
	void clean();
646228
	
2a209b
	friend class Protocol;
646228
	ErrorCode open(Socket *socket);
2499ad
	
2499ad
	void readDone();
2499ad
	void writeDone();
2499ad
	
f07ad6
public:
f07ad6
	Connection();
f07ad6
	virtual ~Connection();
f07ad6
	
f07ad6
	void close(ErrorCode errorCode = ERR_NONE);
646228
	void closeReq();
646228
	void closeWait(unsigned long long timeoutUs, bool withReqest = true);
f07ad6
	
f07ad6
	ReqId writeReq(const void *data, size_t size, void *userData = nullptr);
f07ad6
	ReqId readReq(void *data, size_t size, void *userData = nullptr);
f07ad6
	
f07ad6
protected:
541903
	// mutex must be locked before calling of all following methods
5a39be
	inline bool isCloseRequested() const { return started && closeRequested; }
5a39be
	inline bool isAllReadDone() const { return readQueue.empty() || readQueue.front().success(); }
5a39be
	inline bool isAllWriteDone() const { return writeQueue.empty() || writeQueue.front().success(); }
5a39be
	inline bool isAllDone() const { return isAllReadDone() && isAllWriteDone(); }
f07ad6
	Protocol& getProtocol() const;
f07ad6
	const Address& getRemoteAddress() const;
9bbba5
	const THandle<connection>& getConnection() const;</connection>
98bb38
	const THandle<server>& getServer() const;</server>
f07ad6
646228
	virtual ErrorCode onOpen();
541903
	virtual void onCloseRequested();
f07ad6
	virtual void onClose(ErrorCode errorCode);
f07ad6
	virtual void onReadReady(const ReadReq &req);
f07ad6
	virtual void onWriteReady(const WriteReq &req);
f07ad6
};
f07ad6
f07ad6
f07ad6
#endif