diff --git a/c/.gitignore b/c/.gitignore index 3d4b704..4fa6158 100644 --- a/c/.gitignore +++ b/c/.gitignore @@ -1,4 +1,5 @@ blake3 example +example-mmap build/ *.o diff --git a/c/CMakeLists.txt b/c/CMakeLists.txt index 3a3b232..a8d7298 100644 --- a/c/CMakeLists.txt +++ b/c/CMakeLists.txt @@ -46,6 +46,7 @@ add_library(blake3 blake3.c blake3_dispatch.c blake3_portable.c + blake3_thread.c ) add_library(BLAKE3::blake3 ALIAS blake3) diff --git a/c/Makefile.testing b/c/Makefile.testing index b540528..c8a94c4 100644 --- a/c/Makefile.testing +++ b/c/Makefile.testing @@ -46,7 +46,7 @@ ifdef BLAKE3_NO_NEON EXTRAFLAGS += -DBLAKE3_USE_NEON=0 endif -all: blake3.c blake3_dispatch.c blake3_portable.c main.c $(TARGETS) +all: blake3.c blake3_dispatch.c blake3_portable.c main.c blake3_thread.c $(TARGETS) $(CC) $(CFLAGS) $(EXTRAFLAGS) $^ -o $(NAME) $(LDFLAGS) blake3_sse2.o: blake3_sse2.c @@ -68,14 +68,17 @@ test: CFLAGS += -DBLAKE3_TESTING -fsanitize=address,undefined test: all ./test.py -asm: blake3.c blake3_dispatch.c blake3_portable.c main.c $(ASM_TARGETS) +asm: blake3.c blake3_dispatch.c blake3_portable.c main.c blake3_thread.c $(ASM_TARGETS) $(CC) $(CFLAGS) $(EXTRAFLAGS) $^ -o $(NAME) $(LDFLAGS) test_asm: CFLAGS += -DBLAKE3_TESTING -fsanitize=address,undefined test_asm: asm ./test.py -example: example.c blake3.c blake3_dispatch.c blake3_portable.c $(ASM_TARGETS) +example: example.c blake3.c blake3_dispatch.c blake3_portable.c blake3_thread.c $(ASM_TARGETS) + $(CC) $(CFLAGS) $(EXTRAFLAGS) $^ -o $@ $(LDFLAGS) + +example-mmap: example-mmap.c blake3.c blake3_dispatch.c blake3_portable.c blake3_thread.c $(ASM_TARGETS) $(CC) $(CFLAGS) $(EXTRAFLAGS) $^ -o $@ $(LDFLAGS) clean: diff --git a/c/blake3.c b/c/blake3.c index 1b44c71..e3046de 100644 --- a/c/blake3.c +++ b/c/blake3.c @@ -4,6 +4,7 @@ #include "blake3.h" #include "blake3_impl.h" +#include "blake3_thread.h" const char *blake3_version(void) { return BLAKE3_VERSION_STRING; } @@ -11,7 +12,6 @@ INLINE void chunk_state_init(blake3_chunk_state *self, const uint32_t key[8], uint8_t flags) { memcpy(self->cv, key, BLAKE3_KEY_LEN); self->chunk_counter = 0; - memset(self->buf, 0, BLAKE3_BLOCK_LEN); self->buf_len = 0; self->blocks_compressed = 0; self->flags = flags; @@ -22,7 +22,6 @@ INLINE void chunk_state_reset(blake3_chunk_state *self, const uint32_t key[8], memcpy(self->cv, key, BLAKE3_KEY_LEN); self->chunk_counter = chunk_counter; self->blocks_compressed = 0; - memset(self->buf, 0, BLAKE3_BLOCK_LEN); self->buf_len = 0; } @@ -65,7 +64,9 @@ INLINE output_t make_output(const uint32_t input_cv[8], uint8_t flags) { output_t ret; memcpy(ret.input_cv, input_cv, 32); - memcpy(ret.block, block, BLAKE3_BLOCK_LEN); + // copy out what's there and fill the rest with zeroes + memcpy(ret.block, block, block_len); + memset(ret.block + block_len, 0, BLAKE3_BLOCK_LEN - block_len); ret.block_len = block_len; ret.counter = counter; ret.flags = flags; @@ -121,7 +122,6 @@ INLINE void chunk_state_update(blake3_chunk_state *self, const uint8_t *input, self->flags | chunk_state_maybe_start_flag(self)); self->blocks_compressed += 1; self->buf_len = 0; - memset(self->buf, 0, BLAKE3_BLOCK_LEN); } } @@ -244,6 +244,44 @@ INLINE size_t compress_parents_parallel(const uint8_t *child_chaining_values, } } + +// the state for the thread when doing compress subtree +typedef struct { + // inputs + const uint8_t *input; + size_t input_len; + const uint32_t *key; + uint64_t chunk_counter; + uint8_t flags; + // outputs + uint8_t *out; + size_t n; +} blake3_compress_subtree_state; + +static size_t blake3_compress_subtree_wide(const uint8_t *input, + size_t input_len, + const uint32_t key[8], + uint64_t chunk_counter, + uint8_t flags, uint8_t *out); + +static bool blake3_compress_subtree_wide_work_check(const void *arg) { + const blake3_compress_subtree_state *s = arg; + + /* only off-load to thread if we have enough input (8 chunks at least) */ + return s->input_len >= 8 * BLAKE3_CHUNK_LEN; +} + +static void blake3_compress_subtree_wide_thread(void *arg) { + blake3_compress_subtree_state *s = arg; + + s->n = blake3_compress_subtree_wide( + s->input, s->input_len, + s->key, + s->chunk_counter, + s->flags, + s->out); +} + // The wide helper function returns (writes out) an array of chaining values // and returns the length of that array. The number of chaining values returned // is the dynamically detected SIMD degree, at most MAX_SIMD_DEGREE. Or fewer, @@ -299,12 +337,32 @@ static size_t blake3_compress_subtree_wide(const uint8_t *input, } uint8_t *right_cvs = &cv_array[degree * BLAKE3_OUT_LEN]; - // Recurse! If this implementation adds multi-threading support in the - // future, this is where it will go. - size_t left_n = blake3_compress_subtree_wide(input, left_input_len, key, - chunk_counter, flags, cv_array); - size_t right_n = blake3_compress_subtree_wide( - right_input, right_input_len, key, right_chunk_counter, flags, right_cvs); + // Recurse! this is the multi-threaded implementation + blake3_compress_subtree_state states[2]; + + /* common */ + states[0].key = states[1].key = key; + states[0].flags = states[1].flags = flags; + + /* left */ + states[0].input = input; + states[0].input_len = left_input_len; + states[0].chunk_counter = chunk_counter; + states[0].out = cv_array; + + /* right */ + states[1].input = right_input; + states[1].input_len = right_input_len; + states[1].chunk_counter = right_chunk_counter; + states[1].out = right_cvs; + + blake3_thread_arg_array_join(blake3_get_thread_pool(), + blake3_compress_subtree_wide_thread, + blake3_compress_subtree_wide_work_check, + states, sizeof(states[0]), 2); + + size_t left_n = states[0].n; + size_t right_n = states[1].n; // The special case again. If simd_degree=1, then we'll have left_n=1 and // right_n=1. Rather than compressing them into a single output, return @@ -364,6 +422,7 @@ INLINE void compress_subtree_to_parent_node( INLINE void hasher_init_base(blake3_hasher *self, const uint32_t key[8], uint8_t flags) { + memset(self, 0, sizeof(*self)); memcpy(self->key, key, BLAKE3_KEY_LEN); chunk_state_init(&self->chunk, key, flags); self->cv_stack_len = 0; diff --git a/c/blake3_dispatch.c b/c/blake3_dispatch.c index c9abc13..0885f3f 100644 --- a/c/blake3_dispatch.c +++ b/c/blake3_dispatch.c @@ -1,8 +1,10 @@ #include #include #include +#include #include "blake3_impl.h" +#include "blake3_thread.h" #if defined(_MSC_VER) #include @@ -306,3 +308,21 @@ size_t blake3_simd_degree(void) { #endif return 1; } + +blake3_thread_pool *blake3_get_thread_pool(void) { + static _Atomic(blake3_thread_pool *)g_thread_pool; + blake3_thread_pool *tp, *exp_tp; + + if ((tp = atomic_load(&g_thread_pool)) == NULL) { + tp = blake3_thread_pool_create(0); /* let the pool implementation choose */ + assert(tp); + exp_tp = NULL; + /* store it, if the comparison fails, some other thread won, use theirs */ + if (!atomic_compare_exchange_strong(&g_thread_pool, &exp_tp, tp)) { + blake3_thread_pool_destroy(tp); + return exp_tp; + } + } + + return tp; +} diff --git a/c/blake3_impl.h b/c/blake3_impl.h index 98611c3..18fea36 100644 --- a/c/blake3_impl.h +++ b/c/blake3_impl.h @@ -8,6 +8,7 @@ #include #include "blake3.h" +#include "blake3_thread.h" // internal flags enum blake3_flags { @@ -281,5 +282,6 @@ void blake3_hash_many_neon(const uint8_t *const *inputs, size_t num_inputs, uint8_t flags_end, uint8_t *out); #endif +blake3_thread_pool *blake3_get_thread_pool(void); #endif /* BLAKE3_IMPL_H */ diff --git a/c/blake3_thread.c b/c/blake3_thread.c new file mode 100644 index 0000000..e5a3dc0 --- /dev/null +++ b/c/blake3_thread.c @@ -0,0 +1,503 @@ +/* + * blake3_thread.h - minimal thread pool implementation for BLAKE3 + * + * Copyright (c) 2023 Pantelis Antoniou + * + * Released under the BLAKE3 License (CC0 1.0 or Apache License 2.0) + */ +#define _DEFAULT_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include + +#if defined(__linux__) +#include +#include +#endif + +#include "blake3_impl.h" +#include "blake3_thread.h" + +#undef BIT64 +#define BIT64(x) ((uint64_t)1 << (x)) + +#define B3WORK_SHUTDOWN ((const blake3_thread_work *)(void *)-1) + +#if defined(__linux__) && !defined(BLAKE3_THREAD_PORTABLE) + +/* linux pedal to the metal implementation */ +static inline int futex(_Atomic(uint32_t) *uaddr, int futex_op, uint32_t val, const struct timespec *timeout, uint32_t *uaddr2, uint32_t val3) +{ + return syscall(SYS_futex, uaddr, futex_op, val, timeout, uaddr2, val3); +} + +static inline int fwait(_Atomic(uint32_t) *futexp) +{ + long s; + uint32_t one = 1; + + while (!atomic_compare_exchange_strong(futexp, &one, 0)) { + s = futex(futexp, FUTEX_WAIT, 0, NULL, NULL, 0); + if (s == -1 && errno != EAGAIN) + return -1; + } + return 0; +} + +static inline int fpost(_Atomic(uint32_t) *futexp) +{ + long s; + uint32_t zero = 0; + + if (atomic_compare_exchange_strong(futexp, &zero, 1)) { + s = futex(futexp, FUTEX_WAKE, 1, NULL, NULL, 0); + if (s == -1) + return -1; + } + return 0; +} + +static inline void blake3_thread_init_sync(blake3_thread *t) +{ + /* nothing more needed for futexes */ + atomic_store(&t->submit, 0); + atomic_store(&t->done, 0); +} + +static inline const blake3_thread_work *blake3_worker_wait_for_work(blake3_thread *t) +{ + int ret; + + (void)ret; /* for when NDEBUG is set */ + ret = fwait(&t->submit); + assert(!ret); + return t->work; +} + +static inline void blake3_worker_signal_work_done(blake3_thread *t, const blake3_thread_work *work) +{ + const blake3_thread_work *exp_work; + + /* note that the work won't be replaced if it's a shutdown */ + exp_work = work; + if (!atomic_compare_exchange_strong(&t->work, &exp_work, NULL)) { + assert(exp_work == B3WORK_SHUTDOWN); + return; + } + + (void)fpost(&t->done); +} + +int blake3_thread_submit_work(blake3_thread *t, const blake3_thread_work *work) +{ + const blake3_thread_work *exp_work; + + /* atomically update the work */ + exp_work = NULL; + if (!atomic_compare_exchange_strong(&t->work, &exp_work, work)) { + assert(exp_work == B3WORK_SHUTDOWN); + return -1; + } + + return fpost(&t->submit); +} + +int blake3_thread_wait_work(blake3_thread *t) +{ + const blake3_thread_work *work; + + while ((work = atomic_load(&t->work)) != NULL) + fwait(&t->done); + + atomic_store(&t->done, 0); + + return 0; +} + +void blake3_worker_thread_shutdown(blake3_thread *t) +{ + atomic_store(&t->work, B3WORK_SHUTDOWN); + fpost(&t->submit); + pthread_join(t->tid, NULL); +} + +#else + +/* portable pthread implementation */ + +static inline void blake3_thread_init_sync(blake3_thread *t) +{ + pthread_mutex_init(&t->lock, NULL); + pthread_cond_init(&t->cond, NULL); + + pthread_mutex_init(&t->wait_lock, NULL); + pthread_cond_init(&t->wait_cond, NULL); +} + +static inline const blake3_thread_work *blake3_worker_wait_for_work(blake3_thread *t) +{ + const blake3_thread_work *work; + + pthread_mutex_lock(&t->lock); + while ((work = atomic_load(&t->work)) == NULL) + pthread_cond_wait(&t->cond, &t->lock); + pthread_mutex_unlock(&t->lock); + + return work; +} + +static inline void blake3_worker_signal_work_done(blake3_thread *t, const blake3_thread_work *work) +{ + const blake3_thread_work *exp_work; + + /* clear the work, so that the user knows we're done */ + pthread_mutex_lock(&t->wait_lock); + + /* note that the work won't be replaced if it's a shutdown */ + exp_work = work; + if (!atomic_compare_exchange_strong(&t->work, &exp_work, NULL)) + assert(exp_work == B3WORK_SHUTDOWN); + pthread_cond_signal(&t->wait_cond); + pthread_mutex_unlock(&t->wait_lock); +} + +int blake3_thread_submit_work(blake3_thread *t, const blake3_thread_work *work) +{ + const blake3_thread_work *exp_work; + int ret; + + /* atomically update the work */ + + assert(t); + assert(work); + + pthread_mutex_lock(&t->lock); + exp_work = NULL; + if (!atomic_compare_exchange_strong(&t->work, &exp_work, work)) { + assert(exp_work == B3WORK_SHUTDOWN); + ret = -1; + } else { + pthread_cond_signal(&t->cond); + ret = 0; + } + pthread_mutex_unlock(&t->lock); + + return ret; +} + +int blake3_thread_wait_work(blake3_thread *t) +{ + const blake3_thread_work *work; + + pthread_mutex_lock(&t->wait_lock); + while ((work = atomic_load(&t->work)) != NULL) + pthread_cond_wait(&t->wait_cond, &t->wait_lock); + pthread_mutex_unlock(&t->wait_lock); + + return 0; +} + +void blake3_worker_thread_shutdown(blake3_thread *t) +{ + pthread_mutex_lock(&t->lock); + atomic_store(&t->work, B3WORK_SHUTDOWN); + pthread_cond_signal(&t->cond); + pthread_mutex_unlock(&t->lock); + pthread_join(t->tid, NULL); +} + +#endif + +void *blake3_worker_thread(void *arg) +{ + blake3_thread *t = arg; + const blake3_thread_work *work; + + while ((work = blake3_worker_wait_for_work(t)) != B3WORK_SHUTDOWN) { + work->fn(work->arg); + blake3_worker_signal_work_done(t, work); + } + + return NULL; +} + +blake3_thread *blake3_thread_pool_reserve(blake3_thread_pool *tp) +{ + blake3_thread *t; + unsigned int slot; + _Atomic(uint64_t) *free; + uint64_t exp, v; + unsigned int i; + + t = NULL; + for (i = 0, free = tp->freep; i < tp->free_count; i++, free++) { + v = atomic_load(free); + while (v) { + slot = highest_one(v); + assert(v & BIT64(slot)); + exp = v; /* expecting the previous value */ + v &= ~BIT64(slot); /* clear this bit */ + if (atomic_compare_exchange_strong(free, &exp, v)) { + slot += i * 64; + t = tp->threads + slot; + assert(slot == t->id); + return t; + } + v = exp; + } + } + + return NULL; +} + +void blake3_thread_pool_unreserve(blake3_thread_pool *tp, blake3_thread *t) +{ + _Atomic(uint64_t) *free; + + free = tp->freep + (unsigned int)(t->id / 64); + atomic_fetch_or(free, BIT64(t->id & 63)); +} + +void blake3_thread_pool_cleanup(blake3_thread_pool *tp) +{ + blake3_thread *t; + unsigned int i; + + if (!tp) + return; + + if (tp->threads) { + for (i = 0; i < tp->num_threads; i++) { + t = &tp->threads[i]; + if (t->id == i) + blake3_worker_thread_shutdown(t); + } + + free(tp->threads); + } + + if (tp->freep) + free(tp->freep); + + memset(tp, 0, sizeof(*tp)); +} + +int blake3_thread_pool_init(blake3_thread_pool *tp, unsigned int num_threads) +{ + blake3_thread *t; + unsigned int i; + int rc; + + assert(tp); + + if (!num_threads) { + long scval; + scval = sysconf(_SC_NPROCESSORS_ONLN); + assert(scval > 0); + /* we spin num_cpus * 3 / 2 threads to cover I/O bubbles */ + num_threads = (unsigned int)((scval * 3) / 2); + } + + memset(tp, 0, sizeof(*tp)); + + tp->num_threads = num_threads; + + tp->free_count = (tp->num_threads / 64) + ((tp->num_threads & 63) ? 1 : 0); + + tp->freep = malloc(tp->free_count * sizeof(uint64_t)); + if (!tp->freep) + goto err_out; + + for (i = 0; i < tp->free_count; i++) + tp->freep[i] = (uint64_t)-1; + if (tp->num_threads & 63) + tp->freep[tp->free_count - 1] = BIT64(tp->num_threads & 63) - 1; + + tp->threads = malloc(sizeof(*tp->threads) * tp->num_threads); + if (!tp->threads) + goto err_out; + + memset(tp->threads, 0, sizeof(*tp->threads) * tp->num_threads); + + for (i = 0, t = tp->threads; i < tp->num_threads; i++, t++) { + + t->tp = tp; + t->id = (unsigned int)-1; + + blake3_thread_init_sync(t); + } + + for (i = 0, t = tp->threads; i < tp->num_threads; i++, t++) { + + rc = pthread_create(&t->tid, NULL, blake3_worker_thread, t); + if (rc) + goto err_out; + t->id = i; + } + + return 0; + +err_out: + blake3_thread_pool_cleanup(tp); + return -1; +} + +blake3_thread_pool *blake3_thread_pool_create(unsigned int num_threads) +{ + blake3_thread_pool *tp; + int rc; + + tp = malloc(sizeof(*tp)); + if (!tp) + return NULL; + + rc = blake3_thread_pool_init(tp, num_threads); + if (rc) { + free(tp); + return NULL; + } + + return tp; +} + +void blake3_thread_pool_destroy(blake3_thread_pool *tp) +{ + if (!tp) + return; + + blake3_thread_pool_cleanup(tp); + free(tp); +} + +void blake3_thread_work_join(blake3_thread_pool *tp, const blake3_thread_work *works, size_t work_count, blake3_work_check_fn check_fn) +{ + const blake3_thread_work **direct_work, **thread_work, *w; + blake3_thread **threads, *t; + size_t i, direct_work_count, thread_work_count; + int rc; + + /* just a single (or no) work, or no threads? execute directly */ + if (work_count <= 1 || !tp || !tp->num_threads) { + for (i = 0, w = works; i < work_count; i++, w++) + w->fn(w->arg); + return; + } + + /* allocate the keeper of direct work */ + direct_work = alloca(work_count * sizeof(*direct_work)); + direct_work_count = 0; + + threads = alloca(work_count * sizeof(*threads)); + thread_work = alloca(work_count * sizeof(*thread_work)); + thread_work_count = 0; + + for (i = 0, w = works; i < work_count; i++, w++) { + + t = NULL; + if (!check_fn || check_fn(w->arg)) + t = blake3_thread_pool_reserve(tp); + + if (t) { + threads[thread_work_count] = t; + thread_work[thread_work_count++] = w; + } else + direct_work[direct_work_count++] = w; + } + + /* if we don't have any direct_work, steal the last threaded work as direct */ + if (!direct_work_count) { + assert(thread_work_count > 0); + t = threads[thread_work_count - 1]; + w = thread_work[thread_work_count - 1]; + thread_work_count--; + + /* unreserve this */ + blake3_thread_pool_unreserve(tp, t); + direct_work[direct_work_count++] = w; + } + + /* submit the threaded work */ + for (i = 0; i < thread_work_count; i++) { + t = threads[i]; + w = thread_work[i]; + rc = blake3_thread_submit_work(t, w); + if (rc) { + /* unable to submit? remove work, and move to direct */ + threads[i] = NULL; + thread_work[i] = NULL; + blake3_thread_pool_unreserve(tp, t); + direct_work[direct_work_count++] = w; + } + } + + /* now perform the direct work while the threaded work is being performed in parallel */ + for (i = 0; i < direct_work_count; i++) { + w = direct_work[i]; + w->fn(w->arg); + } + + /* finally wait for all threaded work to complete */ + for (i = 0; i < thread_work_count; i++) { + t = threads[i]; + assert(t); + blake3_thread_wait_work(t); + blake3_thread_pool_unreserve(tp, t); + } +} + +void blake3_thread_args_join(blake3_thread_pool *tp, blake3_work_exec_fn fn, blake3_work_check_fn check_fn, void **args, size_t count) +{ + blake3_thread_work *works; + size_t i; + + if (!count) + return; + + works = alloca(sizeof(*works) * count); + for (i = 0; i < count; i++) { + works[i].fn = fn; + works[i].arg = args ? args[i] : NULL; + } + + blake3_thread_work_join(tp, works, count, check_fn); +} + +void blake3_thread_arg_array_join(blake3_thread_pool *tp, blake3_work_exec_fn fn, blake3_work_check_fn check_fn, void *args, size_t argsize, size_t count) +{ + blake3_thread_work *works; + uint8_t *p; + size_t i; + + if (!count) + return; + + works = alloca(sizeof(*works) * count); + for (i = 0, p = args; i < count; i++, p += argsize) { + works[i].fn = fn; + works[i].arg = p; + } + + blake3_thread_work_join(tp, works, count, check_fn); +} + +void blake3_thread_arg_join(blake3_thread_pool *tp, blake3_work_exec_fn fn, blake3_work_check_fn check_fn, void *arg, size_t count) +{ + blake3_thread_work *works; + size_t i; + + if (!count) + return; + + works = alloca(sizeof(*works) * count); + for (i = 0; i < count; i++) { + works[i].fn = fn; + works[i].arg = arg; + } + + blake3_thread_work_join(tp, works, count, check_fn); +} diff --git a/c/blake3_thread.h b/c/blake3_thread.h new file mode 100644 index 0000000..a374d94 --- /dev/null +++ b/c/blake3_thread.h @@ -0,0 +1,64 @@ +/* + * blake3_thread.h - minimal thread pool implementation for BLAKE3 + * + * Copyright (c) 2023 Pantelis Antoniou + * + * Released under the BLAKE3 License (CC0 1.0 or Apache License 2.0) + */ +#ifndef BLAKE3_THREAD_H +#define BLAKE3_THREAD_H + +#include +#include +#include +#include +#include + +struct blake3_thread_pool; + +typedef void (*blake3_work_exec_fn)(void *arg); +typedef bool (*blake3_work_check_fn)(const void *arg); + +typedef struct blake3_thread_work { + blake3_work_exec_fn fn; + void *arg; +} blake3_thread_work; + +//#define BLAKE3_THREAD_PORTABLE +typedef struct blake3_thread { + struct blake3_thread_pool *tp; + unsigned int id; + pthread_t tid; + void *arg; + _Atomic(const blake3_thread_work *)work; +#if defined(__linux__) && !defined(BLAKE3_THREAD_PORTABLE) + _Atomic(uint32_t) submit; + _Atomic(uint32_t) done; +#else + pthread_mutex_t lock; + pthread_cond_t cond; + pthread_mutex_t wait_lock; + pthread_cond_t wait_cond; +#endif +} blake3_thread; + +typedef struct blake3_thread_pool { + unsigned int num_threads; + struct blake3_thread *threads; + _Atomic(uint64_t) *freep; + unsigned int free_count; +} blake3_thread_pool; + +blake3_thread_pool *blake3_thread_pool_create(unsigned int num_threads); +void blake3_thread_pool_destroy(blake3_thread_pool *tp); +blake3_thread *blake3_thread_pool_reserve(blake3_thread_pool *tp); +void blake3_thread_pool_unreserve(blake3_thread_pool *tp, blake3_thread *t); +int blake3_thread_submit_work(blake3_thread *t, const blake3_thread_work *work); +int blake3_thread_wait_work(blake3_thread *t); + +void blake3_thread_work_join(blake3_thread_pool *tp, const blake3_thread_work *works, size_t work_count, blake3_work_check_fn check_fn); +void blake3_thread_args_join(blake3_thread_pool *tp, blake3_work_exec_fn, blake3_work_check_fn check_fn, void **args, size_t count); +void blake3_thread_arg_array_join(blake3_thread_pool *tp, blake3_work_exec_fn fn, blake3_work_check_fn check_fn, void *args, size_t argsize, size_t count); +void blake3_thread_arg_join(blake3_thread_pool *tp, blake3_work_exec_fn fn, blake3_work_check_fn check_fn, void *arg, size_t count); + +#endif diff --git a/c/example-mmap.c b/c/example-mmap.c new file mode 100644 index 0000000..ecaa241 --- /dev/null +++ b/c/example-mmap.c @@ -0,0 +1,137 @@ +// vim: ts=2 sw=2 et +#include "blake3.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* 256K threshold for using alloca */ +#define BLAKE3_ALLOCA_BUFFER_SIZE (256U << 10) + +int blake3_hash_file(const char *filename, uint8_t output[BLAKE3_OUT_LEN]) +{ + blake3_hasher hasher; + FILE *fp = NULL; + void *mem = NULL; + void *buf = NULL; + int fd = -1, ret = -1; + size_t rdn, filesize, bufsz = BLAKE3_ALLOCA_BUFFER_SIZE; + struct stat sb; + int rc; + + memset(output, 0, BLAKE3_OUT_LEN); + + blake3_hasher_init(&hasher); + + if (!strcmp(filename, "-")) { + fp = stdin; + } else { + fp = NULL; + + fd = open(filename, O_RDONLY); + if (fd < 0) + goto err_out; + + rc = fstat(fd, &sb); + if (rc < 0) + goto err_out; + + filesize = (size_t)-1; + + /* try to mmap */ + if (sb.st_size > 0) { + filesize = sb.st_size; + mem = mmap(NULL, sb.st_size, PROT_READ, MAP_PRIVATE, fd, 0); + if (mem != MAP_FAILED) { + close(fd); + fd = -1; + } else + mem = NULL; + } + + /* unable to map? fallback to stream mode */ + if (!mem) { + close(fd); + fd = -1; + fp = fopen(filename, "r"); + if (!fp) + goto err_out; + } + } + + /* mmap case, very simple */ + if (mem) { + blake3_hasher_update(&hasher, mem, filesize); + } else { + /* slow path using file reads */ + + assert(fp); + if (bufsz <= BLAKE3_ALLOCA_BUFFER_SIZE) { + buf = alloca(bufsz); + } else { + buf = malloc(bufsz); + if (!buf) + goto err_out; + } + + do { + rdn = fread(buf, 1, bufsz, fp); + if (rdn == 0) + break; + blake3_hasher_update(&hasher, buf, rdn); + } while (rdn >= bufsz); + } + + // Finalize the hash. BLAKE3_OUT_LEN is the default output length, 32 bytes. + blake3_hasher_finalize(&hasher, output, BLAKE3_OUT_LEN); + + ret = 0; + +out: + if (mem) + munmap(mem, filesize); + + if (fp && fp != stdin) + fclose(fp); + + if (fd >= 0) + close(fd); + + if (buf && (bufsz > BLAKE3_ALLOCA_BUFFER_SIZE)) + free(buf); + + return ret; + +err_out: + ret = -1; + goto out; +} + +int main(int argc, char *argv[]) +{ + uint8_t output[BLAKE3_OUT_LEN]; + int i, ok, rc; + + ok = 1; + for (i = 1; i < argc; i++) { + rc = blake3_hash_file(argv[i], output); + if (rc) { + fprintf(stderr, "Error hashing file \"%s\": %s\n", argv[i], strerror(errno)); + continue; + } + + // Print the hash as hexadecimal. + for (size_t j = 0; j < BLAKE3_OUT_LEN; j++) + printf("%02x", output[j]); + printf(" %s\n", argv[i]); + ok++; + } + return ok == argc ? EXIT_SUCCESS : EXIT_FAILURE; +}