Avoid unnecessary I/O during merging
This commit is contained in:
parent
24290acc9c
commit
b091d8b1eb
470
src/sortbin.cpp
470
src/sortbin.cpp
|
@ -32,6 +32,7 @@
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <iterator>
|
#include <iterator>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
#include <numeric>
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <system_error>
|
#include <system_error>
|
||||||
|
@ -87,15 +88,8 @@ struct SortStrategy
|
||||||
{
|
{
|
||||||
/** Strategy for a single merge pass. */
|
/** Strategy for a single merge pass. */
|
||||||
struct MergePass {
|
struct MergePass {
|
||||||
|
/** Number of records in each input block into this pass. */
|
||||||
/** Number of records per input block into this merge pass. */
|
std::vector<uint64_t> records_per_block;
|
||||||
uint64_t records_per_input_block;
|
|
||||||
|
|
||||||
/** Total number of input blocks into this merge pass. */
|
|
||||||
uint64_t num_input_blocks;
|
|
||||||
|
|
||||||
/** Number of input blocks to merge into each output block. */
|
|
||||||
unsigned int branch_factor;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/** Number of records per block during the initial sort pass. */
|
/** Number of records per block during the initial sort pass. */
|
||||||
|
@ -104,6 +98,9 @@ struct SortStrategy
|
||||||
/** Number of blocks for the initial sort pass. */
|
/** Number of blocks for the initial sort pass. */
|
||||||
uint64_t num_sort_blocks;
|
uint64_t num_sort_blocks;
|
||||||
|
|
||||||
|
/** Number of blocks that are processed during the first merge pass. */
|
||||||
|
uint64_t num_sort_blocks_first_merge;
|
||||||
|
|
||||||
/** List of merge passes. */
|
/** List of merge passes. */
|
||||||
std::vector<MergePass> merge_pass;
|
std::vector<MergePass> merge_pass;
|
||||||
};
|
};
|
||||||
|
@ -411,10 +408,9 @@ public:
|
||||||
/**
|
/**
|
||||||
* Read binary records from an input file with buffering.
|
* Read binary records from an input file with buffering.
|
||||||
*
|
*
|
||||||
* The input stream reads from a sequence of discontinuous, equally spaced
|
* The input stream reads from a sequence of discontinuous blocks in
|
||||||
* blocks in the input file. All blocks have the same size, except for
|
* the input file. Each block contains a flat array of binary records.
|
||||||
* the last block which may be shorter if it runs to the end of the file.
|
* An explicit list of these blocks is passed to the stream constructor.
|
||||||
* Each block contains a flat array of binary records.
|
|
||||||
*
|
*
|
||||||
* The input stream starts in the "empty" state.
|
* The input stream starts in the "empty" state.
|
||||||
* The first call to "next_block()" enables reading records from the
|
* The first call to "next_block()" enables reading records from the
|
||||||
|
@ -426,6 +422,8 @@ class RecordInputStream
|
||||||
{
|
{
|
||||||
// TODO : double-buffering with delayed I/O via background thread
|
// TODO : double-buffering with delayed I/O via background thread
|
||||||
public:
|
public:
|
||||||
|
typedef std::vector<std::tuple<uint64_t, uint64_t>> BlockList;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct a record input stream.
|
* Construct a record input stream.
|
||||||
*
|
*
|
||||||
|
@ -433,12 +431,8 @@ public:
|
||||||
*
|
*
|
||||||
* @param input_file Input file where records read from.
|
* @param input_file Input file where records read from.
|
||||||
* @param record_size Record size in bytes.
|
* @param record_size Record size in bytes.
|
||||||
* @param start_offset Offset in input file of first input section.
|
* @param blocks Vector of input blocks specified as tuple
|
||||||
* @param block_size Size of each input block in bytes.
|
* (file_offset, number_of_records).
|
||||||
* Must be a multiple of "record_size".
|
|
||||||
* The last input block may be shorter if it runs
|
|
||||||
* to the end of the file.
|
|
||||||
* @param block_stride Distance between start of blocks in bytes.
|
|
||||||
* @param buffer_size Buffer size in bytes.
|
* @param buffer_size Buffer size in bytes.
|
||||||
* Must be a multiple of "record_size".
|
* Must be a multiple of "record_size".
|
||||||
* Note: Each RecordInputStream creates two buffers
|
* Note: Each RecordInputStream creates two buffers
|
||||||
|
@ -447,24 +441,18 @@ public:
|
||||||
RecordInputStream(
|
RecordInputStream(
|
||||||
BinaryFile& input_file,
|
BinaryFile& input_file,
|
||||||
unsigned int record_size,
|
unsigned int record_size,
|
||||||
uint64_t start_offset,
|
BlockList&& blocks,
|
||||||
uint64_t block_size,
|
|
||||||
uint64_t block_stride,
|
|
||||||
size_t buffer_size)
|
size_t buffer_size)
|
||||||
: m_input_file(input_file),
|
: m_input_file(input_file),
|
||||||
m_record_size(record_size),
|
m_record_size(record_size),
|
||||||
m_block_offset(start_offset),
|
m_next_block(0),
|
||||||
m_block_size(block_size),
|
|
||||||
m_block_stride(block_stride),
|
|
||||||
m_block_remaining(0),
|
m_block_remaining(0),
|
||||||
m_file_offset(0),
|
m_file_offset(0),
|
||||||
m_bufpos(NULL),
|
m_bufpos(NULL),
|
||||||
m_bufend(NULL),
|
m_bufend(NULL),
|
||||||
|
m_blocks(blocks),
|
||||||
m_buffer(buffer_size)
|
m_buffer(buffer_size)
|
||||||
{
|
{
|
||||||
assert(start_offset <= input_file.size());
|
|
||||||
assert(block_size % record_size == 0);
|
|
||||||
assert(block_size <= block_stride);
|
|
||||||
assert(buffer_size % record_size == 0);
|
assert(buffer_size % record_size == 0);
|
||||||
assert(buffer_size > record_size);
|
assert(buffer_size > record_size);
|
||||||
}
|
}
|
||||||
|
@ -515,16 +503,16 @@ public:
|
||||||
void next_block()
|
void next_block()
|
||||||
{
|
{
|
||||||
assert(m_bufpos == m_bufend);
|
assert(m_bufpos == m_bufend);
|
||||||
|
assert(m_next_block < m_blocks.size());
|
||||||
|
|
||||||
|
uint64_t num_records;
|
||||||
|
std::tie(m_file_offset, num_records) = m_blocks[m_next_block];
|
||||||
|
m_block_remaining = num_records * m_record_size;
|
||||||
|
m_next_block++;
|
||||||
|
|
||||||
uint64_t file_size = m_input_file.size();
|
uint64_t file_size = m_input_file.size();
|
||||||
assert(m_block_offset <= file_size);
|
assert(m_file_offset <= file_size);
|
||||||
|
assert(m_block_remaining <= file_size - m_file_offset);
|
||||||
m_file_offset = m_block_offset;
|
|
||||||
m_block_remaining =
|
|
||||||
std::min(m_block_size, file_size - m_block_offset);
|
|
||||||
|
|
||||||
m_block_offset += std::min(m_block_stride,
|
|
||||||
file_size - m_block_offset);
|
|
||||||
|
|
||||||
refill_buffer();
|
refill_buffer();
|
||||||
}
|
}
|
||||||
|
@ -547,13 +535,12 @@ private:
|
||||||
|
|
||||||
BinaryFile& m_input_file;
|
BinaryFile& m_input_file;
|
||||||
const unsigned int m_record_size;
|
const unsigned int m_record_size;
|
||||||
uint64_t m_block_offset;
|
size_t m_next_block;
|
||||||
uint64_t m_block_size;
|
|
||||||
uint64_t m_block_stride;
|
|
||||||
uint64_t m_block_remaining;
|
uint64_t m_block_remaining;
|
||||||
uint64_t m_file_offset;
|
uint64_t m_file_offset;
|
||||||
unsigned char * m_bufpos;
|
unsigned char * m_bufpos;
|
||||||
unsigned char * m_bufend;
|
unsigned char * m_bufend;
|
||||||
|
BlockList m_blocks;
|
||||||
std::vector<unsigned char> m_buffer;
|
std::vector<unsigned char> m_buffer;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -1171,16 +1158,20 @@ void single_pass(
|
||||||
* the last block in the file which may be smaller.
|
* the last block in the file which may be smaller.
|
||||||
*
|
*
|
||||||
* @param input_file Input file.
|
* @param input_file Input file.
|
||||||
* @param output_file Output file for this pass.
|
* @param output_file1 Output file for this pass.
|
||||||
|
* @param output_file2 Second output file for this pass.
|
||||||
* @param records_per_block Number of records per sort block.
|
* @param records_per_block Number of records per sort block.
|
||||||
* @param num_blocks Number of sort blocks.
|
* @param num_blocks Number of sort blocks.
|
||||||
|
* @param num_blocks_file1 Number of blocks for the first output file.
|
||||||
* @param ctx Reference to context structure.
|
* @param ctx Reference to context structure.
|
||||||
*/
|
*/
|
||||||
void sort_pass(
|
void sort_pass(
|
||||||
BinaryFile& input_file,
|
BinaryFile& input_file,
|
||||||
BinaryFile& output_file,
|
BinaryFile& output_file1,
|
||||||
|
BinaryFile& output_file2,
|
||||||
uint64_t records_per_block,
|
uint64_t records_per_block,
|
||||||
uint64_t num_blocks,
|
uint64_t num_blocks,
|
||||||
|
uint64_t num_blocks_file1,
|
||||||
const SortContext& ctx)
|
const SortContext& ctx)
|
||||||
{
|
{
|
||||||
unsigned int record_size = ctx.record_size;
|
unsigned int record_size = ctx.record_size;
|
||||||
|
@ -1207,7 +1198,7 @@ void sort_pass(
|
||||||
std::min(records_per_block, num_records - first_record_idx);
|
std::min(records_per_block, num_records - first_record_idx);
|
||||||
|
|
||||||
log(ctx,
|
log(ctx,
|
||||||
"sorting block %" PRIu64 " / %" PRIu64 ": %" PRIu64 " records\n",
|
" sorting block %" PRIu64 " / %" PRIu64 ": %" PRIu64 " records\n",
|
||||||
block_index,
|
block_index,
|
||||||
num_blocks,
|
num_blocks,
|
||||||
block_num_records);
|
block_num_records);
|
||||||
|
@ -1225,6 +1216,8 @@ void sort_pass(
|
||||||
block_num_records);
|
block_num_records);
|
||||||
|
|
||||||
// Write block.
|
// Write block.
|
||||||
|
BinaryFile& output_file =
|
||||||
|
(block_index < num_blocks_file1) ? output_file1 : output_file2;
|
||||||
output_file.write(
|
output_file.write(
|
||||||
buffer.data(),
|
buffer.data(),
|
||||||
first_record_idx * record_size,
|
first_record_idx * record_size,
|
||||||
|
@ -1232,72 +1225,11 @@ void sort_pass(
|
||||||
}
|
}
|
||||||
|
|
||||||
timer.stop();
|
timer.stop();
|
||||||
log(ctx, "initial sort pass finished\n");
|
log(ctx, "end initial sort pass\n");
|
||||||
log(ctx, " t = %.3f seconds\n", timer.value());
|
log(ctx, " t = %.3f seconds\n", timer.value());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Merge 2 sorted blocks of records into a single sorted block.
|
|
||||||
*
|
|
||||||
* @param instream1 Input stream containing block 1.
|
|
||||||
* @param instream2 Input stream containing block 2.
|
|
||||||
* @param output_stream Output stream for the merged block.
|
|
||||||
* @param record_size Record size in bytes.
|
|
||||||
*/
|
|
||||||
void merge_2_blocks(
|
|
||||||
RecordInputStream& instream1,
|
|
||||||
RecordInputStream& instream2,
|
|
||||||
RecordOutputStream& output_stream,
|
|
||||||
size_t record_size)
|
|
||||||
{
|
|
||||||
// Input blocks should not be empty.
|
|
||||||
assert(!instream1.empty());
|
|
||||||
assert(!instream2.empty());
|
|
||||||
|
|
||||||
const unsigned char * rec1 = instream1.record();
|
|
||||||
const unsigned char * rec2 = instream2.record();
|
|
||||||
|
|
||||||
// Merge until one stream runs empty.
|
|
||||||
while (true) {
|
|
||||||
|
|
||||||
// Choose which record should go first.
|
|
||||||
if (record_compare(rec1, rec2, record_size) < 0) {
|
|
||||||
// Push record from stream 1 and load next record.
|
|
||||||
output_stream.put(rec1);
|
|
||||||
instream1.next_record();
|
|
||||||
if (instream1.empty()) {
|
|
||||||
rec1 = NULL;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
rec1 = instream1.record();
|
|
||||||
} else {
|
|
||||||
// Push record from stream 2 and load next record.
|
|
||||||
output_stream.put(rec2);
|
|
||||||
instream2.next_record();
|
|
||||||
if (instream2.empty()) {
|
|
||||||
rec2 = NULL;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
rec2 = instream2.record();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// At most one of the streams still has records left.
|
|
||||||
// Copy those records to the output.
|
|
||||||
|
|
||||||
while (!instream1.empty()) {
|
|
||||||
output_stream.put(instream1.record());
|
|
||||||
instream1.next_record();
|
|
||||||
}
|
|
||||||
|
|
||||||
while (!instream2.empty()) {
|
|
||||||
output_stream.put(instream2.record());
|
|
||||||
instream2.next_record();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Merge sorted blocks of records into a single sorted block.
|
* Merge sorted blocks of records into a single sorted block.
|
||||||
*
|
*
|
||||||
|
@ -1308,7 +1240,7 @@ void merge_2_blocks(
|
||||||
* May be less than the length of input_streams.
|
* May be less than the length of input_streams.
|
||||||
* @param filter_dupl True to eliminate duplicate records.
|
* @param filter_dupl True to eliminate duplicate records.
|
||||||
*/
|
*/
|
||||||
void merge_n_blocks(
|
void merge_blocks(
|
||||||
std::vector<std::unique_ptr<RecordInputStream>>& input_streams,
|
std::vector<std::unique_ptr<RecordInputStream>>& input_streams,
|
||||||
RecordOutputStream& output_stream,
|
RecordOutputStream& output_stream,
|
||||||
size_t record_size,
|
size_t record_size,
|
||||||
|
@ -1419,25 +1351,22 @@ void merge_n_blocks(
|
||||||
void merge_pass(
|
void merge_pass(
|
||||||
BinaryFile& input_file,
|
BinaryFile& input_file,
|
||||||
BinaryFile& output_file,
|
BinaryFile& output_file,
|
||||||
uint64_t records_per_block,
|
const SortStrategy::MergePass& merge_pass,
|
||||||
uint64_t num_blocks,
|
|
||||||
unsigned int branch_factor,
|
|
||||||
bool filter_dupl,
|
bool filter_dupl,
|
||||||
const SortContext& ctx)
|
const SortContext& ctx)
|
||||||
{
|
{
|
||||||
assert(branch_factor > 1);
|
size_t num_blocks = merge_pass.records_per_block.size();
|
||||||
assert(branch_factor <= num_blocks);
|
|
||||||
|
|
||||||
// Only filter duplicates when the output is a single block.
|
// Only filter duplicates when the output is a single block.
|
||||||
assert((!filter_dupl) || (branch_factor == num_blocks));
|
assert((!filter_dupl) || num_blocks <= ctx.branch_factor);
|
||||||
|
|
||||||
Timer timer;
|
Timer timer;
|
||||||
timer.start();
|
timer.start();
|
||||||
|
|
||||||
// Calculate number of buffers:
|
// Calculate number of buffers:
|
||||||
// 2 buffers per input stream + more buffers for output stream.
|
// 2 buffers per input stream + more buffers for output stream.
|
||||||
size_t num_output_buffers = 2 + (branch_factor - 1) / 2;
|
size_t num_output_buffers = 2 + (ctx.branch_factor - 1) / 2;
|
||||||
size_t num_buffers = 2 * branch_factor + num_output_buffers;
|
size_t num_buffers = 2 * ctx.branch_factor + num_output_buffers;
|
||||||
|
|
||||||
// Calculate buffer size.
|
// Calculate buffer size.
|
||||||
// Must be a multiple of the record size and the transfer alignment.
|
// Must be a multiple of the record size and the transfer alignment.
|
||||||
|
@ -1446,21 +1375,23 @@ void merge_pass(
|
||||||
|
|
||||||
// TODO : double-buffering with I/O in separate thread
|
// TODO : double-buffering with I/O in separate thread
|
||||||
|
|
||||||
|
// Prepare a list of blocks for each input stream.
|
||||||
|
std::vector<RecordInputStream::BlockList> stream_blocks(ctx.branch_factor);
|
||||||
|
uint64_t file_offset = 0;
|
||||||
|
for (size_t p = 0; p < num_blocks; p++) {
|
||||||
|
uint64_t num_records = merge_pass.records_per_block[p];
|
||||||
|
unsigned int streamidx = p % ctx.branch_factor;
|
||||||
|
stream_blocks[streamidx].emplace_back(file_offset, num_records);
|
||||||
|
file_offset += num_records * ctx.record_size;
|
||||||
|
}
|
||||||
|
|
||||||
// Initialize input streams.
|
// Initialize input streams.
|
||||||
std::vector<std::unique_ptr<RecordInputStream>> input_streams;
|
std::vector<std::unique_ptr<RecordInputStream>> input_streams;
|
||||||
for (unsigned int i = 0; i < branch_factor; i++) {
|
for (unsigned int i = 0; i < ctx.branch_factor; i++) {
|
||||||
uint64_t block_size = records_per_block * ctx.record_size;
|
|
||||||
uint64_t start_offset = i * block_size;
|
|
||||||
uint64_t block_stride = branch_factor * block_size;
|
|
||||||
if (start_offset >= input_file.size()) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
input_streams.emplace_back(new RecordInputStream(
|
input_streams.emplace_back(new RecordInputStream(
|
||||||
input_file,
|
input_file,
|
||||||
ctx.record_size,
|
ctx.record_size,
|
||||||
start_offset,
|
std::move(stream_blocks[i]),
|
||||||
block_size,
|
|
||||||
block_stride,
|
|
||||||
buffer_size));
|
buffer_size));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1475,54 +1406,35 @@ void merge_pass(
|
||||||
// Every group consists of "branch_factor" blocks, except the last
|
// Every group consists of "branch_factor" blocks, except the last
|
||||||
// group which may contain fewer blocks.
|
// group which may contain fewer blocks.
|
||||||
// Each group produces one output block.
|
// Each group produces one output block.
|
||||||
uint64_t block_index = 0;
|
size_t block_index = 0;
|
||||||
while (block_index < num_blocks) {
|
while (block_index < num_blocks) {
|
||||||
|
|
||||||
// Determine how many blocks will be merged in this group.
|
// If this is the first merge pass, the last group may have
|
||||||
unsigned int this_branch_factor = branch_factor;
|
// fewer than "branch_factor" blocks.
|
||||||
if (branch_factor > num_blocks - block_index) {
|
unsigned int blocks_this_group = ctx.branch_factor;
|
||||||
this_branch_factor = num_blocks - block_index;
|
if (blocks_this_group > num_blocks - block_index) {
|
||||||
|
blocks_this_group = num_blocks - block_index;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Merging a single block with itself would be dumb.
|
||||||
|
// And our strategy planner is not that dumb.
|
||||||
|
assert(blocks_this_group > 1);
|
||||||
|
|
||||||
// Skip to the next section of each active input stream.
|
// Skip to the next section of each active input stream.
|
||||||
for (unsigned int i = 0; i < this_branch_factor; i++) {
|
for (unsigned int i = 0; i < blocks_this_group; i++) {
|
||||||
input_streams[i]->next_block();
|
input_streams[i]->next_block();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this_branch_factor == 1) {
|
// Merge the blocks.
|
||||||
|
merge_blocks(
|
||||||
// Last group contains just 1 block.
|
input_streams,
|
||||||
// Copy it to the output.
|
output_stream,
|
||||||
assert(!filter_dupl);
|
ctx.record_size,
|
||||||
RecordInputStream * instream = input_streams[0].get();
|
blocks_this_group,
|
||||||
while (!instream->empty()) {
|
filter_dupl);
|
||||||
output_stream.put(instream->record());
|
|
||||||
instream->next_record();
|
|
||||||
}
|
|
||||||
|
|
||||||
} else if (this_branch_factor == 2 && !filter_dupl) {
|
|
||||||
|
|
||||||
// Special case for merging 2 blocks.
|
|
||||||
merge_2_blocks(
|
|
||||||
*input_streams[0],
|
|
||||||
*input_streams[1],
|
|
||||||
output_stream,
|
|
||||||
ctx.record_size);
|
|
||||||
|
|
||||||
} else {
|
|
||||||
|
|
||||||
// Merge more than 2 blocks or filter duplicates.
|
|
||||||
merge_n_blocks(
|
|
||||||
input_streams,
|
|
||||||
output_stream,
|
|
||||||
ctx.record_size,
|
|
||||||
this_branch_factor,
|
|
||||||
filter_dupl);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// Skip to the start of the next block group.
|
// Skip to the start of the next block group.
|
||||||
block_index += this_branch_factor;
|
block_index += blocks_this_group;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Flush output stream buffers.
|
// Flush output stream buffers.
|
||||||
|
@ -1562,34 +1474,168 @@ SortStrategy plan_multi_pass_strategy(
|
||||||
uint64_t num_records = file_size / ctx.record_size;
|
uint64_t num_records = file_size / ctx.record_size;
|
||||||
uint64_t num_sort_blocks = 1 + (num_records - 1) / records_per_sort_block;
|
uint64_t num_sort_blocks = 1 + (num_records - 1) / records_per_sort_block;
|
||||||
|
|
||||||
|
// A list of blocks is constructed in memory for each merge pass.
|
||||||
|
// There are several of these lists. Together they will consume
|
||||||
|
// about 40 bytes per block. If we get an insanely large input file
|
||||||
|
// with a small memory limit, this metadata could by itself consume
|
||||||
|
// too much memory.
|
||||||
|
// Let's do a sanity check to ensure that the metadata uses less than
|
||||||
|
// 25% of the memory limit.
|
||||||
|
if (num_sort_blocks >= ctx.memory_size / 4 / 40) {
|
||||||
|
throw std::logic_error(
|
||||||
|
"Not enough memory to manage the list of blocks");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Plan the merge passes.
|
||||||
|
//
|
||||||
|
// In prinicple, every pass merges groups of "branch_factor" input blocks
|
||||||
|
// into one output block per group, thus reducing the number of remaining
|
||||||
|
// blocks by a factor "branch_factor".
|
||||||
|
//
|
||||||
|
// However, this gets more complicated if the merge tree is not perfectly
|
||||||
|
// balanced, which happens if the number of sort blocks is not a power
|
||||||
|
// of "branch_factor". In that case, the first merge pass will have
|
||||||
|
// to make things right by handling only a subset of the data.
|
||||||
|
//
|
||||||
|
// The first merge pass processes a subset of the sort blocks.
|
||||||
|
// It merges groups of "branch_factor" sort blocks into one output block
|
||||||
|
// per group. The last group in this pass may contain fewer than
|
||||||
|
// "branch_factor" sort blocks. After the first merge pass, the number
|
||||||
|
// of remaining blocks is an exact power of "branch_factor".
|
||||||
|
// The remaining blocks are in general not all the same size.
|
||||||
|
//
|
||||||
|
// After the first merge pass, each subsequent pass (if any) merges
|
||||||
|
// groups of exactly "branch_factor" blocks into one output block per
|
||||||
|
// group. These blocks are in general not all the same size.
|
||||||
|
//
|
||||||
|
// Example:
|
||||||
|
//
|
||||||
|
// branch_factor = 3
|
||||||
|
// num_sort_blocks = 12
|
||||||
|
//
|
||||||
|
// Sorted blocks:
|
||||||
|
// [S00] [S01] [S02] [S03] [S04] [S05] [S06] [S07] [S08] [S09] [S10] [S11]
|
||||||
|
//
|
||||||
|
// There are 3 merge passes.
|
||||||
|
// The first merge pass handles only sort blocks S00 - S04.
|
||||||
|
// The next two merge passes handle groups of exactly 3 blocks.
|
||||||
|
//
|
||||||
|
// [S00] [S01] [S02] [S03] [S04] [S05] [S06] [S07] [S08] [S09] [S10] [S11]
|
||||||
|
// | | | | |
|
||||||
|
// +-----+-----+ +--+--+
|
||||||
|
// | |
|
||||||
|
// [S00-S02] [S03-S04] [S05] [S06] [S07] [S08] [S09] [S10] [S11]
|
||||||
|
// | | | | | | | | |
|
||||||
|
// +-----------+--------+ +-----+-----+ +-----+-----+
|
||||||
|
// | | |
|
||||||
|
// [S00-S05] [S06-S08] [S09-S11]
|
||||||
|
// | | |
|
||||||
|
// +--------------------+-----------------+
|
||||||
|
// |
|
||||||
|
// [S00-S11]
|
||||||
|
//
|
||||||
|
// Note that a subset of sort blocks enter into the first merge pass
|
||||||
|
// while the remaining sort blocks go directly into the second merge pass.
|
||||||
|
// (It is also possible that all sort blocks go into the first pass,
|
||||||
|
// if the merge tree is perfectly balanced.)
|
||||||
|
//
|
||||||
|
|
||||||
|
// Determine the number of full merge passes (2nd pass and later).
|
||||||
|
unsigned int num_merge_pass = 0;
|
||||||
|
uint64_t num_merge_blocks = 1;
|
||||||
|
while (num_merge_blocks * ctx.branch_factor < num_sort_blocks) {
|
||||||
|
num_merge_blocks *= ctx.branch_factor;
|
||||||
|
num_merge_pass++;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add a first merge pass.
|
||||||
|
num_merge_pass++;
|
||||||
|
|
||||||
|
// Determine the number of merge groups in the first merge pass.
|
||||||
|
// The last group may have fewer than "branch_factor" input blocks.
|
||||||
|
uint64_t num_merge_ops_first_pass =
|
||||||
|
(num_sort_blocks - num_merge_blocks + (ctx.branch_factor - 1) - 1)
|
||||||
|
/ (ctx.branch_factor - 1);
|
||||||
|
|
||||||
|
// Determine the number of sort blocks to process in the first merge pass.
|
||||||
|
uint64_t num_sort_blocks_first_merge =
|
||||||
|
num_sort_blocks - num_merge_blocks + num_merge_ops_first_pass;
|
||||||
|
|
||||||
|
assert(num_sort_blocks_first_merge <= num_sort_blocks);
|
||||||
|
|
||||||
SortStrategy strategy;
|
SortStrategy strategy;
|
||||||
strategy.records_per_sort_block = records_per_sort_block;
|
strategy.records_per_sort_block = records_per_sort_block;
|
||||||
strategy.num_sort_blocks = num_sort_blocks;
|
strategy.num_sort_blocks = num_sort_blocks;
|
||||||
|
strategy.num_sort_blocks_first_merge = num_sort_blocks_first_merge;
|
||||||
|
|
||||||
// Plan the merge passes.
|
// Plan the details of each merge pass.
|
||||||
// Start with the result of the initial sort pass.
|
//
|
||||||
uint64_t records_per_block = records_per_sort_block;
|
// The first merge pass handles a subset of the sort blocks.
|
||||||
uint64_t num_blocks = num_sort_blocks;
|
// All of these sort blocks are the same size, except possibly the
|
||||||
|
// last block if it runs to the end of the file.
|
||||||
// Keep merging until there is only one block left.
|
{
|
||||||
while (num_blocks > 1) {
|
strategy.merge_pass.emplace_back();
|
||||||
// Calculate the number of blocks out of this merge pass.
|
SortStrategy::MergePass& merge_pass = strategy.merge_pass.back();
|
||||||
uint64_t num_merged_blocks = 1 + (num_blocks - 1) / ctx.branch_factor;
|
for (size_t i = 0; i < num_sort_blocks_first_merge; i++) {
|
||||||
|
uint64_t records_this_block =
|
||||||
// Choose the smallest branch factor that produces this nr of blocks.
|
std::min(records_per_sort_block,
|
||||||
unsigned int branch_factor = 1 + (num_blocks - 1) / num_merged_blocks;
|
num_records - i * records_per_sort_block);
|
||||||
|
merge_pass.records_per_block.push_back(records_this_block);
|
||||||
SortStrategy::MergePass merge_pass;
|
}
|
||||||
merge_pass.records_per_input_block = records_per_block;
|
|
||||||
merge_pass.num_input_blocks = num_blocks;
|
|
||||||
merge_pass.branch_factor = branch_factor;
|
|
||||||
strategy.merge_pass.push_back(merge_pass);
|
|
||||||
|
|
||||||
// Result of this merge pass will go into the next pass.
|
|
||||||
records_per_block *= branch_factor;
|
|
||||||
num_blocks = num_merged_blocks;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Plan the rest of the passes.
|
||||||
|
for (unsigned int mp = 1; mp < num_merge_pass; mp++) {
|
||||||
|
|
||||||
|
strategy.merge_pass.emplace_back();
|
||||||
|
SortStrategy::MergePass& merge_pass = strategy.merge_pass.back();
|
||||||
|
SortStrategy::MergePass& prev_pass = *(strategy.merge_pass.end() - 2);
|
||||||
|
|
||||||
|
uint64_t records_this_pass = 0;
|
||||||
|
|
||||||
|
// Output from the previous pass is input into this pass.
|
||||||
|
uint64_t records_this_block = 0;
|
||||||
|
for (size_t i = 0; i < prev_pass.records_per_block.size(); i++) {
|
||||||
|
records_this_block += prev_pass.records_per_block[i];
|
||||||
|
if ((i + 1) % ctx.branch_factor == 0) {
|
||||||
|
merge_pass.records_per_block.push_back(records_this_block);
|
||||||
|
records_this_pass += records_this_block;
|
||||||
|
records_this_block = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// The last group of the first pass may merge fewer than
|
||||||
|
// "branch_factor" blocks.
|
||||||
|
if (records_this_block > 0) {
|
||||||
|
merge_pass.records_per_block.push_back(records_this_block);
|
||||||
|
records_this_pass += records_this_block;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mp == 1) {
|
||||||
|
// The second pass handles sort blocks that were skipped
|
||||||
|
// during the first pass.
|
||||||
|
for (size_t i = num_sort_blocks_first_merge;
|
||||||
|
i < num_sort_blocks;
|
||||||
|
i++) {
|
||||||
|
records_this_block = std::min(
|
||||||
|
records_per_sort_block,
|
||||||
|
num_records - i * records_per_sort_block);
|
||||||
|
merge_pass.records_per_block.push_back(records_this_block);
|
||||||
|
records_this_pass += records_this_block;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that number of input blocks is divisible by branch_factor.
|
||||||
|
assert(merge_pass.records_per_block.size() % ctx.branch_factor == 0);
|
||||||
|
|
||||||
|
// Check that all records are accounted for.
|
||||||
|
assert(records_this_pass == num_records);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Double-check that the last pass produces a single output block.
|
||||||
|
SortStrategy::MergePass& last_pass = strategy.merge_pass.back();
|
||||||
|
assert(last_pass.records_per_block.size() <= ctx.branch_factor);
|
||||||
|
|
||||||
return strategy;
|
return strategy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1608,6 +1654,10 @@ void sortbin(
|
||||||
{
|
{
|
||||||
log(ctx, "using memory_size = %" PRIu64 " bytes\n", ctx.memory_size);
|
log(ctx, "using memory_size = %" PRIu64 " bytes\n", ctx.memory_size);
|
||||||
|
|
||||||
|
if (ctx.branch_factor < 2) {
|
||||||
|
throw std::logic_error("Invalid branch factor (must be >= 2)");
|
||||||
|
}
|
||||||
|
|
||||||
// We want file I/O to occur on 4096-byte boundaries.
|
// We want file I/O to occur on 4096-byte boundaries.
|
||||||
// To ensure this, we want to do I/O on multiples of 4096 records.
|
// To ensure this, we want to do I/O on multiples of 4096 records.
|
||||||
// To ensure this is possible, we need room for ~ 32k records per branch.
|
// To ensure this is possible, we need room for ~ 32k records per branch.
|
||||||
|
@ -1659,30 +1709,48 @@ void sortbin(
|
||||||
|
|
||||||
// The merge passes alternate between tempfile-to-outputfile and
|
// The merge passes alternate between tempfile-to-outputfile and
|
||||||
// outputfile-to-tempfile.
|
// outputfile-to-tempfile.
|
||||||
// The final merge pass will be tempfile-to-outputfile.
|
|
||||||
// Depending on the number of merge passes, the initial sort pass
|
|
||||||
// will either be inputfile-to-tempfile or inputfile-to-outputfile.
|
|
||||||
BinaryFile * output_or_temp_file[2] = { &output_file, &temp_file };
|
BinaryFile * output_or_temp_file[2] = { &output_file, &temp_file };
|
||||||
BinaryFile * sort_output_file =
|
|
||||||
output_or_temp_file[num_merge_pass % 2];
|
|
||||||
|
|
||||||
// Execute the initial sort pass.
|
// The final merge pass will be tempfile-to-outputfile.
|
||||||
sort_pass(
|
// Depending on the number of merge passes, the first merge pass
|
||||||
input_file,
|
// reads either from the output file or from the tempfile.
|
||||||
*sort_output_file,
|
//
|
||||||
strategy.records_per_sort_block,
|
// The sort pass feeds blocks into the first merge pass,
|
||||||
strategy.num_sort_blocks,
|
// but may also feed blocks into the second merge pass if the
|
||||||
ctx);
|
// merge tree is unbalanced.
|
||||||
|
{
|
||||||
|
BinaryFile * sort_output_first_merge_pass =
|
||||||
|
output_or_temp_file[num_merge_pass % 2];
|
||||||
|
BinaryFile * sort_output_second_merge_pass =
|
||||||
|
output_or_temp_file[(num_merge_pass - 1) % 2];
|
||||||
|
|
||||||
|
// Execute the initial sort pass.
|
||||||
|
sort_pass(
|
||||||
|
input_file,
|
||||||
|
*sort_output_first_merge_pass,
|
||||||
|
*sort_output_second_merge_pass,
|
||||||
|
strategy.records_per_sort_block,
|
||||||
|
strategy.num_sort_blocks,
|
||||||
|
strategy.num_sort_blocks_first_merge,
|
||||||
|
ctx);
|
||||||
|
}
|
||||||
|
|
||||||
// Execute the merge passes.
|
// Execute the merge passes.
|
||||||
for (unsigned int mp = 0; mp < num_merge_pass; mp++) {
|
for (unsigned int mp = 0; mp < num_merge_pass; mp++) {
|
||||||
|
const SortStrategy::MergePass& mpass = strategy.merge_pass[mp];
|
||||||
|
size_t num_blocks = mpass.records_per_block.size();
|
||||||
|
uint64_t num_records = std::accumulate(
|
||||||
|
mpass.records_per_block.begin(),
|
||||||
|
mpass.records_per_block.end(),
|
||||||
|
uint64_t(0));
|
||||||
|
|
||||||
log(ctx,
|
log(ctx,
|
||||||
"running merge pass %u / %u: "
|
"running merge pass %u / %u:"
|
||||||
"%" PRIu64 " blocks, branch factor %u\n",
|
" %zu blocks, %" PRIu64 " records\n",
|
||||||
mp,
|
mp,
|
||||||
num_merge_pass,
|
num_merge_pass,
|
||||||
strategy.merge_pass[mp].num_input_blocks,
|
num_blocks,
|
||||||
strategy.merge_pass[mp].branch_factor);
|
num_records);
|
||||||
|
|
||||||
// Filter duplicate records only on the last pass.
|
// Filter duplicate records only on the last pass.
|
||||||
bool filter_dupl = ctx.flag_unique && (mp + 1 == num_merge_pass);
|
bool filter_dupl = ctx.flag_unique && (mp + 1 == num_merge_pass);
|
||||||
|
@ -1697,9 +1765,7 @@ void sortbin(
|
||||||
merge_pass(
|
merge_pass(
|
||||||
*pass_input_file,
|
*pass_input_file,
|
||||||
*pass_output_file,
|
*pass_output_file,
|
||||||
strategy.merge_pass[mp].records_per_input_block,
|
strategy.merge_pass[mp],
|
||||||
strategy.merge_pass[mp].num_input_blocks,
|
|
||||||
strategy.merge_pass[mp].branch_factor,
|
|
||||||
filter_dupl,
|
filter_dupl,
|
||||||
ctx);
|
ctx);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue