c ++ – Implementing the work pool

With the new additions in C ++ 11 and C ++ 17, I wanted to create a simple implementation of the thread pool.

I would like to have your opinion on:

  • Wire security
  • API
  • show
  • and general quality of the code

I would also like to know if it's a good idea to have wait_until_empty method.
Without an identifier, I probably could have avoided using a mutex.

#ifndef WORKER_POOL_H
#define WORKER_POOL_H

#understand <../cpp11-on-multicore/common/sema.h>

#understand 
#understand 
#understand 
#understand 
#understand 
#understand 
#understand 
#understand 
#understand 
#understand 

#if __cplusplus < 201703l
#error "Compile using c++17 or later"
#endif

/**
 * Simplistic implementation of thread pool
 * using C++17.
 */
class worker_pool {
private:
  /**
   * Inner class that represents individual workers.
   */
  class worker {
  private:
    worker_pool *wp;
    long id;

  public:
    worker(worker_pool *_wp, long _id) : wp(_wp), id(_id){};

    /**
     * Main worker loop.
     */
    void operator()() {
      // work until asked to stop
      while (!wp->stop.load ()) {
auto t = wp-> fetch ();
// when asked to stop workers will wake up
// and receive a nullopt
if (t.has_value ())
value t () ();
}
};
};

std :: vector workers;
std :: queue <std :: function> job_queue;
// access control for the queue
std :: mutex queue_mutex;
Semaphore queue_sem;

// these 2 are used to notify that the queue has been emptied
std :: condition_variable cv_empty;
std :: mutex mx_empty;

// stop indicates that we have been asked to stop but workers are not dismissed
// again
std :: atomic stop;
// term means workers are fired
std :: atomic term;

/ **
* Safe working thread fetchind
* /
std :: optional <std :: function> fetch () {
queue_sem.wait ();
std :: unique_lock l (queue_mutex);
// do not return anything if he is asked to stop
if (stop.load ())
returns nullopt;
res auto = std :: move (job_queue.front ());
// it turns out that we have emptied the queue, warn all those waiting
job_queue.pop ();
if (job_queue.empty ())
cv_empty.notify_all ();
returns std :: move (res);
};

public:
/ **
* Initialization of the pool of workers with n workers.
* By default the number of workers is equal to the number
* kernels on the machine.
* /
worker_pool (long tcount = std :: thread :: hardware_concurrency ())
: queue_sem (0), stop (false), term (false) {
for (long i = 0; i <tcount; i ++) {
workers.push_back (std :: thread (worker (this, i)));
}
}

/ **
* Finish all workers before being destroyed
* /
~ worker_pool () {terminate (); }

/ **
* No copy and no move
* /
worker_pool (worker_pool const &) = delete;
worker_pool & operator = (worker_pool const &) = delete;
worker_pool (worker_pool &&) = delete;
worker_pool & operator = (worker_pool &&) = delete;

/ **
* Submission of secure tasks. Accept all callable and
* returns a future.
* /
model 
  automatic submission (F && f, ARGS && ... args) -> std :: future {
std :: lock_guard l (queue_mutex);
// wrap callable with arguments in a packed task
auto func = std :: bind (std :: forward(f), std :: forward(args) ...);
auto task_ptr =
std :: make_shared <std :: packaged_task> (func);
// Pack a task packaged in a simple lambda for convenience
job_queue.push ([task_ptr] {(* task_ptr) (); });
queue_sem.signal ();
return task_ptr-> get_future ();
}

/ **
* Finish will stop all workers by ignoring the remaining jobs.
* /
void terminate () {
// do nothing if it is already done
if (term.load ())
return;
stop.store (true);
// wake up all the workers
queue_sem.signal (workers.size ());
// wait for each worker to finish
for (size_t i = 0; i <workers.capacity (); i ++) {
if (workers[i].joinable ())
workers[i].join();
}
term.store (true);
}

/ **
* Check how many jobs remain in the queue
* /
long jobs_remaining () {
std :: lock_guard l (queue_mutex);
return job_queue.size ();
}

/ **
* This function will block until all
* the work of the queue has been processed
* /
void wait_until_empty () {
std :: unique_lock l (mx_empty);
while (! (job_queue.empty () || stop.load ()))
cv_empty.wait (1, [&] {return job_queue.empty () || stop.load (); });
}

/ **
* Check if there was a request to stop.
* Note: There may still be workers running.
* /
bool stopped () {return stop.load (); }

/ **
* Check if the workers were fired
* /
bool completed () {return term.load (); }
};

#endif // WORKER_POOL_H
`` `