1
0
Fork 0

Implement merge pass - not yet tested

This commit is contained in:
Joris van Rantwijk 2022-06-22 22:17:14 +02:00
parent 5b5c090bc5
commit 1da4983c7d
1 changed files with 297 additions and 106 deletions

View File

@ -51,6 +51,7 @@
#include <algorithm>
#include <iterator>
#include <memory>
#include <stdexcept>
#include <string>
#include <system_error>
@ -70,6 +71,10 @@
/* Default number of sorting threads. */
#define DEFAULT_THREADS 1
/* Align buffer sizes and I/O on this number of records.
For efficiency, I/O should be done in multiples of 4096 bytes. */
#define TRANSFER_ALIGNMENT 4096
namespace { // anonymous namespace
@ -400,18 +405,19 @@ public:
* Read binary records from an input file with buffering.
*
* The input stream reads from a sequence of discontinuous, equally spaced
* sections in the input file. All sections have the same size, except for
* the last section which may be shorter if it runs to the end of the file.
* Each section contains a flat array of binary records.
* blocks in the input file. All blocks have the same size, except for
* the last block which may be shorter if it runs to the end of the file.
* Each block contains a flat array of binary records.
*
* The input stream starts in the "end_of_stream" state.
* The first call to "next_section()" enables reading records from the
* first input section. When the input stream reaches the end of the
* current section, it again enters the "end_of_stream" state until
* the next call to "next_section()".
* The input stream starts in the "empty" state.
* The first call to "next_block()" enables reading records from the
* first input block. When the input stream reaches the end of the
* current block, it again enters the "empty" state until the following
* call to "next_block()".
*/
class RecordInputStream
{
// TODO : double-buffering with delayed I/O via background thread
public:
/**
* Construct a record input stream.
@ -420,12 +426,12 @@ public:
*
* @param input_file Input file where records read from.
* @param record_size Record size in bytes.
* @param section_offset Offset in input file of first input section.
* @param section_size Size of each input section in bytes.
* @param start_offset Offset in input file of first input section.
* @param block_size Size of each input block in bytes.
* Must be a multiple of "record_size".
* The last input section may be shorter if it
* runs to the end of the file.
* @param section_stride Distance between start of sections in bytes.
* The last input block may be shorter if it runs
* to the end of the file.
* @param block_stride Distance between start of blocks in bytes.
* @param buffer_size Buffer size in bytes.
* Must be a multiple of "record_size".
* Note: Each RecordInputStream creates two buffers
@ -434,24 +440,24 @@ public:
RecordInputStream(
BinaryFile& input_file,
unsigned int record_size,
uint64_t section_offset,
uint64_t section_size,
uint64_t section_stride,
uint64_t start_offset,
uint64_t block_size,
uint64_t block_stride,
size_t buffer_size)
: m_input_file(input_file),
m_record_size(record_size),
m_section_offset(section_offset),
m_section_size(section_size),
m_section_stride(section_stride),
m_section_remaining(0),
m_block_offset(start_offset),
m_block_size(block_size),
m_block_stride(block_stride),
m_block_remaining(0),
m_file_offset(0),
m_bufpos(NULL),
m_bufend(NULL),
m_buffer(buffer_size)
{
assert(section_size % record_size == 0);
assert(section_size > record_size);
assert(section_size <= section_stride);
assert(start_offset <= input_file.size());
assert(block_size % record_size == 0);
assert(block_size <= block_stride);
assert(buffer_size % record_size == 0);
assert(buffer_size > record_size);
}
@ -460,8 +466,8 @@ public:
RecordInputStream(const RecordInputStream&) = delete;
RecordInputStream& operator=(const RecordInputStream&) = delete;
/** Return true if the end of the current input section is reached. */
inline bool end_of_stream() const
/** Return true if the end of the current input block is reached. */
inline bool empty() const
{
return (m_bufpos == m_bufend);
}
@ -472,7 +478,7 @@ public:
* This function must only be used if "end_of_stream()" returns false.
* The returned pointer becomes invalid after a call to "next_record()".
*/
inline const unsigned char * current_record() const
inline const unsigned char * record() const
{
return m_bufpos;
}
@ -480,7 +486,7 @@ public:
/**
* Move to the next record of the current section.
*
* This function must only be used if "end_of_stream()" returns false.
* This function must only be used if "empty()" returns false.
* Calling this function invalidates all pointers previously returned
* by "current_record()".
*/
@ -499,19 +505,19 @@ public:
/**
* Start reading from the next input section.
*
* This function may only be called if "end_of_stream()" returns true.
* This function may only be called if "empty()" returns true.
*/
void next_section()
void next_block()
{
assert(m_bufpos == m_bufend);
uint64_t file_size = m_input_file.size();
assert(m_section_stride < file_size - m_section_offset);
assert(m_block_stride < file_size - m_block_offset);
m_section_offset += m_section_stride;
m_file_offset = m_section_offset;
m_section_remaining =
std::min(m_section_size, file_size - m_section_offset);
m_block_offset += m_block_stride;
m_file_offset = m_block_offset;
m_block_remaining =
std::min(m_block_size, file_size - m_block_offset);
refill_buffer();
}
@ -520,28 +526,27 @@ private:
/** Refill the buffer from the current input section. */
void refill_buffer()
{
if (m_section_remaining > 0) {
size_t block_size =
(m_buffer.size() < m_section_remaining) ?
m_buffer.size() : m_section_remaining;
m_input_file.read(m_buffer.data(), m_file_offset, block_size);
m_file_offset += block_size;
m_section_remaining -= block_size;
m_bufend = m_buffer.data() + block_size;
if (m_block_remaining > 0) {
size_t transfer_size =
(m_buffer.size() < m_block_remaining) ?
m_buffer.size() : m_block_remaining;
m_input_file.read(m_buffer.data(), m_file_offset, transfer_size);
m_file_offset += transfer_size;
m_block_remaining -= transfer_size;
m_bufend = m_buffer.data() + transfer_size;
}
}
BinaryFile& m_input_file;
const unsigned int m_record_size;
uint64_t m_section_offset;
uint64_t m_section_size;
uint64_t m_section_stride;
uint64_t m_section_remaining;
uint64_t m_block_offset;
uint64_t m_block_size;
uint64_t m_block_stride;
uint64_t m_block_remaining;
uint64_t m_file_offset;
unsigned char * m_bufpos;
unsigned char * m_bufend;
std::vector<unsigned char> m_buffer;
// TODO : double-buffering with delayed I/O via background thread
};
@ -550,6 +555,7 @@ private:
*/
class RecordOutputStream
{
// TODO : double-buffering with delayed I/O via background thread
public:
/**
* Construct a record output stream.
@ -602,7 +608,6 @@ private:
uint64_t m_file_offset;
size_t m_bufpos;
std::vector<unsigned char> m_buffer;
// TODO : double-buffering with delayed I/O via background thread
};
@ -1212,6 +1217,147 @@ void sort_pass(
}
/**
* Merge 2 sorted blocks of records into a single sorted block.
*
* @param instream1 Input stream containing block 1.
* @param instream2 Input stream containing block 2.
* @param output_stream Output stream for the merged block.
* @param record_size Record size in bytes.
* @param filter_dupl True to eliminate duplicate records.
*/
void merge_2_blocks(
RecordInputStream& instream1,
RecordInputStream& instream2,
RecordOutputStream& output_stream,
size_t record_size,
bool filter_dupl)
{
// Merge until one stream runs empty.
if (!instream1.empty() && !instream2.empty()) {
const unsigned char * rec1 = instream1.record();
const unsigned char * rec2 = instream2.record();
while (true) {
// TODO TODO : filter duplicates
// Choose which record should go first.
if (record_compare(rec1, rec2, record_size) < 0) {
// Push record from stream 1 and load next record.
output_stream.put(rec1);
instream1.next_record();
if (instream1.empty()) {
break;
}
rec1 = instream1.record();
} else {
// Push record from stream 2 and load next record.
output_stream.put(rec2);
instream2.next_record();
if (instream2.empty()) {
break;
}
rec2 = instream2.record();
}
}
}
// At most one of the streams still has records left.
// Copy those records to the output.
while (!instream1.empty()) {
output_stream.put(instream1.record());
instream1.next_record();
}
while (!instream2.empty()) {
output_stream.put(instream2.record());
instream2.next_record();
}
}
/**
* Merge sorted blocks of records into a single sorted block.
*
* @param input_streams One input stream for each input blocks.
* @param output_stream Output stream for the merged block.
* @param record_size Record size in bytes.
* @param branch_factor Number of input blocks.
* May be less than the length of input_streams.
* @param filter_dupl True to eliminate duplicate records.
*/
void merge_n_blocks(
std::vector<std::unique_ptr<RecordInputStream>>& input_streams,
RecordOutputStream& output_stream,
size_t record_size,
unsigned int branch_factor,
bool filter_dupl)
{
assert(branch_factor > 1);
assert(branch_factor <= input_streams.size());
// Put the head element of each block into a heap.
// The heap will determine which block contains the element that
// should go first in the merged block.
typedef std::tuple<const unsigned char*, RecordInputStream*> HeapElement;
// Function which compares records and returns true if
// record A comes after record B in sort order.
// If this function is used as the compare operator of a max-heap,
// the record that comes first in sort order will be at the top
// of the heap.
auto cmp_heap_elem =
[record_size](const HeapElement& a, const HeapElement& b) {
const unsigned char *reca = std::get<0>(a);
const unsigned char *recb = std::get<0>(b);
return record_compare(reca, recb, record_size) > 0;
};
// Initialize empty heap.
std::vector<HeapElement> heap;
// Get the first element of each block.
for (unsigned int i = 0; i < branch_factor; i++) {
if (!input_streams[i]->empty()) {
heap.emplace_back(input_streams[i]->record(),
input_streams[i].get());
}
}
// Make a heap of the first blocks.
std::make_heap(heap.begin(), heap.end(), cmp_heap_elem);
// Keep merging until the heap runs empty.
while (!heap.empty()) {
// TODO TODO : filter duplicates
// Extract the first element from the heap.
const unsigned char * rec;
RecordInputStream * instream;
std::tie(rec, instream) = heap[0];
std::pop_heap(heap.begin(), heap.end());
// Push this element to the output block.
output_stream.put(rec);
// Try to pull the next record from this input stream.
instream->next_record();
if (instream->empty()) {
// Stream is empty. This stream is now out of the game.
// The heap shrinks by 1 element.
heap.pop_back();
} else {
// Push next record from the stream into the heap.
heap.back() = std::make_tuple(instream->record(), instream);
std::push_heap(heap.begin(), heap.end());
}
}
}
/**
* Perform a merge pass of multi-pass external sorting.
*
@ -1228,8 +1374,8 @@ void sort_pass(
* @param output_file Output file for this pass.
* @param records_per_block Number of records per input block.
* @param num_blocks Number of input blocks.
* @param branch_factor Number of blocks to merge per an output block.
* @param filter_duplicates true to eliminate duplicate elements.
* @param branch_factor Number of blocks to merge per output block.
* @param filter_dupl True to eliminate duplicate records.
* @param ctx Reference to context structure.
*/
void merge_pass(
@ -1238,23 +1384,59 @@ void merge_pass(
uint64_t records_per_block,
uint64_t num_blocks,
unsigned int branch_factor,
bool filter_duplicates,
bool filter_dupl,
const SortContext& ctx)
{
// TODO TODO TODO TODO
#if 0
// Eliminating duplicates is only supported for a single output block.
assert((!filter_duplicates) || (num_blocks <= branch_factor));
assert(branch_factor > 1);
assert(branch_factor <= num_blocks);
// Only filter duplicates when the output is a single block.
assert((!filter_dupl) || (branch_factor == num_blocks));
Timer timer;
timer.start();
// TODO : double-buffer with I/O in separate thread
// Allocate sort buffer.
assert(records_per_block < SIZE_MAX / record_size);
std::vector<unsigned char> buffer(records_per_block * record_size);
// Calculate number of buffers:
// 2 buffers per input stream + buffers for output 1 stream.
size_t num_output_buffers = 2 + (branch_factor - 1) / 2;
size_t num_buffers = 2 * branch_factor + num_output_buffers;
// Calculate buffer size.
// Must be a multiple of the record size and the transfer alignment.
size_t buffer_size = ctx.memory_size / num_buffers;
buffer_size -= buffer_size % (TRANSFER_ALIGNMENT * ctx.record_size);
// TODO : double-buffering with I/O in separate thread
// Initialize input streams.
std::vector<std::unique_ptr<RecordInputStream>> input_streams;
for (unsigned int i = 0; i < branch_factor; i++) {
uint64_t block_size = records_per_block * ctx.record_size;
uint64_t start_offset = i * block_size;
uint64_t block_stride = branch_factor * block_size;
if (start_offset >= input_file.size()) {
break;
}
input_streams.emplace_back(new RecordInputStream(
input_file,
ctx.record_size,
start_offset,
block_size,
block_stride,
buffer_size));
}
// Initialize output stream.
RecordOutputStream output_stream(
output_file,
ctx.record_size,
0,
buffer_size);
// Loop over groups of blocks to be sorted.
// Every group consists of "branch_factor" blocks, except the last
// group which may contain fewer blocks.
// Each group produces one output block.
uint64_t block_index = 0;
while (block_index < num_blocks) {
@ -1264,44 +1446,53 @@ void merge_pass(
branch_factor = num_blocks - block_index;
}
// Skip to the next section of each active input stream.
for (unsigned int i = 0; i < this_branch_factor; i++) {
input_streams[i]->next_block();
}
if (this_branch_factor == 1) {
// Last group contains just 1 block.
// Copy it to the output.
assert(!filter_dupl);
RecordInputStream * instream = input_streams[0].get();
while (!instream->empty()) {
output_stream.put(instream->record());
instream->next_record();
}
} else if (this_branch_factor == 2) {
// Special case for merging 2 blocks.
merge_2_blocks(
*input_streams[0],
*input_streams[1],
output_stream,
ctx.record_size,
filter_dupl);
} else {
// Merge more than 2 blocks.
merge_n_blocks(
input_streams,
output_stream,
ctx.record_size,
this_branch_factor,
filter_dupl);
}
// Loop over blocks to be sorted.
for (uint64_t block_index = 0; block_index < num_blocks; block_index++) {
uint64_t block_start_idx = block_index * records_per_block;
size_t block_num_records =
std::min(records_per_block, num_records - block_start_idx);
log(ctx,
"sorting block %" PRIu64 " / %" PRIu64 ": %" PRIu64 " records\n",
block_index,
num_blocks,
block_num_records);
// Read block.
input_file.read(
buffer.data(),
block_start_idx * record_size,
block_num_records * record_size);
// Sort records in this block.
sort_records(
buffer.data(),
record_size,
block_num_records);
// Write block.
output_file.write(
buffer.data(),
block_start_idx * record_size,
block_num_records * record_size);
// Skip to the start of the next block group.
block_index += this_branch_factor;
}
// Flush output stream buffers.
output_stream.flush();
timer.stop();
log(ctx, " t = %.3f seconds\n", timer.value());
#endif
}
@ -1313,17 +1504,16 @@ SortStrategy plan_multi_pass_strategy(
const SortContext& ctx)
{
// Plan the initial sort pass.
// Use blocks that are at most half of available memory
// (so we can use two buffers to overlap I/O and sorting).
// Use blocks that are at most half of available memory,
// so we can use two buffers to overlap I/O and sorting.
uint64_t max_sort_block_size = ctx.memory_size / 2;
// Calculate number of records per block.
// Make sure this is a multiple of the transfer alignment size.
uint64_t records_per_sort_block = max_sort_block_size / ctx.record_size;
records_per_sort_block -= records_per_sort_block % TRANSFER_ALIGNMENT;
// Make sure the block size is a multiple of 4096 bytes.
records_per_sort_block -= records_per_sort_block % 4096;
// Calculate the number of blocks during the initial sort pass.
// Calculate number of blocks during the initial sort pass.
uint64_t num_records = file_size / ctx.record_size;
uint64_t num_sort_blocks = 1 + (num_records - 1) / records_per_sort_block;
@ -1374,7 +1564,8 @@ void sortbin(
// We want file I/O to occur on 4096-byte boundaries.
// To ensure this, we want to do I/O on multiples of 4096 records.
// To ensure this is possible, we need room for ~ 32k records per branch.
if (ctx.branch_factor > ctx.memory_size / ctx.record_size / 32768) {
if (ctx.memory_size / ctx.record_size / ctx.branch_factor <
8 * TRANSFER_ALIGNMENT) {
throw std::logic_error(
"Not enough memory for this combination of record size"
" and branch factor");
@ -1603,7 +1794,7 @@ int main(int argc, char **argv)
SortContext ctx;
ctx.record_size = record_size;
ctx.memory_size = uint64_t(memory_size) * 1024 * 1024;
ctx.memory_size = size_t(memory_size) * 1024 * 1024;
ctx.branch_factor = branch_factor;
ctx.flag_unique = flag_unique;
ctx.flag_verbose = flag_verbose;