diff --git a/sortbin.cpp b/sortbin.cpp index 9081bc7..a397fc0 100644 --- a/sortbin.cpp +++ b/sortbin.cpp @@ -51,6 +51,7 @@ #include #include +#include #include #include #include @@ -70,6 +71,10 @@ /* Default number of sorting threads. */ #define DEFAULT_THREADS 1 +/* Align buffer sizes and I/O on this number of records. + For efficiency, I/O should be done in multiples of 4096 bytes. */ +#define TRANSFER_ALIGNMENT 4096 + namespace { // anonymous namespace @@ -400,58 +405,59 @@ public: * Read binary records from an input file with buffering. * * The input stream reads from a sequence of discontinuous, equally spaced - * sections in the input file. All sections have the same size, except for - * the last section which may be shorter if it runs to the end of the file. - * Each section contains a flat array of binary records. + * blocks in the input file. All blocks have the same size, except for + * the last block which may be shorter if it runs to the end of the file. + * Each block contains a flat array of binary records. * - * The input stream starts in the "end_of_stream" state. - * The first call to "next_section()" enables reading records from the - * first input section. When the input stream reaches the end of the - * current section, it again enters the "end_of_stream" state until - * the next call to "next_section()". + * The input stream starts in the "empty" state. + * The first call to "next_block()" enables reading records from the + * first input block. When the input stream reaches the end of the + * current block, it again enters the "empty" state until the following + * call to "next_block()". */ class RecordInputStream { +// TODO : double-buffering with delayed I/O via background thread public: /** * Construct a record input stream. * * The stream will initially contain no input sections. * - * @param input_file Input file where records read from. - * @param record_size Record size in bytes. - * @param section_offset Offset in input file of first input section. - * @param section_size Size of each input section in bytes. - * Must be a multiple of "record_size". - * The last input section may be shorter if it - * runs to the end of the file. - * @param section_stride Distance between start of sections in bytes. - * @param buffer_size Buffer size in bytes. - * Must be a multiple of "record_size". - * Note: Each RecordInputStream creates two buffers - * of the specified size. + * @param input_file Input file where records read from. + * @param record_size Record size in bytes. + * @param start_offset Offset in input file of first input section. + * @param block_size Size of each input block in bytes. + * Must be a multiple of "record_size". + * The last input block may be shorter if it runs + * to the end of the file. + * @param block_stride Distance between start of blocks in bytes. + * @param buffer_size Buffer size in bytes. + * Must be a multiple of "record_size". + * Note: Each RecordInputStream creates two buffers + * of the specified size. */ RecordInputStream( BinaryFile& input_file, unsigned int record_size, - uint64_t section_offset, - uint64_t section_size, - uint64_t section_stride, + uint64_t start_offset, + uint64_t block_size, + uint64_t block_stride, size_t buffer_size) : m_input_file(input_file), m_record_size(record_size), - m_section_offset(section_offset), - m_section_size(section_size), - m_section_stride(section_stride), - m_section_remaining(0), + m_block_offset(start_offset), + m_block_size(block_size), + m_block_stride(block_stride), + m_block_remaining(0), m_file_offset(0), m_bufpos(NULL), m_bufend(NULL), m_buffer(buffer_size) { - assert(section_size % record_size == 0); - assert(section_size > record_size); - assert(section_size <= section_stride); + assert(start_offset <= input_file.size()); + assert(block_size % record_size == 0); + assert(block_size <= block_stride); assert(buffer_size % record_size == 0); assert(buffer_size > record_size); } @@ -460,8 +466,8 @@ public: RecordInputStream(const RecordInputStream&) = delete; RecordInputStream& operator=(const RecordInputStream&) = delete; - /** Return true if the end of the current input section is reached. */ - inline bool end_of_stream() const + /** Return true if the end of the current input block is reached. */ + inline bool empty() const { return (m_bufpos == m_bufend); } @@ -472,7 +478,7 @@ public: * This function must only be used if "end_of_stream()" returns false. * The returned pointer becomes invalid after a call to "next_record()". */ - inline const unsigned char * current_record() const + inline const unsigned char * record() const { return m_bufpos; } @@ -480,7 +486,7 @@ public: /** * Move to the next record of the current section. * - * This function must only be used if "end_of_stream()" returns false. + * This function must only be used if "empty()" returns false. * Calling this function invalidates all pointers previously returned * by "current_record()". */ @@ -499,19 +505,19 @@ public: /** * Start reading from the next input section. * - * This function may only be called if "end_of_stream()" returns true. + * This function may only be called if "empty()" returns true. */ - void next_section() + void next_block() { assert(m_bufpos == m_bufend); uint64_t file_size = m_input_file.size(); - assert(m_section_stride < file_size - m_section_offset); + assert(m_block_stride < file_size - m_block_offset); - m_section_offset += m_section_stride; - m_file_offset = m_section_offset; - m_section_remaining = - std::min(m_section_size, file_size - m_section_offset); + m_block_offset += m_block_stride; + m_file_offset = m_block_offset; + m_block_remaining = + std::min(m_block_size, file_size - m_block_offset); refill_buffer(); } @@ -520,28 +526,27 @@ private: /** Refill the buffer from the current input section. */ void refill_buffer() { - if (m_section_remaining > 0) { - size_t block_size = - (m_buffer.size() < m_section_remaining) ? - m_buffer.size() : m_section_remaining; - m_input_file.read(m_buffer.data(), m_file_offset, block_size); - m_file_offset += block_size; - m_section_remaining -= block_size; - m_bufend = m_buffer.data() + block_size; + if (m_block_remaining > 0) { + size_t transfer_size = + (m_buffer.size() < m_block_remaining) ? + m_buffer.size() : m_block_remaining; + m_input_file.read(m_buffer.data(), m_file_offset, transfer_size); + m_file_offset += transfer_size; + m_block_remaining -= transfer_size; + m_bufend = m_buffer.data() + transfer_size; } } BinaryFile& m_input_file; const unsigned int m_record_size; - uint64_t m_section_offset; - uint64_t m_section_size; - uint64_t m_section_stride; - uint64_t m_section_remaining; + uint64_t m_block_offset; + uint64_t m_block_size; + uint64_t m_block_stride; + uint64_t m_block_remaining; uint64_t m_file_offset; unsigned char * m_bufpos; unsigned char * m_bufend; std::vector m_buffer; -// TODO : double-buffering with delayed I/O via background thread }; @@ -550,6 +555,7 @@ private: */ class RecordOutputStream { +// TODO : double-buffering with delayed I/O via background thread public: /** * Construct a record output stream. @@ -602,7 +608,6 @@ private: uint64_t m_file_offset; size_t m_bufpos; std::vector m_buffer; -// TODO : double-buffering with delayed I/O via background thread }; @@ -1212,6 +1217,147 @@ void sort_pass( } +/** + * Merge 2 sorted blocks of records into a single sorted block. + * + * @param instream1 Input stream containing block 1. + * @param instream2 Input stream containing block 2. + * @param output_stream Output stream for the merged block. + * @param record_size Record size in bytes. + * @param filter_dupl True to eliminate duplicate records. + */ +void merge_2_blocks( + RecordInputStream& instream1, + RecordInputStream& instream2, + RecordOutputStream& output_stream, + size_t record_size, + bool filter_dupl) +{ + // Merge until one stream runs empty. + if (!instream1.empty() && !instream2.empty()) { + const unsigned char * rec1 = instream1.record(); + const unsigned char * rec2 = instream2.record(); + + while (true) { + +// TODO TODO : filter duplicates + + // Choose which record should go first. + if (record_compare(rec1, rec2, record_size) < 0) { + // Push record from stream 1 and load next record. + output_stream.put(rec1); + instream1.next_record(); + if (instream1.empty()) { + break; + } + rec1 = instream1.record(); + } else { + // Push record from stream 2 and load next record. + output_stream.put(rec2); + instream2.next_record(); + if (instream2.empty()) { + break; + } + rec2 = instream2.record(); + } + } + } + + // At most one of the streams still has records left. + // Copy those records to the output. + + while (!instream1.empty()) { + output_stream.put(instream1.record()); + instream1.next_record(); + } + + while (!instream2.empty()) { + output_stream.put(instream2.record()); + instream2.next_record(); + } +} + + +/** + * Merge sorted blocks of records into a single sorted block. + * + * @param input_streams One input stream for each input blocks. + * @param output_stream Output stream for the merged block. + * @param record_size Record size in bytes. + * @param branch_factor Number of input blocks. + * May be less than the length of input_streams. + * @param filter_dupl True to eliminate duplicate records. + */ +void merge_n_blocks( + std::vector>& input_streams, + RecordOutputStream& output_stream, + size_t record_size, + unsigned int branch_factor, + bool filter_dupl) +{ + assert(branch_factor > 1); + assert(branch_factor <= input_streams.size()); + + // Put the head element of each block into a heap. + // The heap will determine which block contains the element that + // should go first in the merged block. + typedef std::tuple HeapElement; + + // Function which compares records and returns true if + // record A comes after record B in sort order. + // If this function is used as the compare operator of a max-heap, + // the record that comes first in sort order will be at the top + // of the heap. + auto cmp_heap_elem = + [record_size](const HeapElement& a, const HeapElement& b) { + const unsigned char *reca = std::get<0>(a); + const unsigned char *recb = std::get<0>(b); + return record_compare(reca, recb, record_size) > 0; + }; + + // Initialize empty heap. + std::vector heap; + + // Get the first element of each block. + for (unsigned int i = 0; i < branch_factor; i++) { + if (!input_streams[i]->empty()) { + heap.emplace_back(input_streams[i]->record(), + input_streams[i].get()); + } + } + + // Make a heap of the first blocks. + std::make_heap(heap.begin(), heap.end(), cmp_heap_elem); + + // Keep merging until the heap runs empty. + while (!heap.empty()) { + +// TODO TODO : filter duplicates + + // Extract the first element from the heap. + const unsigned char * rec; + RecordInputStream * instream; + std::tie(rec, instream) = heap[0]; + std::pop_heap(heap.begin(), heap.end()); + + // Push this element to the output block. + output_stream.put(rec); + + // Try to pull the next record from this input stream. + instream->next_record(); + if (instream->empty()) { + // Stream is empty. This stream is now out of the game. + // The heap shrinks by 1 element. + heap.pop_back(); + } else { + // Push next record from the stream into the heap. + heap.back() = std::make_tuple(instream->record(), instream); + std::push_heap(heap.begin(), heap.end()); + } + } +} + + /** * Perform a merge pass of multi-pass external sorting. * @@ -1228,8 +1374,8 @@ void sort_pass( * @param output_file Output file for this pass. * @param records_per_block Number of records per input block. * @param num_blocks Number of input blocks. - * @param branch_factor Number of blocks to merge per an output block. - * @param filter_duplicates true to eliminate duplicate elements. + * @param branch_factor Number of blocks to merge per output block. + * @param filter_dupl True to eliminate duplicate records. * @param ctx Reference to context structure. */ void merge_pass( @@ -1238,23 +1384,59 @@ void merge_pass( uint64_t records_per_block, uint64_t num_blocks, unsigned int branch_factor, - bool filter_duplicates, + bool filter_dupl, const SortContext& ctx) { -// TODO TODO TODO TODO -#if 0 - // Eliminating duplicates is only supported for a single output block. - assert((!filter_duplicates) || (num_blocks <= branch_factor)); + assert(branch_factor > 1); + assert(branch_factor <= num_blocks); + + // Only filter duplicates when the output is a single block. + assert((!filter_dupl) || (branch_factor == num_blocks)); Timer timer; timer.start(); -// TODO : double-buffer with I/O in separate thread - // Allocate sort buffer. - assert(records_per_block < SIZE_MAX / record_size); - std::vector buffer(records_per_block * record_size); + // Calculate number of buffers: + // 2 buffers per input stream + buffers for output 1 stream. + size_t num_output_buffers = 2 + (branch_factor - 1) / 2; + size_t num_buffers = 2 * branch_factor + num_output_buffers; + + // Calculate buffer size. + // Must be a multiple of the record size and the transfer alignment. + size_t buffer_size = ctx.memory_size / num_buffers; + buffer_size -= buffer_size % (TRANSFER_ALIGNMENT * ctx.record_size); + +// TODO : double-buffering with I/O in separate thread + + // Initialize input streams. + std::vector> input_streams; + for (unsigned int i = 0; i < branch_factor; i++) { + uint64_t block_size = records_per_block * ctx.record_size; + uint64_t start_offset = i * block_size; + uint64_t block_stride = branch_factor * block_size; + if (start_offset >= input_file.size()) { + break; + } + input_streams.emplace_back(new RecordInputStream( + input_file, + ctx.record_size, + start_offset, + block_size, + block_stride, + buffer_size)); + } + + // Initialize output stream. + RecordOutputStream output_stream( + output_file, + ctx.record_size, + 0, + buffer_size); // Loop over groups of blocks to be sorted. + // Every group consists of "branch_factor" blocks, except the last + // group which may contain fewer blocks. + // Each group produces one output block. uint64_t block_index = 0; while (block_index < num_blocks) { @@ -1264,44 +1446,53 @@ void merge_pass( branch_factor = num_blocks - block_index; } + // Skip to the next section of each active input stream. + for (unsigned int i = 0; i < this_branch_factor; i++) { + input_streams[i]->next_block(); + } + if (this_branch_factor == 1) { + + // Last group contains just 1 block. + // Copy it to the output. + assert(!filter_dupl); + RecordInputStream * instream = input_streams[0].get(); + while (!instream->empty()) { + output_stream.put(instream->record()); + instream->next_record(); + } + + } else if (this_branch_factor == 2) { + + // Special case for merging 2 blocks. + merge_2_blocks( + *input_streams[0], + *input_streams[1], + output_stream, + ctx.record_size, + filter_dupl); + + } else { + + // Merge more than 2 blocks. + merge_n_blocks( + input_streams, + output_stream, + ctx.record_size, + this_branch_factor, + filter_dupl); + + } + + // Skip to the start of the next block group. + block_index += this_branch_factor; } - // Loop over blocks to be sorted. - for (uint64_t block_index = 0; block_index < num_blocks; block_index++) { - - uint64_t block_start_idx = block_index * records_per_block; - size_t block_num_records = - std::min(records_per_block, num_records - block_start_idx); - - log(ctx, - "sorting block %" PRIu64 " / %" PRIu64 ": %" PRIu64 " records\n", - block_index, - num_blocks, - block_num_records); - - // Read block. - input_file.read( - buffer.data(), - block_start_idx * record_size, - block_num_records * record_size); - - // Sort records in this block. - sort_records( - buffer.data(), - record_size, - block_num_records); - - // Write block. - output_file.write( - buffer.data(), - block_start_idx * record_size, - block_num_records * record_size); - } + // Flush output stream buffers. + output_stream.flush(); timer.stop(); log(ctx, " t = %.3f seconds\n", timer.value()); -#endif } @@ -1313,17 +1504,16 @@ SortStrategy plan_multi_pass_strategy( const SortContext& ctx) { // Plan the initial sort pass. - // Use blocks that are at most half of available memory - // (so we can use two buffers to overlap I/O and sorting). + // Use blocks that are at most half of available memory, + // so we can use two buffers to overlap I/O and sorting. uint64_t max_sort_block_size = ctx.memory_size / 2; // Calculate number of records per block. + // Make sure this is a multiple of the transfer alignment size. uint64_t records_per_sort_block = max_sort_block_size / ctx.record_size; + records_per_sort_block -= records_per_sort_block % TRANSFER_ALIGNMENT; - // Make sure the block size is a multiple of 4096 bytes. - records_per_sort_block -= records_per_sort_block % 4096; - - // Calculate the number of blocks during the initial sort pass. + // Calculate number of blocks during the initial sort pass. uint64_t num_records = file_size / ctx.record_size; uint64_t num_sort_blocks = 1 + (num_records - 1) / records_per_sort_block; @@ -1374,7 +1564,8 @@ void sortbin( // We want file I/O to occur on 4096-byte boundaries. // To ensure this, we want to do I/O on multiples of 4096 records. // To ensure this is possible, we need room for ~ 32k records per branch. - if (ctx.branch_factor > ctx.memory_size / ctx.record_size / 32768) { + if (ctx.memory_size / ctx.record_size / ctx.branch_factor < + 8 * TRANSFER_ALIGNMENT) { throw std::logic_error( "Not enough memory for this combination of record size" " and branch factor"); @@ -1603,7 +1794,7 @@ int main(int argc, char **argv) SortContext ctx; ctx.record_size = record_size; - ctx.memory_size = uint64_t(memory_size) * 1024 * 1024; + ctx.memory_size = size_t(memory_size) * 1024 * 1024; ctx.branch_factor = branch_factor; ctx.flag_unique = flag_unique; ctx.flag_verbose = flag_verbose;