From aa77300fd2aa2745f5ccbc62deb5302c77688f65 Mon Sep 17 00:00:00 2001 From: Pantelis Antoniou Date: Mon, 11 Sep 2023 18:46:59 +0300 Subject: [PATCH 1/3] Remove unneeded memset for clearing state We can get rid of uneeded memsets when clearing state, since the count is always reset. We can clear the hasher state on init if we need to start from zeroes. Signed-off-by: Pantelis Antoniou --- c/blake3.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/c/blake3.c b/c/blake3.c index 692f4b0..69d59d7 100644 --- a/c/blake3.c +++ b/c/blake3.c @@ -11,7 +11,6 @@ INLINE void chunk_state_init(blake3_chunk_state *self, const uint32_t key[8], uint8_t flags) { memcpy(self->cv, key, BLAKE3_KEY_LEN); self->chunk_counter = 0; - memset(self->buf, 0, BLAKE3_BLOCK_LEN); self->buf_len = 0; self->blocks_compressed = 0; self->flags = flags; @@ -22,7 +21,6 @@ INLINE void chunk_state_reset(blake3_chunk_state *self, const uint32_t key[8], memcpy(self->cv, key, BLAKE3_KEY_LEN); self->chunk_counter = chunk_counter; self->blocks_compressed = 0; - memset(self->buf, 0, BLAKE3_BLOCK_LEN); self->buf_len = 0; } @@ -121,7 +119,6 @@ INLINE void chunk_state_update(blake3_chunk_state *self, const uint8_t *input, self->flags | chunk_state_maybe_start_flag(self)); self->blocks_compressed += 1; self->buf_len = 0; - memset(self->buf, 0, BLAKE3_BLOCK_LEN); } } @@ -361,6 +358,7 @@ INLINE void compress_subtree_to_parent_node( INLINE void hasher_init_base(blake3_hasher *self, const uint32_t key[8], uint8_t flags) { + memset(self, 0, sizeof(*self)); memcpy(self->key, key, BLAKE3_KEY_LEN); chunk_state_init(&self->chunk, key, flags); self->cv_stack_len = 0; From b8ef7c2441455912b360f7bde6b4a03d06df7af2 Mon Sep 17 00:00:00 2001 From: Pantelis Antoniou Date: Mon, 11 Sep 2023 17:48:52 +0300 Subject: [PATCH 2/3] Introduce C multi-threading MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is a minimal thread-pool implementation for BLAKE3. As part of the commit a very simple file mmap-ing example is introduced that providing file checksuming. At least on my machines (x86-64 linux boxes) the C implementation is within a 1% point in performance of the rust b3sum tool. > $ ls -l ggml-vicuna-7b-1.1-q4_0.bin -rw-rw-r-- 1 panto panto 3791725184 Ιουν 17 13:34 ggml-vicuna-7b-1.1-q4_0.bin > $ time b3sum ggml-vicuna-7b-1.1-q4_0.bin > fe87fecc4427c4661aa46f52a7e286ecd6dd7f5f788564abb2c46d2ed5341584 ggml-vicuna-7b-1.1-q4_0.bin > > real 0m0,190s > user 0m1,583s > sys 0m0,196s > $ time ./example-mmap ggml-vicuna-7b-1.1-q4_0.bin > fe87fecc4427c4661aa46f52a7e286ecd6dd7f5f788564abb2c46d2ed5341584 ggml-vicuna-7b-1.1-q4_0.bin > > real 0m0,204s > user 0m1,525s > sys 0m0,247s At my local copy of BLAKE3 with more extensive changes I can get the C implementation to be slightly faster than the rust one. But those require more extensive changes which will have to come much later. Signed-off-by: Pantelis Antoniou --- c/.gitignore | 1 + c/CMakeLists.txt | 1 + c/Makefile.testing | 9 +- c/blake3.c | 71 ++++++- c/blake3_dispatch.c | 20 ++ c/blake3_impl.h | 2 + c/blake3_thread.c | 503 ++++++++++++++++++++++++++++++++++++++++++++ c/blake3_thread.h | 64 ++++++ c/example-mmap.c | 137 ++++++++++++ 9 files changed, 799 insertions(+), 9 deletions(-) create mode 100644 c/blake3_thread.c create mode 100644 c/blake3_thread.h create mode 100644 c/example-mmap.c diff --git a/c/.gitignore b/c/.gitignore index 3d4b704..4fa6158 100644 --- a/c/.gitignore +++ b/c/.gitignore @@ -1,4 +1,5 @@ blake3 example +example-mmap build/ *.o diff --git a/c/CMakeLists.txt b/c/CMakeLists.txt index 3541a7e..1252350 100644 --- a/c/CMakeLists.txt +++ b/c/CMakeLists.txt @@ -35,6 +35,7 @@ add_library(blake3 blake3.c blake3_dispatch.c blake3_portable.c + blake3_thread.c ) add_library(BLAKE3::blake3 ALIAS blake3) diff --git a/c/Makefile.testing b/c/Makefile.testing index b540528..c8a94c4 100644 --- a/c/Makefile.testing +++ b/c/Makefile.testing @@ -46,7 +46,7 @@ ifdef BLAKE3_NO_NEON EXTRAFLAGS += -DBLAKE3_USE_NEON=0 endif -all: blake3.c blake3_dispatch.c blake3_portable.c main.c $(TARGETS) +all: blake3.c blake3_dispatch.c blake3_portable.c main.c blake3_thread.c $(TARGETS) $(CC) $(CFLAGS) $(EXTRAFLAGS) $^ -o $(NAME) $(LDFLAGS) blake3_sse2.o: blake3_sse2.c @@ -68,14 +68,17 @@ test: CFLAGS += -DBLAKE3_TESTING -fsanitize=address,undefined test: all ./test.py -asm: blake3.c blake3_dispatch.c blake3_portable.c main.c $(ASM_TARGETS) +asm: blake3.c blake3_dispatch.c blake3_portable.c main.c blake3_thread.c $(ASM_TARGETS) $(CC) $(CFLAGS) $(EXTRAFLAGS) $^ -o $(NAME) $(LDFLAGS) test_asm: CFLAGS += -DBLAKE3_TESTING -fsanitize=address,undefined test_asm: asm ./test.py -example: example.c blake3.c blake3_dispatch.c blake3_portable.c $(ASM_TARGETS) +example: example.c blake3.c blake3_dispatch.c blake3_portable.c blake3_thread.c $(ASM_TARGETS) + $(CC) $(CFLAGS) $(EXTRAFLAGS) $^ -o $@ $(LDFLAGS) + +example-mmap: example-mmap.c blake3.c blake3_dispatch.c blake3_portable.c blake3_thread.c $(ASM_TARGETS) $(CC) $(CFLAGS) $(EXTRAFLAGS) $^ -o $@ $(LDFLAGS) clean: diff --git a/c/blake3.c b/c/blake3.c index 69d59d7..512adda 100644 --- a/c/blake3.c +++ b/c/blake3.c @@ -4,6 +4,7 @@ #include "blake3.h" #include "blake3_impl.h" +#include "blake3_thread.h" const char *blake3_version(void) { return BLAKE3_VERSION_STRING; } @@ -241,6 +242,44 @@ INLINE size_t compress_parents_parallel(const uint8_t *child_chaining_values, } } + +// the state for the thread when doing compress subtree +typedef struct { + // inputs + const uint8_t *input; + size_t input_len; + const uint32_t *key; + uint64_t chunk_counter; + uint8_t flags; + // outputs + uint8_t *out; + size_t n; +} blake3_compress_subtree_state; + +static size_t blake3_compress_subtree_wide(const uint8_t *input, + size_t input_len, + const uint32_t key[8], + uint64_t chunk_counter, + uint8_t flags, uint8_t *out); + +static bool blake3_compress_subtree_wide_work_check(const void *arg) { + const blake3_compress_subtree_state *s = arg; + + /* only off-load to thread if we have enough input (8 chunks at least) */ + return s->input_len >= 8 * BLAKE3_CHUNK_LEN; +} + +static void blake3_compress_subtree_wide_thread(void *arg) { + blake3_compress_subtree_state *s = arg; + + s->n = blake3_compress_subtree_wide( + s->input, s->input_len, + s->key, + s->chunk_counter, + s->flags, + s->out); +} + // The wide helper function returns (writes out) an array of chaining values // and returns the length of that array. The number of chaining values returned // is the dynamically detected SIMD degree, at most MAX_SIMD_DEGREE. Or fewer, @@ -296,12 +335,32 @@ static size_t blake3_compress_subtree_wide(const uint8_t *input, } uint8_t *right_cvs = &cv_array[degree * BLAKE3_OUT_LEN]; - // Recurse! If this implementation adds multi-threading support in the - // future, this is where it will go. - size_t left_n = blake3_compress_subtree_wide(input, left_input_len, key, - chunk_counter, flags, cv_array); - size_t right_n = blake3_compress_subtree_wide( - right_input, right_input_len, key, right_chunk_counter, flags, right_cvs); + // Recurse! this is the multi-threaded implementation + blake3_compress_subtree_state states[2]; + + /* common */ + states[0].key = states[1].key = key; + states[0].flags = states[1].flags = flags; + + /* left */ + states[0].input = input; + states[0].input_len = left_input_len; + states[0].chunk_counter = chunk_counter; + states[0].out = cv_array; + + /* right */ + states[1].input = right_input; + states[1].input_len = right_input_len; + states[1].chunk_counter = right_chunk_counter; + states[1].out = right_cvs; + + blake3_thread_arg_array_join(blake3_get_thread_pool(), + blake3_compress_subtree_wide_thread, + blake3_compress_subtree_wide_work_check, + states, sizeof(states[0]), 2); + + size_t left_n = states[0].n; + size_t right_n = states[1].n; // The special case again. If simd_degree=1, then we'll have left_n=1 and // right_n=1. Rather than compressing them into a single output, return diff --git a/c/blake3_dispatch.c b/c/blake3_dispatch.c index af6c3da..c9984cc 100644 --- a/c/blake3_dispatch.c +++ b/c/blake3_dispatch.c @@ -1,8 +1,10 @@ #include #include #include +#include #include "blake3_impl.h" +#include "blake3_thread.h" #if defined(IS_X86) #if defined(_MSC_VER) @@ -303,3 +305,21 @@ size_t blake3_simd_degree(void) { #endif return 1; } + +blake3_thread_pool *blake3_get_thread_pool(void) { + static _Atomic(blake3_thread_pool *)g_thread_pool; + blake3_thread_pool *tp, *exp_tp; + + if ((tp = atomic_load(&g_thread_pool)) == NULL) { + tp = blake3_thread_pool_create(0); /* let the pool implementation choose */ + assert(tp); + exp_tp = NULL; + /* store it, if the comparison fails, some other thread won, use theirs */ + if (!atomic_compare_exchange_strong(&g_thread_pool, &exp_tp, tp)) { + blake3_thread_pool_destroy(tp); + return exp_tp; + } + } + + return tp; +} diff --git a/c/blake3_impl.h b/c/blake3_impl.h index 3ba9ceb..8357c26 100644 --- a/c/blake3_impl.h +++ b/c/blake3_impl.h @@ -8,6 +8,7 @@ #include #include "blake3.h" +#include "blake3_thread.h" // internal flags enum blake3_flags { @@ -277,5 +278,6 @@ void blake3_hash_many_neon(const uint8_t *const *inputs, size_t num_inputs, uint8_t flags_end, uint8_t *out); #endif +blake3_thread_pool *blake3_get_thread_pool(void); #endif /* BLAKE3_IMPL_H */ diff --git a/c/blake3_thread.c b/c/blake3_thread.c new file mode 100644 index 0000000..e5a3dc0 --- /dev/null +++ b/c/blake3_thread.c @@ -0,0 +1,503 @@ +/* + * blake3_thread.h - minimal thread pool implementation for BLAKE3 + * + * Copyright (c) 2023 Pantelis Antoniou + * + * Released under the BLAKE3 License (CC0 1.0 or Apache License 2.0) + */ +#define _DEFAULT_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include + +#if defined(__linux__) +#include +#include +#endif + +#include "blake3_impl.h" +#include "blake3_thread.h" + +#undef BIT64 +#define BIT64(x) ((uint64_t)1 << (x)) + +#define B3WORK_SHUTDOWN ((const blake3_thread_work *)(void *)-1) + +#if defined(__linux__) && !defined(BLAKE3_THREAD_PORTABLE) + +/* linux pedal to the metal implementation */ +static inline int futex(_Atomic(uint32_t) *uaddr, int futex_op, uint32_t val, const struct timespec *timeout, uint32_t *uaddr2, uint32_t val3) +{ + return syscall(SYS_futex, uaddr, futex_op, val, timeout, uaddr2, val3); +} + +static inline int fwait(_Atomic(uint32_t) *futexp) +{ + long s; + uint32_t one = 1; + + while (!atomic_compare_exchange_strong(futexp, &one, 0)) { + s = futex(futexp, FUTEX_WAIT, 0, NULL, NULL, 0); + if (s == -1 && errno != EAGAIN) + return -1; + } + return 0; +} + +static inline int fpost(_Atomic(uint32_t) *futexp) +{ + long s; + uint32_t zero = 0; + + if (atomic_compare_exchange_strong(futexp, &zero, 1)) { + s = futex(futexp, FUTEX_WAKE, 1, NULL, NULL, 0); + if (s == -1) + return -1; + } + return 0; +} + +static inline void blake3_thread_init_sync(blake3_thread *t) +{ + /* nothing more needed for futexes */ + atomic_store(&t->submit, 0); + atomic_store(&t->done, 0); +} + +static inline const blake3_thread_work *blake3_worker_wait_for_work(blake3_thread *t) +{ + int ret; + + (void)ret; /* for when NDEBUG is set */ + ret = fwait(&t->submit); + assert(!ret); + return t->work; +} + +static inline void blake3_worker_signal_work_done(blake3_thread *t, const blake3_thread_work *work) +{ + const blake3_thread_work *exp_work; + + /* note that the work won't be replaced if it's a shutdown */ + exp_work = work; + if (!atomic_compare_exchange_strong(&t->work, &exp_work, NULL)) { + assert(exp_work == B3WORK_SHUTDOWN); + return; + } + + (void)fpost(&t->done); +} + +int blake3_thread_submit_work(blake3_thread *t, const blake3_thread_work *work) +{ + const blake3_thread_work *exp_work; + + /* atomically update the work */ + exp_work = NULL; + if (!atomic_compare_exchange_strong(&t->work, &exp_work, work)) { + assert(exp_work == B3WORK_SHUTDOWN); + return -1; + } + + return fpost(&t->submit); +} + +int blake3_thread_wait_work(blake3_thread *t) +{ + const blake3_thread_work *work; + + while ((work = atomic_load(&t->work)) != NULL) + fwait(&t->done); + + atomic_store(&t->done, 0); + + return 0; +} + +void blake3_worker_thread_shutdown(blake3_thread *t) +{ + atomic_store(&t->work, B3WORK_SHUTDOWN); + fpost(&t->submit); + pthread_join(t->tid, NULL); +} + +#else + +/* portable pthread implementation */ + +static inline void blake3_thread_init_sync(blake3_thread *t) +{ + pthread_mutex_init(&t->lock, NULL); + pthread_cond_init(&t->cond, NULL); + + pthread_mutex_init(&t->wait_lock, NULL); + pthread_cond_init(&t->wait_cond, NULL); +} + +static inline const blake3_thread_work *blake3_worker_wait_for_work(blake3_thread *t) +{ + const blake3_thread_work *work; + + pthread_mutex_lock(&t->lock); + while ((work = atomic_load(&t->work)) == NULL) + pthread_cond_wait(&t->cond, &t->lock); + pthread_mutex_unlock(&t->lock); + + return work; +} + +static inline void blake3_worker_signal_work_done(blake3_thread *t, const blake3_thread_work *work) +{ + const blake3_thread_work *exp_work; + + /* clear the work, so that the user knows we're done */ + pthread_mutex_lock(&t->wait_lock); + + /* note that the work won't be replaced if it's a shutdown */ + exp_work = work; + if (!atomic_compare_exchange_strong(&t->work, &exp_work, NULL)) + assert(exp_work == B3WORK_SHUTDOWN); + pthread_cond_signal(&t->wait_cond); + pthread_mutex_unlock(&t->wait_lock); +} + +int blake3_thread_submit_work(blake3_thread *t, const blake3_thread_work *work) +{ + const blake3_thread_work *exp_work; + int ret; + + /* atomically update the work */ + + assert(t); + assert(work); + + pthread_mutex_lock(&t->lock); + exp_work = NULL; + if (!atomic_compare_exchange_strong(&t->work, &exp_work, work)) { + assert(exp_work == B3WORK_SHUTDOWN); + ret = -1; + } else { + pthread_cond_signal(&t->cond); + ret = 0; + } + pthread_mutex_unlock(&t->lock); + + return ret; +} + +int blake3_thread_wait_work(blake3_thread *t) +{ + const blake3_thread_work *work; + + pthread_mutex_lock(&t->wait_lock); + while ((work = atomic_load(&t->work)) != NULL) + pthread_cond_wait(&t->wait_cond, &t->wait_lock); + pthread_mutex_unlock(&t->wait_lock); + + return 0; +} + +void blake3_worker_thread_shutdown(blake3_thread *t) +{ + pthread_mutex_lock(&t->lock); + atomic_store(&t->work, B3WORK_SHUTDOWN); + pthread_cond_signal(&t->cond); + pthread_mutex_unlock(&t->lock); + pthread_join(t->tid, NULL); +} + +#endif + +void *blake3_worker_thread(void *arg) +{ + blake3_thread *t = arg; + const blake3_thread_work *work; + + while ((work = blake3_worker_wait_for_work(t)) != B3WORK_SHUTDOWN) { + work->fn(work->arg); + blake3_worker_signal_work_done(t, work); + } + + return NULL; +} + +blake3_thread *blake3_thread_pool_reserve(blake3_thread_pool *tp) +{ + blake3_thread *t; + unsigned int slot; + _Atomic(uint64_t) *free; + uint64_t exp, v; + unsigned int i; + + t = NULL; + for (i = 0, free = tp->freep; i < tp->free_count; i++, free++) { + v = atomic_load(free); + while (v) { + slot = highest_one(v); + assert(v & BIT64(slot)); + exp = v; /* expecting the previous value */ + v &= ~BIT64(slot); /* clear this bit */ + if (atomic_compare_exchange_strong(free, &exp, v)) { + slot += i * 64; + t = tp->threads + slot; + assert(slot == t->id); + return t; + } + v = exp; + } + } + + return NULL; +} + +void blake3_thread_pool_unreserve(blake3_thread_pool *tp, blake3_thread *t) +{ + _Atomic(uint64_t) *free; + + free = tp->freep + (unsigned int)(t->id / 64); + atomic_fetch_or(free, BIT64(t->id & 63)); +} + +void blake3_thread_pool_cleanup(blake3_thread_pool *tp) +{ + blake3_thread *t; + unsigned int i; + + if (!tp) + return; + + if (tp->threads) { + for (i = 0; i < tp->num_threads; i++) { + t = &tp->threads[i]; + if (t->id == i) + blake3_worker_thread_shutdown(t); + } + + free(tp->threads); + } + + if (tp->freep) + free(tp->freep); + + memset(tp, 0, sizeof(*tp)); +} + +int blake3_thread_pool_init(blake3_thread_pool *tp, unsigned int num_threads) +{ + blake3_thread *t; + unsigned int i; + int rc; + + assert(tp); + + if (!num_threads) { + long scval; + scval = sysconf(_SC_NPROCESSORS_ONLN); + assert(scval > 0); + /* we spin num_cpus * 3 / 2 threads to cover I/O bubbles */ + num_threads = (unsigned int)((scval * 3) / 2); + } + + memset(tp, 0, sizeof(*tp)); + + tp->num_threads = num_threads; + + tp->free_count = (tp->num_threads / 64) + ((tp->num_threads & 63) ? 1 : 0); + + tp->freep = malloc(tp->free_count * sizeof(uint64_t)); + if (!tp->freep) + goto err_out; + + for (i = 0; i < tp->free_count; i++) + tp->freep[i] = (uint64_t)-1; + if (tp->num_threads & 63) + tp->freep[tp->free_count - 1] = BIT64(tp->num_threads & 63) - 1; + + tp->threads = malloc(sizeof(*tp->threads) * tp->num_threads); + if (!tp->threads) + goto err_out; + + memset(tp->threads, 0, sizeof(*tp->threads) * tp->num_threads); + + for (i = 0, t = tp->threads; i < tp->num_threads; i++, t++) { + + t->tp = tp; + t->id = (unsigned int)-1; + + blake3_thread_init_sync(t); + } + + for (i = 0, t = tp->threads; i < tp->num_threads; i++, t++) { + + rc = pthread_create(&t->tid, NULL, blake3_worker_thread, t); + if (rc) + goto err_out; + t->id = i; + } + + return 0; + +err_out: + blake3_thread_pool_cleanup(tp); + return -1; +} + +blake3_thread_pool *blake3_thread_pool_create(unsigned int num_threads) +{ + blake3_thread_pool *tp; + int rc; + + tp = malloc(sizeof(*tp)); + if (!tp) + return NULL; + + rc = blake3_thread_pool_init(tp, num_threads); + if (rc) { + free(tp); + return NULL; + } + + return tp; +} + +void blake3_thread_pool_destroy(blake3_thread_pool *tp) +{ + if (!tp) + return; + + blake3_thread_pool_cleanup(tp); + free(tp); +} + +void blake3_thread_work_join(blake3_thread_pool *tp, const blake3_thread_work *works, size_t work_count, blake3_work_check_fn check_fn) +{ + const blake3_thread_work **direct_work, **thread_work, *w; + blake3_thread **threads, *t; + size_t i, direct_work_count, thread_work_count; + int rc; + + /* just a single (or no) work, or no threads? execute directly */ + if (work_count <= 1 || !tp || !tp->num_threads) { + for (i = 0, w = works; i < work_count; i++, w++) + w->fn(w->arg); + return; + } + + /* allocate the keeper of direct work */ + direct_work = alloca(work_count * sizeof(*direct_work)); + direct_work_count = 0; + + threads = alloca(work_count * sizeof(*threads)); + thread_work = alloca(work_count * sizeof(*thread_work)); + thread_work_count = 0; + + for (i = 0, w = works; i < work_count; i++, w++) { + + t = NULL; + if (!check_fn || check_fn(w->arg)) + t = blake3_thread_pool_reserve(tp); + + if (t) { + threads[thread_work_count] = t; + thread_work[thread_work_count++] = w; + } else + direct_work[direct_work_count++] = w; + } + + /* if we don't have any direct_work, steal the last threaded work as direct */ + if (!direct_work_count) { + assert(thread_work_count > 0); + t = threads[thread_work_count - 1]; + w = thread_work[thread_work_count - 1]; + thread_work_count--; + + /* unreserve this */ + blake3_thread_pool_unreserve(tp, t); + direct_work[direct_work_count++] = w; + } + + /* submit the threaded work */ + for (i = 0; i < thread_work_count; i++) { + t = threads[i]; + w = thread_work[i]; + rc = blake3_thread_submit_work(t, w); + if (rc) { + /* unable to submit? remove work, and move to direct */ + threads[i] = NULL; + thread_work[i] = NULL; + blake3_thread_pool_unreserve(tp, t); + direct_work[direct_work_count++] = w; + } + } + + /* now perform the direct work while the threaded work is being performed in parallel */ + for (i = 0; i < direct_work_count; i++) { + w = direct_work[i]; + w->fn(w->arg); + } + + /* finally wait for all threaded work to complete */ + for (i = 0; i < thread_work_count; i++) { + t = threads[i]; + assert(t); + blake3_thread_wait_work(t); + blake3_thread_pool_unreserve(tp, t); + } +} + +void blake3_thread_args_join(blake3_thread_pool *tp, blake3_work_exec_fn fn, blake3_work_check_fn check_fn, void **args, size_t count) +{ + blake3_thread_work *works; + size_t i; + + if (!count) + return; + + works = alloca(sizeof(*works) * count); + for (i = 0; i < count; i++) { + works[i].fn = fn; + works[i].arg = args ? args[i] : NULL; + } + + blake3_thread_work_join(tp, works, count, check_fn); +} + +void blake3_thread_arg_array_join(blake3_thread_pool *tp, blake3_work_exec_fn fn, blake3_work_check_fn check_fn, void *args, size_t argsize, size_t count) +{ + blake3_thread_work *works; + uint8_t *p; + size_t i; + + if (!count) + return; + + works = alloca(sizeof(*works) * count); + for (i = 0, p = args; i < count; i++, p += argsize) { + works[i].fn = fn; + works[i].arg = p; + } + + blake3_thread_work_join(tp, works, count, check_fn); +} + +void blake3_thread_arg_join(blake3_thread_pool *tp, blake3_work_exec_fn fn, blake3_work_check_fn check_fn, void *arg, size_t count) +{ + blake3_thread_work *works; + size_t i; + + if (!count) + return; + + works = alloca(sizeof(*works) * count); + for (i = 0; i < count; i++) { + works[i].fn = fn; + works[i].arg = arg; + } + + blake3_thread_work_join(tp, works, count, check_fn); +} diff --git a/c/blake3_thread.h b/c/blake3_thread.h new file mode 100644 index 0000000..a374d94 --- /dev/null +++ b/c/blake3_thread.h @@ -0,0 +1,64 @@ +/* + * blake3_thread.h - minimal thread pool implementation for BLAKE3 + * + * Copyright (c) 2023 Pantelis Antoniou + * + * Released under the BLAKE3 License (CC0 1.0 or Apache License 2.0) + */ +#ifndef BLAKE3_THREAD_H +#define BLAKE3_THREAD_H + +#include +#include +#include +#include +#include + +struct blake3_thread_pool; + +typedef void (*blake3_work_exec_fn)(void *arg); +typedef bool (*blake3_work_check_fn)(const void *arg); + +typedef struct blake3_thread_work { + blake3_work_exec_fn fn; + void *arg; +} blake3_thread_work; + +//#define BLAKE3_THREAD_PORTABLE +typedef struct blake3_thread { + struct blake3_thread_pool *tp; + unsigned int id; + pthread_t tid; + void *arg; + _Atomic(const blake3_thread_work *)work; +#if defined(__linux__) && !defined(BLAKE3_THREAD_PORTABLE) + _Atomic(uint32_t) submit; + _Atomic(uint32_t) done; +#else + pthread_mutex_t lock; + pthread_cond_t cond; + pthread_mutex_t wait_lock; + pthread_cond_t wait_cond; +#endif +} blake3_thread; + +typedef struct blake3_thread_pool { + unsigned int num_threads; + struct blake3_thread *threads; + _Atomic(uint64_t) *freep; + unsigned int free_count; +} blake3_thread_pool; + +blake3_thread_pool *blake3_thread_pool_create(unsigned int num_threads); +void blake3_thread_pool_destroy(blake3_thread_pool *tp); +blake3_thread *blake3_thread_pool_reserve(blake3_thread_pool *tp); +void blake3_thread_pool_unreserve(blake3_thread_pool *tp, blake3_thread *t); +int blake3_thread_submit_work(blake3_thread *t, const blake3_thread_work *work); +int blake3_thread_wait_work(blake3_thread *t); + +void blake3_thread_work_join(blake3_thread_pool *tp, const blake3_thread_work *works, size_t work_count, blake3_work_check_fn check_fn); +void blake3_thread_args_join(blake3_thread_pool *tp, blake3_work_exec_fn, blake3_work_check_fn check_fn, void **args, size_t count); +void blake3_thread_arg_array_join(blake3_thread_pool *tp, blake3_work_exec_fn fn, blake3_work_check_fn check_fn, void *args, size_t argsize, size_t count); +void blake3_thread_arg_join(blake3_thread_pool *tp, blake3_work_exec_fn fn, blake3_work_check_fn check_fn, void *arg, size_t count); + +#endif diff --git a/c/example-mmap.c b/c/example-mmap.c new file mode 100644 index 0000000..ecaa241 --- /dev/null +++ b/c/example-mmap.c @@ -0,0 +1,137 @@ +// vim: ts=2 sw=2 et +#include "blake3.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* 256K threshold for using alloca */ +#define BLAKE3_ALLOCA_BUFFER_SIZE (256U << 10) + +int blake3_hash_file(const char *filename, uint8_t output[BLAKE3_OUT_LEN]) +{ + blake3_hasher hasher; + FILE *fp = NULL; + void *mem = NULL; + void *buf = NULL; + int fd = -1, ret = -1; + size_t rdn, filesize, bufsz = BLAKE3_ALLOCA_BUFFER_SIZE; + struct stat sb; + int rc; + + memset(output, 0, BLAKE3_OUT_LEN); + + blake3_hasher_init(&hasher); + + if (!strcmp(filename, "-")) { + fp = stdin; + } else { + fp = NULL; + + fd = open(filename, O_RDONLY); + if (fd < 0) + goto err_out; + + rc = fstat(fd, &sb); + if (rc < 0) + goto err_out; + + filesize = (size_t)-1; + + /* try to mmap */ + if (sb.st_size > 0) { + filesize = sb.st_size; + mem = mmap(NULL, sb.st_size, PROT_READ, MAP_PRIVATE, fd, 0); + if (mem != MAP_FAILED) { + close(fd); + fd = -1; + } else + mem = NULL; + } + + /* unable to map? fallback to stream mode */ + if (!mem) { + close(fd); + fd = -1; + fp = fopen(filename, "r"); + if (!fp) + goto err_out; + } + } + + /* mmap case, very simple */ + if (mem) { + blake3_hasher_update(&hasher, mem, filesize); + } else { + /* slow path using file reads */ + + assert(fp); + if (bufsz <= BLAKE3_ALLOCA_BUFFER_SIZE) { + buf = alloca(bufsz); + } else { + buf = malloc(bufsz); + if (!buf) + goto err_out; + } + + do { + rdn = fread(buf, 1, bufsz, fp); + if (rdn == 0) + break; + blake3_hasher_update(&hasher, buf, rdn); + } while (rdn >= bufsz); + } + + // Finalize the hash. BLAKE3_OUT_LEN is the default output length, 32 bytes. + blake3_hasher_finalize(&hasher, output, BLAKE3_OUT_LEN); + + ret = 0; + +out: + if (mem) + munmap(mem, filesize); + + if (fp && fp != stdin) + fclose(fp); + + if (fd >= 0) + close(fd); + + if (buf && (bufsz > BLAKE3_ALLOCA_BUFFER_SIZE)) + free(buf); + + return ret; + +err_out: + ret = -1; + goto out; +} + +int main(int argc, char *argv[]) +{ + uint8_t output[BLAKE3_OUT_LEN]; + int i, ok, rc; + + ok = 1; + for (i = 1; i < argc; i++) { + rc = blake3_hash_file(argv[i], output); + if (rc) { + fprintf(stderr, "Error hashing file \"%s\": %s\n", argv[i], strerror(errno)); + continue; + } + + // Print the hash as hexadecimal. + for (size_t j = 0; j < BLAKE3_OUT_LEN; j++) + printf("%02x", output[j]); + printf(" %s\n", argv[i]); + ok++; + } + return ok == argc ? EXIT_SUCCESS : EXIT_FAILURE; +} From 7973cc5a378687f21c29fcfa04975e2ded6456ff Mon Sep 17 00:00:00 2001 From: Pantelis Antoniou Date: Tue, 12 Sep 2023 11:08:43 +0300 Subject: [PATCH 3/3] Fix expected zeroes at output When buffer sizes are small in streaming mode the partial output must have the rest of the buffer filled with zeroes. Signed-off-by: Pantelis Antoniou --- c/blake3.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/c/blake3.c b/c/blake3.c index 512adda..3fdec6c 100644 --- a/c/blake3.c +++ b/c/blake3.c @@ -64,7 +64,9 @@ INLINE output_t make_output(const uint32_t input_cv[8], uint8_t flags) { output_t ret; memcpy(ret.input_cv, input_cv, 32); - memcpy(ret.block, block, BLAKE3_BLOCK_LEN); + // copy out what's there and fill the rest with zeroes + memcpy(ret.block, block, block_len); + memset(ret.block + block_len, 0, BLAKE3_BLOCK_LEN - block_len); ret.block_len = block_len; ret.counter = counter; ret.flags = flags;