#ifndef NNTRAIN_INC_CPP
#define NNTRAIN_INC_CPP
#include <chrono>
#include <thread>
#include "layer.inc.cpp"
long long timeUs() {
static std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now();
return (long long)std::chrono::duration_cast<std::chrono::microseconds>( std::chrono::steady_clock::now() - begin ).count();
}
class Trainer {
private:
std::atomic<unsigned int> barrierCounter;
std::vector<AccumReal> qualities;
public:
Layer *layer;
AccumReal ratio;
int threadsCount;
int itersPerBlock;
int blocksPerSaving;
int blocksCount;
AccumReal qmin;
protected:
volatile bool doBackpassAtThisIteration;
Layer *fl;
Layer *bl;
virtual bool prepare() { return true; }
virtual bool prepareBlock() { return true; }
virtual void finishBlock() { }
virtual void finish() { }
virtual void loadData(Barrier &barrier, int block, int iter) { }
virtual AccumReal verifyData(Barrier &barrier, int block, int iter) { return 0; }
virtual void loadDataMain(int block, int iter) { };
virtual AccumReal verifyDataMain(int block, int iter) { return 0; };
private:
void threadFunc(int tid, int block) {
Barrier barrier(barrierCounter, tid, threadsCount);
volatile AccumReal &sumQ = qualities[tid] = 0;
for(int i = 0; i < itersPerBlock; ++i) {
barrier.wait();
loadData(barrier, block, i);
barrier.wait();
if (!tid) loadDataMain(block, i);
for(Layer *l = fl->next; l; l = l->next) {
barrier.wait();
l->pass(barrier);
}
barrier.wait();
sumQ += verifyData(barrier, block, i);
barrier.wait();
if (!tid) {
doBackpassAtThisIteration = true;
sumQ += verifyDataMain(block, i);
}
barrier.wait();
if (ratio > 0 && doBackpassAtThisIteration) {
for(Layer *l = bl; l->prev && l->prev->prev; l = l->prev) {
barrier.wait();
l->backpassDeltas(barrier);
}
for(Layer *l = bl; l->prev; l = l->prev) {
barrier.wait();
l->backpassWeights(barrier);
}
}
}
}
AccumReal runThreads(int block) {
barrierCounter = 0;
std::vector<std::thread*> t(threadsCount, nullptr);
for(int i = 1; i < threadsCount; ++i)
t[i] = new std::thread(&Trainer::threadFunc, this, i, block);
threadFunc(0, block);
AccumReal result = qualities[0];
for(int i = 1; i < threadsCount; ++i)
{ t[i]->join(); delete t[i]; result += qualities[i]; }
return result / itersPerBlock;
}
public:
Trainer():
barrierCounter(0),
layer(),
ratio(),
threadsCount(1),
itersPerBlock(100),
blocksPerSaving(),
blocksCount(1000),
qmin(),
doBackpassAtThisIteration(),
fl(),
bl() { }
Trainer& configure(
Layer &layer,
AccumReal ratio,
int threadsCount,
int itersPerBlock,
int blocksPerSaving,
int blocksCount,
AccumReal qmin )
{
this->layer = &layer;
this->ratio = ratio;
this->threadsCount = threadsCount;
this->itersPerBlock = itersPerBlock;
this->blocksPerSaving = blocksPerSaving;
this->blocksCount = blocksCount;
this->qmin = qmin;
return *this;
}
AccumReal run() {
assert(layer && !layer->prev && layer->next);
assert(threadsCount > 0);
assert(itersPerBlock > 0);
printf("training: threads %d, itersPerBlock %d, ratio: %lf\n", threadsCount, itersPerBlock, ratio);
fl = layer;
bl = &layer->back();
qualities.clear();
qualities.resize(threadsCount, 0);
for(Layer *l = layer; l; l = l->next)
l->split(threadsCount);
if (!prepare())
return printf("cannot prepare\n"), -1;
AccumReal result = -1;
long long fullTimeStartUs = timeUs();
int i = 0;
while(true) {
if (!prepareBlock()) {
printf("cannot prepare block\n");
result = -1;
break;
};
long long runTimeUs = timeUs();
result = runThreads(i);
runTimeUs = timeUs() - runTimeUs;
finishBlock();
long long t = timeUs();
long long fullTimeUs = t - fullTimeStartUs;
fullTimeStartUs = t;
++i;
printf("%4d, total %7d, avg.result %f, time: %f / %f\n", i, i*itersPerBlock, result, runTimeUs*0.000001, fullTimeUs*0.000001);
bool done = (blocksCount > 0 && i >= blocksCount) || result <= qmin;
if (ratio > 0 && (blocksPerSaving <= 0 || i%blocksPerSaving == 0 || done) && !layer->save()) {
printf("saving failed\n");
result = -1;
break;
}
if (done) break;
}
finish();
return result;
}
};
#endif