From af0d74dc21eb4debb91f3c656bd7eb9f2534b640 Mon Sep 17 00:00:00 2001 From: Joris van Rantwijk Date: Wed, 22 Jun 2022 22:45:09 +0200 Subject: [PATCH] Implement duplicate filtering - not yet tested --- sortbin.cpp | 95 +++++++++++++++++++++++++++++++---------------------- 1 file changed, 56 insertions(+), 39 deletions(-) diff --git a/sortbin.cpp b/sortbin.cpp index a397fc0..88fb770 100644 --- a/sortbin.cpp +++ b/sortbin.cpp @@ -1224,42 +1224,42 @@ void sort_pass( * @param instream2 Input stream containing block 2. * @param output_stream Output stream for the merged block. * @param record_size Record size in bytes. - * @param filter_dupl True to eliminate duplicate records. */ void merge_2_blocks( RecordInputStream& instream1, RecordInputStream& instream2, RecordOutputStream& output_stream, - size_t record_size, - bool filter_dupl) + size_t record_size) { + // Input blocks should not be empty. + assert(!instream1.empty()); + assert(!instream2.empty()); + + const unsigned char * rec1 = instream1.record(); + const unsigned char * rec2 = instream2.record(); + // Merge until one stream runs empty. - if (!instream1.empty() && !instream2.empty()) { - const unsigned char * rec1 = instream1.record(); - const unsigned char * rec2 = instream2.record(); + while (true) { - while (true) { - -// TODO TODO : filter duplicates - - // Choose which record should go first. - if (record_compare(rec1, rec2, record_size) < 0) { - // Push record from stream 1 and load next record. - output_stream.put(rec1); - instream1.next_record(); - if (instream1.empty()) { - break; - } - rec1 = instream1.record(); - } else { - // Push record from stream 2 and load next record. - output_stream.put(rec2); - instream2.next_record(); - if (instream2.empty()) { - break; - } - rec2 = instream2.record(); + // Choose which record should go first. + if (record_compare(rec1, rec2, record_size) < 0) { + // Push record from stream 1 and load next record. + output_stream.put(rec1); + instream1.next_record(); + if (instream1.empty()) { + rec1 = NULL; + break; } + rec1 = instream1.record(); + } else { + // Push record from stream 2 and load next record. + output_stream.put(rec2); + instream2.next_record(); + if (instream2.empty()) { + rec2 = NULL; + break; + } + rec2 = instream2.record(); } } @@ -1320,28 +1320,46 @@ void merge_n_blocks( // Get the first element of each block. for (unsigned int i = 0; i < branch_factor; i++) { - if (!input_streams[i]->empty()) { - heap.emplace_back(input_streams[i]->record(), - input_streams[i].get()); - } + // Input blocks should not be empty. + assert(!input_streams[i]->empty()); + heap.emplace_back(input_streams[i]->record(), input_streams[i].get()); } // Make a heap of the first blocks. std::make_heap(heap.begin(), heap.end(), cmp_heap_elem); + // Allocate a temporary record for duplicate filtering. + std::vector temp_record(record_size); + + // The very first record can not be filtered out. + bool filter_first_pass = true; + // Keep merging until the heap runs empty. while (!heap.empty()) { -// TODO TODO : filter duplicates - // Extract the first element from the heap. const unsigned char * rec; RecordInputStream * instream; std::tie(rec, instream) = heap[0]; std::pop_heap(heap.begin(), heap.end()); - // Push this element to the output block. - output_stream.put(rec); + if (filter_dupl) { + + // Compare against previous record, only output if different. + if (filter_first_pass + || record_compare(temp_record.data(), rec, record_size) != 0) + { + output_stream.put(rec); + record_copy(temp_record.data(), rec, record_size); + } + filter_first_pass = false; + + } else { + + // No filtering, just push record to the output block. + output_stream.put(rec); + + } // Try to pull the next record from this input stream. instream->next_record(); @@ -1462,19 +1480,18 @@ void merge_pass( instream->next_record(); } - } else if (this_branch_factor == 2) { + } else if (this_branch_factor == 2 && !filter_dupl) { // Special case for merging 2 blocks. merge_2_blocks( *input_streams[0], *input_streams[1], output_stream, - ctx.record_size, - filter_dupl); + ctx.record_size); } else { - // Merge more than 2 blocks. + // Merge more than 2 blocks or filter duplicates. merge_n_blocks( input_streams, output_stream,