From 4f6b76e78509dabb466ba0f834b51df4e66c425f Mon Sep 17 00:00:00 2001 From: Joris van Rantwijk Date: Tue, 28 Jun 2022 20:42:36 +0200 Subject: [PATCH] Implement background I/O for the sorting pass --- src/sortbin.cpp | 208 ++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 182 insertions(+), 26 deletions(-) diff --git a/src/sortbin.cpp b/src/sortbin.cpp index d06a128..812ec63 100644 --- a/src/sortbin.cpp +++ b/src/sortbin.cpp @@ -31,6 +31,7 @@ #include #include +#include #include #include #include @@ -327,7 +328,7 @@ public: } // Wait on this future. - fut.wait(); + fut.get(); } } @@ -1506,6 +1507,7 @@ void single_pass( * @param records_per_block Number of records per sort block. * @param num_blocks Number of sort blocks. * @param num_blocks_file1 Number of blocks for the first output file. + * @param io_pool Optional thread pool for background I/O. * @param ctx Reference to context structure. */ void sort_pass( @@ -1515,6 +1517,7 @@ void sort_pass( uint64_t records_per_block, uint64_t num_blocks, uint64_t num_blocks_file1, + ThreadPool * io_pool, const SortContext& ctx) { unsigned int record_size = ctx.record_size; @@ -1526,11 +1529,6 @@ void sort_pass( Timer timer; timer.start(); -// TODO : double-buffer with I/O in separate thread - // Allocate sort buffer. - assert(records_per_block < SIZE_MAX / record_size); - std::vector buffer(records_per_block * record_size); - // Create thread pool for parallel sorting. std::unique_ptr thread_pool; if (ctx.num_threads > 1) { @@ -1538,12 +1536,86 @@ void sort_pass( thread_pool.reset(new ThreadPool(ctx.num_threads)); } + assert(records_per_block < SIZE_MAX / record_size); + + // Allocate sort buffer(s). + std::array, 2> buffers; + buffers[0].resize(records_per_block * record_size); + if (io_pool != NULL) { + // Allocate a second buffer for background I/O. + buffers[1].resize(records_per_block * record_size); + } + + // Helper function that returns the output file for a specified block. + // This function deals with the nasty complication that the first bunch + // of blocks must be written to a separate output file. + auto output_file_chooser = + [&output_file1, &output_file2, num_blocks_file1] + (uint64_t block_index) -> BinaryFile& { + if (block_index < num_blocks_file1) { + return output_file1; + } else { + return output_file2; + } + }; + + // Helper function that returns the buffer for a specified block. + // This function deals with the nasty complication that we may or may not + // be using two separate buffers for alternating blocks. + auto buffer_chooser = + [io_pool, &buffers] + (uint64_t block_index) -> unsigned char * { + if (io_pool != NULL) { + // Blocks alternate between the two buffers. + return buffers[block_index % 2].data(); + } else { + // All blocks use the same buffer. + return buffers[0].data(); + } + }; + + // Helper function to read a block from file to buffer. + // This function deals with block offset calculations and with + // the nasty complication that the last block may be shorter. + auto read_block = + [records_per_block, record_size, file_size] + (BinaryFile& input_file, uint64_t block_index, unsigned char * buf) { + uint64_t block_offset = + block_index * records_per_block * record_size; + size_t block_size = records_per_block * record_size; + if (block_size > file_size - block_offset) { + block_size = file_size - block_offset; + } + input_file.read(buf, block_offset, block_size); + }; + + // Helper function to write a block from buffer to file. + // This function deals with block offset calculations and with + // the nasty complication that the last block may be shorter. + auto write_block = + [records_per_block, record_size, file_size] + (BinaryFile& output_file, uint64_t block_index, unsigned char * buf) { + uint64_t block_offset = + block_index * records_per_block * record_size; + size_t block_size = records_per_block * record_size; + if (block_size > file_size - block_offset) { + block_size = file_size - block_offset; + } + output_file.write(buf, block_offset, block_size); + }; + + // Future object to wait for background I/O. + ThreadPool::FutureType io_future; + // Loop over blocks to be sorted. for (uint64_t block_index = 0; block_index < num_blocks; block_index++) { - uint64_t first_record_idx = block_index * records_per_block; - size_t block_num_records = - std::min(records_per_block, num_records - first_record_idx); + // Determine number of records in this block + // (the last block may be shorter). + uint64_t block_start_idx = block_index * records_per_block; + size_t block_num_records = + (records_per_block < num_records - block_start_idx) ? + records_per_block : (num_records - block_start_idx); log(ctx, " sorting block %" PRIu64 " / %" PRIu64 ": %" PRIu64 " records\n", @@ -1551,27 +1623,88 @@ void sort_pass( num_blocks, block_num_records); - // Read block. - input_file.read( - buffer.data(), - first_record_idx * record_size, - block_num_records * record_size); + // Choose a buffer to sort this block. + unsigned char * block_buffer = buffer_chooser(block_index); - // Sort records in this block. + // Read block via blocking I/O, if necessary. + // Use this to read all blocks when we do not use background I/O. + // Use this only for the first block when we have background I/O. + if (io_pool == NULL || block_index == 0) { + read_block(input_file, block_index, block_buffer); + } + + // Handle background I/O, if necessary. + if (io_pool != NULL) { + ThreadPool::FutureType new_future; + if (block_index > 0 && block_index < num_blocks - 1) { + // Start background I/O to write the previous block, then + // read the next block when writing has finished. + new_future = io_pool->submit( + [&input_file, &output_file_chooser, &buffer_chooser, + &read_block, &write_block, block_index] + () { + write_block( + output_file_chooser(block_index - 1), + block_index - 1, + buffer_chooser(block_index - 1)); + read_block( + input_file, + block_index + 1, + buffer_chooser(block_index + 1)); + }); + } else if (block_index < num_blocks - 1) { + // Start background I/O to read next block. + new_future = io_pool->submit( + [&input_file, &buffer_chooser, &read_block, block_index] + () { + read_block( + input_file, + block_index + 1, + buffer_chooser(block_index + 1)); + }); + } else if (block_index > 0) { + // Start background I/O to write previous block. + new_future = io_pool->submit( + [&output_file_chooser, &buffer_chooser, &write_block, + block_index] + () { + write_block( + output_file_chooser(block_index - 1), + block_index - 1, + buffer_chooser(block_index - 1)); + }); + } + + // Wait for completion of previous background I/O, if any. + if (block_index > 0) { + io_future.get(); + } + + // Keep future for the new background I/O, if any. + io_future = std::move(new_future); + } + + // Sort records in this block in-place. sort_records( - buffer.data(), + block_buffer, record_size, block_num_records, ctx.num_threads, thread_pool.get()); - // Write block. - BinaryFile& output_file = - (block_index < num_blocks_file1) ? output_file1 : output_file2; - output_file.write( - buffer.data(), - first_record_idx * record_size, - block_num_records * record_size); + // If we use background I/O and this is the last block, wait for + // any previous background write to complete. + if (io_pool != NULL && block_index == num_blocks - 1) { + io_future.get(); + } + + // Write block via blocking I/O, if necessary. + // Use this to write all blocks when we do not use background I/O. + // Use this only for the last block when we have background I/O. + if (io_pool == NULL || block_index == num_blocks - 1) { + BinaryFile& output_file = output_file_chooser(block_index); + write_block(output_file, block_index, block_buffer); + } } timer.stop(); @@ -1696,6 +1829,8 @@ void merge_blocks( * @param num_blocks Number of input blocks. * @param branch_factor Number of blocks to merge per output block. * @param filter_dupl True to eliminate duplicate records. + * @param read_thread_pool Optional thread pool for background read I/O. + * @param write_thread_pool Optional thread pool for background write I/O. * @param ctx Reference to context structure. */ void merge_pass( @@ -1703,6 +1838,8 @@ void merge_pass( BinaryFile& output_file, const SortStrategy::MergePass& merge_pass, bool filter_dupl, + ThreadPool * read_thread_pool, + ThreadPool * write_thread_pool, const SortContext& ctx) { size_t num_blocks = merge_pass.records_per_block.size(); @@ -1811,9 +1948,11 @@ SortStrategy plan_multi_pass_strategy( const SortContext& ctx) { // Plan the initial sort pass. - // Use blocks that are at most half of available memory, - // so we can use two buffers to overlap I/O and sorting. - uint64_t max_sort_block_size = ctx.memory_size / 2; + + // When background I/O is enabled, use blocks that are at most half of + // available memory so we can use two buffers to overlap I/O and sorting. + uint64_t max_sort_block_size = + ctx.flag_io_thread ? (ctx.memory_size / 2) : ctx.memory_size; // Calculate number of records per block. // Make sure this is a multiple of the transfer alignment size. @@ -2057,6 +2196,20 @@ void sortbin( log(ctx, "creating temporary file\n"); BinaryTempFile temp_file(ctx.temporary_directory, file_size); + // Create thread pools for background I/O. + std::array, 2> io_thread_pool; + if (ctx.flag_io_thread) { + // Create separate threads for reading and writing. + // Just 1 thread per pool because + // - More than 1 write thread is useless because there is + // only one sequential output stream. + // - More than 1 read thread may increase seeking while + // in fact we want to encourage big sequential transfers. + log(ctx, "creating background I/O threads\n"); + io_thread_pool[0].reset(new ThreadPool(1)); + io_thread_pool[1].reset(new ThreadPool(1)); + } + // The merge passes alternate between tempfile-to-outputfile and // outputfile-to-tempfile. BinaryFile * output_or_temp_file[2] = { &output_file, &temp_file }; @@ -2082,6 +2235,7 @@ void sortbin( strategy.records_per_sort_block, strategy.num_sort_blocks, strategy.num_sort_blocks_first_merge, + io_thread_pool[0].get(), ctx); } @@ -2117,6 +2271,8 @@ void sortbin( *pass_output_file, strategy.merge_pass[mp], filter_dupl, + io_thread_pool[0].get(), + io_thread_pool[1].get(), ctx); } }