diff --git a/sortbin.cpp b/sortbin.cpp index 17478c1..bc2c7a5 100644 --- a/sortbin.cpp +++ b/sortbin.cpp @@ -62,6 +62,7 @@ // TODO : use a background thread for file I/O +// TODO : use fadvise to drop used data from system cache /* Maximum amount of RAM to use (in MBytes). */ @@ -498,8 +499,6 @@ public: m_bufpos += m_record_size; if (m_bufpos == m_bufend) { - m_bufpos = m_buffer.data(); - m_bufend = m_buffer.data(); refill_buffer(); } } @@ -514,13 +513,15 @@ public: assert(m_bufpos == m_bufend); uint64_t file_size = m_input_file.size(); - assert(m_block_stride < file_size - m_block_offset); + assert(m_block_offset <= file_size); - m_block_offset += m_block_stride; m_file_offset = m_block_offset; m_block_remaining = std::min(m_block_size, file_size - m_block_offset); + m_block_offset += std::min(m_block_stride, + file_size - m_block_offset); + refill_buffer(); } @@ -535,6 +536,7 @@ private: m_input_file.read(m_buffer.data(), m_file_offset, transfer_size); m_file_offset += transfer_size; m_block_remaining -= transfer_size; + m_bufpos = m_buffer.data(); m_bufend = m_buffer.data() + transfer_size; } } @@ -577,9 +579,11 @@ public: : m_output_file(output_file), m_record_size(record_size), m_file_offset(file_offset), - m_bufpos(0), m_buffer(buffer_size) - { } + { + m_bufpos = m_buffer.data(); + m_bufend = m_buffer.data() + m_buffer.size(); + } // Prevent copying and assignment. RecordOutputStream(const RecordOutputStream&) = delete; @@ -588,19 +592,28 @@ public: /** Append a record to the output stream. */ inline void put(const unsigned char *record) { - if (m_record_size > m_buffer.size() - m_bufpos) { + if (m_record_size > m_bufend - m_bufpos) { flush(); } - memcpy(m_buffer.data() + m_bufpos, record, m_record_size); + memcpy(m_bufpos, record, m_record_size); + m_bufpos += m_record_size; + } + + /** Return the current file offset. Flush before calling this function. */ + inline uint64_t file_offset() const + { + assert(m_bufpos == m_buffer.data()); + return m_file_offset; } /** Flush buffered records to the output file. */ void flush() { - if (m_bufpos > 0) { - m_output_file.write(m_buffer.data(), m_file_offset, m_bufpos); - m_file_offset += m_bufpos; - m_bufpos = 0; + size_t flush_size = m_bufpos - m_buffer.data(); + if (flush_size > 0) { + m_output_file.write(m_buffer.data(), m_file_offset, flush_size); + m_file_offset += flush_size; + m_bufpos = m_buffer.data(); } } @@ -608,7 +621,8 @@ private: BinaryFile& m_output_file; const unsigned int m_record_size; uint64_t m_file_offset; - size_t m_bufpos; + unsigned char * m_bufpos; + unsigned char * m_bufend; std::vector m_buffer; }; @@ -1343,7 +1357,7 @@ void merge_n_blocks( const unsigned char * rec; RecordInputStream * instream; std::tie(rec, instream) = heap[0]; - std::pop_heap(heap.begin(), heap.end()); + std::pop_heap(heap.begin(), heap.end(), cmp_heap_elem); if (filter_dupl) { @@ -1372,7 +1386,7 @@ void merge_n_blocks( } else { // Push next record from the stream into the heap. heap.back() = std::make_tuple(instream->record(), instream); - std::push_heap(heap.begin(), heap.end()); + std::push_heap(heap.begin(), heap.end(), cmp_heap_elem); } } } @@ -1510,6 +1524,14 @@ void merge_pass( // Flush output stream buffers. output_stream.flush(); + // Shrink output file if duplicate records were removed. + if (filter_dupl) { + uint64_t output_size = output_stream.file_offset(); + uint64_t num_output_records = output_size / ctx.record_size; + log(ctx, "found %zu unique records\n", num_output_records); + output_file.truncate_file(output_size); + } + timer.stop(); log(ctx, " t = %.3f seconds\n", timer.value()); } @@ -1740,7 +1762,7 @@ int main(int argc, char **argv) std::string tempdir = get_default_tmpdir(); int opt; - while ((opt = getopt_long(argc, argv, "uv", longopts, NULL)) != -1) { + while ((opt = getopt_long(argc, argv, "s:T:uv", longopts, NULL)) != -1) { switch (opt) { case 's': record_size = atoi(optarg);