add da_threading lib
All checks were successful
continuous-integration/drone/push Build is passing

retrieved from https://stackoverflow.com/a/46135882
original author LWimsey <https://stackoverflow.com/users/6651824/lwimsey> (c) 2017
edited and made into a header-only lib by wanderer <a_mirre at utb.cz> (c) 2021
This commit is contained in:
surtur 2021-12-12 22:50:06 +01:00
parent cf41d323a4
commit da48e61cc2
Signed by: wanderer
GPG Key ID: 19CE1EC1D9E0486D
3 changed files with 190 additions and 0 deletions

@ -22,6 +22,7 @@ if(GIT_FOUND AND EXISTS "${PROJECT_SOURCE_DIR}/.git")
endif()
add_subdirectory(lib/fmt EXCLUDE_FROM_ALL)
add_subdirectory(lib/da_threading EXCLUDE_FROM_ALL)
if(NOT CMAKE_CXX_FLAGS MATCHES "-Wall")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall")
@ -188,4 +189,5 @@ add_executable(fortuna main.cpp generator.cpp generator.h fortuna.cpp fortuna.h
target_link_libraries(fortuna
PRIVATE cryptopp
PRIVATE fmt::fmt-header-only
PRIVATE da_threading::da_threading
PRIVATE pthread)

@ -0,0 +1,14 @@
# author a_mirre <a_mirre at utb.cz> (c) 2021
cmake_minimum_required (VERSION 3.20)
project (da_threading LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
add_library(da_threading INTERFACE)
add_library(da_threading::da_threading ALIAS da_threading)
target_compile_definitions(da_threading INTERFACE)
target_include_directories(da_threading INTERFACE .)

@ -0,0 +1,174 @@
// retrieved from https://stackoverflow.com/a/46135882
// original author LWimsey <https://stackoverflow.com/users/6651824/lwimsey> (c) 2017
// edited and made into a header-only lib by a_mirre <a_mirre at utb.cz> (c) 2021
#ifndef DA_THREADING_H
#define DA_THREADING_H
#include <iostream>
#include <chrono>
#include <queue>
#include <unistd.h>
#include <vector>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <memory>
namespace da_threading {
class Task {
public:
virtual void run() = 0;
virtual ~Task() { }
};
class Scheduler {
public:
Scheduler();
~Scheduler();
void add(Task &task, double delayToRun);
private:
using timepoint = std::chrono::time_point<std::chrono::steady_clock>;
struct key {
timepoint tp;
Task *taskp;
};
struct TScomp {
bool operator()(const key &a, const key &b) const
{
return a.tp > b.tp;
}
};
const int ThreadPoolSize = 10;
std::vector<std::thread> ThreadPool;
std::vector<Task *> tasksReadyToRunQueue;
std::priority_queue<key, std::vector<key>, TScomp> allTasksQueue;
std::thread TimerThr;
std::mutex TimerMtx, WorkerMtx;
std::condition_variable TimerCV, WorkerCV;
bool WorkerIsRunning = true;
bool TimerIsRunning = true;
void worker_thread();
void timer_thread();
};
Scheduler::Scheduler() {
for (int i = 0; i < ThreadPoolSize; ++i) {
ThreadPool.push_back(std::thread(&Scheduler::worker_thread, this));
}
TimerThr = std::thread(&Scheduler::timer_thread, this);
}
Scheduler::~Scheduler() {
{
std::lock_guard<std::mutex> lck{TimerMtx};
TimerIsRunning = false;
TimerCV.notify_one();
}
TimerThr.join();
{
std::lock_guard<std::mutex> lck{WorkerMtx};
WorkerIsRunning = false;
WorkerCV.notify_all();
}
for (auto &t : ThreadPool)
t.join();
}
void Scheduler::add(Task &task, double delayToRun) {
auto now = std::chrono::steady_clock::now();
long delay_ms = delayToRun * 1000;
std::chrono::milliseconds duration (delay_ms);
timepoint tp = now + duration;
if (now >= tp) {
/*
* This is a short-cut
* When time is due, the task is directly dispatched to the workers
*/
std::lock_guard<std::mutex> lck{WorkerMtx};
tasksReadyToRunQueue.push_back(&task);
WorkerCV.notify_one();
} else {
std::lock_guard<std::mutex> lck{TimerMtx};
allTasksQueue.push({tp, &task});
TimerCV.notify_one();
}
}
void Scheduler::worker_thread() {
for (;;) {
std::unique_lock<std::mutex> lck{WorkerMtx};
WorkerCV.wait(lck, [this] { return tasksReadyToRunQueue.size() != 0 ||
!WorkerIsRunning; } );
if (!WorkerIsRunning) {
break;
}
Task *p = tasksReadyToRunQueue.back();
tasksReadyToRunQueue.pop_back();
lck.unlock();
p->run();
delete p; // delete Task
}
}
void Scheduler::timer_thread() {
for (;;) {
std::unique_lock<std::mutex> lck{TimerMtx};
if (!TimerIsRunning) {
break;
}
auto duration = std::chrono::nanoseconds(1000000000);
if (allTasksQueue.size() != 0) {
auto now = std::chrono::steady_clock::now();
auto head = allTasksQueue.top();
Task *p = head.taskp;
duration = head.tp - now;
if (now >= head.tp) {
/*
* A Task is due, pass to worker threads
*/
std::unique_lock<std::mutex> ulck{WorkerMtx};
tasksReadyToRunQueue.push_back(p);
WorkerCV.notify_one();
ulck.unlock();
allTasksQueue.pop();
}
}
TimerCV.wait_for(lck, duration);
}
}
} // namespace da_threading
#endif//DA_THREADING_H