diff --git a/src/sortbin.cpp b/src/sortbin.cpp index fc7d2c1..b669640 100644 --- a/src/sortbin.cpp +++ b/src/sortbin.cpp @@ -32,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -87,15 +88,8 @@ struct SortStrategy { /** Strategy for a single merge pass. */ struct MergePass { - - /** Number of records per input block into this merge pass. */ - uint64_t records_per_input_block; - - /** Total number of input blocks into this merge pass. */ - uint64_t num_input_blocks; - - /** Number of input blocks to merge into each output block. */ - unsigned int branch_factor; + /** Number of records in each input block into this pass. */ + std::vector records_per_block; }; /** Number of records per block during the initial sort pass. */ @@ -104,6 +98,9 @@ struct SortStrategy /** Number of blocks for the initial sort pass. */ uint64_t num_sort_blocks; + /** Number of blocks that are processed during the first merge pass. */ + uint64_t num_sort_blocks_first_merge; + /** List of merge passes. */ std::vector merge_pass; }; @@ -411,10 +408,9 @@ public: /** * Read binary records from an input file with buffering. * - * The input stream reads from a sequence of discontinuous, equally spaced - * blocks in the input file. All blocks have the same size, except for - * the last block which may be shorter if it runs to the end of the file. - * Each block contains a flat array of binary records. + * The input stream reads from a sequence of discontinuous blocks in + * the input file. Each block contains a flat array of binary records. + * An explicit list of these blocks is passed to the stream constructor. * * The input stream starts in the "empty" state. * The first call to "next_block()" enables reading records from the @@ -426,6 +422,8 @@ class RecordInputStream { // TODO : double-buffering with delayed I/O via background thread public: + typedef std::vector> BlockList; + /** * Construct a record input stream. * @@ -433,12 +431,8 @@ public: * * @param input_file Input file where records read from. * @param record_size Record size in bytes. - * @param start_offset Offset in input file of first input section. - * @param block_size Size of each input block in bytes. - * Must be a multiple of "record_size". - * The last input block may be shorter if it runs - * to the end of the file. - * @param block_stride Distance between start of blocks in bytes. + * @param blocks Vector of input blocks specified as tuple + * (file_offset, number_of_records). * @param buffer_size Buffer size in bytes. * Must be a multiple of "record_size". * Note: Each RecordInputStream creates two buffers @@ -447,24 +441,18 @@ public: RecordInputStream( BinaryFile& input_file, unsigned int record_size, - uint64_t start_offset, - uint64_t block_size, - uint64_t block_stride, + BlockList&& blocks, size_t buffer_size) : m_input_file(input_file), m_record_size(record_size), - m_block_offset(start_offset), - m_block_size(block_size), - m_block_stride(block_stride), + m_next_block(0), m_block_remaining(0), m_file_offset(0), m_bufpos(NULL), m_bufend(NULL), + m_blocks(blocks), m_buffer(buffer_size) { - assert(start_offset <= input_file.size()); - assert(block_size % record_size == 0); - assert(block_size <= block_stride); assert(buffer_size % record_size == 0); assert(buffer_size > record_size); } @@ -515,16 +503,16 @@ public: void next_block() { assert(m_bufpos == m_bufend); + assert(m_next_block < m_blocks.size()); + + uint64_t num_records; + std::tie(m_file_offset, num_records) = m_blocks[m_next_block]; + m_block_remaining = num_records * m_record_size; + m_next_block++; uint64_t file_size = m_input_file.size(); - assert(m_block_offset <= file_size); - - m_file_offset = m_block_offset; - m_block_remaining = - std::min(m_block_size, file_size - m_block_offset); - - m_block_offset += std::min(m_block_stride, - file_size - m_block_offset); + assert(m_file_offset <= file_size); + assert(m_block_remaining <= file_size - m_file_offset); refill_buffer(); } @@ -547,13 +535,12 @@ private: BinaryFile& m_input_file; const unsigned int m_record_size; - uint64_t m_block_offset; - uint64_t m_block_size; - uint64_t m_block_stride; + size_t m_next_block; uint64_t m_block_remaining; uint64_t m_file_offset; unsigned char * m_bufpos; unsigned char * m_bufend; + BlockList m_blocks; std::vector m_buffer; }; @@ -1171,16 +1158,20 @@ void single_pass( * the last block in the file which may be smaller. * * @param input_file Input file. - * @param output_file Output file for this pass. + * @param output_file1 Output file for this pass. + * @param output_file2 Second output file for this pass. * @param records_per_block Number of records per sort block. * @param num_blocks Number of sort blocks. + * @param num_blocks_file1 Number of blocks for the first output file. * @param ctx Reference to context structure. */ void sort_pass( BinaryFile& input_file, - BinaryFile& output_file, + BinaryFile& output_file1, + BinaryFile& output_file2, uint64_t records_per_block, uint64_t num_blocks, + uint64_t num_blocks_file1, const SortContext& ctx) { unsigned int record_size = ctx.record_size; @@ -1207,7 +1198,7 @@ void sort_pass( std::min(records_per_block, num_records - first_record_idx); log(ctx, - "sorting block %" PRIu64 " / %" PRIu64 ": %" PRIu64 " records\n", + " sorting block %" PRIu64 " / %" PRIu64 ": %" PRIu64 " records\n", block_index, num_blocks, block_num_records); @@ -1225,6 +1216,8 @@ void sort_pass( block_num_records); // Write block. + BinaryFile& output_file = + (block_index < num_blocks_file1) ? output_file1 : output_file2; output_file.write( buffer.data(), first_record_idx * record_size, @@ -1232,72 +1225,11 @@ void sort_pass( } timer.stop(); - log(ctx, "initial sort pass finished\n"); + log(ctx, "end initial sort pass\n"); log(ctx, " t = %.3f seconds\n", timer.value()); } -/** - * Merge 2 sorted blocks of records into a single sorted block. - * - * @param instream1 Input stream containing block 1. - * @param instream2 Input stream containing block 2. - * @param output_stream Output stream for the merged block. - * @param record_size Record size in bytes. - */ -void merge_2_blocks( - RecordInputStream& instream1, - RecordInputStream& instream2, - RecordOutputStream& output_stream, - size_t record_size) -{ - // Input blocks should not be empty. - assert(!instream1.empty()); - assert(!instream2.empty()); - - const unsigned char * rec1 = instream1.record(); - const unsigned char * rec2 = instream2.record(); - - // Merge until one stream runs empty. - while (true) { - - // Choose which record should go first. - if (record_compare(rec1, rec2, record_size) < 0) { - // Push record from stream 1 and load next record. - output_stream.put(rec1); - instream1.next_record(); - if (instream1.empty()) { - rec1 = NULL; - break; - } - rec1 = instream1.record(); - } else { - // Push record from stream 2 and load next record. - output_stream.put(rec2); - instream2.next_record(); - if (instream2.empty()) { - rec2 = NULL; - break; - } - rec2 = instream2.record(); - } - } - - // At most one of the streams still has records left. - // Copy those records to the output. - - while (!instream1.empty()) { - output_stream.put(instream1.record()); - instream1.next_record(); - } - - while (!instream2.empty()) { - output_stream.put(instream2.record()); - instream2.next_record(); - } -} - - /** * Merge sorted blocks of records into a single sorted block. * @@ -1308,7 +1240,7 @@ void merge_2_blocks( * May be less than the length of input_streams. * @param filter_dupl True to eliminate duplicate records. */ -void merge_n_blocks( +void merge_blocks( std::vector>& input_streams, RecordOutputStream& output_stream, size_t record_size, @@ -1419,25 +1351,22 @@ void merge_n_blocks( void merge_pass( BinaryFile& input_file, BinaryFile& output_file, - uint64_t records_per_block, - uint64_t num_blocks, - unsigned int branch_factor, + const SortStrategy::MergePass& merge_pass, bool filter_dupl, const SortContext& ctx) { - assert(branch_factor > 1); - assert(branch_factor <= num_blocks); + size_t num_blocks = merge_pass.records_per_block.size(); // Only filter duplicates when the output is a single block. - assert((!filter_dupl) || (branch_factor == num_blocks)); + assert((!filter_dupl) || num_blocks <= ctx.branch_factor); Timer timer; timer.start(); // Calculate number of buffers: // 2 buffers per input stream + more buffers for output stream. - size_t num_output_buffers = 2 + (branch_factor - 1) / 2; - size_t num_buffers = 2 * branch_factor + num_output_buffers; + size_t num_output_buffers = 2 + (ctx.branch_factor - 1) / 2; + size_t num_buffers = 2 * ctx.branch_factor + num_output_buffers; // Calculate buffer size. // Must be a multiple of the record size and the transfer alignment. @@ -1446,21 +1375,23 @@ void merge_pass( // TODO : double-buffering with I/O in separate thread + // Prepare a list of blocks for each input stream. + std::vector stream_blocks(ctx.branch_factor); + uint64_t file_offset = 0; + for (size_t p = 0; p < num_blocks; p++) { + uint64_t num_records = merge_pass.records_per_block[p]; + unsigned int streamidx = p % ctx.branch_factor; + stream_blocks[streamidx].emplace_back(file_offset, num_records); + file_offset += num_records * ctx.record_size; + } + // Initialize input streams. std::vector> input_streams; - for (unsigned int i = 0; i < branch_factor; i++) { - uint64_t block_size = records_per_block * ctx.record_size; - uint64_t start_offset = i * block_size; - uint64_t block_stride = branch_factor * block_size; - if (start_offset >= input_file.size()) { - break; - } + for (unsigned int i = 0; i < ctx.branch_factor; i++) { input_streams.emplace_back(new RecordInputStream( input_file, ctx.record_size, - start_offset, - block_size, - block_stride, + std::move(stream_blocks[i]), buffer_size)); } @@ -1475,54 +1406,35 @@ void merge_pass( // Every group consists of "branch_factor" blocks, except the last // group which may contain fewer blocks. // Each group produces one output block. - uint64_t block_index = 0; + size_t block_index = 0; while (block_index < num_blocks) { - // Determine how many blocks will be merged in this group. - unsigned int this_branch_factor = branch_factor; - if (branch_factor > num_blocks - block_index) { - this_branch_factor = num_blocks - block_index; + // If this is the first merge pass, the last group may have + // fewer than "branch_factor" blocks. + unsigned int blocks_this_group = ctx.branch_factor; + if (blocks_this_group > num_blocks - block_index) { + blocks_this_group = num_blocks - block_index; } + // Merging a single block with itself would be dumb. + // And our strategy planner is not that dumb. + assert(blocks_this_group > 1); + // Skip to the next section of each active input stream. - for (unsigned int i = 0; i < this_branch_factor; i++) { + for (unsigned int i = 0; i < blocks_this_group; i++) { input_streams[i]->next_block(); } - if (this_branch_factor == 1) { - - // Last group contains just 1 block. - // Copy it to the output. - assert(!filter_dupl); - RecordInputStream * instream = input_streams[0].get(); - while (!instream->empty()) { - output_stream.put(instream->record()); - instream->next_record(); - } - - } else if (this_branch_factor == 2 && !filter_dupl) { - - // Special case for merging 2 blocks. - merge_2_blocks( - *input_streams[0], - *input_streams[1], - output_stream, - ctx.record_size); - - } else { - - // Merge more than 2 blocks or filter duplicates. - merge_n_blocks( - input_streams, - output_stream, - ctx.record_size, - this_branch_factor, - filter_dupl); - - } + // Merge the blocks. + merge_blocks( + input_streams, + output_stream, + ctx.record_size, + blocks_this_group, + filter_dupl); // Skip to the start of the next block group. - block_index += this_branch_factor; + block_index += blocks_this_group; } // Flush output stream buffers. @@ -1562,34 +1474,168 @@ SortStrategy plan_multi_pass_strategy( uint64_t num_records = file_size / ctx.record_size; uint64_t num_sort_blocks = 1 + (num_records - 1) / records_per_sort_block; + // A list of blocks is constructed in memory for each merge pass. + // There are several of these lists. Together they will consume + // about 40 bytes per block. If we get an insanely large input file + // with a small memory limit, this metadata could by itself consume + // too much memory. + // Let's do a sanity check to ensure that the metadata uses less than + // 25% of the memory limit. + if (num_sort_blocks >= ctx.memory_size / 4 / 40) { + throw std::logic_error( + "Not enough memory to manage the list of blocks"); + } + + // Plan the merge passes. + // + // In prinicple, every pass merges groups of "branch_factor" input blocks + // into one output block per group, thus reducing the number of remaining + // blocks by a factor "branch_factor". + // + // However, this gets more complicated if the merge tree is not perfectly + // balanced, which happens if the number of sort blocks is not a power + // of "branch_factor". In that case, the first merge pass will have + // to make things right by handling only a subset of the data. + // + // The first merge pass processes a subset of the sort blocks. + // It merges groups of "branch_factor" sort blocks into one output block + // per group. The last group in this pass may contain fewer than + // "branch_factor" sort blocks. After the first merge pass, the number + // of remaining blocks is an exact power of "branch_factor". + // The remaining blocks are in general not all the same size. + // + // After the first merge pass, each subsequent pass (if any) merges + // groups of exactly "branch_factor" blocks into one output block per + // group. These blocks are in general not all the same size. + // + // Example: + // + // branch_factor = 3 + // num_sort_blocks = 12 + // + // Sorted blocks: + // [S00] [S01] [S02] [S03] [S04] [S05] [S06] [S07] [S08] [S09] [S10] [S11] + // + // There are 3 merge passes. + // The first merge pass handles only sort blocks S00 - S04. + // The next two merge passes handle groups of exactly 3 blocks. + // + // [S00] [S01] [S02] [S03] [S04] [S05] [S06] [S07] [S08] [S09] [S10] [S11] + // | | | | | + // +-----+-----+ +--+--+ + // | | + // [S00-S02] [S03-S04] [S05] [S06] [S07] [S08] [S09] [S10] [S11] + // | | | | | | | | | + // +-----------+--------+ +-----+-----+ +-----+-----+ + // | | | + // [S00-S05] [S06-S08] [S09-S11] + // | | | + // +--------------------+-----------------+ + // | + // [S00-S11] + // + // Note that a subset of sort blocks enter into the first merge pass + // while the remaining sort blocks go directly into the second merge pass. + // (It is also possible that all sort blocks go into the first pass, + // if the merge tree is perfectly balanced.) + // + + // Determine the number of full merge passes (2nd pass and later). + unsigned int num_merge_pass = 0; + uint64_t num_merge_blocks = 1; + while (num_merge_blocks * ctx.branch_factor < num_sort_blocks) { + num_merge_blocks *= ctx.branch_factor; + num_merge_pass++; + } + + // Add a first merge pass. + num_merge_pass++; + + // Determine the number of merge groups in the first merge pass. + // The last group may have fewer than "branch_factor" input blocks. + uint64_t num_merge_ops_first_pass = + (num_sort_blocks - num_merge_blocks + (ctx.branch_factor - 1) - 1) + / (ctx.branch_factor - 1); + + // Determine the number of sort blocks to process in the first merge pass. + uint64_t num_sort_blocks_first_merge = + num_sort_blocks - num_merge_blocks + num_merge_ops_first_pass; + + assert(num_sort_blocks_first_merge <= num_sort_blocks); + SortStrategy strategy; strategy.records_per_sort_block = records_per_sort_block; strategy.num_sort_blocks = num_sort_blocks; + strategy.num_sort_blocks_first_merge = num_sort_blocks_first_merge; - // Plan the merge passes. - // Start with the result of the initial sort pass. - uint64_t records_per_block = records_per_sort_block; - uint64_t num_blocks = num_sort_blocks; - - // Keep merging until there is only one block left. - while (num_blocks > 1) { - // Calculate the number of blocks out of this merge pass. - uint64_t num_merged_blocks = 1 + (num_blocks - 1) / ctx.branch_factor; - - // Choose the smallest branch factor that produces this nr of blocks. - unsigned int branch_factor = 1 + (num_blocks - 1) / num_merged_blocks; - - SortStrategy::MergePass merge_pass; - merge_pass.records_per_input_block = records_per_block; - merge_pass.num_input_blocks = num_blocks; - merge_pass.branch_factor = branch_factor; - strategy.merge_pass.push_back(merge_pass); - - // Result of this merge pass will go into the next pass. - records_per_block *= branch_factor; - num_blocks = num_merged_blocks; + // Plan the details of each merge pass. + // + // The first merge pass handles a subset of the sort blocks. + // All of these sort blocks are the same size, except possibly the + // last block if it runs to the end of the file. + { + strategy.merge_pass.emplace_back(); + SortStrategy::MergePass& merge_pass = strategy.merge_pass.back(); + for (size_t i = 0; i < num_sort_blocks_first_merge; i++) { + uint64_t records_this_block = + std::min(records_per_sort_block, + num_records - i * records_per_sort_block); + merge_pass.records_per_block.push_back(records_this_block); + } } + // Plan the rest of the passes. + for (unsigned int mp = 1; mp < num_merge_pass; mp++) { + + strategy.merge_pass.emplace_back(); + SortStrategy::MergePass& merge_pass = strategy.merge_pass.back(); + SortStrategy::MergePass& prev_pass = *(strategy.merge_pass.end() - 2); + + uint64_t records_this_pass = 0; + + // Output from the previous pass is input into this pass. + uint64_t records_this_block = 0; + for (size_t i = 0; i < prev_pass.records_per_block.size(); i++) { + records_this_block += prev_pass.records_per_block[i]; + if ((i + 1) % ctx.branch_factor == 0) { + merge_pass.records_per_block.push_back(records_this_block); + records_this_pass += records_this_block; + records_this_block = 0; + } + } + + // The last group of the first pass may merge fewer than + // "branch_factor" blocks. + if (records_this_block > 0) { + merge_pass.records_per_block.push_back(records_this_block); + records_this_pass += records_this_block; + } + + if (mp == 1) { + // The second pass handles sort blocks that were skipped + // during the first pass. + for (size_t i = num_sort_blocks_first_merge; + i < num_sort_blocks; + i++) { + records_this_block = std::min( + records_per_sort_block, + num_records - i * records_per_sort_block); + merge_pass.records_per_block.push_back(records_this_block); + records_this_pass += records_this_block; + } + } + + // Check that number of input blocks is divisible by branch_factor. + assert(merge_pass.records_per_block.size() % ctx.branch_factor == 0); + + // Check that all records are accounted for. + assert(records_this_pass == num_records); + } + + // Double-check that the last pass produces a single output block. + SortStrategy::MergePass& last_pass = strategy.merge_pass.back(); + assert(last_pass.records_per_block.size() <= ctx.branch_factor); + return strategy; } @@ -1608,6 +1654,10 @@ void sortbin( { log(ctx, "using memory_size = %" PRIu64 " bytes\n", ctx.memory_size); + if (ctx.branch_factor < 2) { + throw std::logic_error("Invalid branch factor (must be >= 2)"); + } + // We want file I/O to occur on 4096-byte boundaries. // To ensure this, we want to do I/O on multiples of 4096 records. // To ensure this is possible, we need room for ~ 32k records per branch. @@ -1659,30 +1709,48 @@ void sortbin( // The merge passes alternate between tempfile-to-outputfile and // outputfile-to-tempfile. - // The final merge pass will be tempfile-to-outputfile. - // Depending on the number of merge passes, the initial sort pass - // will either be inputfile-to-tempfile or inputfile-to-outputfile. BinaryFile * output_or_temp_file[2] = { &output_file, &temp_file }; - BinaryFile * sort_output_file = - output_or_temp_file[num_merge_pass % 2]; - // Execute the initial sort pass. - sort_pass( - input_file, - *sort_output_file, - strategy.records_per_sort_block, - strategy.num_sort_blocks, - ctx); + // The final merge pass will be tempfile-to-outputfile. + // Depending on the number of merge passes, the first merge pass + // reads either from the output file or from the tempfile. + // + // The sort pass feeds blocks into the first merge pass, + // but may also feed blocks into the second merge pass if the + // merge tree is unbalanced. + { + BinaryFile * sort_output_first_merge_pass = + output_or_temp_file[num_merge_pass % 2]; + BinaryFile * sort_output_second_merge_pass = + output_or_temp_file[(num_merge_pass - 1) % 2]; + + // Execute the initial sort pass. + sort_pass( + input_file, + *sort_output_first_merge_pass, + *sort_output_second_merge_pass, + strategy.records_per_sort_block, + strategy.num_sort_blocks, + strategy.num_sort_blocks_first_merge, + ctx); + } // Execute the merge passes. for (unsigned int mp = 0; mp < num_merge_pass; mp++) { + const SortStrategy::MergePass& mpass = strategy.merge_pass[mp]; + size_t num_blocks = mpass.records_per_block.size(); + uint64_t num_records = std::accumulate( + mpass.records_per_block.begin(), + mpass.records_per_block.end(), + uint64_t(0)); + log(ctx, - "running merge pass %u / %u: " - "%" PRIu64 " blocks, branch factor %u\n", + "running merge pass %u / %u:" + " %zu blocks, %" PRIu64 " records\n", mp, num_merge_pass, - strategy.merge_pass[mp].num_input_blocks, - strategy.merge_pass[mp].branch_factor); + num_blocks, + num_records); // Filter duplicate records only on the last pass. bool filter_dupl = ctx.flag_unique && (mp + 1 == num_merge_pass); @@ -1697,9 +1765,7 @@ void sortbin( merge_pass( *pass_input_file, *pass_output_file, - strategy.merge_pass[mp].records_per_input_block, - strategy.merge_pass[mp].num_input_blocks, - strategy.merge_pass[mp].branch_factor, + strategy.merge_pass[mp], filter_dupl, ctx); }