diff --git a/src/sortbin.cpp b/src/sortbin.cpp index 23935e5..2c32806 100644 --- a/src/sortbin.cpp +++ b/src/sortbin.cpp @@ -1342,16 +1342,28 @@ void quicksort_records_parallel( } -/** Sort the specified block of records (in-place). */ +/** + * Sort the specified block of records (in-place). + * + * Optionally use a thread pool for parallel sorting. + */ void sort_records( unsigned char * buffer, size_t record_size, - size_t num_records) + size_t num_records, + unsigned int num_threads, + ThreadPool * thread_pool) { - // TODO : multi-threaded quicksort - -// heap_sort_records(buffer, record_size, num_records); - quicksort_records(buffer, record_size, num_records); + if (num_threads > 1 && thread_pool != NULL) { + quicksort_records_parallel( + buffer, + record_size, + num_records, + num_threads, + thread_pool); + } else { + quicksort_records(buffer, record_size, num_records); + } } @@ -1430,7 +1442,7 @@ void single_pass( timer.stop(); log(ctx, " t = %.3f seconds\n", timer.value()); - // Set up thread pool. + // Create thread pool for parallel sorting. std::unique_ptr thread_pool; if (ctx.num_threads > 1) { log(ctx, "creating thread pool with %u threads\n", ctx.num_threads); @@ -1440,16 +1452,12 @@ void single_pass( // Sort records in memory buffer. log(ctx, "sorting records using %u threads\n", ctx.num_threads); timer.start(); - if (ctx.num_threads > 1) { - quicksort_records_parallel( - buffer.data(), - ctx.record_size, - num_records, - ctx.num_threads, - thread_pool.get()); - } else { - quicksort_records(buffer.data(), ctx.record_size, num_records); - } + sort_records( + buffer.data(), + ctx.record_size, + num_records, + ctx.num_threads, + thread_pool.get()); timer.stop(); log(ctx, " t = %.3f seconds\n", timer.value()); @@ -1517,7 +1525,12 @@ void sort_pass( assert(records_per_block < SIZE_MAX / record_size); std::vector buffer(records_per_block * record_size); -// TODO : multi-threaded sorting with thread pool + // Create thread pool for parallel sorting. + std::unique_ptr thread_pool; + if (ctx.num_threads > 1) { + log(ctx, "creating thread pool with %u threads\n", ctx.num_threads); + thread_pool.reset(new ThreadPool(ctx.num_threads)); + } // Loop over blocks to be sorted. for (uint64_t block_index = 0; block_index < num_blocks; block_index++) { @@ -1542,7 +1555,9 @@ void sort_pass( sort_records( buffer.data(), record_size, - block_num_records); + block_num_records, + ctx.num_threads, + thread_pool.get()); // Write block. BinaryFile& output_file =