1
0
Fork 0

Fix bugs in multi-pass sorting

This commit is contained in:
Joris van Rantwijk 2022-06-24 16:31:09 +02:00
parent c9ea115e68
commit c1d088a3c7
1 changed files with 38 additions and 16 deletions

View File

@ -62,6 +62,7 @@
// TODO : use a background thread for file I/O // TODO : use a background thread for file I/O
// TODO : use fadvise to drop used data from system cache
/* Maximum amount of RAM to use (in MBytes). */ /* Maximum amount of RAM to use (in MBytes). */
@ -498,8 +499,6 @@ public:
m_bufpos += m_record_size; m_bufpos += m_record_size;
if (m_bufpos == m_bufend) { if (m_bufpos == m_bufend) {
m_bufpos = m_buffer.data();
m_bufend = m_buffer.data();
refill_buffer(); refill_buffer();
} }
} }
@ -514,13 +513,15 @@ public:
assert(m_bufpos == m_bufend); assert(m_bufpos == m_bufend);
uint64_t file_size = m_input_file.size(); uint64_t file_size = m_input_file.size();
assert(m_block_stride < file_size - m_block_offset); assert(m_block_offset <= file_size);
m_block_offset += m_block_stride;
m_file_offset = m_block_offset; m_file_offset = m_block_offset;
m_block_remaining = m_block_remaining =
std::min(m_block_size, file_size - m_block_offset); std::min(m_block_size, file_size - m_block_offset);
m_block_offset += std::min(m_block_stride,
file_size - m_block_offset);
refill_buffer(); refill_buffer();
} }
@ -535,6 +536,7 @@ private:
m_input_file.read(m_buffer.data(), m_file_offset, transfer_size); m_input_file.read(m_buffer.data(), m_file_offset, transfer_size);
m_file_offset += transfer_size; m_file_offset += transfer_size;
m_block_remaining -= transfer_size; m_block_remaining -= transfer_size;
m_bufpos = m_buffer.data();
m_bufend = m_buffer.data() + transfer_size; m_bufend = m_buffer.data() + transfer_size;
} }
} }
@ -577,9 +579,11 @@ public:
: m_output_file(output_file), : m_output_file(output_file),
m_record_size(record_size), m_record_size(record_size),
m_file_offset(file_offset), m_file_offset(file_offset),
m_bufpos(0),
m_buffer(buffer_size) m_buffer(buffer_size)
{ } {
m_bufpos = m_buffer.data();
m_bufend = m_buffer.data() + m_buffer.size();
}
// Prevent copying and assignment. // Prevent copying and assignment.
RecordOutputStream(const RecordOutputStream&) = delete; RecordOutputStream(const RecordOutputStream&) = delete;
@ -588,19 +592,28 @@ public:
/** Append a record to the output stream. */ /** Append a record to the output stream. */
inline void put(const unsigned char *record) inline void put(const unsigned char *record)
{ {
if (m_record_size > m_buffer.size() - m_bufpos) { if (m_record_size > m_bufend - m_bufpos) {
flush(); flush();
} }
memcpy(m_buffer.data() + m_bufpos, record, m_record_size); memcpy(m_bufpos, record, m_record_size);
m_bufpos += m_record_size;
}
/** Return the current file offset. Flush before calling this function. */
inline uint64_t file_offset() const
{
assert(m_bufpos == m_buffer.data());
return m_file_offset;
} }
/** Flush buffered records to the output file. */ /** Flush buffered records to the output file. */
void flush() void flush()
{ {
if (m_bufpos > 0) { size_t flush_size = m_bufpos - m_buffer.data();
m_output_file.write(m_buffer.data(), m_file_offset, m_bufpos); if (flush_size > 0) {
m_file_offset += m_bufpos; m_output_file.write(m_buffer.data(), m_file_offset, flush_size);
m_bufpos = 0; m_file_offset += flush_size;
m_bufpos = m_buffer.data();
} }
} }
@ -608,7 +621,8 @@ private:
BinaryFile& m_output_file; BinaryFile& m_output_file;
const unsigned int m_record_size; const unsigned int m_record_size;
uint64_t m_file_offset; uint64_t m_file_offset;
size_t m_bufpos; unsigned char * m_bufpos;
unsigned char * m_bufend;
std::vector<unsigned char> m_buffer; std::vector<unsigned char> m_buffer;
}; };
@ -1343,7 +1357,7 @@ void merge_n_blocks(
const unsigned char * rec; const unsigned char * rec;
RecordInputStream * instream; RecordInputStream * instream;
std::tie(rec, instream) = heap[0]; std::tie(rec, instream) = heap[0];
std::pop_heap(heap.begin(), heap.end()); std::pop_heap(heap.begin(), heap.end(), cmp_heap_elem);
if (filter_dupl) { if (filter_dupl) {
@ -1372,7 +1386,7 @@ void merge_n_blocks(
} else { } else {
// Push next record from the stream into the heap. // Push next record from the stream into the heap.
heap.back() = std::make_tuple(instream->record(), instream); heap.back() = std::make_tuple(instream->record(), instream);
std::push_heap(heap.begin(), heap.end()); std::push_heap(heap.begin(), heap.end(), cmp_heap_elem);
} }
} }
} }
@ -1510,6 +1524,14 @@ void merge_pass(
// Flush output stream buffers. // Flush output stream buffers.
output_stream.flush(); output_stream.flush();
// Shrink output file if duplicate records were removed.
if (filter_dupl) {
uint64_t output_size = output_stream.file_offset();
uint64_t num_output_records = output_size / ctx.record_size;
log(ctx, "found %zu unique records\n", num_output_records);
output_file.truncate_file(output_size);
}
timer.stop(); timer.stop();
log(ctx, " t = %.3f seconds\n", timer.value()); log(ctx, " t = %.3f seconds\n", timer.value());
} }
@ -1740,7 +1762,7 @@ int main(int argc, char **argv)
std::string tempdir = get_default_tmpdir(); std::string tempdir = get_default_tmpdir();
int opt; int opt;
while ((opt = getopt_long(argc, argv, "uv", longopts, NULL)) != -1) { while ((opt = getopt_long(argc, argv, "s:T:uv", longopts, NULL)) != -1) {
switch (opt) { switch (opt) {
case 's': case 's':
record_size = atoi(optarg); record_size = atoi(optarg);