1
0
Fork 0

Use parallel quicksort for multi-pass sorts

This commit is contained in:
Joris van Rantwijk 2022-06-27 18:16:30 +02:00
parent 8114068ca7
commit c47ff98883
1 changed files with 34 additions and 19 deletions

View File

@ -1342,16 +1342,28 @@ void quicksort_records_parallel(
} }
/** Sort the specified block of records (in-place). */ /**
* Sort the specified block of records (in-place).
*
* Optionally use a thread pool for parallel sorting.
*/
void sort_records( void sort_records(
unsigned char * buffer, unsigned char * buffer,
size_t record_size, size_t record_size,
size_t num_records) size_t num_records,
unsigned int num_threads,
ThreadPool * thread_pool)
{ {
// TODO : multi-threaded quicksort if (num_threads > 1 && thread_pool != NULL) {
quicksort_records_parallel(
// heap_sort_records(buffer, record_size, num_records); buffer,
record_size,
num_records,
num_threads,
thread_pool);
} else {
quicksort_records(buffer, record_size, num_records); quicksort_records(buffer, record_size, num_records);
}
} }
@ -1430,7 +1442,7 @@ void single_pass(
timer.stop(); timer.stop();
log(ctx, " t = %.3f seconds\n", timer.value()); log(ctx, " t = %.3f seconds\n", timer.value());
// Set up thread pool. // Create thread pool for parallel sorting.
std::unique_ptr<ThreadPool> thread_pool; std::unique_ptr<ThreadPool> thread_pool;
if (ctx.num_threads > 1) { if (ctx.num_threads > 1) {
log(ctx, "creating thread pool with %u threads\n", ctx.num_threads); log(ctx, "creating thread pool with %u threads\n", ctx.num_threads);
@ -1440,16 +1452,12 @@ void single_pass(
// Sort records in memory buffer. // Sort records in memory buffer.
log(ctx, "sorting records using %u threads\n", ctx.num_threads); log(ctx, "sorting records using %u threads\n", ctx.num_threads);
timer.start(); timer.start();
if (ctx.num_threads > 1) { sort_records(
quicksort_records_parallel(
buffer.data(), buffer.data(),
ctx.record_size, ctx.record_size,
num_records, num_records,
ctx.num_threads, ctx.num_threads,
thread_pool.get()); thread_pool.get());
} else {
quicksort_records(buffer.data(), ctx.record_size, num_records);
}
timer.stop(); timer.stop();
log(ctx, " t = %.3f seconds\n", timer.value()); log(ctx, " t = %.3f seconds\n", timer.value());
@ -1517,7 +1525,12 @@ void sort_pass(
assert(records_per_block < SIZE_MAX / record_size); assert(records_per_block < SIZE_MAX / record_size);
std::vector<unsigned char> buffer(records_per_block * record_size); std::vector<unsigned char> buffer(records_per_block * record_size);
// TODO : multi-threaded sorting with thread pool // Create thread pool for parallel sorting.
std::unique_ptr<ThreadPool> thread_pool;
if (ctx.num_threads > 1) {
log(ctx, "creating thread pool with %u threads\n", ctx.num_threads);
thread_pool.reset(new ThreadPool(ctx.num_threads));
}
// Loop over blocks to be sorted. // Loop over blocks to be sorted.
for (uint64_t block_index = 0; block_index < num_blocks; block_index++) { for (uint64_t block_index = 0; block_index < num_blocks; block_index++) {
@ -1542,7 +1555,9 @@ void sort_pass(
sort_records( sort_records(
buffer.data(), buffer.data(),
record_size, record_size,
block_num_records); block_num_records,
ctx.num_threads,
thread_pool.get());
// Write block. // Write block.
BinaryFile& output_file = BinaryFile& output_file =