1
0
Fork 0

Implement background I/O for the sorting pass

This commit is contained in:
Joris van Rantwijk 2022-06-28 20:42:36 +02:00
parent 57c20dbca3
commit 4f6b76e785
1 changed files with 182 additions and 26 deletions

View File

@ -31,6 +31,7 @@
#include <unistd.h> #include <unistd.h>
#include <algorithm> #include <algorithm>
#include <array>
#include <condition_variable> #include <condition_variable>
#include <deque> #include <deque>
#include <functional> #include <functional>
@ -327,7 +328,7 @@ public:
} }
// Wait on this future. // Wait on this future.
fut.wait(); fut.get();
} }
} }
@ -1506,6 +1507,7 @@ void single_pass(
* @param records_per_block Number of records per sort block. * @param records_per_block Number of records per sort block.
* @param num_blocks Number of sort blocks. * @param num_blocks Number of sort blocks.
* @param num_blocks_file1 Number of blocks for the first output file. * @param num_blocks_file1 Number of blocks for the first output file.
* @param io_pool Optional thread pool for background I/O.
* @param ctx Reference to context structure. * @param ctx Reference to context structure.
*/ */
void sort_pass( void sort_pass(
@ -1515,6 +1517,7 @@ void sort_pass(
uint64_t records_per_block, uint64_t records_per_block,
uint64_t num_blocks, uint64_t num_blocks,
uint64_t num_blocks_file1, uint64_t num_blocks_file1,
ThreadPool * io_pool,
const SortContext& ctx) const SortContext& ctx)
{ {
unsigned int record_size = ctx.record_size; unsigned int record_size = ctx.record_size;
@ -1526,11 +1529,6 @@ void sort_pass(
Timer timer; Timer timer;
timer.start(); timer.start();
// TODO : double-buffer with I/O in separate thread
// Allocate sort buffer.
assert(records_per_block < SIZE_MAX / record_size);
std::vector<unsigned char> buffer(records_per_block * record_size);
// Create thread pool for parallel sorting. // Create thread pool for parallel sorting.
std::unique_ptr<ThreadPool> thread_pool; std::unique_ptr<ThreadPool> thread_pool;
if (ctx.num_threads > 1) { if (ctx.num_threads > 1) {
@ -1538,12 +1536,86 @@ void sort_pass(
thread_pool.reset(new ThreadPool(ctx.num_threads)); thread_pool.reset(new ThreadPool(ctx.num_threads));
} }
assert(records_per_block < SIZE_MAX / record_size);
// Allocate sort buffer(s).
std::array<std::vector<unsigned char>, 2> buffers;
buffers[0].resize(records_per_block * record_size);
if (io_pool != NULL) {
// Allocate a second buffer for background I/O.
buffers[1].resize(records_per_block * record_size);
}
// Helper function that returns the output file for a specified block.
// This function deals with the nasty complication that the first bunch
// of blocks must be written to a separate output file.
auto output_file_chooser =
[&output_file1, &output_file2, num_blocks_file1]
(uint64_t block_index) -> BinaryFile& {
if (block_index < num_blocks_file1) {
return output_file1;
} else {
return output_file2;
}
};
// Helper function that returns the buffer for a specified block.
// This function deals with the nasty complication that we may or may not
// be using two separate buffers for alternating blocks.
auto buffer_chooser =
[io_pool, &buffers]
(uint64_t block_index) -> unsigned char * {
if (io_pool != NULL) {
// Blocks alternate between the two buffers.
return buffers[block_index % 2].data();
} else {
// All blocks use the same buffer.
return buffers[0].data();
}
};
// Helper function to read a block from file to buffer.
// This function deals with block offset calculations and with
// the nasty complication that the last block may be shorter.
auto read_block =
[records_per_block, record_size, file_size]
(BinaryFile& input_file, uint64_t block_index, unsigned char * buf) {
uint64_t block_offset =
block_index * records_per_block * record_size;
size_t block_size = records_per_block * record_size;
if (block_size > file_size - block_offset) {
block_size = file_size - block_offset;
}
input_file.read(buf, block_offset, block_size);
};
// Helper function to write a block from buffer to file.
// This function deals with block offset calculations and with
// the nasty complication that the last block may be shorter.
auto write_block =
[records_per_block, record_size, file_size]
(BinaryFile& output_file, uint64_t block_index, unsigned char * buf) {
uint64_t block_offset =
block_index * records_per_block * record_size;
size_t block_size = records_per_block * record_size;
if (block_size > file_size - block_offset) {
block_size = file_size - block_offset;
}
output_file.write(buf, block_offset, block_size);
};
// Future object to wait for background I/O.
ThreadPool::FutureType io_future;
// Loop over blocks to be sorted. // Loop over blocks to be sorted.
for (uint64_t block_index = 0; block_index < num_blocks; block_index++) { for (uint64_t block_index = 0; block_index < num_blocks; block_index++) {
uint64_t first_record_idx = block_index * records_per_block; // Determine number of records in this block
size_t block_num_records = // (the last block may be shorter).
std::min(records_per_block, num_records - first_record_idx); uint64_t block_start_idx = block_index * records_per_block;
size_t block_num_records =
(records_per_block < num_records - block_start_idx) ?
records_per_block : (num_records - block_start_idx);
log(ctx, log(ctx,
" sorting block %" PRIu64 " / %" PRIu64 ": %" PRIu64 " records\n", " sorting block %" PRIu64 " / %" PRIu64 ": %" PRIu64 " records\n",
@ -1551,27 +1623,88 @@ void sort_pass(
num_blocks, num_blocks,
block_num_records); block_num_records);
// Read block. // Choose a buffer to sort this block.
input_file.read( unsigned char * block_buffer = buffer_chooser(block_index);
buffer.data(),
first_record_idx * record_size,
block_num_records * record_size);
// Sort records in this block. // Read block via blocking I/O, if necessary.
// Use this to read all blocks when we do not use background I/O.
// Use this only for the first block when we have background I/O.
if (io_pool == NULL || block_index == 0) {
read_block(input_file, block_index, block_buffer);
}
// Handle background I/O, if necessary.
if (io_pool != NULL) {
ThreadPool::FutureType new_future;
if (block_index > 0 && block_index < num_blocks - 1) {
// Start background I/O to write the previous block, then
// read the next block when writing has finished.
new_future = io_pool->submit(
[&input_file, &output_file_chooser, &buffer_chooser,
&read_block, &write_block, block_index]
() {
write_block(
output_file_chooser(block_index - 1),
block_index - 1,
buffer_chooser(block_index - 1));
read_block(
input_file,
block_index + 1,
buffer_chooser(block_index + 1));
});
} else if (block_index < num_blocks - 1) {
// Start background I/O to read next block.
new_future = io_pool->submit(
[&input_file, &buffer_chooser, &read_block, block_index]
() {
read_block(
input_file,
block_index + 1,
buffer_chooser(block_index + 1));
});
} else if (block_index > 0) {
// Start background I/O to write previous block.
new_future = io_pool->submit(
[&output_file_chooser, &buffer_chooser, &write_block,
block_index]
() {
write_block(
output_file_chooser(block_index - 1),
block_index - 1,
buffer_chooser(block_index - 1));
});
}
// Wait for completion of previous background I/O, if any.
if (block_index > 0) {
io_future.get();
}
// Keep future for the new background I/O, if any.
io_future = std::move(new_future);
}
// Sort records in this block in-place.
sort_records( sort_records(
buffer.data(), block_buffer,
record_size, record_size,
block_num_records, block_num_records,
ctx.num_threads, ctx.num_threads,
thread_pool.get()); thread_pool.get());
// Write block. // If we use background I/O and this is the last block, wait for
BinaryFile& output_file = // any previous background write to complete.
(block_index < num_blocks_file1) ? output_file1 : output_file2; if (io_pool != NULL && block_index == num_blocks - 1) {
output_file.write( io_future.get();
buffer.data(), }
first_record_idx * record_size,
block_num_records * record_size); // Write block via blocking I/O, if necessary.
// Use this to write all blocks when we do not use background I/O.
// Use this only for the last block when we have background I/O.
if (io_pool == NULL || block_index == num_blocks - 1) {
BinaryFile& output_file = output_file_chooser(block_index);
write_block(output_file, block_index, block_buffer);
}
} }
timer.stop(); timer.stop();
@ -1696,6 +1829,8 @@ void merge_blocks(
* @param num_blocks Number of input blocks. * @param num_blocks Number of input blocks.
* @param branch_factor Number of blocks to merge per output block. * @param branch_factor Number of blocks to merge per output block.
* @param filter_dupl True to eliminate duplicate records. * @param filter_dupl True to eliminate duplicate records.
* @param read_thread_pool Optional thread pool for background read I/O.
* @param write_thread_pool Optional thread pool for background write I/O.
* @param ctx Reference to context structure. * @param ctx Reference to context structure.
*/ */
void merge_pass( void merge_pass(
@ -1703,6 +1838,8 @@ void merge_pass(
BinaryFile& output_file, BinaryFile& output_file,
const SortStrategy::MergePass& merge_pass, const SortStrategy::MergePass& merge_pass,
bool filter_dupl, bool filter_dupl,
ThreadPool * read_thread_pool,
ThreadPool * write_thread_pool,
const SortContext& ctx) const SortContext& ctx)
{ {
size_t num_blocks = merge_pass.records_per_block.size(); size_t num_blocks = merge_pass.records_per_block.size();
@ -1811,9 +1948,11 @@ SortStrategy plan_multi_pass_strategy(
const SortContext& ctx) const SortContext& ctx)
{ {
// Plan the initial sort pass. // Plan the initial sort pass.
// Use blocks that are at most half of available memory,
// so we can use two buffers to overlap I/O and sorting. // When background I/O is enabled, use blocks that are at most half of
uint64_t max_sort_block_size = ctx.memory_size / 2; // available memory so we can use two buffers to overlap I/O and sorting.
uint64_t max_sort_block_size =
ctx.flag_io_thread ? (ctx.memory_size / 2) : ctx.memory_size;
// Calculate number of records per block. // Calculate number of records per block.
// Make sure this is a multiple of the transfer alignment size. // Make sure this is a multiple of the transfer alignment size.
@ -2057,6 +2196,20 @@ void sortbin(
log(ctx, "creating temporary file\n"); log(ctx, "creating temporary file\n");
BinaryTempFile temp_file(ctx.temporary_directory, file_size); BinaryTempFile temp_file(ctx.temporary_directory, file_size);
// Create thread pools for background I/O.
std::array<std::unique_ptr<ThreadPool>, 2> io_thread_pool;
if (ctx.flag_io_thread) {
// Create separate threads for reading and writing.
// Just 1 thread per pool because
// - More than 1 write thread is useless because there is
// only one sequential output stream.
// - More than 1 read thread may increase seeking while
// in fact we want to encourage big sequential transfers.
log(ctx, "creating background I/O threads\n");
io_thread_pool[0].reset(new ThreadPool(1));
io_thread_pool[1].reset(new ThreadPool(1));
}
// The merge passes alternate between tempfile-to-outputfile and // The merge passes alternate between tempfile-to-outputfile and
// outputfile-to-tempfile. // outputfile-to-tempfile.
BinaryFile * output_or_temp_file[2] = { &output_file, &temp_file }; BinaryFile * output_or_temp_file[2] = { &output_file, &temp_file };
@ -2082,6 +2235,7 @@ void sortbin(
strategy.records_per_sort_block, strategy.records_per_sort_block,
strategy.num_sort_blocks, strategy.num_sort_blocks,
strategy.num_sort_blocks_first_merge, strategy.num_sort_blocks_first_merge,
io_thread_pool[0].get(),
ctx); ctx);
} }
@ -2117,6 +2271,8 @@ void sortbin(
*pass_output_file, *pass_output_file,
strategy.merge_pass[mp], strategy.merge_pass[mp],
filter_dupl, filter_dupl,
io_thread_pool[0].get(),
io_thread_pool[1].get(),
ctx); ctx);
} }
} }