Blame projects/neural/train.inc.cpp

e865c9
#ifndef NNTRAIN_INC_CPP
e865c9
#define NNTRAIN_INC_CPP
e865c9
e865c9
e865c9
#include <chrono></chrono>
e865c9
#include <thread></thread>
e865c9
e865c9
e865c9
#include "layer.inc.cpp"
e865c9
e865c9
e865c9
long long timeUs() {
e865c9
  static std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now();
e865c9
  return (long long)std::chrono::duration_cast<std::chrono::microseconds>( std::chrono::steady_clock::now() - begin ).count();</std::chrono::microseconds>
e865c9
}
e865c9
e865c9
e865c9
class Trainer {
e865c9
private:
e865c9
  std::atomic<unsigned int=""> barrierCounter;</unsigned>
e865c9
  std::vector<accumreal> qualities;</accumreal>
e865c9
e865c9
public:
e865c9
  Layer *layer;
e865c9
  AccumReal ratio;
e865c9
  int threadsCount;
e865c9
  int itersPerBlock;
e865c9
  int blocksPerSaving;
e865c9
  int blocksCount;
e865c9
  AccumReal qmin;
e865c9
e865c9
protected:
e865c9
  volatile bool doBackpassAtThisIteration;
e865c9
  Layer *fl;
e865c9
  Layer *bl;
e865c9
  
e865c9
  virtual bool prepare() { return true; }
e865c9
  virtual bool prepareBlock() { return true; }
e865c9
  virtual void finishBlock() { }
e865c9
  virtual void finish() { }
e865c9
e865c9
  virtual void loadData(Barrier &barrier, int block, int iter) { }
e865c9
  virtual AccumReal verifyData(Barrier &barrier, int block, int iter) { return 0; }
e865c9
e865c9
  virtual void loadDataMain(int block, int iter) { };
e865c9
  virtual AccumReal verifyDataMain(int block, int iter) { return 0; };
e865c9
e865c9
private:
e865c9
  void threadFunc(int tid, int block) {
e865c9
    Barrier barrier(barrierCounter, tid, threadsCount);
e865c9
e865c9
    volatile AccumReal &sumQ = qualities[tid] = 0;
e865c9
    for(int i = 0; i < itersPerBlock; ++i) {
e865c9
      barrier.wait();
e865c9
      loadData(barrier, block, i);
e865c9
      barrier.wait();
e865c9
      if (!tid) loadDataMain(block, i);
e865c9
e865c9
      for(Layer *l = fl->next; l; l = l->next) {
e865c9
        barrier.wait();
e865c9
        l->pass(barrier);
e865c9
      }
e865c9
e865c9
      barrier.wait();
e865c9
      sumQ += verifyData(barrier, block, i);
e865c9
      barrier.wait();
e865c9
      if (!tid) {
e865c9
        doBackpassAtThisIteration = true;
e865c9
        sumQ += verifyDataMain(block, i);
e865c9
      }
e865c9
      
e865c9
      barrier.wait();
e865c9
      if (ratio > 0 && doBackpassAtThisIteration) {
e865c9
        for(Layer *l = bl; l->prev && l->prev->prev; l = l->prev) {
e865c9
          barrier.wait();
e865c9
          l->backpassDeltas(barrier);
e865c9
        }
e865c9
        for(Layer *l = bl; l->prev; l = l->prev) {
e865c9
          barrier.wait();
e865c9
          l->backpassWeights(barrier);
e865c9
        }
e865c9
      }
e865c9
    }
e865c9
  }
e865c9
e865c9
e865c9
  AccumReal runThreads(int block) {
e865c9
    barrierCounter = 0;
e865c9
    std::vector<std::thread*> t(threadsCount, nullptr);</std::thread*>
e865c9
    for(int i = 1; i < threadsCount; ++i)
e865c9
      t[i] = new std::thread(&Trainer::threadFunc, this, i, block);
e865c9
    threadFunc(0, block);
e865c9
e865c9
    AccumReal result = qualities[0];
e865c9
    for(int i = 1; i < threadsCount; ++i)
e865c9
      { t[i]->join(); delete t[i]; result += qualities[i]; }
e865c9
    return result / itersPerBlock;
e865c9
  }
e865c9
e865c9
e865c9
public:
e865c9
  Trainer():
e865c9
    barrierCounter(0),
e865c9
    layer(),
e865c9
    ratio(),
e865c9
    threadsCount(1),
e865c9
    itersPerBlock(100),
e865c9
    blocksPerSaving(),
e865c9
    blocksCount(1000),
e865c9
    qmin(),
e865c9
    doBackpassAtThisIteration(),
e865c9
    fl(),
e865c9
    bl() { }
e865c9
e865c9
e865c9
  Trainer& configure(
e865c9
    Layer &layer,
e865c9
    AccumReal ratio,
e865c9
    int threadsCount,
e865c9
    int itersPerBlock,
e865c9
    int blocksPerSaving,
e865c9
    int blocksCount,
e865c9
    AccumReal qmin )
e865c9
  {
e865c9
    this->layer           = &layer;
e865c9
    this->ratio           = ratio;
e865c9
    this->threadsCount    = threadsCount;
e865c9
    this->itersPerBlock   = itersPerBlock;
e865c9
    this->blocksPerSaving = blocksPerSaving;
e865c9
    this->blocksCount     = blocksCount;
e865c9
    this->qmin            = qmin;
e865c9
    return *this;
e865c9
  }
e865c9
e865c9
e865c9
  AccumReal run() {
e865c9
    assert(layer && !layer->prev && layer->next);
e865c9
    assert(threadsCount > 0);
e865c9
    assert(itersPerBlock > 0);
e865c9
e865c9
    printf("training: threads %d, itersPerBlock %d, ratio: %lf\n", threadsCount, itersPerBlock, ratio);
e865c9
e865c9
    fl = layer;
e865c9
    bl = &layer->back();
e865c9
    
e865c9
    qualities.clear();
e865c9
    qualities.resize(threadsCount, 0);
e865c9
    for(Layer *l = layer; l; l = l->next)
e865c9
      l->split(threadsCount);
e865c9
    
e865c9
    if (!prepare())
e865c9
      return printf("cannot prepare\n"), -1;
e865c9
e865c9
    AccumReal result = -1;
e865c9
    long long fullTimeStartUs = timeUs();
e865c9
    int i = 0;
e865c9
    while(true) {
e865c9
      if (!prepareBlock()) {
e865c9
        printf("cannot prepare block\n");
e865c9
        result = -1;
e865c9
        break;
e865c9
      };
e865c9
e865c9
      long long runTimeUs = timeUs();
e865c9
      result = runThreads(i);
e865c9
      runTimeUs = timeUs() - runTimeUs;
e865c9
e865c9
      finishBlock();
e865c9
e865c9
      long long t = timeUs();
e865c9
      long long fullTimeUs = t - fullTimeStartUs;
e865c9
      fullTimeStartUs = t;
e865c9
      ++i;
e865c9
e865c9
      printf("%4d, total %7d, avg.result %f, time: %f / %f\n", i, i*itersPerBlock, result, runTimeUs*0.000001, fullTimeUs*0.000001);
e865c9
e865c9
      bool done = (blocksCount > 0 && i >= blocksCount) || result <= qmin;
e865c9
e865c9
      if (ratio > 0 && (blocksPerSaving <= 0 || i%blocksPerSaving == 0 || done) && !layer->save()) {
e865c9
        printf("saving failed\n");
e865c9
        result = -1;
e865c9
        break;
e865c9
      }
e865c9
e865c9
      if (done) break;
e865c9
    }
e865c9
e865c9
    finish();
e865c9
e865c9
    return result;
e865c9
  }
e865c9
};
e865c9
e865c9
e865c9
#endif