diff --git a/.gitignore b/.gitignore index 3dcdb6bb5ab..96c794b1c73 100644 --- a/.gitignore +++ b/.gitignore @@ -33,6 +33,7 @@ /git-check-mailmap /git-check-ref-format /git-checkout +/git-checkout--worker /git-checkout-index /git-cherry /git-cherry-pick diff --git a/Documentation/Makefile b/Documentation/Makefile index 874a01d7a86..c2baad0bd82 100644 --- a/Documentation/Makefile +++ b/Documentation/Makefile @@ -91,6 +91,7 @@ TECH_DOCS += technical/multi-pack-index TECH_DOCS += technical/pack-format TECH_DOCS += technical/pack-heuristics TECH_DOCS += technical/pack-protocol +TECH_DOCS += technical/parallel-checkout TECH_DOCS += technical/partial-clone TECH_DOCS += technical/protocol-capabilities TECH_DOCS += technical/protocol-common diff --git a/Documentation/config/checkout.txt b/Documentation/config/checkout.txt index 2cddf7b4b40..bfbca90f0e9 100644 --- a/Documentation/config/checkout.txt +++ b/Documentation/config/checkout.txt @@ -21,3 +21,24 @@ checkout.guess:: Provides the default value for the `--guess` or `--no-guess` option in `git checkout` and `git switch`. See linkgit:git-switch[1] and linkgit:git-checkout[1]. + +checkout.workers:: + The number of parallel workers to use when updating the working tree. + The default is one, i.e. sequential execution. If set to a value less + than one, Git will use as many workers as the number of logical cores + available. This setting and `checkout.thresholdForParallelism` affect + all commands that perform checkout. E.g. checkout, clone, reset, + sparse-checkout, etc. ++ +Note: parallel checkout usually delivers better performance for repositories +located on SSDs or over NFS. For repositories on spinning disks and/or machines +with a small number of cores, the default sequential checkout often performs +better. The size and compression level of a repository might also influence how +well the parallel version performs. + +checkout.thresholdForParallelism:: + When running parallel checkout with a small number of files, the cost + of subprocess spawning and inter-process communication might outweigh + the parallelization gains. This setting allows to define the minimum + number of files for which parallel checkout should be attempted. The + default is 100. diff --git a/Documentation/technical/parallel-checkout.txt b/Documentation/technical/parallel-checkout.txt new file mode 100644 index 00000000000..e790258a1ad --- /dev/null +++ b/Documentation/technical/parallel-checkout.txt @@ -0,0 +1,270 @@ +Parallel Checkout Design Notes +============================== + +The "Parallel Checkout" feature attempts to use multiple processes to +parallelize the work of uncompressing the blobs, applying in-core +filters, and writing the resulting contents to the working tree during a +checkout operation. It can be used by all checkout-related commands, +such as `clone`, `checkout`, `reset`, `sparse-checkout`, and others. + +These commands share the following basic structure: + +* Step 1: Read the current index file into memory. + +* Step 2: Modify the in-memory index based upon the command, and + temporarily mark all cache entries that need to be updated. + +* Step 3: Populate the working tree to match the new candidate index. + This includes iterating over all of the to-be-updated cache entries + and delete, create, or overwrite the associated files in the working + tree. + +* Step 4: Write the new index to disk. + +Step 3 is the focus of the "parallel checkout" effort described here. + +Sequential Implementation +------------------------- + +For the purposes of discussion here, the current sequential +implementation of Step 3 is divided in 3 parts, each one implemented in +its own function: + +* Step 3a: `unpack-trees.c:check_updates()` contains a series of + sequential loops iterating over the `cache_entry`'s array. The main + loop in this function calls the Step 3b function for each of the + to-be-updated entries. + +* Step 3b: `entry.c:checkout_entry()` examines the existing working tree + for file conflicts, collisions, and unsaved changes. It removes files + and creates leading directories as necessary. It calls the Step 3c + function for each entry to be written. + +* Step 3c: `entry.c:write_entry()` loads the blob into memory, smudges + it if necessary, creates the file in the working tree, writes the + smudged contents, calls `fstat()` or `lstat()`, and updates the + associated `cache_entry` struct with the stat information gathered. + +It wouldn't be safe to perform Step 3b in parallel, as there could be +race conditions between file creations and removals. Instead, the +parallel checkout framework lets the sequential code handle Step 3b, +and uses parallel workers to replace the sequential +`entry.c:write_entry()` calls from Step 3c. + +Rejected Multi-Threaded Solution +-------------------------------- + +The most "straightforward" implementation would be to spread the set of +to-be-updated cache entries across multiple threads. But due to the +thread-unsafe functions in the ODB code, we would have to use locks to +coordinate the parallel operation. An early prototype of this solution +showed that the multi-threaded checkout would bring performance +improvements over the sequential code, but there was still too much lock +contention. A `perf` profiling indicated that around 20% of the runtime +during a local Linux clone (on an SSD) was spent in locking functions. +For this reason this approach was rejected in favor of using multiple +child processes, which led to a better performance. + +Multi-Process Solution +---------------------- + +Parallel checkout alters the aforementioned Step 3 to use multiple +`checkout--worker` background processes to distribute the work. The +long-running worker processes are controlled by the foreground Git +command using the existing run-command API. + +Overview +~~~~~~~~ + +Step 3b is only slightly altered; for each entry to be checked out, the +main process performs the following steps: + +* M1: Check whether there is any untracked or unclean file in the + working tree which would be overwritten by this entry, and decide + whether to proceed (removing the file(s)) or not. + +* M2: Create the leading directories. + +* M3: Load the conversion attributes for the entry's path. + +* M4: Check, based on the entry's type and conversion attributes, + whether the entry is eligible for parallel checkout (more on this + later). If it is eligible, enqueue the entry and the loaded + attributes to later write the entry in parallel. If not, write the + entry right away, using the default sequential code. + +Note: we save the conversion attributes associated with each entry +because the workers don't have access to the main process' index state, +so they can't load the attributes by themselves (and the attributes are +needed to properly smudge the entry). Additionally, this has a positive +impact on performance as (1) we don't need to load the attributes twice +and (2) the attributes machinery is optimized to handle paths in +sequential order. + +After all entries have passed through the above steps, the main process +checks if the number of enqueued entries is sufficient to spread among +the workers. If not, it just writes them sequentially. Otherwise, it +spawns the workers and distributes the queued entries uniformly in +continuous chunks. This aims to minimize the chances of two workers +writing to the same directory simultaneously, which could increase lock +contention in the kernel. + +Then, for each assigned item, each worker: + +* W1: Checks if there is any non-directory file in the leading part of + the entry's path or if there already exists a file at the entry' path. + If so, mark the entry with `PC_ITEM_COLLIDED` and skip it (more on + this later). + +* W2: Creates the file (with O_CREAT and O_EXCL). + +* W3: Loads the blob into memory (inflating and delta reconstructing + it). + +* W4: Applies any required in-process filter, like end-of-line + conversion and re-encoding. + +* W5: Writes the result to the file descriptor opened at W2. + +* W6: Calls `fstat()` or lstat()` on the just-written path, and sends + the result back to the main process, together with the end status of + the operation and the item's identification number. + +Note that, when possible, steps W3 to W5 are delegated to the streaming +machinery, removing the need to keep the entire blob in memory. + +If the worker fails to read the blob or to write it to the working tree, +it removes the created file to avoid leaving empty files behind. This is +the *only* time a worker is allowed to remove a file. + +As mentioned earlier, it is the responsibility of the main process to +remove any file that blocks the checkout operation (or abort if the +removal(s) would cause data loss and the user didn't ask to `--force`). +This is crucial to avoid race conditions and also to properly detect +path collisions at Step W1. + +After the workers finish writing the items and sending back the required +information, the main process handles the results in two steps: + +- First, it updates the in-memory index with the `lstat()` information + sent by the workers. (This must be done first as this information + might me required in the following step.) + +- Then it writes the items which collided on disk (i.e. items marked + with `PC_ITEM_COLLIDED`). More on this below. + +Path Collisions +--------------- + +Path collisions happen when two different paths correspond to the same +entry in the file system. E.g. the paths 'a' and 'A' would collide in a +case-insensitive file system. + +The sequential checkout deals with collisions in the same way that it +deals with files that were already present in the working tree before +checkout. Basically, it checks if the path that it wants to write +already exists on disk, makes sure the existing file doesn't have +unsaved data, and then overwrites it. (To be more pedantic: it deletes +the existing file and creates the new one.) So, if there are multiple +colliding files to be checked out, the sequential code will write each +one of them but only the last will actually survive on disk. + +Parallel checkout aims to reproduce the same behavior. However, we +cannot let the workers racily write to the same file on disk. Instead, +the workers detect when the entry that they want to check out would +collide with an existing file, and mark it with `PC_ITEM_COLLIDED`. +Later, the main process can sequentially feed these entries back to +`checkout_entry()` without the risk of race conditions. On clone, this +also has the effect of marking the colliding entries to later emit a +warning for the user, like the classic sequential checkout does. + +The workers are able to detect both collisions among the entries being +concurrently written and collisions between a parallel-eligible entry +and an ineligible entry. The general idea for collision detection is +quite straightforward: for each parallel-eligible entry, the main +process must remove all files that prevent this entry from being written +(before enqueueing it). This includes any non-directory file in the +leading path of the entry. Later, when a worker gets assigned the entry, +it looks again for the non-directories files and for an already existing +file at the entry's path. If any of these checks finds something, the +worker knows that there was a path collision. + +Because parallel checkout can distinguish path collisions from the case +where the file was already present in the working tree before checkout, +we could alternatively choose to skip the checkout of colliding entries. +However, each entry that doesn't get written would have NULL `lstat()` +fields on the index. This could cause performance penalties for +subsequent commands that need to refresh the index, as they would have +to go to the file system to see if the entry is dirty. Thus, if we have +N entries in a colliding group and we decide to write and `lstat()` only +one of them, every subsequent `git-status` will have to read, convert, +and hash the written file N - 1 times. By checking out all colliding +entries (like the sequential code does), we only pay the overhead once, +during checkout. + +Eligible Entries for Parallel Checkout +-------------------------------------- + +As previously mentioned, not all entries passed to `checkout_entry()` +will be considered eligible for parallel checkout. More specifically, we +exclude: + +- Symbolic links; to avoid race conditions that, in combination with + path collisions, could cause workers to write files at the wrong + place. For example, if we were to concurrently check out a symlink + 'a' -> 'b' and a regular file 'A/f' in a case-insensitive file system, + we could potentially end up writing the file 'A/f' at 'a/f', due to a + race condition. + +- Regular files that require external filters (either "one shot" filters + or long-running process filters). These filters are black-boxes to Git + and may have their own internal locking or non-concurrent assumptions. + So it might not be safe to run multiple instances in parallel. ++ +Besides, long-running filters may use the delayed checkout feature to +postpone the return of some filtered blobs. The delayed checkout queue +and the parallel checkout queue are not compatible and should remain +separate. ++ +Note: regular files that only require internal filters, like end-of-line +conversion and re-encoding, are eligible for parallel checkout. + +Ineligible entries are checked out by the classic sequential codepath +*before* spawning workers. + +Note: submodules's files are also eligible for parallel checkout (as +long as they don't fall into any of the excluding categories mentioned +above). But since each submodule is checked out in its own child +process, we don't mix the superproject's and the submodules' files in +the same parallel checkout process or queue. + +The API +------- + +The parallel checkout API was designed with the goal of minimizing +changes to the current users of the checkout machinery. This means that +they don't have to call a different function for sequential or parallel +checkout. As already mentioned, `checkout_entry()` will automatically +insert the given entry in the parallel checkout queue when this feature +is enabled and the entry is eligible; otherwise, it will just write the +entry right away, using the sequential code. In general, callers of the +parallel checkout API should look similar to this: + +---------------------------------------------- +int pc_workers, pc_threshold, err = 0; +struct checkout state; + +get_parallel_checkout_configs(&pc_workers, &pc_threshold); + +/* + * This check is not strictly required, but it + * should save some time in sequential mode. + */ +if (pc_workers > 1) + init_parallel_checkout(); + +for (each cache_entry ce to-be-updated) + err |= checkout_entry(ce, &state, NULL, NULL); + +err |= run_parallel_checkout(&state, pc_workers, pc_threshold, NULL, NULL); +---------------------------------------------- diff --git a/Makefile b/Makefile index bebc734046e..93664d67146 100644 --- a/Makefile +++ b/Makefile @@ -948,6 +948,7 @@ LIB_OBJS += pack-revindex.o LIB_OBJS += pack-write.o LIB_OBJS += packfile.o LIB_OBJS += pager.o +LIB_OBJS += parallel-checkout.o LIB_OBJS += parse-options-cb.o LIB_OBJS += parse-options.o LIB_OBJS += patch-delta.o @@ -1064,6 +1065,7 @@ BUILTIN_OBJS += builtin/check-attr.o BUILTIN_OBJS += builtin/check-ignore.o BUILTIN_OBJS += builtin/check-mailmap.o BUILTIN_OBJS += builtin/check-ref-format.o +BUILTIN_OBJS += builtin/checkout--worker.o BUILTIN_OBJS += builtin/checkout-index.o BUILTIN_OBJS += builtin/checkout.o BUILTIN_OBJS += builtin/clean.o diff --git a/builtin.h b/builtin.h index b6ce981b737..16ecd5586f0 100644 --- a/builtin.h +++ b/builtin.h @@ -123,6 +123,7 @@ int cmd_bugreport(int argc, const char **argv, const char *prefix); int cmd_bundle(int argc, const char **argv, const char *prefix); int cmd_cat_file(int argc, const char **argv, const char *prefix); int cmd_checkout(int argc, const char **argv, const char *prefix); +int cmd_checkout__worker(int argc, const char **argv, const char *prefix); int cmd_checkout_index(int argc, const char **argv, const char *prefix); int cmd_check_attr(int argc, const char **argv, const char *prefix); int cmd_check_ignore(int argc, const char **argv, const char *prefix); diff --git a/builtin/checkout--worker.c b/builtin/checkout--worker.c new file mode 100644 index 00000000000..31e0de2f7ec --- /dev/null +++ b/builtin/checkout--worker.c @@ -0,0 +1,145 @@ +#include "builtin.h" +#include "config.h" +#include "entry.h" +#include "parallel-checkout.h" +#include "parse-options.h" +#include "pkt-line.h" + +static void packet_to_pc_item(const char *buffer, int len, + struct parallel_checkout_item *pc_item) +{ + const struct pc_item_fixed_portion *fixed_portion; + const char *variant; + char *encoding; + + if (len < sizeof(struct pc_item_fixed_portion)) + BUG("checkout worker received too short item (got %dB, exp %dB)", + len, (int)sizeof(struct pc_item_fixed_portion)); + + fixed_portion = (struct pc_item_fixed_portion *)buffer; + + if (len - sizeof(struct pc_item_fixed_portion) != + fixed_portion->name_len + fixed_portion->working_tree_encoding_len) + BUG("checkout worker received corrupted item"); + + variant = buffer + sizeof(struct pc_item_fixed_portion); + + /* + * Note: the main process uses zero length to communicate that the + * encoding is NULL. There is no use case that requires sending an + * actual empty string, since convert_attrs() never sets + * ca.working_tree_enconding to "". + */ + if (fixed_portion->working_tree_encoding_len) { + encoding = xmemdupz(variant, + fixed_portion->working_tree_encoding_len); + variant += fixed_portion->working_tree_encoding_len; + } else { + encoding = NULL; + } + + memset(pc_item, 0, sizeof(*pc_item)); + pc_item->ce = make_empty_transient_cache_entry(fixed_portion->name_len); + pc_item->ce->ce_namelen = fixed_portion->name_len; + pc_item->ce->ce_mode = fixed_portion->ce_mode; + memcpy(pc_item->ce->name, variant, pc_item->ce->ce_namelen); + oidcpy(&pc_item->ce->oid, &fixed_portion->oid); + + pc_item->id = fixed_portion->id; + pc_item->ca.crlf_action = fixed_portion->crlf_action; + pc_item->ca.ident = fixed_portion->ident; + pc_item->ca.working_tree_encoding = encoding; +} + +static void report_result(struct parallel_checkout_item *pc_item) +{ + struct pc_item_result res; + size_t size; + + res.id = pc_item->id; + res.status = pc_item->status; + + if (pc_item->status == PC_ITEM_WRITTEN) { + res.st = pc_item->st; + size = sizeof(res); + } else { + size = PC_ITEM_RESULT_BASE_SIZE; + } + + packet_write(1, (const char *)&res, size); +} + +/* Free the worker-side malloced data, but not pc_item itself. */ +static void release_pc_item_data(struct parallel_checkout_item *pc_item) +{ + free((char *)pc_item->ca.working_tree_encoding); + discard_cache_entry(pc_item->ce); +} + +static void worker_loop(struct checkout *state) +{ + struct parallel_checkout_item *items = NULL; + size_t i, nr = 0, alloc = 0; + + while (1) { + int len = packet_read(0, NULL, NULL, packet_buffer, + sizeof(packet_buffer), 0); + + if (len < 0) + BUG("packet_read() returned negative value"); + else if (!len) + break; + + ALLOC_GROW(items, nr + 1, alloc); + packet_to_pc_item(packet_buffer, len, &items[nr++]); + } + + for (i = 0; i < nr; i++) { + struct parallel_checkout_item *pc_item = &items[i]; + write_pc_item(pc_item, state); + report_result(pc_item); + release_pc_item_data(pc_item); + } + + packet_flush(1); + + free(items); +} + +static const char * const checkout_worker_usage[] = { + N_("git checkout--worker []"), + NULL +}; + +int cmd_checkout__worker(int argc, const char **argv, const char *prefix) +{ + struct checkout state = CHECKOUT_INIT; + struct option checkout_worker_options[] = { + OPT_STRING(0, "prefix", &state.base_dir, N_("string"), + N_("when creating files, prepend ")), + OPT_END() + }; + + if (argc == 2 && !strcmp(argv[1], "-h")) + usage_with_options(checkout_worker_usage, + checkout_worker_options); + + git_config(git_default_config, NULL); + argc = parse_options(argc, argv, prefix, checkout_worker_options, + checkout_worker_usage, 0); + if (argc > 0) + usage_with_options(checkout_worker_usage, checkout_worker_options); + + if (state.base_dir) + state.base_dir_len = strlen(state.base_dir); + + /* + * Setting this on a worker won't actually update the index. We just + * need to tell the checkout machinery to lstat() the written entries, + * so that we can send this data back to the main process. + */ + state.refresh_cache = 1; + + worker_loop(&state); + return 0; +} diff --git a/entry.c b/entry.c index 9fdc843135f..3f376b9fdf2 100644 --- a/entry.c +++ b/entry.c @@ -7,6 +7,7 @@ #include "progress.h" #include "fsmonitor.h" #include "entry.h" +#include "parallel-checkout.h" static void create_directories(const char *path, int path_len, const struct checkout *state) @@ -428,8 +429,17 @@ static void mark_colliding_entries(const struct checkout *state, for (i = 0; i < state->istate->cache_nr; i++) { struct cache_entry *dup = state->istate->cache[i]; - if (dup == ce) - break; + if (dup == ce) { + /* + * Parallel checkout doesn't create the files in index + * order. So the other side of the collision may appear + * after the given cache_entry in the array. + */ + if (parallel_checkout_status() == PC_RUNNING) + continue; + else + break; + } if (dup->ce_flags & (CE_MATCHED | CE_VALID | CE_SKIP_WORKTREE)) continue; @@ -538,6 +548,9 @@ int checkout_entry_ca(struct cache_entry *ce, struct conv_attrs *ca, ca = &ca_buf; } + if (!enqueue_checkout(ce, ca)) + return 0; + return write_entry(ce, path.buf, ca, state, 0); } diff --git a/git.c b/git.c index b53e6656713..06d681cd593 100644 --- a/git.c +++ b/git.c @@ -490,6 +490,8 @@ static struct cmd_struct commands[] = { { "check-mailmap", cmd_check_mailmap, RUN_SETUP }, { "check-ref-format", cmd_check_ref_format, NO_PARSEOPT }, { "checkout", cmd_checkout, RUN_SETUP | NEED_WORK_TREE }, + { "checkout--worker", cmd_checkout__worker, + RUN_SETUP | NEED_WORK_TREE | SUPPORT_SUPER_PREFIX }, { "checkout-index", cmd_checkout_index, RUN_SETUP | NEED_WORK_TREE}, { "cherry", cmd_cherry, RUN_SETUP }, diff --git a/parallel-checkout.c b/parallel-checkout.c new file mode 100644 index 00000000000..09e8b10a359 --- /dev/null +++ b/parallel-checkout.c @@ -0,0 +1,655 @@ +#include "cache.h" +#include "config.h" +#include "entry.h" +#include "parallel-checkout.h" +#include "pkt-line.h" +#include "progress.h" +#include "run-command.h" +#include "sigchain.h" +#include "streaming.h" +#include "thread-utils.h" + +struct pc_worker { + struct child_process cp; + size_t next_item_to_complete, nr_items_to_complete; +}; + +struct parallel_checkout { + enum pc_status status; + struct parallel_checkout_item *items; /* The parallel checkout queue. */ + size_t nr, alloc; + struct progress *progress; + unsigned int *progress_cnt; +}; + +static struct parallel_checkout parallel_checkout; + +enum pc_status parallel_checkout_status(void) +{ + return parallel_checkout.status; +} + +static const int DEFAULT_THRESHOLD_FOR_PARALLELISM = 100; +static const int DEFAULT_NUM_WORKERS = 1; + +void get_parallel_checkout_configs(int *num_workers, int *threshold) +{ + if (git_config_get_int("checkout.workers", num_workers)) + *num_workers = DEFAULT_NUM_WORKERS; + else if (*num_workers < 1) + *num_workers = online_cpus(); + + if (git_config_get_int("checkout.thresholdForParallelism", threshold)) + *threshold = DEFAULT_THRESHOLD_FOR_PARALLELISM; +} + +void init_parallel_checkout(void) +{ + if (parallel_checkout.status != PC_UNINITIALIZED) + BUG("parallel checkout already initialized"); + + parallel_checkout.status = PC_ACCEPTING_ENTRIES; +} + +static void finish_parallel_checkout(void) +{ + if (parallel_checkout.status == PC_UNINITIALIZED) + BUG("cannot finish parallel checkout: not initialized yet"); + + free(parallel_checkout.items); + memset(¶llel_checkout, 0, sizeof(parallel_checkout)); +} + +static int is_eligible_for_parallel_checkout(const struct cache_entry *ce, + const struct conv_attrs *ca) +{ + enum conv_attrs_classification c; + size_t packed_item_size; + + /* + * Symlinks cannot be checked out in parallel as, in case of path + * collision, they could racily replace leading directories of other + * entries being checked out. Submodules are checked out in child + * processes, which have their own parallel checkout queues. + */ + if (!S_ISREG(ce->ce_mode)) + return 0; + + packed_item_size = sizeof(struct pc_item_fixed_portion) + ce->ce_namelen + + (ca->working_tree_encoding ? strlen(ca->working_tree_encoding) : 0); + + /* + * The amount of data we send to the workers per checkout item is + * typically small (75~300B). So unless we find an insanely huge path + * of 64KB, we should never reach the 65KB limit of one pkt-line. If + * that does happen, we let the sequential code handle the item. + */ + if (packed_item_size > LARGE_PACKET_DATA_MAX) + return 0; + + c = classify_conv_attrs(ca); + switch (c) { + case CA_CLASS_INCORE: + return 1; + + case CA_CLASS_INCORE_FILTER: + /* + * It would be safe to allow concurrent instances of + * single-file smudge filters, like rot13, but we should not + * assume that all filters are parallel-process safe. So we + * don't allow this. + */ + return 0; + + case CA_CLASS_INCORE_PROCESS: + /* + * The parallel queue and the delayed queue are not compatible, + * so they must be kept completely separated. And we can't tell + * if a long-running process will delay its response without + * actually asking it to perform the filtering. Therefore, this + * type of filter is not allowed in parallel checkout. + * + * Furthermore, there should only be one instance of the + * long-running process filter as we don't know how it is + * managing its own concurrency. So, spreading the entries that + * requisite such a filter among the parallel workers would + * require a lot more inter-process communication. We would + * probably have to designate a single process to interact with + * the filter and send all the necessary data to it, for each + * entry. + */ + return 0; + + case CA_CLASS_STREAMABLE: + return 1; + + default: + BUG("unsupported conv_attrs classification '%d'", c); + } +} + +int enqueue_checkout(struct cache_entry *ce, struct conv_attrs *ca) +{ + struct parallel_checkout_item *pc_item; + + if (parallel_checkout.status != PC_ACCEPTING_ENTRIES || + !is_eligible_for_parallel_checkout(ce, ca)) + return -1; + + ALLOC_GROW(parallel_checkout.items, parallel_checkout.nr + 1, + parallel_checkout.alloc); + + pc_item = ¶llel_checkout.items[parallel_checkout.nr]; + pc_item->ce = ce; + memcpy(&pc_item->ca, ca, sizeof(pc_item->ca)); + pc_item->status = PC_ITEM_PENDING; + pc_item->id = parallel_checkout.nr; + parallel_checkout.nr++; + + return 0; +} + +size_t pc_queue_size(void) +{ + return parallel_checkout.nr; +} + +static void advance_progress_meter(void) +{ + if (parallel_checkout.progress) { + (*parallel_checkout.progress_cnt)++; + display_progress(parallel_checkout.progress, + *parallel_checkout.progress_cnt); + } +} + +static int handle_results(struct checkout *state) +{ + int ret = 0; + size_t i; + int have_pending = 0; + + /* + * We first update the successfully written entries with the collected + * stat() data, so that they can be found by mark_colliding_entries(), + * in the next loop, when necessary. + */ + for (i = 0; i < parallel_checkout.nr; i++) { + struct parallel_checkout_item *pc_item = ¶llel_checkout.items[i]; + if (pc_item->status == PC_ITEM_WRITTEN) + update_ce_after_write(state, pc_item->ce, &pc_item->st); + } + + for (i = 0; i < parallel_checkout.nr; i++) { + struct parallel_checkout_item *pc_item = ¶llel_checkout.items[i]; + + switch(pc_item->status) { + case PC_ITEM_WRITTEN: + /* Already handled */ + break; + case PC_ITEM_COLLIDED: + /* + * The entry could not be checked out due to a path + * collision with another entry. Since there can only + * be one entry of each colliding group on the disk, we + * could skip trying to check out this one and move on. + * However, this would leave the unwritten entries with + * null stat() fields on the index, which could + * potentially slow down subsequent operations that + * require refreshing it: git would not be able to + * trust st_size and would have to go to the filesystem + * to see if the contents match (see ie_modified()). + * + * Instead, let's pay the overhead only once, now, and + * call checkout_entry_ca() again for this file, to + * have its stat() data stored in the index. This also + * has the benefit of adding this entry and its + * colliding pair to the collision report message. + * Additionally, this overwriting behavior is consistent + * with what the sequential checkout does, so it doesn't + * add any extra overhead. + */ + ret |= checkout_entry_ca(pc_item->ce, &pc_item->ca, + state, NULL, NULL); + advance_progress_meter(); + break; + case PC_ITEM_PENDING: + have_pending = 1; + /* fall through */ + case PC_ITEM_FAILED: + ret = -1; + break; + default: + BUG("unknown checkout item status in parallel checkout"); + } + } + + if (have_pending) + error("parallel checkout finished with pending entries"); + + return ret; +} + +static int reset_fd(int fd, const char *path) +{ + if (lseek(fd, 0, SEEK_SET) != 0) + return error_errno("failed to rewind descriptor of '%s'", path); + if (ftruncate(fd, 0)) + return error_errno("failed to truncate file '%s'", path); + return 0; +} + +static int write_pc_item_to_fd(struct parallel_checkout_item *pc_item, int fd, + const char *path) +{ + int ret; + struct stream_filter *filter; + struct strbuf buf = STRBUF_INIT; + char *blob; + unsigned long size; + ssize_t wrote; + + /* Sanity check */ + assert(is_eligible_for_parallel_checkout(pc_item->ce, &pc_item->ca)); + + filter = get_stream_filter_ca(&pc_item->ca, &pc_item->ce->oid); + if (filter) { + if (stream_blob_to_fd(fd, &pc_item->ce->oid, filter, 1)) { + /* On error, reset fd to try writing without streaming */ + if (reset_fd(fd, path)) + return -1; + } else { + return 0; + } + } + + blob = read_blob_entry(pc_item->ce, &size); + if (!blob) + return error("cannot read object %s '%s'", + oid_to_hex(&pc_item->ce->oid), pc_item->ce->name); + + /* + * checkout metadata is used to give context for external process + * filters. Files requiring such filters are not eligible for parallel + * checkout, so pass NULL. Note: if that changes, the metadata must also + * be passed from the main process to the workers. + */ + ret = convert_to_working_tree_ca(&pc_item->ca, pc_item->ce->name, + blob, size, &buf, NULL); + + if (ret) { + size_t newsize; + free(blob); + blob = strbuf_detach(&buf, &newsize); + size = newsize; + } + + wrote = write_in_full(fd, blob, size); + free(blob); + if (wrote < 0) + return error("unable to write file '%s'", path); + + return 0; +} + +static int close_and_clear(int *fd) +{ + int ret = 0; + + if (*fd >= 0) { + ret = close(*fd); + *fd = -1; + } + + return ret; +} + +void write_pc_item(struct parallel_checkout_item *pc_item, + struct checkout *state) +{ + unsigned int mode = (pc_item->ce->ce_mode & 0100) ? 0777 : 0666; + int fd = -1, fstat_done = 0; + struct strbuf path = STRBUF_INIT; + const char *dir_sep; + + strbuf_add(&path, state->base_dir, state->base_dir_len); + strbuf_add(&path, pc_item->ce->name, pc_item->ce->ce_namelen); + + dir_sep = find_last_dir_sep(path.buf); + + /* + * The leading dirs should have been already created by now. But, in + * case of path collisions, one of the dirs could have been replaced by + * a symlink (checked out after we enqueued this entry for parallel + * checkout). Thus, we must check the leading dirs again. + */ + if (dir_sep && !has_dirs_only_path(path.buf, dir_sep - path.buf, + state->base_dir_len)) { + pc_item->status = PC_ITEM_COLLIDED; + goto out; + } + + fd = open(path.buf, O_WRONLY | O_CREAT | O_EXCL, mode); + + if (fd < 0) { + if (errno == EEXIST || errno == EISDIR) { + /* + * Errors which probably represent a path collision. + * Suppress the error message and mark the item to be + * retried later, sequentially. ENOTDIR and ENOENT are + * also interesting, but the above has_dirs_only_path() + * call should have already caught these cases. + */ + pc_item->status = PC_ITEM_COLLIDED; + } else { + error_errno("failed to open file '%s'", path.buf); + pc_item->status = PC_ITEM_FAILED; + } + goto out; + } + + if (write_pc_item_to_fd(pc_item, fd, path.buf)) { + /* Error was already reported. */ + pc_item->status = PC_ITEM_FAILED; + close_and_clear(&fd); + unlink(path.buf); + goto out; + } + + fstat_done = fstat_checkout_output(fd, state, &pc_item->st); + + if (close_and_clear(&fd)) { + error_errno("unable to close file '%s'", path.buf); + pc_item->status = PC_ITEM_FAILED; + goto out; + } + + if (state->refresh_cache && !fstat_done && lstat(path.buf, &pc_item->st) < 0) { + error_errno("unable to stat just-written file '%s'", path.buf); + pc_item->status = PC_ITEM_FAILED; + goto out; + } + + pc_item->status = PC_ITEM_WRITTEN; + +out: + strbuf_release(&path); +} + +static void send_one_item(int fd, struct parallel_checkout_item *pc_item) +{ + size_t len_data; + char *data, *variant; + struct pc_item_fixed_portion *fixed_portion; + const char *working_tree_encoding = pc_item->ca.working_tree_encoding; + size_t name_len = pc_item->ce->ce_namelen; + size_t working_tree_encoding_len = working_tree_encoding ? + strlen(working_tree_encoding) : 0; + + /* + * Any changes in the calculation of the message size must also be made + * in is_eligible_for_parallel_checkout(). + */ + len_data = sizeof(struct pc_item_fixed_portion) + name_len + + working_tree_encoding_len; + + data = xcalloc(1, len_data); + + fixed_portion = (struct pc_item_fixed_portion *)data; + fixed_portion->id = pc_item->id; + fixed_portion->ce_mode = pc_item->ce->ce_mode; + fixed_portion->crlf_action = pc_item->ca.crlf_action; + fixed_portion->ident = pc_item->ca.ident; + fixed_portion->name_len = name_len; + fixed_portion->working_tree_encoding_len = working_tree_encoding_len; + /* + * We use hashcpy() instead of oidcpy() because the hash[] positions + * after `the_hash_algo->rawsz` might not be initialized. And Valgrind + * would complain about passing uninitialized bytes to a syscall + * (write(2)). There is no real harm in this case, but the warning could + * hinder the detection of actual errors. + */ + hashcpy(fixed_portion->oid.hash, pc_item->ce->oid.hash); + + variant = data + sizeof(*fixed_portion); + if (working_tree_encoding_len) { + memcpy(variant, working_tree_encoding, working_tree_encoding_len); + variant += working_tree_encoding_len; + } + memcpy(variant, pc_item->ce->name, name_len); + + packet_write(fd, data, len_data); + + free(data); +} + +static void send_batch(int fd, size_t start, size_t nr) +{ + size_t i; + sigchain_push(SIGPIPE, SIG_IGN); + for (i = 0; i < nr; i++) + send_one_item(fd, ¶llel_checkout.items[start + i]); + packet_flush(fd); + sigchain_pop(SIGPIPE); +} + +static struct pc_worker *setup_workers(struct checkout *state, int num_workers) +{ + struct pc_worker *workers; + int i, workers_with_one_extra_item; + size_t base_batch_size, batch_beginning = 0; + + ALLOC_ARRAY(workers, num_workers); + + for (i = 0; i < num_workers; i++) { + struct child_process *cp = &workers[i].cp; + + child_process_init(cp); + cp->git_cmd = 1; + cp->in = -1; + cp->out = -1; + cp->clean_on_exit = 1; + strvec_push(&cp->args, "checkout--worker"); + if (state->base_dir_len) + strvec_pushf(&cp->args, "--prefix=%s", state->base_dir); + if (start_command(cp)) + die("failed to spawn checkout worker"); + } + + base_batch_size = parallel_checkout.nr / num_workers; + workers_with_one_extra_item = parallel_checkout.nr % num_workers; + + for (i = 0; i < num_workers; i++) { + struct pc_worker *worker = &workers[i]; + size_t batch_size = base_batch_size; + + /* distribute the extra work evenly */ + if (i < workers_with_one_extra_item) + batch_size++; + + send_batch(worker->cp.in, batch_beginning, batch_size); + worker->next_item_to_complete = batch_beginning; + worker->nr_items_to_complete = batch_size; + + batch_beginning += batch_size; + } + + return workers; +} + +static void finish_workers(struct pc_worker *workers, int num_workers) +{ + int i; + + /* + * Close pipes before calling finish_command() to let the workers + * exit asynchronously and avoid spending extra time on wait(). + */ + for (i = 0; i < num_workers; i++) { + struct child_process *cp = &workers[i].cp; + if (cp->in >= 0) + close(cp->in); + if (cp->out >= 0) + close(cp->out); + } + + for (i = 0; i < num_workers; i++) { + int rc = finish_command(&workers[i].cp); + if (rc > 128) { + /* + * For a normal non-zero exit, the worker should have + * already printed something useful to stderr. But a + * death by signal should be mentioned to the user. + */ + error("checkout worker %d died of signal %d", i, rc - 128); + } + } + + free(workers); +} + +static inline void assert_pc_item_result_size(int got, int exp) +{ + if (got != exp) + BUG("wrong result size from checkout worker (got %dB, exp %dB)", + got, exp); +} + +static void parse_and_save_result(const char *buffer, int len, + struct pc_worker *worker) +{ + struct pc_item_result *res; + struct parallel_checkout_item *pc_item; + struct stat *st = NULL; + + if (len < PC_ITEM_RESULT_BASE_SIZE) + BUG("too short result from checkout worker (got %dB, exp >=%dB)", + len, (int)PC_ITEM_RESULT_BASE_SIZE); + + res = (struct pc_item_result *)buffer; + + /* + * Worker should send either the full result struct on success, or + * just the base (i.e. no stat data), otherwise. + */ + if (res->status == PC_ITEM_WRITTEN) { + assert_pc_item_result_size(len, (int)sizeof(struct pc_item_result)); + st = &res->st; + } else { + assert_pc_item_result_size(len, (int)PC_ITEM_RESULT_BASE_SIZE); + } + + if (!worker->nr_items_to_complete) + BUG("received result from supposedly finished checkout worker"); + if (res->id != worker->next_item_to_complete) + BUG("unexpected item id from checkout worker (got %"PRIuMAX", exp %"PRIuMAX")", + (uintmax_t)res->id, (uintmax_t)worker->next_item_to_complete); + + worker->next_item_to_complete++; + worker->nr_items_to_complete--; + + pc_item = ¶llel_checkout.items[res->id]; + pc_item->status = res->status; + if (st) + pc_item->st = *st; + + if (res->status != PC_ITEM_COLLIDED) + advance_progress_meter(); +} + +static void gather_results_from_workers(struct pc_worker *workers, + int num_workers) +{ + int i, active_workers = num_workers; + struct pollfd *pfds; + + CALLOC_ARRAY(pfds, num_workers); + for (i = 0; i < num_workers; i++) { + pfds[i].fd = workers[i].cp.out; + pfds[i].events = POLLIN; + } + + while (active_workers) { + int nr = poll(pfds, num_workers, -1); + + if (nr < 0) { + if (errno == EINTR) + continue; + die_errno("failed to poll checkout workers"); + } + + for (i = 0; i < num_workers && nr > 0; i++) { + struct pc_worker *worker = &workers[i]; + struct pollfd *pfd = &pfds[i]; + + if (!pfd->revents) + continue; + + if (pfd->revents & POLLIN) { + int len = packet_read(pfd->fd, NULL, NULL, + packet_buffer, + sizeof(packet_buffer), 0); + + if (len < 0) { + BUG("packet_read() returned negative value"); + } else if (!len) { + pfd->fd = -1; + active_workers--; + } else { + parse_and_save_result(packet_buffer, + len, worker); + } + } else if (pfd->revents & POLLHUP) { + pfd->fd = -1; + active_workers--; + } else if (pfd->revents & (POLLNVAL | POLLERR)) { + die("error polling from checkout worker"); + } + + nr--; + } + } + + free(pfds); +} + +static void write_items_sequentially(struct checkout *state) +{ + size_t i; + + for (i = 0; i < parallel_checkout.nr; i++) { + struct parallel_checkout_item *pc_item = ¶llel_checkout.items[i]; + write_pc_item(pc_item, state); + if (pc_item->status != PC_ITEM_COLLIDED) + advance_progress_meter(); + } +} + +int run_parallel_checkout(struct checkout *state, int num_workers, int threshold, + struct progress *progress, unsigned int *progress_cnt) +{ + int ret; + + if (parallel_checkout.status != PC_ACCEPTING_ENTRIES) + BUG("cannot run parallel checkout: uninitialized or already running"); + + parallel_checkout.status = PC_RUNNING; + parallel_checkout.progress = progress; + parallel_checkout.progress_cnt = progress_cnt; + + if (parallel_checkout.nr < num_workers) + num_workers = parallel_checkout.nr; + + if (num_workers <= 1 || parallel_checkout.nr < threshold) { + write_items_sequentially(state); + } else { + struct pc_worker *workers = setup_workers(state, num_workers); + gather_results_from_workers(workers, num_workers); + finish_workers(workers, num_workers); + } + + ret = handle_results(state); + + finish_parallel_checkout(); + return ret; +} diff --git a/parallel-checkout.h b/parallel-checkout.h new file mode 100644 index 00000000000..80f539bcb77 --- /dev/null +++ b/parallel-checkout.h @@ -0,0 +1,111 @@ +#ifndef PARALLEL_CHECKOUT_H +#define PARALLEL_CHECKOUT_H + +#include "convert.h" + +struct cache_entry; +struct checkout; +struct progress; + +/**************************************************************** + * Users of parallel checkout + ****************************************************************/ + +enum pc_status { + PC_UNINITIALIZED = 0, + PC_ACCEPTING_ENTRIES, + PC_RUNNING, +}; + +enum pc_status parallel_checkout_status(void); +void get_parallel_checkout_configs(int *num_workers, int *threshold); + +/* + * Put parallel checkout into the PC_ACCEPTING_ENTRIES state. Should be used + * only when in the PC_UNINITIALIZED state. + */ +void init_parallel_checkout(void); + +/* + * Return -1 if parallel checkout is currently not accepting entries or if the + * entry is not eligible for parallel checkout. Otherwise, enqueue the entry + * for later write and return 0. + */ +int enqueue_checkout(struct cache_entry *ce, struct conv_attrs *ca); +size_t pc_queue_size(void); + +/* + * Write all the queued entries, returning 0 on success. If the number of + * entries is smaller than the specified threshold, the operation is performed + * sequentially. + */ +int run_parallel_checkout(struct checkout *state, int num_workers, int threshold, + struct progress *progress, unsigned int *progress_cnt); + +/**************************************************************** + * Interface with checkout--worker + ****************************************************************/ + +enum pc_item_status { + PC_ITEM_PENDING = 0, + PC_ITEM_WRITTEN, + /* + * The entry could not be written because there was another file + * already present in its path or leading directories. Since + * checkout_entry_ca() removes such files from the working tree before + * enqueueing the entry for parallel checkout, it means that there was + * a path collision among the entries being written. + */ + PC_ITEM_COLLIDED, + PC_ITEM_FAILED, +}; + +struct parallel_checkout_item { + /* + * In main process ce points to a istate->cache[] entry. Thus, it's not + * owned by us. In workers they own the memory, which *must be* released. + */ + struct cache_entry *ce; + struct conv_attrs ca; + size_t id; /* position in parallel_checkout.items[] of main process */ + + /* Output fields, sent from workers. */ + enum pc_item_status status; + struct stat st; +}; + +/* + * The fixed-size portion of `struct parallel_checkout_item` that is sent to the + * workers. Following this will be 2 strings: ca.working_tree_encoding and + * ce.name; These are NOT null terminated, since we have the size in the fixed + * portion. + * + * Note that not all fields of conv_attrs and cache_entry are passed, only the + * ones that will be required by the workers to smudge and write the entry. + */ +struct pc_item_fixed_portion { + size_t id; + struct object_id oid; + unsigned int ce_mode; + enum convert_crlf_action crlf_action; + int ident; + size_t working_tree_encoding_len; + size_t name_len; +}; + +/* + * The fields of `struct parallel_checkout_item` that are returned by the + * workers. Note: `st` must be the last one, as it is omitted on error. + */ +struct pc_item_result { + size_t id; + enum pc_item_status status; + struct stat st; +}; + +#define PC_ITEM_RESULT_BASE_SIZE offsetof(struct pc_item_result, st) + +void write_pc_item(struct parallel_checkout_item *pc_item, + struct checkout *state); + +#endif /* PARALLEL_CHECKOUT_H */ diff --git a/unpack-trees.c b/unpack-trees.c index 9cffa88af4a..7a1804c314a 100644 --- a/unpack-trees.c +++ b/unpack-trees.c @@ -17,6 +17,7 @@ #include "object-store.h" #include "promisor-remote.h" #include "entry.h" +#include "parallel-checkout.h" /* * Error messages expected by scripts out of plumbing commands such as @@ -398,7 +399,7 @@ static int check_updates(struct unpack_trees_options *o, int errs = 0; struct progress *progress; struct checkout state = CHECKOUT_INIT; - int i; + int i, pc_workers, pc_threshold; trace_performance_enter(); state.force = 1; @@ -441,7 +442,6 @@ static int check_updates(struct unpack_trees_options *o, if (should_update_submodules()) load_gitmodules_file(index, &state); - enable_delayed_checkout(&state); if (has_promisor_remote()) { /* * Prefetch the objects that are to be checked out in the loop @@ -464,18 +464,31 @@ static int check_updates(struct unpack_trees_options *o, to_fetch.oid, to_fetch.nr); oid_array_clear(&to_fetch); } + + get_parallel_checkout_configs(&pc_workers, &pc_threshold); + + enable_delayed_checkout(&state); + if (pc_workers > 1) + init_parallel_checkout(); for (i = 0; i < index->cache_nr; i++) { struct cache_entry *ce = index->cache[i]; if (ce->ce_flags & CE_UPDATE) { + size_t last_pc_queue_size = pc_queue_size(); + if (ce->ce_flags & CE_WT_REMOVE) BUG("both update and delete flags are set on %s", ce->name); - display_progress(progress, ++cnt); ce->ce_flags &= ~CE_UPDATE; errs |= checkout_entry(ce, &state, NULL, NULL); + + if (last_pc_queue_size == pc_queue_size()) + display_progress(progress, ++cnt); } } + if (pc_workers > 1) + errs |= run_parallel_checkout(&state, pc_workers, pc_threshold, + progress, &cnt); stop_progress(&progress); errs |= finish_delayed_checkout(&state, NULL); git_attr_set_direction(GIT_ATTR_CHECKIN);