diff --git a/CMakeLists.txt b/CMakeLists.txt index 3ef7229..94587dd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -22,6 +22,7 @@ if(GIT_FOUND AND EXISTS "${PROJECT_SOURCE_DIR}/.git") endif() add_subdirectory(lib/fmt EXCLUDE_FROM_ALL) +add_subdirectory(lib/da_threading EXCLUDE_FROM_ALL) if(NOT CMAKE_CXX_FLAGS MATCHES "-Wall") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall") @@ -188,4 +189,5 @@ add_executable(fortuna main.cpp generator.cpp generator.h fortuna.cpp fortuna.h target_link_libraries(fortuna PRIVATE cryptopp PRIVATE fmt::fmt-header-only + PRIVATE da_threading::da_threading PRIVATE pthread) diff --git a/lib/da_threading/CMakeLists.txt b/lib/da_threading/CMakeLists.txt new file mode 100644 index 0000000..831a519 --- /dev/null +++ b/lib/da_threading/CMakeLists.txt @@ -0,0 +1,14 @@ +# author a_mirre (c) 2021 +cmake_minimum_required (VERSION 3.20) + +project (da_threading LANGUAGES CXX) + +set(CMAKE_CXX_STANDARD 11) +set(CMAKE_CXX_STANDARD_REQUIRED ON) + +add_library(da_threading INTERFACE) +add_library(da_threading::da_threading ALIAS da_threading) + +target_compile_definitions(da_threading INTERFACE) + +target_include_directories(da_threading INTERFACE .) diff --git a/lib/da_threading/da_threading.h b/lib/da_threading/da_threading.h new file mode 100644 index 0000000..18b98c0 --- /dev/null +++ b/lib/da_threading/da_threading.h @@ -0,0 +1,174 @@ +// retrieved from https://stackoverflow.com/a/46135882 +// original author LWimsey (c) 2017 +// edited and made into a header-only lib by a_mirre (c) 2021 +#ifndef DA_THREADING_H +#define DA_THREADING_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace da_threading { + +class Task { +public: + virtual void run() = 0; + virtual ~Task() { } +}; + +class Scheduler { +public: + Scheduler(); + ~Scheduler(); + + void add(Task &task, double delayToRun); + +private: + using timepoint = std::chrono::time_point; + + struct key { + timepoint tp; + Task *taskp; + }; + + struct TScomp { + bool operator()(const key &a, const key &b) const + { + return a.tp > b.tp; + } + }; + + const int ThreadPoolSize = 10; + + std::vector ThreadPool; + std::vector tasksReadyToRunQueue; + + std::priority_queue, TScomp> allTasksQueue; + + std::thread TimerThr; + std::mutex TimerMtx, WorkerMtx; + std::condition_variable TimerCV, WorkerCV; + + bool WorkerIsRunning = true; + bool TimerIsRunning = true; + + void worker_thread(); + void timer_thread(); +}; + +Scheduler::Scheduler() { + for (int i = 0; i < ThreadPoolSize; ++i) { + ThreadPool.push_back(std::thread(&Scheduler::worker_thread, this)); + } + + TimerThr = std::thread(&Scheduler::timer_thread, this); +} + +Scheduler::~Scheduler() { + { + std::lock_guard lck{TimerMtx}; + TimerIsRunning = false; + TimerCV.notify_one(); + } + TimerThr.join(); + + { + std::lock_guard lck{WorkerMtx}; + WorkerIsRunning = false; + WorkerCV.notify_all(); + } + for (auto &t : ThreadPool) + t.join(); +} + +void Scheduler::add(Task &task, double delayToRun) { + auto now = std::chrono::steady_clock::now(); + long delay_ms = delayToRun * 1000; + + std::chrono::milliseconds duration (delay_ms); + + timepoint tp = now + duration; + + if (now >= tp) { + /* + * This is a short-cut + * When time is due, the task is directly dispatched to the workers + */ + std::lock_guard lck{WorkerMtx}; + tasksReadyToRunQueue.push_back(&task); + WorkerCV.notify_one(); + + } else { + std::lock_guard lck{TimerMtx}; + + allTasksQueue.push({tp, &task}); + + TimerCV.notify_one(); + } +} + +void Scheduler::worker_thread() { + for (;;) { + std::unique_lock lck{WorkerMtx}; + + WorkerCV.wait(lck, [this] { return tasksReadyToRunQueue.size() != 0 || + !WorkerIsRunning; } ); + + if (!WorkerIsRunning) { + break; + } + + Task *p = tasksReadyToRunQueue.back(); + tasksReadyToRunQueue.pop_back(); + + lck.unlock(); + + p->run(); + + delete p; // delete Task + } +} + +void Scheduler::timer_thread() { + for (;;) { + std::unique_lock lck{TimerMtx}; + + if (!TimerIsRunning) { + break; + } + + auto duration = std::chrono::nanoseconds(1000000000); + + if (allTasksQueue.size() != 0) { + auto now = std::chrono::steady_clock::now(); + + auto head = allTasksQueue.top(); + Task *p = head.taskp; + + duration = head.tp - now; + if (now >= head.tp) { + /* + * A Task is due, pass to worker threads + */ + std::unique_lock ulck{WorkerMtx}; + tasksReadyToRunQueue.push_back(p); + WorkerCV.notify_one(); + ulck.unlock(); + + allTasksQueue.pop(); + } + } + + TimerCV.wait_for(lck, duration); + } +} + +} // namespace da_threading + +#endif//DA_THREADING_H