//Qt includes
#include <QCoreApplication>
#include <QThread>
#include <QTime>
#include <QSharedMemory>
#include <QProcess>
#include <QMutex>
#include <QSemaphore>
#include <QAtomicInt>
#include <QEventLoop>
#include <QTimer>
//System-specific includes
#ifdef _WIN32
#include <windows.h>
#elif MACOSX
#include <sys/sysctl.h>
#include <unistd.h>
#elif LINUX
#include <sys/time.h>
#include <unistd.h>
#endif
#include "tipc.h"
/*
PLATFORM-SPECIFIC REMINDERS:
There are few remarks to be aware when maintaining this code.
Please, be careful that:
- It seems that, on Windows, QLocalSocket::waitForBytesWritten does not return
success unless the data is actually read on the other end. On Unix this is not
the case, presumably because the data is written to a buffer which can be read by
the other process at some later point.
Thus, *BE SURE* that every data written is received on the other side.
- On MACOSX, the default shared memory settings can be quite restrictive.
On a standard machine, the maximum size of a shared segment is 4 MB, exactly
the same as the TOTAL size of shared memory available.
Whereas tipc respects the former parameter, there must be a way to circumvent the
latter in order to make the allocation of multiple shared segments of the maximum
size.
*/
//********************************************************
// Diagnostics Stuff
//********************************************************
//#define TIPC_DEBUG
#ifdef TIPC_DEBUG
#define tipc_debug(expr) expr
#else
#define tipc_debug(expr)
#endif
#ifdef TIPC_DEBUG
#include <QTime>
#endif
//********************************************************
// Local namespace Stuff
//********************************************************
namespace
{
int shm_max = -1;
int shm_all = -1;
int shm_seg = -1;
int shm_mni = -1;
}
//********************************************************
// tipc Stream Implementation
//********************************************************
int tipc::Stream::readSize()
{
if (m_socket->bytesAvailable() < sizeof(TINT32))
return -1;
TINT32 msgSize = -1;
m_socket->peek((char *)&msgSize, sizeof(TINT32));
return msgSize;
}
//-------------------------------------------------------------
bool tipc::Stream::messageReady()
{
TINT32 msgSize;
return (msgSize = readSize()) >= 0 && m_socket->bytesAvailable() >= msgSize;
}
//-------------------------------------------------------------
bool tipc::Stream::readData(char *data, qint64 dataSize, int msecs)
{
tipc_debug(qDebug("tipc::Stream::readData entry"));
qint64 r, dataRead = 0;
char *currData = data;
while (dataRead < dataSize) {
if ((m_socket->bytesAvailable() == 0) && !m_socket->waitForReadyRead(msecs)) {
tipc_debug(qDebug("tipc::Stream::readData exit (unexpected loss of data)"));
return false;
}
//Read the supplied data
currData += r = m_socket->read(currData, dataSize - dataRead);
dataRead += r;
}
tipc_debug(qDebug("tipc::Stream::readData exit"));
return true;
}
//-------------------------------------------------------------
bool tipc::Stream::readDataNB(char *data, qint64 dataSize, int msecs,
QEventLoop::ProcessEventsFlag flag)
{
tipc_debug(qDebug("tipc::Stream::readDataNB entry"));
qint64 r, dataRead = 0;
char *currData = data;
QEventLoop loop;
QObject::connect(m_socket, SIGNAL(readyRead()), &loop, SLOT(quit()));
QObject::connect(m_socket, SIGNAL(error(QLocalSocket::LocalSocketError)), &loop, SLOT(quit()));
if (msecs >= 0)
QTimer::singleShot(msecs, &loop, SLOT(quit()));
while (dataRead < dataSize) {
if (m_socket->bytesAvailable() == 0) {
loop.exec(flag);
if (m_socket->bytesAvailable() == 0) {
tipc_debug(qDebug("tipc::Stream::readDataNB exit (unexpected loss of data)"));
return false;
}
}
//Read the supplied data
currData += r = m_socket->read(currData, dataSize - dataRead);
dataRead += r;
}
tipc_debug(qDebug("tipc::Stream::readDataNB exit"));
return true;
}
//-------------------------------------------------------------
/*!
Reads the message and returns its header.
This function reads a complete message from the socket, waiting
until it is completely available. The function accepts
an inactivity timeout which can be supplied to drop the operation
after msecs milliseconds no data has been received.
*/
bool tipc::Stream::readMessage(Message &msg, int msecs)
{
TINT32 msgSize = 0;
if (!readData((char *)&msgSize, sizeof(TINT32), msecs))
return false;
msg.ba().resize(msgSize);
if (!readData(msg.ba().data(), msgSize, msecs))
return false;
return true;
}
//-------------------------------------------------------------
/*!
The non-blocking equivalent to readMessage(), this function
performs event processing in a local event loop until all
message data has been received.
*/
bool tipc::Stream::readMessageNB(Message &msg, int msecs,
QEventLoop::ProcessEventsFlag flag)
{
TINT32 msgSize = 0;
if (!readDataNB((char *)&msgSize, sizeof(TINT32), msecs, flag))
return false;
msg.ba().resize(msgSize);
if (!readDataNB(msg.ba().data(), msgSize, msecs, flag))
return false;
return true;
}
//-------------------------------------------------------------
/*!
Flushes all data written to the stream.
This function waits until all data written on the stream
has been successfully delivered in output.
Returns true if the operation was successful, false if
it timed out or an error occurred.
*/
bool tipc::Stream::flush(int msecs)
{
tipc_debug(qDebug("tipc:flush entry"));
while (m_socket->bytesToWrite() > 0) {
tipc_debug(qDebug() << "bytes to write:" << m_socket->bytesToWrite());
bool ok = m_socket->flush();
tipc_debug(qDebug() << "flush success:" << ok << "bytes to write:" << m_socket->bytesToWrite());
if (m_socket->bytesToWrite() > 0 && !m_socket->waitForBytesWritten(msecs))
return false;
}
tipc_debug(qDebug() << "tipc:flush exit - bytes to write:" << m_socket->bytesToWrite());
return (m_socket->bytesToWrite() == 0);
}
//********************************************************
// tipc Stream Operators
//********************************************************
//! \warning This operation assumes that all the message is available for read.
//! Use tipc::stream::readMessage if this cannot be ensured.
tipc::Stream &operator>>(tipc::Stream &stream, tipc::Message &msg)
{
QLocalSocket *socket = stream.socket();
msg.clear();
TINT32 msgSize;
socket->read((char *)&msgSize, sizeof(TINT32));
msg.ba().resize(msgSize);
socket->read(msg.ba().data(), msgSize);
return stream;
}
tipc::Stream &operator<<(tipc::Stream &stream, tipc::Message &msg)
{
QLocalSocket *socket = stream.socket();
TINT32 size = msg.ba().size();
socket->write((char *)&size, sizeof(TINT32));
socket->write(msg.ba().data(), size);
return stream;
}
//********************************************************
// tipc Utilities
//********************************************************
/*!
Appends the invoking process' pid to the passed srvName.
*/
QString tipc::applicationSpecificServerName(QString srvName)
{
return srvName + QString::number(QCoreApplication::applicationPid());
}
//-------------------------------------------------------------
bool tipc::startBackgroundProcess(QString cmdline)
{
#ifdef _WIN32
QProcess *proc = new QProcess;
proc->start(cmdline);
if (proc->state() == QProcess::NotRunning) {
delete proc;
return false;
}
QObject::connect(proc, SIGNAL(finished(int, QProcess::ExitStatus)), proc, SLOT(deleteLater()));
QObject::connect(proc, SIGNAL(error(QProcess::ProcessError)), proc, SLOT(deleteLater()));
return true;
#else
return QProcess::startDetached(cmdline);
;
#endif
}
//-------------------------------------------------------------
/*!
Invokes the passed command line to run a slave server.
A slave server is hereby intended as a 'child' server process which
automatically destroys itself in case the calling application
crashes.
This process \b MUST support one server, running in the \b MAIN \b THREAD,
whose name is <srvName>_main.
This function waits until the main server is up and ready to
listen for incoming connections - no timeout accepted.
\warning Please, observe that a correct slave server name should be
ensured to be unique to the system.
*/
bool tipc::startSlaveServer(QString srvName, QString cmdline)
{
if (!tipc::startBackgroundProcess(cmdline))
return false;
QString mainSrvName(srvName + "_main");
//Establish a dummy socket connection to provide a mean for the process
//to tell whether the calling process exited unexpectedly.
QLocalSocket *dummySock = new QLocalSocket;
dummySock->connectToServer(mainSrvName);
//Wait up to msecs until the socket is connecting. Wait a small amount of time
//until the server is up and listening to connection (there is no other way to tell).
while (dummySock->state() == QLocalSocket::UnconnectedState) {
#ifdef _WIN32
Sleep(10);
#else
usleep(10 << 10); //10.24 msecs
#endif
dummySock->connectToServer(mainSrvName);
}
dummySock->waitForConnected(-1);
tipc::Stream stream(dummySock);
tipc::Message msg;
//Supply the 'quit if this socket connection fails' command
//This command ensure termination of the child process in case of some errors
//or ending of the program
stream << (msg << QString("$quit_on_error"));
if (tipc::readMessage(stream, msg, 3000) == QString()) {
std::cout << "tipc::startSlaveServer - tipc::readMessage TIMEOUT" << std::endl;
return false;
}
//The server should die if dummyDock is destroyed. This should happen when the *MAIN* thread
//in *this process* exits. So, if this is not the main thread, we must move the socket there.
if (QThread::currentThread() != QCoreApplication::instance()->thread())
dummySock->moveToThread(QCoreApplication::instance()->thread());
//If a connection error takes place, release the dummy socket.
//Please, observe that this QObject::connect is invoked *AFTER* the connection trials above...
QObject::connect(dummySock, SIGNAL(error(QLocalSocket::LocalSocketError)), dummySock, SLOT(deleteLater()));
return true;
}
//-------------------------------------------------------------
/*!
Connects the passed socket to the server with name <srvName> + <threadName>.
Awaits for the connection up to msecs milliseconds before returning false.
If no server was found, a new slave server is started by invoking
the supplied command line and connection is re-attempted.
Returns true on success, false otherwise.
\warning Please, observe that a correct slave server name should be
ensured to be unique to the parent process.
*/
bool tipc::startSlaveConnection(QLocalSocket *socket, QString srvName, int msecs,
QString cmdline, QString threadName)
{
QTime time;
time.start();
if (msecs == -1)
msecs = (std::numeric_limits<int>::max)();
QString fullSrvName(srvName + threadName);
socket->connectToServer(fullSrvName);
//If the socket is not connecting, the server lookup table returned that the no server with
//the passed name exists. This means that a server must be created.
if (socket->state() == QLocalSocket::UnconnectedState && !cmdline.isEmpty()) {
//Completely serialize the server start
static QMutex mutex;
QMutexLocker locker(&mutex);
//Retry connection - this is required due to the mutex
socket->connectToServer(fullSrvName);
if (socket->state() != QLocalSocket::UnconnectedState)
goto connecting;
//Invoke the supplied command line to start the server
if (!tipc::startSlaveServer(srvName, cmdline))
return false;
//Reconnect to the server
socket->connectToServer(fullSrvName);
if (socket->state() == QLocalSocket::UnconnectedState)
return false;
}
connecting:
//Now, the server is connecting or already connected. Wait until the socket is connected.
socket->waitForConnected(msecs - time.elapsed());
if (socket->state() != QLocalSocket::ConnectedState)
return false;
return true;
}
//-------------------------------------------------------------
/*!
Waits and reads the next message from stream.
This function is mainly a useful macro that encapsulates
the following steps in one call:
\li Flush the write buffer (output messages)
\li Wait until an input message is completely readable
\li Read the message from stream
\li Read the first string from the message and return it
This function returns an empty QString if the message could not be
entirely retrieved from the stream.
*/
QString tipc::readMessage(Stream &stream, Message &msg, int msecs)
{
msg.clear();
stream.flush();
if (!stream.readMessage(msg, msecs))
return QString();
QString res;
msg >> res;
return res;
}
//-------------------------------------------------------------
/*!
The non-blocking equivalent to tipc::readMessage.
*/
QString tipc::readMessageNB(Stream &stream, Message &msg, int msecs,
QEventLoop::ProcessEventsFlag flag)
{
msg.clear();
if (!stream.readMessageNB(msg, msecs, flag))
return QString();
QString res;
msg >> res;
return res;
}
//-------------------------------------------------------------
/*!
Returns an inter-process unique id string; the returned
id should be used to create QSharedMemory objects.
*/
QString tipc::uniqueId()
{
static QAtomicInt count;
count.ref();
return QString::number(QCoreApplication::applicationPid()) + "_" + QString::number((int)count);
}
//-------------------------------------------------------------
//! Returns the maximum size of a shared memory segment allowed by the system
int tipc::shm_maxSegmentSize()
{
if (shm_max < 0) {
#ifdef MACOSX
//Retrieve it by invoking sysctl
size_t valSize = sizeof(TINT64);
TINT64 val;
sysctlbyname("kern.sysv.shmmax", &val, &valSize, NULL, 0);
shm_max = std::min(val, (TINT64)(std::numeric_limits<int>::max)());
#else
//Windows case: no such limit
//Observe that QSharedMemory accepts only an int size - so the num_lim is against int.
shm_max = (std::numeric_limits<int>::max)();
#endif
}
return shm_max;
}
//-------------------------------------------------------------
//! Returns the maximum number of shared segments allowed by the system
int tipc::shm_maxSegmentCount()
{
if (shm_seg < 0) {
#ifdef MACOSX
size_t valSize = sizeof(TINT64);
TINT64 val;
sysctlbyname("kern.sysv.shmseg", &val, &valSize, NULL, 0);
shm_seg = std::min(val, (TINT64)(std::numeric_limits<int>::max)());
#else
//Windows case: no such limit - again, using limit against max due to Qt
shm_seg = (std::numeric_limits<int>::max)();
#endif
}
return shm_seg;
}
//-------------------------------------------------------------
int tipc::shm_maxSharedPages()
{
if (shm_all < 0) {
#ifdef MACOSX
size_t valSize = sizeof(TINT64);
TINT64 val;
sysctlbyname("kern.sysv.shmall", &val, &valSize, NULL, 0);
shm_all = std::min(val, (TINT64)(std::numeric_limits<int>::max)());
#else
shm_all = (std::numeric_limits<int>::max)();
#endif
}
return shm_all;
}
//-------------------------------------------------------------
int tipc::shm_maxSharedCount()
{
if (shm_mni < 0) {
#ifdef MACOSX
size_t valSize = sizeof(TINT64);
TINT64 val;
sysctlbyname("kern.sysv.shmmni", &val, &valSize, NULL, 0);
shm_mni = std::min(val, (TINT64)(std::numeric_limits<int>::max)());
#else
shm_mni = (std::numeric_limits<int>::max)();
#endif
}
return shm_mni;
}
//-------------------------------------------------------------
/*!
Attempts to set the shared memory parameters to the system.
This is only working on MAC's SystemV shm, it's a no-op on Win.
This function will fail anyway if the process is not owned by an
admin.
*/
void tipc::shm_set(int shmmax, int shmseg, int shmall, int shmmni)
{
tipc_debug(qDebug("shmmax: %i, shmseg: %i, shmall: %i, shmmni: %i", shmmax, shmseg, shmall, shmmni));
#ifdef MACOSX
TINT64 val;
int err;
if (shmmax > 0) {
val = shmmax;
err = sysctlbyname("kern.sysv.shmmax", NULL, NULL, &val, sizeof(TINT64));
if (!err)
shm_max = shmmax;
}
if (shmseg > 0) {
val = shmseg;
err = sysctlbyname("kern.sysv.shmseg", NULL, NULL, &val, sizeof(TINT64));
if (!err)
shm_seg = shmseg;
}
if (shmall > 0) {
val = shmall;
err = sysctlbyname("kern.sysv.shmall", NULL, NULL, &val, sizeof(TINT64));
if (!err)
shm_all = shmall;
}
if (shmmni > 0) {
val = shmmni;
err = sysctlbyname("kern.sysv.shmmni", NULL, NULL, &val, sizeof(TINT64));
if (!err)
shm_mni = shmmni;
}
#endif
}
//-------------------------------------------------------------
/*!
Creates a shared memory segment for passed QSharedMemory.
This function attempts creation of a shared memory segment
in the form of Qt's QSharedMemory, with the following \b UNIX-specific
distinctions:
<LI> If the segment size is beyond that supported by the system,
the function can be set to either fail or return a segment with
the maximum supported size. <\LI>
<LI> Unlike QSharedMemory::create, this function attempts to
reclaim an already existing memory id before creating a new one. <\LI>
*/
int tipc::create(QSharedMemory &shmem, int size, bool strictSize)
{
bool ok, retried = false;
if (!strictSize)
size = std::min(size, (int)shm_maxSegmentSize());
tipc_debug(qDebug() << "shMem create: size =" << size);
retry:
ok = shmem.create(size);
if (!ok) {
tipc_debug(qDebug() << "Error: Shared Segment could not be created: #" << shmem.errorString());
//Unix-specific error recovery follows. See Qt's docs about it.
//Try to recover error #AlreadyExists - supposedly, the server crashed in a previous instance.
//As shared memory segments that happen to go this way are owned by the server process with 1
//reference count, detaching it now may solve the issue.
if (shmem.error() == QSharedMemory::AlreadyExists && !retried) {
retried = true; // We're trying this only once... for now it works.
shmem.attach();
shmem.detach();
goto retry;
}
return -1;
}
return size;
}
//-------------------------------------------------------------
/*!
Writes data through a shared memory segment medium.
*/
bool tipc::writeShMemBuffer(Stream &stream, Message &msg, int bufSize, ShMemWriter *dataWriter)
{
tipc_debug(QTime time; time.start());
tipc_debug(qDebug("tipc::writeShMemBuffer entry"));
static QSemaphore sem(tipc::shm_maxSegmentCount());
sem.acquire(1);
{
//Create a shared memory segment, possibly of passed size
QSharedMemory shmem(tipc::uniqueId());
bool ok = (tipc::create(shmem, bufSize) > 0);
if (!ok)
goto err;
//Communicate the shared memory id and bufSize to the reader
msg << QString("shm") << shmem.key() << bufSize;
//Fill in data until all the buffer has been sent
int chunkData, remainingData = bufSize;
while (remainingData > 0) {
//Write to the shared memory segment
tipc_debug(QTime xchTime; xchTime.start());
shmem.lock();
remainingData -= chunkData = dataWriter->write((char *)shmem.data(), std::min(shmem.size(), remainingData));
shmem.unlock();
tipc_debug(qDebug() << "exchange time:" << xchTime.elapsed());
stream << (msg << QString("chk") << chunkData);
if (tipc::readMessage(stream, msg) != "ok")
goto err;
msg.clear();
}
}
sem.release(1);
tipc_debug(qDebug("tipc::writeShMemBuffer exit"));
tipc_debug(qDebug() << "tipc::writeShMemBuffer time:" << time.elapsed());
return true;
err:
tipc_debug(qDebug("tipc::writeShMemBuffer exit (error)"));
msg.clear();
sem.release(1);
return false;
}
//-------------------------------------------------------------
/*!
Reads data through a shared memory segment medium.
*/
bool tipc::readShMemBuffer(Stream &stream, Message &msg, ShMemReader *dataReader)
{
tipc_debug(QTime time; time.start(););
tipc_debug(qDebug("tipc::readShMemBuffer entry"));
//Read the id from stream
QString res(tipc::readMessage(stream, msg));
if (res != "shm") {
tipc_debug(qDebug("tipc::readShMemBuffer exit (res != \"shm\")"));
return false;
}
//Read message and reply
QString id, chkStr;
int bufSize;
msg >> id >> bufSize >> chkStr;
//Data is ready to be read - attach to the shared memory segment.
QSharedMemory shmem(id);
shmem.attach();
if (!shmem.isAttached()) {
tipc_debug(qDebug("tipc::readShMemBuffer exit (shmem not attached)"));
return false;
}
//Start reading from it
int chunkData, remainingData = bufSize;
while (true) {
msg >> chunkData;
tipc_debug(QTime xchTime; xchTime.start());
shmem.lock();
remainingData -= dataReader->read((const char *)shmem.data(), chunkData);
shmem.unlock();
tipc_debug(qDebug() << "exchange time:" << xchTime.elapsed());
//Data was read. Inform the writer
stream << (msg << clr << QString("ok"));
stream.flush();
if (remainingData <= 0)
break;
//Wait for more chunks
if (tipc::readMessage(stream, msg) != "chk") {
tipc_debug(qDebug("tipc::readShMemBuffer exit (unexpected chunk absence)"));
return false;
}
}
shmem.detach();
tipc_debug(qDebug("tipc::readShMemBuffer exit"));
tipc_debug(qDebug() << "tipc::readShMemBuffer time:" << time.elapsed());
return true;
}