Blob Blame Raw
#ifndef TRAIN_INC_CPP
#define TRAIN_INC_CPP


#include <chrono>
#include <thread>


#include "layer.inc.cpp"


long long timeUs() {
  static std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now();
  return (long long)std::chrono::duration_cast<std::chrono::microseconds>( std::chrono::steady_clock::now() - begin ).count();
}


struct Quality {
  AccumReal train;
  AccumReal human;
  
  inline Quality& operator+=(const Quality &b) {
    train += b.train;
    human += b.human;
    return *this;
  }

  inline Quality& operator*=(AccumReal x) {
    train *= x;
    human *= x;
    return *this;
  }
  
  inline bool operator<(const Quality &b) const {
    return human < b.human ? true
         : b.human < human ? false
         : train < b.train;
  }
};


class Trainer {
private:
  std::atomic<unsigned int> barrierCounter;
  std::vector<Quality> qualities;

public:
  Layer *layer;
  AccumReal ratio;
  int threadsCount;
  int itersPerBlock;
  int blocksPerSaving;
  int blocksCount;
  AccumReal qmin;

protected:
  std::atomic<unsigned int> skipBackpass;
  Layer *fl;
  Layer *bl;
  
  virtual bool prepare() { return true; }
  virtual bool prepareBlock() { return true; }
  virtual void finishBlock() { }
  virtual void finish() { }

  virtual void loadData(Barrier &barrier, int block, int iter) { }
  virtual Quality verifyData(Barrier &barrier, int block, int iter) { return Quality{}; }

private:
  void threadFunc(int tid, unsigned int seed, int block) {
    Barrier barrier(barrierCounter, tid, threadsCount, seed);

    Quality sumQ = {};
    for(int i = 0; i < itersPerBlock; ++i) {
      barrier.wait();
      loadData(barrier, block, i);

      for(Layer *l = fl->next; l; l = l->next) {
        barrier.wait();
        l->pass(barrier);
      }

      barrier.wait();
      skipBackpass = 0;
      sumQ += verifyData(barrier, block, i);
      
      barrier.wait();
      bool skipBp = skipBackpass;
      
      barrier.wait();
      if (ratio > 0 && !skipBp) {
        for(Layer *l = bl; l->prev && l->prev->prev; l = l->prev) {
          barrier.wait();
          l->backpassDeltas(barrier);
        }
        for(Layer *l = bl; l->prev; l = l->prev) {
          barrier.wait();
          l->backpassWeights(barrier);
        }
      }
    }
    qualities[tid] = sumQ;
  }


  Quality runThreads(int block) {
    barrierCounter = 0;
    std::vector<std::thread*> t(threadsCount, nullptr);
    for(int i = 1; i < threadsCount; ++i)
      t[i] = new std::thread(&Trainer::threadFunc, this, i, block, rand());
    threadFunc(0, block, rand());

    Quality result = qualities[0];
    for(int i = 1; i < threadsCount; ++i)
      { t[i]->join(); delete t[i]; result += qualities[i]; }
    return result *= 1/(AccumReal)itersPerBlock;
  }


public:
  Trainer():
    barrierCounter(0),
    layer(),
    ratio(),
    threadsCount(1),
    itersPerBlock(100),
    blocksPerSaving(),
    blocksCount(1000),
    qmin(),
    skipBackpass(0),
    fl(),
    bl() { }


  Trainer& configure(
    Layer &layer,
    AccumReal ratio,
    int threadsCount,
    int itersPerBlock,
    int blocksPerSaving,
    int blocksCount,
    AccumReal qmin )
  {
    this->layer           = &layer;
    this->ratio           = ratio;
    this->threadsCount    = threadsCount;
    this->itersPerBlock   = itersPerBlock;
    this->blocksPerSaving = blocksPerSaving;
    this->blocksCount     = blocksCount;
    this->qmin            = qmin;
    return *this;
  }


  Quality run() {
    assert(layer && !layer->prev && layer->next);
    assert(threadsCount > 0);
    assert(itersPerBlock > 0);

    Quality bad = {INFINITY, INFINITY};
    
    printf("training: threads %d, itersPerBlock %d, ratio: %lf\n", threadsCount, itersPerBlock, ratio);
    fflush(stdout);

    fl = layer;
    bl = &layer->back();
    
    qualities.clear();
    qualities.resize(threadsCount, Quality{});
    for(Layer *l = layer; l; l = l->next)
      l->split(threadsCount);
    
    if (!prepare())
      return printf("cannot prepare\n"), bad;

    
    
    AccumReal ratioCopy = ratio;
    Quality result = bad, best = result, saved = result;
    long long fullTimeStartUs = timeUs();
    ratio = 0;
    int i = 0;
    int bps = blocksPerSaving > 0 ? blocksPerSaving : 1;
    int nextSave = i + bps;
    while(true) {
      if (!prepareBlock()) {
        printf("cannot prepare block\n");
        result = bad;
        break;
      };

      long long runTimeUs = timeUs();
      result = runThreads(i);
      runTimeUs = timeUs() - runTimeUs;

      finishBlock();

      long long t = timeUs();
      long long fullTimeUs = t - fullTimeStartUs;
      fullTimeStartUs = t;
      ++i;

      if (i == 1) saved = result;
      bool good = result < best;
      bool done = (blocksCount > 0 && i >= blocksCount) || result.human <= qmin;
      bool saving = ratio > 0 && (i >= nextSave || done) && result < saved;
      if (good) best = result;
      
      printf("%4d, total %7d, avg.result %f (%f), best %f (%f), time: %f / %f%s\n",
        i, i*itersPerBlock,
        result.human, result.train, best.human, best.train,
        runTimeUs*0.000001, fullTimeUs*0.000001,
        (saving ? ", saving" : "" ) );
      fflush(stdout);

      if (saving) {
        if (!layer->save()) {
          printf("saving failed\n");
          result = bad;
          break;
        }
        saved = result;
        nextSave += bps;
      }

      if (done) break;
      ratio = ratioCopy;
    }
    ratio = ratioCopy;

    finish();

    return result;
  }
};


#endif