From 5b2a06aabe529f694b3aa63db37010264f94f98d Mon Sep 17 00:00:00 2001 From: Joris van Rantwijk Date: Sat, 2 Jul 2022 17:50:28 +0200 Subject: [PATCH] Implement background I/O for merge passes --- src/sortbin.cpp | 260 ++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 217 insertions(+), 43 deletions(-) diff --git a/src/sortbin.cpp b/src/sortbin.cpp index 812ec63..be51437 100644 --- a/src/sortbin.cpp +++ b/src/sortbin.cpp @@ -53,6 +53,7 @@ /* Default branch factor while merging. */ #define DEFAULT_BRANCH_FACTOR 16 +#define MAX_BRANCH_FACTOR 16000 /* Default number of sorting threads. */ #define DEFAULT_THREADS 1 @@ -65,6 +66,10 @@ For efficiency, I/O should be done in multiples of 4096 bytes. */ #define TRANSFER_ALIGNMENT 4096 +/* Maximum size of I/O buffer during merge passes. + Big chunks are efficient, but more than 32 MB is probably not helpful. */ +#define MAX_MERGE_BUFFER_SIZE (32 * 1024 * 1024) + /* Template for temporary file name. Must end in 6 'X' characters. */ #define TEMPFILE_TEMPLATE "sortbin_tmpXXXXXX" @@ -584,7 +589,6 @@ public: */ class RecordInputStream { -// TODO : double-buffering with delayed I/O via background thread public: typedef std::vector> BlockList; @@ -599,14 +603,17 @@ public: * (file_offset, number_of_records). * @param buffer_size Buffer size in bytes. * Must be a multiple of "record_size". - * Note: Each RecordInputStream creates two buffers - * of the specified size. + * Note: If an "io_pool" is available, each input + * stream allocates 2 buffers of the specified size, + * otherwise just 1 buffer. + * @param io_pool Optional thread pool for background I/O. */ RecordInputStream( BinaryFile& input_file, unsigned int record_size, BlockList&& blocks, - size_t buffer_size) + size_t buffer_size, + ThreadPool * io_pool) : m_input_file(input_file), m_record_size(record_size), m_next_block(0), @@ -615,10 +622,22 @@ public: m_bufpos(NULL), m_bufend(NULL), m_blocks(blocks), - m_buffer(buffer_size) + m_buffer_size(buffer_size), + m_buffer_index(0), + m_io_pool(io_pool) { assert(buffer_size % record_size == 0); assert(buffer_size > record_size); + + // Allocate buffer(s). + m_buffers[0].resize(buffer_size); + if (io_pool != NULL) { + // Background I/O: allocate extra buffer. + m_buffers[1].resize(buffer_size); + + // Start reading in background thread. + pre_read_buffer(); + } } // Prevent copying and assignment. @@ -686,17 +705,86 @@ private: void refill_buffer() { if (m_block_remaining > 0) { - size_t transfer_size = - (m_buffer.size() < m_block_remaining) ? - m_buffer.size() : m_block_remaining; - m_input_file.read(m_buffer.data(), m_file_offset, transfer_size); + size_t transfer_size = std::min(uint64_t(m_buffer_size), + m_block_remaining); + + if (m_io_pool == NULL) { + // Use blocking I/O. + m_input_file.read( + m_buffers[0].data(), + m_file_offset, + transfer_size); + } else { + // The data we need is already on its way. + // Wait until the background transfer completes. + assert(m_future.valid()); + m_future.get(); + + // Flip to the refilled buffer. + m_buffer_index = (m_buffer_index + 1) & 1; + } + + // Update position administration. m_file_offset += transfer_size; m_block_remaining -= transfer_size; - m_bufpos = m_buffer.data(); - m_bufend = m_buffer.data() + transfer_size; + + // Update buffer pointers. + m_bufpos = m_buffers[m_buffer_index].data(); + m_bufend = m_bufpos + transfer_size; + + // Start reading next buffer in background thread. + if (m_io_pool != NULL) { + pre_read_buffer(); + } } } + /** Start reading into the next buffer. */ + void pre_read_buffer() + { + assert(m_io_pool != NULL); + + uint64_t pre_file_offset = m_file_offset; + uint64_t pre_block_remaining = m_block_remaining; + size_t pre_block_index = m_next_block; + + // If we are at the end of the current block, + // look ahead to the next block. + while (pre_block_remaining == 0) { + + if (pre_block_index >= m_blocks.size()) { + // Reached the end of the last block. + // There is nothing more to read in this stream. + return; + } + + // Look ahead to the next block. + uint64_t num_records; + std::tie(pre_file_offset, num_records) = m_blocks[pre_block_index]; + pre_block_remaining = num_records * m_record_size; + pre_block_index++; + + uint64_t file_size = m_input_file.size(); + assert(pre_file_offset <= file_size); + assert(pre_block_remaining <= file_size - pre_file_offset); + } + + // Read into the currently non-active buffer. + unsigned int pre_buffer_index = (m_buffer_index + 1) & 1; + + size_t transfer_size = std::min(uint64_t(m_buffer_size), + pre_block_remaining); + + // Start background I/O. + m_future = m_io_pool->submit( + std::bind( + &BinaryFile::read, + &m_input_file, + m_buffers[pre_buffer_index].data(), + pre_file_offset, + transfer_size)); + } + BinaryFile& m_input_file; const unsigned int m_record_size; size_t m_next_block; @@ -705,7 +793,11 @@ private: unsigned char * m_bufpos; unsigned char * m_bufend; BlockList m_blocks; - std::vector m_buffer; + size_t m_buffer_size; + unsigned int m_buffer_index; + std::array, 2> m_buffers; + ThreadPool::FutureType m_future; + ThreadPool * m_io_pool; }; @@ -714,7 +806,6 @@ private: */ class RecordOutputStream { -// TODO : double-buffering with delayed I/O via background thread public: /** * Construct a record output stream. @@ -723,21 +814,40 @@ public: * @param record_size Record size in bytes. * @param file_offset Start offset in the output file. * @param buffer_size Buffer size in bytes. - * Note: Each RecordOutputStream creates two buffers - * of the specified size. + * @param num_buffers Number of buffers of the specified size. + * Note: Without "io_pool", the output stream + * uses only 1 buffer. + * @param io_pool Optional thread pool for background I/O. */ RecordOutputStream( BinaryFile& output_file, unsigned int record_size, uint64_t file_offset, - size_t buffer_size) + size_t buffer_size, + unsigned int num_buffers, + ThreadPool * io_pool) : m_output_file(output_file), m_record_size(record_size), m_file_offset(file_offset), - m_buffer(buffer_size) + m_buffer_index(0), + m_io_pool(io_pool) { - m_bufpos = m_buffer.data(); - m_bufend = m_buffer.data() + m_buffer.size(); + assert(buffer_size > record_size); + assert(num_buffers > 0); + + if (io_pool == NULL) { + // No background I/O: Use just 1 buffer. + m_buffers.emplace_back(buffer_size); + } else { + // Allocate buffers and futures. + for (unsigned int b = 0; b < num_buffers; b++) { + m_buffers.emplace_back(buffer_size); + m_futures.emplace_back(); + } + } + + m_bufpos = m_buffers[0].data(); + m_bufend = m_bufpos + m_buffers[0].size(); } // Prevent copying and assignment. @@ -748,7 +858,7 @@ public: inline void put(const unsigned char *record) { if (m_record_size > m_bufend - m_bufpos) { - flush(); + flush_block(); } memcpy(m_bufpos, record, m_record_size); m_bufpos += m_record_size; @@ -757,28 +867,77 @@ public: /** Return the current file offset. Flush before calling this function. */ inline uint64_t file_offset() const { - assert(m_bufpos == m_buffer.data()); + // File offset is only accurate when the current buffer is empty. + assert(m_bufpos == m_buffers[m_buffer_index].data()); return m_file_offset; } /** Flush buffered records to the output file. */ void flush() { - size_t flush_size = m_bufpos - m_buffer.data(); - if (flush_size > 0) { - m_output_file.write(m_buffer.data(), m_file_offset, flush_size); - m_file_offset += flush_size; - m_bufpos = m_buffer.data(); + // Write the current block to file. + flush_block(); + + // Wait until all background I/O complete. + if (m_io_pool != NULL) { + for (ThreadPool::FutureType& fut : m_futures) { + if (fut.valid()) { + fut.get(); + } + } } } private: + /** Write the current block to file and obtain an empty buffer. */ + void flush_block() + { + unsigned char * buf_start = m_buffers[m_buffer_index].data(); + size_t flush_size = m_bufpos - buf_start; + + if (flush_size > 0) { + + // Write the current block to file. + if (m_io_pool == NULL) { + // Use blocking I/O. + m_output_file.write(buf_start, m_file_offset, flush_size); + } else { + // Use background I/O. + m_futures[m_buffer_index] = m_io_pool->submit( + std::bind( + &BinaryFile::write, + &m_output_file, + buf_start, + m_file_offset, + flush_size)); + + // Move on to the next buffer. + m_buffer_index = (m_buffer_index + 1) % m_buffers.size(); + + // Wait until the new buffer is available. + if (m_futures[m_buffer_index].valid()) { + m_futures[m_buffer_index].get(); + } + } + + // Update file offset. + m_file_offset += flush_size; + + // Update buffer pointers. + m_bufpos = m_buffers[m_buffer_index].data(); + m_bufend = m_bufpos + m_buffers[m_buffer_index].size(); + } + } + BinaryFile& m_output_file; const unsigned int m_record_size; uint64_t m_file_offset; unsigned char * m_bufpos; unsigned char * m_bufend; - std::vector m_buffer; + unsigned int m_buffer_index; + std::vector> m_buffers; + std::vector m_futures; + ThreadPool * m_io_pool; }; @@ -1395,7 +1554,7 @@ size_t filter_duplicate_records( size_t next_pos = 1; while (next_pos < num_records) { - if (memcmp(last_unique, next_record, record_size) == 0) { + if (record_compare(last_unique, next_record, record_size) == 0) { break; } last_unique = next_record; @@ -1406,10 +1565,10 @@ size_t filter_duplicate_records( // Scan the rest of the records and copy unique records. size_t num_unique = next_pos; while (next_pos < num_records) { - if (memcmp(last_unique, next_record, record_size) != 0) { + if (record_compare(last_unique, next_record, record_size) != 0) { num_unique++; last_unique += record_size; - memcpy(last_unique, next_record, record_size); + record_copy(last_unique, next_record, record_size); } next_record += record_size; next_pos++; @@ -1850,17 +2009,23 @@ void merge_pass( Timer timer; timer.start(); - // Calculate number of buffers: - // 2 buffers per input stream + more buffers for output stream. - size_t num_output_buffers = 2 + (ctx.branch_factor - 1) / 2; - size_t num_buffers = 2 * ctx.branch_factor + num_output_buffers; + // Choose number of buffers. + // Without background I/O: 1 input buffer per stream + 1 output buffer. + // With background I/O: 2 input buffers per stream + 4 output buffers. + unsigned int num_input_buf = (read_thread_pool != NULL) ? 2 : 1; + unsigned int num_output_buf = (write_thread_pool != NULL) ? 4 : 1; + unsigned int num_buffers = + ctx.branch_factor * num_input_buf + num_output_buf; // Calculate buffer size. + // Must not exceed the available memory. // Must be a multiple of the record size and the transfer alignment. - size_t buffer_size = ctx.memory_size / num_buffers; - buffer_size -= buffer_size % (TRANSFER_ALIGNMENT * ctx.record_size); - -// TODO : double-buffering with I/O in separate thread + // Should not exceed MAX_MERGE_BUFFER_SIZE. + size_t buffer_alignment = TRANSFER_ALIGNMENT * size_t(ctx.record_size); + size_t buffer_size = + std::min(ctx.memory_size / num_buffers, + std::max(buffer_alignment, size_t(MAX_MERGE_BUFFER_SIZE))); + buffer_size -= buffer_size % buffer_alignment; // Prepare a list of blocks for each input stream. std::vector stream_blocks(ctx.branch_factor); @@ -1879,7 +2044,8 @@ void merge_pass( input_file, ctx.record_size, std::move(stream_blocks[i]), - buffer_size)); + buffer_size, + read_thread_pool)); } // Initialize output stream. @@ -1887,7 +2053,9 @@ void merge_pass( output_file, ctx.record_size, 0, - buffer_size); + buffer_size, + num_output_buf, + write_thread_pool); // Loop over groups of blocks to be sorted. // Every group consists of "branch_factor" blocks, except the last @@ -2010,9 +2178,9 @@ SortStrategy plan_multi_pass_strategy( // The next two merge passes handle groups of exactly 3 blocks. // // [S00] [S01] [S02] [S03] [S04] [S05] [S06] [S07] [S08] [S09] [S10] [S11] - // | | | | | - // +-----+-----+ +--+--+ - // | | + // | | | | | | | | | | | | + // +-----+-----+ +--+--+ | | | | | | | + // | | | | | | | | | // [S00-S02] [S03-S04] [S05] [S06] [S07] [S08] [S09] [S10] [S11] // | | | | | | | | | // +-----------+--------+ +-----+-----+ +-----+-----+ @@ -2146,6 +2314,12 @@ void sortbin( if (ctx.branch_factor < 2) { throw std::logic_error("Invalid branch factor (must be >= 2)"); } + if (ctx.branch_factor > MAX_BRANCH_FACTOR) { + throw std::logic_error( + "Invalid branch factor (must be <= " + + std::to_string(MAX_BRANCH_FACTOR) + + ")"); + } // We want file I/O to occur on 4096-byte boundaries. // To ensure this, we want to do I/O on multiples of 4096 records.