|
|
71057f |
|
|
|
71057f |
#include <cstring></cstring>
|
|
|
b42f0a |
#include <climits></climits>
|
|
|
71057f |
|
|
|
71057f |
#include <chrono></chrono>
|
|
|
71057f |
#include <thread></thread>
|
|
|
71057f |
#include <mutex></mutex>
|
|
|
71057f |
|
|
|
b42f0a |
#include <unistd.h></unistd.h>
|
|
|
b42f0a |
#include <sys types.h=""></sys>
|
|
|
b42f0a |
#include <sys socket.h=""></sys>
|
|
|
b42f0a |
#include <sys epoll.h=""></sys>
|
|
|
b42f0a |
#include <netinet in.h=""></netinet>
|
|
|
b42f0a |
|
|
|
71057f |
#include "protocol.h"
|
|
|
71057f |
#include "connection.h"
|
|
|
71057f |
|
|
|
71057f |
|
|
|
b42f0a |
#define DebugMSG ShowDebugMSG
|
|
|
6ebd3b |
|
|
|
6ebd3b |
|
|
|
6ebd3b |
|
|
|
71057f |
Connection::Connection():
|
|
|
b42f0a |
server(), srvPrev(), srvNext() { }
|
|
|
b42f0a |
|
|
|
b42f0a |
|
|
|
b42f0a |
Connection::~Connection()
|
|
|
b42f0a |
{ destroy(); }
|
|
|
71057f |
|
|
|
71057f |
|
|
|
71057f |
bool Connection::open(Protocol &protocol, void *address, size_t addressSize, int sockId, Server *server) {
|
|
|
b42f0a |
OpenLock lock(*this, protocol, address, addressSize);
|
|
|
b42f0a |
if (!lock.success)
|
|
|
b42f0a |
return false;
|
|
|
71057f |
this->sockId = sockId;
|
|
|
71057f |
this->server = server;
|
|
|
71057f |
return true;
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
|
|
|
b42f0a |
bool Connection::open(Protocol &protocol, void *address, size_t addressSize)
|
|
|
b42f0a |
{ return open(protocol, address, addressSize, -1, nullptr); }
|
|
|
71057f |
|
|
|
71057f |
|
|
|
71057f |
bool Connection::read(Task &task) {
|
|
|
6ebd3b |
DebugMSG();
|
|
|
71057f |
ReadLock lock(stateProtector);
|
|
|
71057f |
State s = state;
|
|
|
b42f0a |
if (s < STATE_OPEN || s >= STATE_CLOSING)
|
|
|
b42f0a |
return false;
|
|
|
6ebd3b |
DebugMSG();
|
|
|
6ebd3b |
readQueue.push(task);
|
|
|
b42f0a |
updateEvents(readQueue.count(), writeQueue.count());
|
|
|
71057f |
return true;
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
|
|
|
71057f |
bool Connection::write(Task &task) {
|
|
|
6ebd3b |
DebugMSG();
|
|
|
71057f |
ReadLock lock(stateProtector);
|
|
|
71057f |
State s = state;
|
|
|
b42f0a |
if (s < STATE_OPEN || s >= STATE_CLOSING)
|
|
|
b42f0a |
return false;
|
|
|
6ebd3b |
DebugMSG();
|
|
|
71057f |
writeQueue.push(task);
|
|
|
b42f0a |
updateEvents(readQueue.count(), writeQueue.count());
|
|
|
71057f |
return true;
|
|
|
71057f |
}
|
|
|
71057f |
|
|
|
71057f |
|
|
|
b42f0a |
void Connection::handleState() {
|
|
|
b42f0a |
if (stateLocal == STATE_NONE) {
|
|
|
b42f0a |
DebugMSG("none");
|
|
|
b42f0a |
bool connected = false;
|
|
|
b42f0a |
|
|
|
b42f0a |
if (sockId >= 0) {
|
|
|
b42f0a |
DebugMSG("accepting");
|
|
|
b42f0a |
initSocket(sockId);
|
|
|
b42f0a |
connected = true;
|
|
|
b42f0a |
} else
|
|
|
b42f0a |
if (addressSize >= sizeof(sa_family_t)) {
|
|
|
b42f0a |
DebugMSG("connecting");
|
|
|
b42f0a |
struct sockaddr *addr = (struct sockaddr*)address;
|
|
|
b42f0a |
sockId = ::socket(addr->sa_family, SOCK_STREAM, 0);
|
|
|
b42f0a |
if (sockId >= 0) {
|
|
|
b42f0a |
initSocket(sockId);
|
|
|
b42f0a |
if (0 == ::connect(sockId, addr, (int)addressSize))
|
|
|
b42f0a |
connected = true;
|
|
|
b42f0a |
} else {
|
|
|
b42f0a |
DebugMSG("none -> cannot create socket");
|
|
|
b42f0a |
}
|
|
|
b42f0a |
}
|
|
|
b42f0a |
|
|
|
b42f0a |
if (sockId >= 0) {
|
|
|
b42f0a |
DebugMSG("opening");
|
|
|
b42f0a |
prev = protocol->sockLast;
|
|
|
b42f0a |
(prev ? prev->next : protocol->sockFirst) = this;
|
|
|
b42f0a |
if (server) {
|
|
|
b42f0a |
srvPrev = server->connLast;
|
|
|
b42f0a |
(srvPrev ? srvPrev->srvNext : server->connFirst) = this;
|
|
|
b42f0a |
}
|
|
|
b42f0a |
updateEvents(false, !connected);
|
|
|
b42f0a |
|
|
|
b42f0a |
if (connected) {
|
|
|
b42f0a |
DebugMSG("to open");
|
|
|
b42f0a |
state = stateLocal = STATE_OPEN;
|
|
|
b42f0a |
stateProtector.wait();
|
|
|
b42f0a |
onOpen(address, addressSize);
|
|
|
b42f0a |
} else {
|
|
|
b42f0a |
assert(!server);
|
|
|
b42f0a |
DebugMSG("to connecting");
|
|
|
b42f0a |
state = stateLocal = STATE_CONNECTING;
|
|
|
b42f0a |
stateProtector.wait();
|
|
|
b42f0a |
}
|
|
|
b42f0a |
} else {
|
|
|
b42f0a |
DebugMSG("none -> to closed");
|
|
|
b42f0a |
Server *srv = server;
|
|
|
b42f0a |
assert(connected || !server);
|
|
|
b42f0a |
server = nullptr;
|
|
|
b42f0a |
sockId = -1;
|
|
|
b42f0a |
onOpeningError();
|
|
|
b42f0a |
state = STATE_CLOSED;
|
|
|
b42f0a |
stateProtector.wait();
|
|
|
b42f0a |
while(closeWaiters > 0)
|
|
|
b42f0a |
{ closeCondition.notify_all(); std::this_thread::yield(); }
|
|
|
b42f0a |
state = STATE_FINISHED;
|
|
|
b42f0a |
DebugMSG("none -> to finished");
|
|
|
b42f0a |
if (srv) server->handleDisconnect(*this, true);
|
|
|
b42f0a |
}
|
|
|
b42f0a |
} else {
|
|
|
b42f0a |
State sw = stateWanted;
|
|
|
b42f0a |
|
|
|
b42f0a |
if ( stateLocal == STATE_CONNECTING
|
|
|
b42f0a |
&& sw == STATE_CLOSING )
|
|
|
b42f0a |
{
|
|
|
b42f0a |
DebugMSG("connecting -> to closing");
|
|
|
b42f0a |
onOpeningError();
|
|
|
b42f0a |
state = stateLocal = STATE_CLOSING;
|
|
|
b42f0a |
stateProtector.wait();
|
|
|
b42f0a |
}
|
|
|
b42f0a |
|
|
|
b42f0a |
if ( stateLocal == STATE_OPEN
|
|
|
b42f0a |
&& sw == STATE_CLOSE_REQ )
|
|
|
b42f0a |
{
|
|
|
b42f0a |
DebugMSG("to close req");
|
|
|
b42f0a |
state = stateLocal = STATE_CLOSE_REQ;
|
|
|
b42f0a |
stateProtector.wait();
|
|
|
b42f0a |
onCloseReqested();
|
|
|
b42f0a |
}
|
|
|
b42f0a |
|
|
|
b42f0a |
if ( stateLocal >= STATE_OPEN
|
|
|
b42f0a |
&& stateLocal <= STATE_CLOSE_REQ
|
|
|
b42f0a |
&& sw == STATE_CLOSING )
|
|
|
b42f0a |
{
|
|
|
b42f0a |
DebugMSG("to closing");
|
|
|
b42f0a |
state = stateLocal = STATE_CLOSING;
|
|
|
b42f0a |
stateProtector.wait();
|
|
|
b42f0a |
while(Task *task = readQueue.peek())
|
|
|
b42f0a |
{ readQueue.pop(); onReadReady(*task, true); }
|
|
|
b42f0a |
while(Task *task = writeQueue.peek())
|
|
|
b42f0a |
{ writeQueue.pop(); onWriteReady(*task, true); }
|
|
|
b42f0a |
onClose(error);
|
|
|
b42f0a |
}
|
|
|
b42f0a |
|
|
|
b42f0a |
if ( stateLocal == STATE_CLOSING && !enqueued ) {
|
|
|
b42f0a |
DebugMSG("to closed");
|
|
|
b42f0a |
Server *srv = server;
|
|
|
b42f0a |
bool err = error;
|
|
|
b42f0a |
|
|
|
b42f0a |
if (server) {
|
|
|
b42f0a |
(srvPrev ? srvPrev->srvNext : server->connFirst) = srvNext;
|
|
|
b42f0a |
(srvNext ? srvNext->srvPrev : server->connLast ) = srvPrev;
|
|
|
b42f0a |
server = nullptr;
|
|
|
b42f0a |
}
|
|
|
b42f0a |
error = false;
|
|
|
b42f0a |
|
|
|
b42f0a |
(prev ? prev->next : protocol->sockFirst) = next;
|
|
|
b42f0a |
(next ? next->prev : protocol->sockLast ) = prev;
|
|
|
b42f0a |
if (sockId >= 0) {
|
|
|
b42f0a |
struct epoll_event event = {};
|
|
|
b42f0a |
epoll_ctl(protocol->epollFd, EPOLL_CTL_DEL, sockId, &event);
|
|
|
b42f0a |
::close(sockId);
|
|
|
b42f0a |
sockId = -1;
|
|
|
b42f0a |
}
|
|
|
b42f0a |
events = 0xffffffff;
|
|
|
b42f0a |
protocol = nullptr;
|
|
|
b42f0a |
|
|
|
b42f0a |
memset(address, 0, sizeof(address));
|
|
|
b42f0a |
addressSize = 0;
|
|
|
b42f0a |
stateLocal = STATE_NONE;
|
|
|
b42f0a |
state = STATE_CLOSED;
|
|
|
b42f0a |
stateWanted = STATE_NONE;
|
|
|
b42f0a |
while(closeWaiters > 0)
|
|
|
b42f0a |
{ closeCondition.notify_all(); std::this_thread::yield(); }
|
|
|
b42f0a |
state = STATE_FINISHED;
|
|
|
b42f0a |
DebugMSG("to finished");
|
|
|
b42f0a |
if (srv) srv->handleDisconnect(*this, err);
|
|
|
b42f0a |
}
|
|
|
b42f0a |
}
|
|
|
b42f0a |
}
|
|
|
b42f0a |
|
|
|
b42f0a |
|
|
|
b42f0a |
void Connection::handleEvents(unsigned int events) {
|
|
|
b42f0a |
bool needUpdate = false;
|
|
|
b42f0a |
DebugMSG("%08x", events);
|
|
|
b42f0a |
|
|
|
b42f0a |
if (stateLocal == STATE_CONNECTING) {
|
|
|
b42f0a |
DebugMSG("connecting");
|
|
|
b42f0a |
if (events & (EPOLLHUP | EPOLLERR)) {
|
|
|
b42f0a |
close(true);
|
|
|
b42f0a |
return;
|
|
|
b42f0a |
}
|
|
|
b42f0a |
|
|
|
b42f0a |
if (events & EPOLLOUT) {
|
|
|
b42f0a |
DebugMSG("connecting -> to open");
|
|
|
b42f0a |
state = stateLocal = STATE_OPEN;
|
|
|
b42f0a |
stateProtector.wait();
|
|
|
b42f0a |
onOpen(address, addressSize);
|
|
|
b42f0a |
needUpdate = true;
|
|
|
b42f0a |
}
|
|
|
b42f0a |
}
|
|
|
b42f0a |
|
|
|
b42f0a |
if ( stateLocal < STATE_OPEN
|
|
|
b42f0a |
|| stateLocal >= STATE_CLOSING )
|
|
|
b42f0a |
return;
|
|
|
b42f0a |
|
|
|
b42f0a |
bool closing = (bool)(events & (EPOLLHUP | EPOLLRDHUP | EPOLLERR));
|
|
|
b42f0a |
|
|
|
b42f0a |
if (events & EPOLLIN) {
|
|
|
b42f0a |
DebugMSG("input");
|
|
|
b42f0a |
bool ready = true;
|
|
|
b42f0a |
int pops = 0;
|
|
|
b42f0a |
int count = (readQueue.count() + 2)*2;
|
|
|
b42f0a |
while(ready) {
|
|
|
b42f0a |
Task *task = readQueue.peek();
|
|
|
b42f0a |
if (!task) {
|
|
|
b42f0a |
if (pops) needUpdate = true;
|
|
|
b42f0a |
break;
|
|
|
b42f0a |
}
|
|
|
b42f0a |
|
|
|
b42f0a |
while(task->size > task->completion) {
|
|
|
b42f0a |
size_t size = task->size - task->completion;
|
|
|
b42f0a |
if (size > INT_MAX) size = INT_MAX;
|
|
|
b42f0a |
int res = ::recv(
|
|
|
b42f0a |
sockId,
|
|
|
b42f0a |
(unsigned char*)task->data + task->completion,
|
|
|
b42f0a |
(int)size, MSG_DONTWAIT );
|
|
|
b42f0a |
if (res <= 0) {
|
|
|
b42f0a |
ready = false;
|
|
|
b42f0a |
break;
|
|
|
b42f0a |
} else {
|
|
|
b42f0a |
task->completion += res;
|
|
|
b42f0a |
}
|
|
|
b42f0a |
}
|
|
|
b42f0a |
|
|
|
b42f0a |
if (task->size <= task->completion) {
|
|
|
b42f0a |
readQueue.pop();
|
|
|
b42f0a |
onReadReady(*task, false);
|
|
|
b42f0a |
++pops;
|
|
|
b42f0a |
if (!closing && --count < 0) ready = false;
|
|
|
b42f0a |
}
|
|
|
b42f0a |
}
|
|
|
b42f0a |
}
|
|
|
b42f0a |
|
|
|
b42f0a |
if (closing) {
|
|
|
b42f0a |
DebugMSG("hup");
|
|
|
b42f0a |
close((bool)(events & EPOLLERR));
|
|
|
b42f0a |
return;
|
|
|
b42f0a |
}
|
|
|
b42f0a |
|
|
|
b42f0a |
if (events & EPOLLOUT) {
|
|
|
b42f0a |
DebugMSG("output");
|
|
|
b42f0a |
bool ready = true;
|
|
|
b42f0a |
int pops = 0;
|
|
|
b42f0a |
int count = writeQueue.count() + 2;
|
|
|
b42f0a |
while(ready) {
|
|
|
b42f0a |
Task *task = writeQueue.peek();
|
|
|
b42f0a |
if (!task) {
|
|
|
b42f0a |
if (pops) needUpdate = true;
|
|
|
b42f0a |
break;
|
|
|
b42f0a |
}
|
|
|
b42f0a |
while(task->size > task->completion) {
|
|
|
b42f0a |
size_t size = task->size - task->completion;
|
|
|
b42f0a |
if (size > INT_MAX) size = INT_MAX;
|
|
|
b42f0a |
int res = ::send(
|
|
|
b42f0a |
sockId,
|
|
|
b42f0a |
(const unsigned char*)task->data + task->completion,
|
|
|
b42f0a |
(int)size, MSG_DONTWAIT );
|
|
|
b42f0a |
if (res <= 0) {
|
|
|
b42f0a |
ready = false;
|
|
|
b42f0a |
break;
|
|
|
b42f0a |
} else {
|
|
|
b42f0a |
task->completion += res;
|
|
|
b42f0a |
}
|
|
|
b42f0a |
}
|
|
|
b42f0a |
|
|
|
b42f0a |
if (task->size <= task->completion) {
|
|
|
b42f0a |
writeQueue.pop();
|
|
|
b42f0a |
onWriteReady(*task, false);
|
|
|
b42f0a |
++pops;
|
|
|
b42f0a |
if (--count <= 0) ready = false;
|
|
|
b42f0a |
}
|
|
|
b42f0a |
}
|
|
|
b42f0a |
}
|
|
|
b42f0a |
|
|
|
b42f0a |
if (needUpdate)
|
|
|
b42f0a |
updateEvents(readQueue.count(), writeQueue.count());
|
|
|
b42f0a |
}
|
|
|
b42f0a |
|
|
|
b42f0a |
|
|
|
b42f0a |
void Connection::onReadReady(Task&, bool) { }
|
|
|
b42f0a |
void Connection::onWriteReady(Task&, bool) { }
|
|
|
71057f |
|