forked from ak-fortuna/fortuna
retrieved from https://stackoverflow.com/a/46135882 original author LWimsey <https://stackoverflow.com/users/6651824/lwimsey> (c) 2017 edited and made into a header-only lib by wanderer <a_mirre at utb.cz> (c) 2021
175 lines
3.4 KiB
C++
175 lines
3.4 KiB
C++
// retrieved from https://stackoverflow.com/a/46135882
|
|
// original author LWimsey <https://stackoverflow.com/users/6651824/lwimsey> (c) 2017
|
|
// edited and made into a header-only lib by a_mirre <a_mirre at utb.cz> (c) 2021
|
|
#ifndef DA_THREADING_H
|
|
#define DA_THREADING_H
|
|
|
|
#include <iostream>
|
|
#include <chrono>
|
|
#include <queue>
|
|
#include <unistd.h>
|
|
#include <vector>
|
|
#include <thread>
|
|
#include <condition_variable>
|
|
#include <mutex>
|
|
#include <memory>
|
|
|
|
namespace da_threading {
|
|
|
|
class Task {
|
|
public:
|
|
virtual void run() = 0;
|
|
virtual ~Task() { }
|
|
};
|
|
|
|
class Scheduler {
|
|
public:
|
|
Scheduler();
|
|
~Scheduler();
|
|
|
|
void add(Task &task, double delayToRun);
|
|
|
|
private:
|
|
using timepoint = std::chrono::time_point<std::chrono::steady_clock>;
|
|
|
|
struct key {
|
|
timepoint tp;
|
|
Task *taskp;
|
|
};
|
|
|
|
struct TScomp {
|
|
bool operator()(const key &a, const key &b) const
|
|
{
|
|
return a.tp > b.tp;
|
|
}
|
|
};
|
|
|
|
const int ThreadPoolSize = 10;
|
|
|
|
std::vector<std::thread> ThreadPool;
|
|
std::vector<Task *> tasksReadyToRunQueue;
|
|
|
|
std::priority_queue<key, std::vector<key>, TScomp> allTasksQueue;
|
|
|
|
std::thread TimerThr;
|
|
std::mutex TimerMtx, WorkerMtx;
|
|
std::condition_variable TimerCV, WorkerCV;
|
|
|
|
bool WorkerIsRunning = true;
|
|
bool TimerIsRunning = true;
|
|
|
|
void worker_thread();
|
|
void timer_thread();
|
|
};
|
|
|
|
Scheduler::Scheduler() {
|
|
for (int i = 0; i < ThreadPoolSize; ++i) {
|
|
ThreadPool.push_back(std::thread(&Scheduler::worker_thread, this));
|
|
}
|
|
|
|
TimerThr = std::thread(&Scheduler::timer_thread, this);
|
|
}
|
|
|
|
Scheduler::~Scheduler() {
|
|
{
|
|
std::lock_guard<std::mutex> lck{TimerMtx};
|
|
TimerIsRunning = false;
|
|
TimerCV.notify_one();
|
|
}
|
|
TimerThr.join();
|
|
|
|
{
|
|
std::lock_guard<std::mutex> lck{WorkerMtx};
|
|
WorkerIsRunning = false;
|
|
WorkerCV.notify_all();
|
|
}
|
|
for (auto &t : ThreadPool)
|
|
t.join();
|
|
}
|
|
|
|
void Scheduler::add(Task &task, double delayToRun) {
|
|
auto now = std::chrono::steady_clock::now();
|
|
long delay_ms = delayToRun * 1000;
|
|
|
|
std::chrono::milliseconds duration (delay_ms);
|
|
|
|
timepoint tp = now + duration;
|
|
|
|
if (now >= tp) {
|
|
/*
|
|
* This is a short-cut
|
|
* When time is due, the task is directly dispatched to the workers
|
|
*/
|
|
std::lock_guard<std::mutex> lck{WorkerMtx};
|
|
tasksReadyToRunQueue.push_back(&task);
|
|
WorkerCV.notify_one();
|
|
|
|
} else {
|
|
std::lock_guard<std::mutex> lck{TimerMtx};
|
|
|
|
allTasksQueue.push({tp, &task});
|
|
|
|
TimerCV.notify_one();
|
|
}
|
|
}
|
|
|
|
void Scheduler::worker_thread() {
|
|
for (;;) {
|
|
std::unique_lock<std::mutex> lck{WorkerMtx};
|
|
|
|
WorkerCV.wait(lck, [this] { return tasksReadyToRunQueue.size() != 0 ||
|
|
!WorkerIsRunning; } );
|
|
|
|
if (!WorkerIsRunning) {
|
|
break;
|
|
}
|
|
|
|
Task *p = tasksReadyToRunQueue.back();
|
|
tasksReadyToRunQueue.pop_back();
|
|
|
|
lck.unlock();
|
|
|
|
p->run();
|
|
|
|
delete p; // delete Task
|
|
}
|
|
}
|
|
|
|
void Scheduler::timer_thread() {
|
|
for (;;) {
|
|
std::unique_lock<std::mutex> lck{TimerMtx};
|
|
|
|
if (!TimerIsRunning) {
|
|
break;
|
|
}
|
|
|
|
auto duration = std::chrono::nanoseconds(1000000000);
|
|
|
|
if (allTasksQueue.size() != 0) {
|
|
auto now = std::chrono::steady_clock::now();
|
|
|
|
auto head = allTasksQueue.top();
|
|
Task *p = head.taskp;
|
|
|
|
duration = head.tp - now;
|
|
if (now >= head.tp) {
|
|
/*
|
|
* A Task is due, pass to worker threads
|
|
*/
|
|
std::unique_lock<std::mutex> ulck{WorkerMtx};
|
|
tasksReadyToRunQueue.push_back(p);
|
|
WorkerCV.notify_one();
|
|
ulck.unlock();
|
|
|
|
allTasksQueue.pop();
|
|
}
|
|
}
|
|
|
|
TimerCV.wait_for(lck, duration);
|
|
}
|
|
}
|
|
|
|
} // namespace da_threading
|
|
|
|
#endif//DA_THREADING_H
|