This repository has been archived on 2022-02-10. You can view files and clone it, but cannot push or open issues or pull requests.
fortuna/lib/da_threading/da_threading.h
surtur da48e61cc2
All checks were successful
continuous-integration/drone/push Build is passing
add da_threading lib
retrieved from https://stackoverflow.com/a/46135882
original author LWimsey <https://stackoverflow.com/users/6651824/lwimsey> (c) 2017
edited and made into a header-only lib by wanderer <a_mirre at utb.cz> (c) 2021
2021-12-12 22:50:06 +01:00

175 lines
3.4 KiB
C++

// retrieved from https://stackoverflow.com/a/46135882
// original author LWimsey <https://stackoverflow.com/users/6651824/lwimsey> (c) 2017
// edited and made into a header-only lib by a_mirre <a_mirre at utb.cz> (c) 2021
#ifndef DA_THREADING_H
#define DA_THREADING_H
#include <iostream>
#include <chrono>
#include <queue>
#include <unistd.h>
#include <vector>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <memory>
namespace da_threading {
class Task {
public:
virtual void run() = 0;
virtual ~Task() { }
};
class Scheduler {
public:
Scheduler();
~Scheduler();
void add(Task &task, double delayToRun);
private:
using timepoint = std::chrono::time_point<std::chrono::steady_clock>;
struct key {
timepoint tp;
Task *taskp;
};
struct TScomp {
bool operator()(const key &a, const key &b) const
{
return a.tp > b.tp;
}
};
const int ThreadPoolSize = 10;
std::vector<std::thread> ThreadPool;
std::vector<Task *> tasksReadyToRunQueue;
std::priority_queue<key, std::vector<key>, TScomp> allTasksQueue;
std::thread TimerThr;
std::mutex TimerMtx, WorkerMtx;
std::condition_variable TimerCV, WorkerCV;
bool WorkerIsRunning = true;
bool TimerIsRunning = true;
void worker_thread();
void timer_thread();
};
Scheduler::Scheduler() {
for (int i = 0; i < ThreadPoolSize; ++i) {
ThreadPool.push_back(std::thread(&Scheduler::worker_thread, this));
}
TimerThr = std::thread(&Scheduler::timer_thread, this);
}
Scheduler::~Scheduler() {
{
std::lock_guard<std::mutex> lck{TimerMtx};
TimerIsRunning = false;
TimerCV.notify_one();
}
TimerThr.join();
{
std::lock_guard<std::mutex> lck{WorkerMtx};
WorkerIsRunning = false;
WorkerCV.notify_all();
}
for (auto &t : ThreadPool)
t.join();
}
void Scheduler::add(Task &task, double delayToRun) {
auto now = std::chrono::steady_clock::now();
long delay_ms = delayToRun * 1000;
std::chrono::milliseconds duration (delay_ms);
timepoint tp = now + duration;
if (now >= tp) {
/*
* This is a short-cut
* When time is due, the task is directly dispatched to the workers
*/
std::lock_guard<std::mutex> lck{WorkerMtx};
tasksReadyToRunQueue.push_back(&task);
WorkerCV.notify_one();
} else {
std::lock_guard<std::mutex> lck{TimerMtx};
allTasksQueue.push({tp, &task});
TimerCV.notify_one();
}
}
void Scheduler::worker_thread() {
for (;;) {
std::unique_lock<std::mutex> lck{WorkerMtx};
WorkerCV.wait(lck, [this] { return tasksReadyToRunQueue.size() != 0 ||
!WorkerIsRunning; } );
if (!WorkerIsRunning) {
break;
}
Task *p = tasksReadyToRunQueue.back();
tasksReadyToRunQueue.pop_back();
lck.unlock();
p->run();
delete p; // delete Task
}
}
void Scheduler::timer_thread() {
for (;;) {
std::unique_lock<std::mutex> lck{TimerMtx};
if (!TimerIsRunning) {
break;
}
auto duration = std::chrono::nanoseconds(1000000000);
if (allTasksQueue.size() != 0) {
auto now = std::chrono::steady_clock::now();
auto head = allTasksQueue.top();
Task *p = head.taskp;
duration = head.tp - now;
if (now >= head.tp) {
/*
* A Task is due, pass to worker threads
*/
std::unique_lock<std::mutex> ulck{WorkerMtx};
tasksReadyToRunQueue.push_back(p);
WorkerCV.notify_one();
ulck.unlock();
allTasksQueue.pop();
}
}
TimerCV.wait_for(lck, duration);
}
}
} // namespace da_threading
#endif//DA_THREADING_H