1
0
Fork 0

Implement background I/O for merge passes

This commit is contained in:
Joris van Rantwijk 2022-07-02 17:50:28 +02:00
parent 4f6b76e785
commit 5b2a06aabe
1 changed files with 217 additions and 43 deletions

View File

@ -53,6 +53,7 @@
/* Default branch factor while merging. */ /* Default branch factor while merging. */
#define DEFAULT_BRANCH_FACTOR 16 #define DEFAULT_BRANCH_FACTOR 16
#define MAX_BRANCH_FACTOR 16000
/* Default number of sorting threads. */ /* Default number of sorting threads. */
#define DEFAULT_THREADS 1 #define DEFAULT_THREADS 1
@ -65,6 +66,10 @@
For efficiency, I/O should be done in multiples of 4096 bytes. */ For efficiency, I/O should be done in multiples of 4096 bytes. */
#define TRANSFER_ALIGNMENT 4096 #define TRANSFER_ALIGNMENT 4096
/* Maximum size of I/O buffer during merge passes.
Big chunks are efficient, but more than 32 MB is probably not helpful. */
#define MAX_MERGE_BUFFER_SIZE (32 * 1024 * 1024)
/* Template for temporary file name. Must end in 6 'X' characters. */ /* Template for temporary file name. Must end in 6 'X' characters. */
#define TEMPFILE_TEMPLATE "sortbin_tmpXXXXXX" #define TEMPFILE_TEMPLATE "sortbin_tmpXXXXXX"
@ -584,7 +589,6 @@ public:
*/ */
class RecordInputStream class RecordInputStream
{ {
// TODO : double-buffering with delayed I/O via background thread
public: public:
typedef std::vector<std::tuple<uint64_t, uint64_t>> BlockList; typedef std::vector<std::tuple<uint64_t, uint64_t>> BlockList;
@ -599,14 +603,17 @@ public:
* (file_offset, number_of_records). * (file_offset, number_of_records).
* @param buffer_size Buffer size in bytes. * @param buffer_size Buffer size in bytes.
* Must be a multiple of "record_size". * Must be a multiple of "record_size".
* Note: Each RecordInputStream creates two buffers * Note: If an "io_pool" is available, each input
* of the specified size. * stream allocates 2 buffers of the specified size,
* otherwise just 1 buffer.
* @param io_pool Optional thread pool for background I/O.
*/ */
RecordInputStream( RecordInputStream(
BinaryFile& input_file, BinaryFile& input_file,
unsigned int record_size, unsigned int record_size,
BlockList&& blocks, BlockList&& blocks,
size_t buffer_size) size_t buffer_size,
ThreadPool * io_pool)
: m_input_file(input_file), : m_input_file(input_file),
m_record_size(record_size), m_record_size(record_size),
m_next_block(0), m_next_block(0),
@ -615,10 +622,22 @@ public:
m_bufpos(NULL), m_bufpos(NULL),
m_bufend(NULL), m_bufend(NULL),
m_blocks(blocks), m_blocks(blocks),
m_buffer(buffer_size) m_buffer_size(buffer_size),
m_buffer_index(0),
m_io_pool(io_pool)
{ {
assert(buffer_size % record_size == 0); assert(buffer_size % record_size == 0);
assert(buffer_size > record_size); assert(buffer_size > record_size);
// Allocate buffer(s).
m_buffers[0].resize(buffer_size);
if (io_pool != NULL) {
// Background I/O: allocate extra buffer.
m_buffers[1].resize(buffer_size);
// Start reading in background thread.
pre_read_buffer();
}
} }
// Prevent copying and assignment. // Prevent copying and assignment.
@ -686,17 +705,86 @@ private:
void refill_buffer() void refill_buffer()
{ {
if (m_block_remaining > 0) { if (m_block_remaining > 0) {
size_t transfer_size = size_t transfer_size = std::min(uint64_t(m_buffer_size),
(m_buffer.size() < m_block_remaining) ? m_block_remaining);
m_buffer.size() : m_block_remaining;
m_input_file.read(m_buffer.data(), m_file_offset, transfer_size); if (m_io_pool == NULL) {
// Use blocking I/O.
m_input_file.read(
m_buffers[0].data(),
m_file_offset,
transfer_size);
} else {
// The data we need is already on its way.
// Wait until the background transfer completes.
assert(m_future.valid());
m_future.get();
// Flip to the refilled buffer.
m_buffer_index = (m_buffer_index + 1) & 1;
}
// Update position administration.
m_file_offset += transfer_size; m_file_offset += transfer_size;
m_block_remaining -= transfer_size; m_block_remaining -= transfer_size;
m_bufpos = m_buffer.data();
m_bufend = m_buffer.data() + transfer_size; // Update buffer pointers.
m_bufpos = m_buffers[m_buffer_index].data();
m_bufend = m_bufpos + transfer_size;
// Start reading next buffer in background thread.
if (m_io_pool != NULL) {
pre_read_buffer();
}
} }
} }
/** Start reading into the next buffer. */
void pre_read_buffer()
{
assert(m_io_pool != NULL);
uint64_t pre_file_offset = m_file_offset;
uint64_t pre_block_remaining = m_block_remaining;
size_t pre_block_index = m_next_block;
// If we are at the end of the current block,
// look ahead to the next block.
while (pre_block_remaining == 0) {
if (pre_block_index >= m_blocks.size()) {
// Reached the end of the last block.
// There is nothing more to read in this stream.
return;
}
// Look ahead to the next block.
uint64_t num_records;
std::tie(pre_file_offset, num_records) = m_blocks[pre_block_index];
pre_block_remaining = num_records * m_record_size;
pre_block_index++;
uint64_t file_size = m_input_file.size();
assert(pre_file_offset <= file_size);
assert(pre_block_remaining <= file_size - pre_file_offset);
}
// Read into the currently non-active buffer.
unsigned int pre_buffer_index = (m_buffer_index + 1) & 1;
size_t transfer_size = std::min(uint64_t(m_buffer_size),
pre_block_remaining);
// Start background I/O.
m_future = m_io_pool->submit(
std::bind(
&BinaryFile::read,
&m_input_file,
m_buffers[pre_buffer_index].data(),
pre_file_offset,
transfer_size));
}
BinaryFile& m_input_file; BinaryFile& m_input_file;
const unsigned int m_record_size; const unsigned int m_record_size;
size_t m_next_block; size_t m_next_block;
@ -705,7 +793,11 @@ private:
unsigned char * m_bufpos; unsigned char * m_bufpos;
unsigned char * m_bufend; unsigned char * m_bufend;
BlockList m_blocks; BlockList m_blocks;
std::vector<unsigned char> m_buffer; size_t m_buffer_size;
unsigned int m_buffer_index;
std::array<std::vector<unsigned char>, 2> m_buffers;
ThreadPool::FutureType m_future;
ThreadPool * m_io_pool;
}; };
@ -714,7 +806,6 @@ private:
*/ */
class RecordOutputStream class RecordOutputStream
{ {
// TODO : double-buffering with delayed I/O via background thread
public: public:
/** /**
* Construct a record output stream. * Construct a record output stream.
@ -723,21 +814,40 @@ public:
* @param record_size Record size in bytes. * @param record_size Record size in bytes.
* @param file_offset Start offset in the output file. * @param file_offset Start offset in the output file.
* @param buffer_size Buffer size in bytes. * @param buffer_size Buffer size in bytes.
* Note: Each RecordOutputStream creates two buffers * @param num_buffers Number of buffers of the specified size.
* of the specified size. * Note: Without "io_pool", the output stream
* uses only 1 buffer.
* @param io_pool Optional thread pool for background I/O.
*/ */
RecordOutputStream( RecordOutputStream(
BinaryFile& output_file, BinaryFile& output_file,
unsigned int record_size, unsigned int record_size,
uint64_t file_offset, uint64_t file_offset,
size_t buffer_size) size_t buffer_size,
unsigned int num_buffers,
ThreadPool * io_pool)
: m_output_file(output_file), : m_output_file(output_file),
m_record_size(record_size), m_record_size(record_size),
m_file_offset(file_offset), m_file_offset(file_offset),
m_buffer(buffer_size) m_buffer_index(0),
m_io_pool(io_pool)
{ {
m_bufpos = m_buffer.data(); assert(buffer_size > record_size);
m_bufend = m_buffer.data() + m_buffer.size(); assert(num_buffers > 0);
if (io_pool == NULL) {
// No background I/O: Use just 1 buffer.
m_buffers.emplace_back(buffer_size);
} else {
// Allocate buffers and futures.
for (unsigned int b = 0; b < num_buffers; b++) {
m_buffers.emplace_back(buffer_size);
m_futures.emplace_back();
}
}
m_bufpos = m_buffers[0].data();
m_bufend = m_bufpos + m_buffers[0].size();
} }
// Prevent copying and assignment. // Prevent copying and assignment.
@ -748,7 +858,7 @@ public:
inline void put(const unsigned char *record) inline void put(const unsigned char *record)
{ {
if (m_record_size > m_bufend - m_bufpos) { if (m_record_size > m_bufend - m_bufpos) {
flush(); flush_block();
} }
memcpy(m_bufpos, record, m_record_size); memcpy(m_bufpos, record, m_record_size);
m_bufpos += m_record_size; m_bufpos += m_record_size;
@ -757,28 +867,77 @@ public:
/** Return the current file offset. Flush before calling this function. */ /** Return the current file offset. Flush before calling this function. */
inline uint64_t file_offset() const inline uint64_t file_offset() const
{ {
assert(m_bufpos == m_buffer.data()); // File offset is only accurate when the current buffer is empty.
assert(m_bufpos == m_buffers[m_buffer_index].data());
return m_file_offset; return m_file_offset;
} }
/** Flush buffered records to the output file. */ /** Flush buffered records to the output file. */
void flush() void flush()
{ {
size_t flush_size = m_bufpos - m_buffer.data(); // Write the current block to file.
if (flush_size > 0) { flush_block();
m_output_file.write(m_buffer.data(), m_file_offset, flush_size);
m_file_offset += flush_size; // Wait until all background I/O complete.
m_bufpos = m_buffer.data(); if (m_io_pool != NULL) {
for (ThreadPool::FutureType& fut : m_futures) {
if (fut.valid()) {
fut.get();
}
}
} }
} }
private: private:
/** Write the current block to file and obtain an empty buffer. */
void flush_block()
{
unsigned char * buf_start = m_buffers[m_buffer_index].data();
size_t flush_size = m_bufpos - buf_start;
if (flush_size > 0) {
// Write the current block to file.
if (m_io_pool == NULL) {
// Use blocking I/O.
m_output_file.write(buf_start, m_file_offset, flush_size);
} else {
// Use background I/O.
m_futures[m_buffer_index] = m_io_pool->submit(
std::bind(
&BinaryFile::write,
&m_output_file,
buf_start,
m_file_offset,
flush_size));
// Move on to the next buffer.
m_buffer_index = (m_buffer_index + 1) % m_buffers.size();
// Wait until the new buffer is available.
if (m_futures[m_buffer_index].valid()) {
m_futures[m_buffer_index].get();
}
}
// Update file offset.
m_file_offset += flush_size;
// Update buffer pointers.
m_bufpos = m_buffers[m_buffer_index].data();
m_bufend = m_bufpos + m_buffers[m_buffer_index].size();
}
}
BinaryFile& m_output_file; BinaryFile& m_output_file;
const unsigned int m_record_size; const unsigned int m_record_size;
uint64_t m_file_offset; uint64_t m_file_offset;
unsigned char * m_bufpos; unsigned char * m_bufpos;
unsigned char * m_bufend; unsigned char * m_bufend;
std::vector<unsigned char> m_buffer; unsigned int m_buffer_index;
std::vector<std::vector<unsigned char>> m_buffers;
std::vector<ThreadPool::FutureType> m_futures;
ThreadPool * m_io_pool;
}; };
@ -1395,7 +1554,7 @@ size_t filter_duplicate_records(
size_t next_pos = 1; size_t next_pos = 1;
while (next_pos < num_records) { while (next_pos < num_records) {
if (memcmp(last_unique, next_record, record_size) == 0) { if (record_compare(last_unique, next_record, record_size) == 0) {
break; break;
} }
last_unique = next_record; last_unique = next_record;
@ -1406,10 +1565,10 @@ size_t filter_duplicate_records(
// Scan the rest of the records and copy unique records. // Scan the rest of the records and copy unique records.
size_t num_unique = next_pos; size_t num_unique = next_pos;
while (next_pos < num_records) { while (next_pos < num_records) {
if (memcmp(last_unique, next_record, record_size) != 0) { if (record_compare(last_unique, next_record, record_size) != 0) {
num_unique++; num_unique++;
last_unique += record_size; last_unique += record_size;
memcpy(last_unique, next_record, record_size); record_copy(last_unique, next_record, record_size);
} }
next_record += record_size; next_record += record_size;
next_pos++; next_pos++;
@ -1850,17 +2009,23 @@ void merge_pass(
Timer timer; Timer timer;
timer.start(); timer.start();
// Calculate number of buffers: // Choose number of buffers.
// 2 buffers per input stream + more buffers for output stream. // Without background I/O: 1 input buffer per stream + 1 output buffer.
size_t num_output_buffers = 2 + (ctx.branch_factor - 1) / 2; // With background I/O: 2 input buffers per stream + 4 output buffers.
size_t num_buffers = 2 * ctx.branch_factor + num_output_buffers; unsigned int num_input_buf = (read_thread_pool != NULL) ? 2 : 1;
unsigned int num_output_buf = (write_thread_pool != NULL) ? 4 : 1;
unsigned int num_buffers =
ctx.branch_factor * num_input_buf + num_output_buf;
// Calculate buffer size. // Calculate buffer size.
// Must not exceed the available memory.
// Must be a multiple of the record size and the transfer alignment. // Must be a multiple of the record size and the transfer alignment.
size_t buffer_size = ctx.memory_size / num_buffers; // Should not exceed MAX_MERGE_BUFFER_SIZE.
buffer_size -= buffer_size % (TRANSFER_ALIGNMENT * ctx.record_size); size_t buffer_alignment = TRANSFER_ALIGNMENT * size_t(ctx.record_size);
size_t buffer_size =
// TODO : double-buffering with I/O in separate thread std::min(ctx.memory_size / num_buffers,
std::max(buffer_alignment, size_t(MAX_MERGE_BUFFER_SIZE)));
buffer_size -= buffer_size % buffer_alignment;
// Prepare a list of blocks for each input stream. // Prepare a list of blocks for each input stream.
std::vector<RecordInputStream::BlockList> stream_blocks(ctx.branch_factor); std::vector<RecordInputStream::BlockList> stream_blocks(ctx.branch_factor);
@ -1879,7 +2044,8 @@ void merge_pass(
input_file, input_file,
ctx.record_size, ctx.record_size,
std::move(stream_blocks[i]), std::move(stream_blocks[i]),
buffer_size)); buffer_size,
read_thread_pool));
} }
// Initialize output stream. // Initialize output stream.
@ -1887,7 +2053,9 @@ void merge_pass(
output_file, output_file,
ctx.record_size, ctx.record_size,
0, 0,
buffer_size); buffer_size,
num_output_buf,
write_thread_pool);
// Loop over groups of blocks to be sorted. // Loop over groups of blocks to be sorted.
// Every group consists of "branch_factor" blocks, except the last // Every group consists of "branch_factor" blocks, except the last
@ -2010,9 +2178,9 @@ SortStrategy plan_multi_pass_strategy(
// The next two merge passes handle groups of exactly 3 blocks. // The next two merge passes handle groups of exactly 3 blocks.
// //
// [S00] [S01] [S02] [S03] [S04] [S05] [S06] [S07] [S08] [S09] [S10] [S11] // [S00] [S01] [S02] [S03] [S04] [S05] [S06] [S07] [S08] [S09] [S10] [S11]
// | | | | | // | | | | | | | | | | | |
// +-----+-----+ +--+--+ // +-----+-----+ +--+--+ | | | | | | |
// | | // | | | | | | | | |
// [S00-S02] [S03-S04] [S05] [S06] [S07] [S08] [S09] [S10] [S11] // [S00-S02] [S03-S04] [S05] [S06] [S07] [S08] [S09] [S10] [S11]
// | | | | | | | | | // | | | | | | | | |
// +-----------+--------+ +-----+-----+ +-----+-----+ // +-----------+--------+ +-----+-----+ +-----+-----+
@ -2146,6 +2314,12 @@ void sortbin(
if (ctx.branch_factor < 2) { if (ctx.branch_factor < 2) {
throw std::logic_error("Invalid branch factor (must be >= 2)"); throw std::logic_error("Invalid branch factor (must be >= 2)");
} }
if (ctx.branch_factor > MAX_BRANCH_FACTOR) {
throw std::logic_error(
"Invalid branch factor (must be <= "
+ std::to_string(MAX_BRANCH_FACTOR)
+ ")");
}
// We want file I/O to occur on 4096-byte boundaries. // We want file I/O to occur on 4096-byte boundaries.
// To ensure this, we want to do I/O on multiples of 4096 records. // To ensure this, we want to do I/O on multiples of 4096 records.