#ifndef TRAIN_INC_CPP
#define TRAIN_INC_CPP
#include <chrono>
#include <thread>
#include "layer.inc.cpp"
long long timeUs() {
static std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now();
return (long long)std::chrono::duration_cast<std::chrono::microseconds>( std::chrono::steady_clock::now() - begin ).count();
}
struct Quality {
AccumReal train;
AccumReal human;
inline Quality& operator+=(const Quality &b) {
train += b.train;
human += b.human;
return *this;
}
inline Quality& operator*=(AccumReal x) {
train *= x;
human *= x;
return *this;
}
inline bool operator<(const Quality &b) const {
return human < b.human ? true
: b.human < human ? false
: train < b.train;
}
};
class Trainer {
private:
std::atomic<unsigned int> barrierCounter;
std::vector<Quality> qualities;
public:
Layer *layer;
AccumReal ratio;
int threadsCount;
int itersPerBlock;
int blocksPerSaving;
int blocksCount;
AccumReal qmin;
protected:
std::atomic<unsigned int> skipBackpass;
Layer *fl;
Layer *bl;
virtual bool prepare() { return true; }
virtual bool prepareBlock() { return true; }
virtual void finishBlock() { }
virtual void finish() { }
virtual void loadData(Barrier &barrier, int block, int iter) { }
virtual Quality verifyData(Barrier &barrier, int block, int iter) { return Quality{}; }
private:
void threadFunc(int tid, unsigned int seed, int block) {
Barrier barrier(barrierCounter, tid, threadsCount, seed);
Quality sumQ = {};
for(int i = 0; i < itersPerBlock; ++i) {
barrier.wait();
loadData(barrier, block, i);
for(Layer *l = fl->next; l; l = l->next) {
barrier.wait();
l->pass(barrier);
}
barrier.wait();
skipBackpass = 0;
sumQ += verifyData(barrier, block, i);
barrier.wait();
bool skipBp = skipBackpass;
barrier.wait();
if (ratio > 0 && !skipBp) {
for(Layer *l = bl; l->prev && l->prev->prev; l = l->prev) {
barrier.wait();
l->backpassDeltas(barrier);
}
for(Layer *l = bl; l->prev; l = l->prev) {
barrier.wait();
l->backpassWeights(barrier);
}
}
}
qualities[tid] = sumQ;
}
Quality runThreads(int block) {
barrierCounter = 0;
std::vector<std::thread*> t(threadsCount, nullptr);
for(int i = 1; i < threadsCount; ++i)
t[i] = new std::thread(&Trainer::threadFunc, this, i, block, rand());
threadFunc(0, block, rand());
Quality result = qualities[0];
for(int i = 1; i < threadsCount; ++i)
{ t[i]->join(); delete t[i]; result += qualities[i]; }
return result *= 1/(AccumReal)itersPerBlock;
}
public:
Trainer():
barrierCounter(0),
layer(),
ratio(),
threadsCount(1),
itersPerBlock(100),
blocksPerSaving(),
blocksCount(1000),
qmin(),
skipBackpass(0),
fl(),
bl() { }
Trainer& configure(
Layer &layer,
AccumReal ratio,
int threadsCount,
int itersPerBlock,
int blocksPerSaving,
int blocksCount,
AccumReal qmin )
{
this->layer = &layer;
this->ratio = ratio;
this->threadsCount = threadsCount;
this->itersPerBlock = itersPerBlock;
this->blocksPerSaving = blocksPerSaving;
this->blocksCount = blocksCount;
this->qmin = qmin;
return *this;
}
Quality run() {
assert(layer && !layer->prev && layer->next);
assert(threadsCount > 0);
assert(itersPerBlock > 0);
Quality bad = {INFINITY, INFINITY};
printf("training: threads %d, itersPerBlock %d, ratio: %lf\n", threadsCount, itersPerBlock, ratio);
fflush(stdout);
fl = layer;
bl = &layer->back();
qualities.clear();
qualities.resize(threadsCount, Quality{});
for(Layer *l = layer; l; l = l->next)
l->split(threadsCount);
if (!prepare())
return printf("cannot prepare\n"), bad;
AccumReal ratioCopy = ratio;
Quality result = bad, best = result, saved = result;
long long fullTimeStartUs = timeUs();
ratio = 0;
int i = 0;
int bps = blocksPerSaving > 0 ? blocksPerSaving : 1;
int nextSave = i + bps;
while(true) {
if (!prepareBlock()) {
printf("cannot prepare block\n");
result = bad;
break;
};
long long runTimeUs = timeUs();
result = runThreads(i);
runTimeUs = timeUs() - runTimeUs;
finishBlock();
long long t = timeUs();
long long fullTimeUs = t - fullTimeStartUs;
fullTimeStartUs = t;
++i;
if (i == 1) saved = result;
bool good = result < best;
bool done = (blocksCount > 0 && i >= blocksCount) || result.human <= qmin;
bool saving = ratio > 0 && (i >= nextSave || done) && result < saved;
if (good) best = result;
printf("%4d, total %7d, avg.result %f (%f), best %f (%f), time: %f / %f%s\n",
i, i*itersPerBlock,
result.human, result.train, best.human, best.train,
runTimeUs*0.000001, fullTimeUs*0.000001,
(saving ? ", saving" : "" ) );
fflush(stdout);
if (saving) {
if (!layer->save()) {
printf("saving failed\n");
result = bad;
break;
}
saved = result;
nextSave += bps;
}
if (done) break;
ratio = ratioCopy;
}
ratio = ratioCopy;
finish();
return result;
}
};
#endif