commit 5b5c090bc573305ecf3ef2ecd244b53f2207c1be Author: Joris van Rantwijk Date: Tue Jun 21 07:50:19 2022 +0200 Current snapshot, work-in-progress diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..17f68a4 --- /dev/null +++ b/Makefile @@ -0,0 +1,13 @@ +# +# Makefile for sortbin utility. +# + +CXX = g++ +CXXFLAGS = -Wall -O2 + # -fsanitize=address -fsanitize=undefined + +sortbin: sortbin.cpp + +.PHONY: clean +clean: + $(RM) sortbin diff --git a/sortbin.cpp b/sortbin.cpp new file mode 100644 index 0000000..9081bc7 --- /dev/null +++ b/sortbin.cpp @@ -0,0 +1,1620 @@ +/* + * Sort arrays of binary data records. + * + * Input and output files contain plain, raw arrays of fixed-length + * binary data records. + * + * Records are interpreted as fixed-length strings of 8-bit unsigned integers. + * The program sorts these records in lexicographic order. + * + * This program is optimized for short records, e.g. up to 20 bytes. + */ + + +/* + * TESTING in-memory sort: + * + * Input: 10**8 records of 10 bytes + * Storage: SSD + * + * GNU sort: 101.7 seconds + * GNU sort -S 2G: 110.2 seconds + * sortbin, qsort_r(): 31.1, 31.2, 31.1 seconds (correct output) + * sortbin, heapsort: 57.4, 57.1, 58.5 seconds (correct output) + * sortbin, quicksort: 24.5, 24.4, 24.4 seconds (correct output) + * sortbin, quicksort, depth_limit=8: 31.6 seconds (correct output) + * + * + * Input: 10**8 records of 10 bytes, 70502908 unique records + * + * GNU sort -u: 120.2 seconds + * sortbin -u: 26.2 seconds (correct output) + * + */ + + +// (already defined by g++) #define _GNU_SOURCE +#define _FILE_OFFSET_BITS 64 + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + + +// TODO : use a background thread for file I/O + + +/* Maximum amount of RAM to use (in MBytes). */ +#define DEFAULT_MEMORY_SIZE_MBYTE 1024 + +/* Default branch factor while merging. */ +#define DEFAULT_BRANCH_FACTOR 16 + +/* Default number of sorting threads. */ +#define DEFAULT_THREADS 1 + + +namespace { // anonymous namespace + + +/** Information about the sort job. */ +struct SortContext +{ + /** Record length in bytes. */ + unsigned int record_size; + + /** Maximum memory to use (bytes). */ + uint64_t memory_size; + + /** Maximum number of arrays to merge in one step. */ + unsigned int branch_factor; + + /** True to eliminate duplicate records. */ + bool flag_unique; + + /** True to write progress messages to stderr. */ + bool flag_verbose; + + /** Directory where temporary files are created. */ + std::string temporary_directory; +}; + + +/** Strategy for multi-pass external sorting. */ +struct SortStrategy +{ + /** Strategy for a single merge pass. */ + struct MergePass { + + /** Number of records per input block into this merge pass. */ + uint64_t records_per_input_block; + + /** Total number of input blocks into this merge pass. */ + uint64_t num_input_blocks; + + /** Number of input blocks to merge into each output block. */ + unsigned int branch_factor; + }; + + /** Number of records per block during the initial sort pass. */ + uint64_t records_per_sort_block; + + /** Number of blocks for the initial sort pass. */ + uint64_t num_sort_blocks; + + /** List of merge passes. */ + std::vector merge_pass; +}; + + +/** Show a progress message. */ +void log(const SortContext& ctx, const char *format, ...) + __attribute__ ((format (printf, 2, 3))) ; + +void log(const SortContext& ctx, const char *format, ...) +{ + va_list ap; + va_start(ap, format); + if (ctx.flag_verbose) { + vfprintf(stderr, format, ap); + } + va_end(ap); +} + + +/** Time measurements. */ +class Timer +{ +public: + Timer() + : m_value(0.0), m_running(false) + { } + + double value() const + { + double v = m_value; + if (m_running) { + struct timespec tnow; + clock_gettime(CLOCK_REALTIME, &tnow); + v += tnow.tv_sec - m_tstart.tv_sec; + v += 1.0e-9 * (tnow.tv_nsec - m_tstart.tv_nsec); + } + return m_value; + } + + void start() + { + m_value = 0.0; + m_running = true; + clock_gettime(CLOCK_REALTIME, &m_tstart); + } + + void resume() + { + if (!m_running) { + m_running = true; + clock_gettime(CLOCK_REALTIME, &m_tstart); + } + } + + void stop() + { + if (m_running) { + struct timespec tnow; + clock_gettime(CLOCK_REALTIME, &tnow); + m_value += tnow.tv_sec - m_tstart.tv_sec; + m_value += 1.0e-9 * (tnow.tv_nsec - m_tstart.tv_nsec); + m_running = false; + } + } + +private: + double m_value; + bool m_running; + struct timespec m_tstart; +}; + + +/** + * Binary file access. + * + * This is a base class which can not be directly constructed. + * Subclasses implement constructors that open files in various ways. + */ +class BinaryFile +{ +public: + // Prevent copying and assignment. + BinaryFile(const BinaryFile&) = delete; + BinaryFile& operator=(const BinaryFile&) = delete; + + /** Return file size in bytes. */ + uint64_t size() const + { + return m_file_size; + } + + /** Truncate file to reduce its size. */ + void truncate_file(uint64_t new_size); + + /** Read raw bytes from file. */ + void read( + unsigned char * buf, + uint64_t file_offset, + size_t length); + + /** Write raw bytes to file. */ + void write( + const unsigned char * buf, + uint64_t file_offset, + size_t length); + +protected: + // Prevent constructing and destructing via this base class. + BinaryFile() + : m_fd(-1), m_file_size(0) + { } + + /** Destructor: close file. */ + ~BinaryFile() + { + if (m_fd >= 0) { + close(m_fd); + } + } + + /** File descriptor. */ + int m_fd; + + /** File size (bytes). */ + uint64_t m_file_size; +}; + + +// Truncate file to reduce its size. +void BinaryFile::truncate_file(uint64_t new_size) +{ + int ret = ftruncate(m_fd, new_size); + if (ret != 0) { + throw std::system_error( + errno, + std::system_category(), + "Can not truncate file"); + } + m_file_size = new_size; +} + +// Read raw bytes from file. +void BinaryFile::read( + unsigned char * buf, + uint64_t file_offset, + size_t length) +{ + // Note that pread() may read fewer bytes than requested. + while (length > 0) { + ssize_t ret = pread(m_fd, buf, length, file_offset); + if (ret <= 0) { + throw std::system_error( + errno, + std::system_category(), + "Can not read from file"); + } + buf += ret; + file_offset += ret; + length -= ret; + } +} + +// Write raw bytes to file. +void BinaryFile::write( + const unsigned char * buf, + uint64_t file_offset, + size_t length) +{ + // Note that pwrite() may write fewer bytes than requested. + while (length > 0) { + ssize_t ret = pwrite(m_fd, buf, length, file_offset); + if (ret <= 0) { + throw std::system_error( + errno, + std::system_category(), + "Can not write to file"); + } + buf += ret; + file_offset += ret; + length -= ret; + } +} + + +/** Binary input file. */ +class BinaryInputFile : public BinaryFile +{ +public: + /** Open existing file for read-only access. */ + explicit BinaryInputFile(const std::string& filename) + { + m_fd = open(filename.c_str(), O_RDONLY); + if (m_fd < 0) { + throw std::system_error( + errno, + std::system_category(), + "Can not open input file"); + } + + off_t fsz = lseek(m_fd, 0, SEEK_END); + if (fsz == (off_t)(-1)) { + throw std::system_error( + errno, + std::system_category(), + "Can not seek input file"); + } + + m_file_size = fsz; + } +}; + + +/** Binary output file. */ +class BinaryOutputFile : public BinaryFile +{ +public: + /** Create output file and pre-allocate space. */ + BinaryOutputFile(const std::string& filename, uint64_t new_size) + { + m_fd = open(filename.c_str(), O_RDWR | O_CREAT | O_EXCL, 0666); + if (m_fd < 0) { + throw std::system_error( + errno, + std::system_category(), + "Can not create output file"); + } + + int ret = posix_fallocate(m_fd, 0, new_size); + if (ret != 0) { + int errnum = errno; + + // Delete empty output file. + unlink(filename.c_str()); + + throw std::system_error( + errnum, + std::system_category(), + "Can not allocate space in output file"); + } + + m_file_size = new_size; + } +}; + + +/** Temporary binary file. */ +class BinaryTempFile : public BinaryFile +{ +public: + /** Create temporary file and pre-allocate space. */ + BinaryTempFile(const std::string& tempdir, uint64_t new_size) + { + // O_TMPFILE creates a new file without a name. + // Effectively the file is created and immediately unlinked. + // As a result, the file will be automatically deleted when closed. + m_fd = open(tempdir.c_str(), O_RDWR | O_TMPFILE, 0600); + if (m_fd < 0) { + throw std::system_error( + errno, + std::system_category(), + "Can not create temporary file"); + } + + int ret = posix_fallocate(m_fd, 0, new_size); + if (ret != 0) { + throw std::system_error( + errno, + std::system_category(), + "Can not allocate space in temporary file"); + } + + m_file_size = new_size; + } +}; + + +/** + * Read binary records from an input file with buffering. + * + * The input stream reads from a sequence of discontinuous, equally spaced + * sections in the input file. All sections have the same size, except for + * the last section which may be shorter if it runs to the end of the file. + * Each section contains a flat array of binary records. + * + * The input stream starts in the "end_of_stream" state. + * The first call to "next_section()" enables reading records from the + * first input section. When the input stream reaches the end of the + * current section, it again enters the "end_of_stream" state until + * the next call to "next_section()". + */ +class RecordInputStream +{ +public: + /** + * Construct a record input stream. + * + * The stream will initially contain no input sections. + * + * @param input_file Input file where records read from. + * @param record_size Record size in bytes. + * @param section_offset Offset in input file of first input section. + * @param section_size Size of each input section in bytes. + * Must be a multiple of "record_size". + * The last input section may be shorter if it + * runs to the end of the file. + * @param section_stride Distance between start of sections in bytes. + * @param buffer_size Buffer size in bytes. + * Must be a multiple of "record_size". + * Note: Each RecordInputStream creates two buffers + * of the specified size. + */ + RecordInputStream( + BinaryFile& input_file, + unsigned int record_size, + uint64_t section_offset, + uint64_t section_size, + uint64_t section_stride, + size_t buffer_size) + : m_input_file(input_file), + m_record_size(record_size), + m_section_offset(section_offset), + m_section_size(section_size), + m_section_stride(section_stride), + m_section_remaining(0), + m_file_offset(0), + m_bufpos(NULL), + m_bufend(NULL), + m_buffer(buffer_size) + { + assert(section_size % record_size == 0); + assert(section_size > record_size); + assert(section_size <= section_stride); + assert(buffer_size % record_size == 0); + assert(buffer_size > record_size); + } + + // Prevent copying and assignment. + RecordInputStream(const RecordInputStream&) = delete; + RecordInputStream& operator=(const RecordInputStream&) = delete; + + /** Return true if the end of the current input section is reached. */ + inline bool end_of_stream() const + { + return (m_bufpos == m_bufend); + } + + /** + * Return a pointer to the current binary record. + * + * This function must only be used if "end_of_stream()" returns false. + * The returned pointer becomes invalid after a call to "next_record()". + */ + inline const unsigned char * current_record() const + { + return m_bufpos; + } + + /** + * Move to the next record of the current section. + * + * This function must only be used if "end_of_stream()" returns false. + * Calling this function invalidates all pointers previously returned + * by "current_record()". + */ + inline void next_record() + { + assert(m_bufpos != m_bufend); + m_bufpos += m_record_size; + + if (m_bufpos == m_bufend) { + m_bufpos = m_buffer.data(); + m_bufend = m_buffer.data(); + refill_buffer(); + } + } + + /** + * Start reading from the next input section. + * + * This function may only be called if "end_of_stream()" returns true. + */ + void next_section() + { + assert(m_bufpos == m_bufend); + + uint64_t file_size = m_input_file.size(); + assert(m_section_stride < file_size - m_section_offset); + + m_section_offset += m_section_stride; + m_file_offset = m_section_offset; + m_section_remaining = + std::min(m_section_size, file_size - m_section_offset); + + refill_buffer(); + } + +private: + /** Refill the buffer from the current input section. */ + void refill_buffer() + { + if (m_section_remaining > 0) { + size_t block_size = + (m_buffer.size() < m_section_remaining) ? + m_buffer.size() : m_section_remaining; + m_input_file.read(m_buffer.data(), m_file_offset, block_size); + m_file_offset += block_size; + m_section_remaining -= block_size; + m_bufend = m_buffer.data() + block_size; + } + } + + BinaryFile& m_input_file; + const unsigned int m_record_size; + uint64_t m_section_offset; + uint64_t m_section_size; + uint64_t m_section_stride; + uint64_t m_section_remaining; + uint64_t m_file_offset; + unsigned char * m_bufpos; + unsigned char * m_bufend; + std::vector m_buffer; +// TODO : double-buffering with delayed I/O via background thread +}; + + +/** + * Write binary records to an output file with buffering. + */ +class RecordOutputStream +{ +public: + /** + * Construct a record output stream. + * + * @param output_file Output file where records are written. + * @param record_size Record size in bytes. + * @param file_offset Start offset in the output file. + * @param buffer_size Buffer size in bytes. + * Note: Each RecordOutputStream creates two buffers + * of the specified size. + */ + RecordOutputStream( + BinaryFile& output_file, + unsigned int record_size, + uint64_t file_offset, + size_t buffer_size) + : m_output_file(output_file), + m_record_size(record_size), + m_file_offset(file_offset), + m_bufpos(0), + m_buffer(buffer_size) + { } + + // Prevent copying and assignment. + RecordOutputStream(const RecordOutputStream&) = delete; + RecordOutputStream& operator=(const RecordOutputStream&) = delete; + + /** Append a record to the output stream. */ + inline void put(const unsigned char *record) + { + if (m_record_size > m_buffer.size() - m_bufpos) { + flush(); + } + memcpy(m_buffer.data() + m_bufpos, record, m_record_size); + } + + /** Flush buffered records to the output file. */ + void flush() + { + if (m_bufpos > 0) { + m_output_file.write(m_buffer.data(), m_file_offset, m_bufpos); + m_file_offset += m_bufpos; + m_bufpos = 0; + } + } + +private: + BinaryFile& m_output_file; + const unsigned int m_record_size; + uint64_t m_file_offset; + size_t m_bufpos; + std::vector m_buffer; +// TODO : double-buffering with delayed I/O via background thread +}; + + +/** Compare two records. */ +#define record_compare(_a, _b, _n) (memcmp(_a, _b, _n)) + +/** Copy a record. */ +#define record_copy(_dst, _src, _n) (memcpy(_dst, _src, _n)) + + +/** + * Swap two records. + */ +inline void record_swap( + unsigned char *a, + unsigned char *b, + size_t record_size) +{ + while (record_size > 0) { + unsigned char aa = *a; + unsigned char bb = *b; + *b = aa; + *a = bb; + a++; + b++; + record_size--; + } +} + + +/** Return the index of the parent of the specified heap node. */ +inline size_t heap_parent_index(size_t node_index) +{ + return (node_index - 1) / 2; +} + + +/** Return the index of the left child of the specified heap node. */ +inline size_t heap_left_child_index(size_t node_index) +{ + return node_index * 2 + 1; +} + + +/** + * Insert a node into an empty spot in the heap, then repair the sub-heap + * rooted at that position. + */ +void heap_sift_down_records( + unsigned char * buffer, + size_t record_size, + size_t num_records, + size_t insert_index, + const unsigned char * insert_value) +{ + // Find the first node index which does not have two child nodes. + size_t parent_end = heap_parent_index(num_records); + + // Move the empty spot all the way down through the sub-heap. + size_t cur_idx = insert_index; + while (cur_idx < parent_end) { + + // Find the left child node of the current node. + size_t child_idx = heap_left_child_index(cur_idx); + unsigned char * child_ptr = buffer + child_idx * record_size; + + // Compare the two child nodes. + if (record_compare(child_ptr, child_ptr + record_size, record_size) + < 0) { + // Right child is greater, choose that one. + child_idx += 1; + child_ptr += record_size; + } + + // Move the chosen child to the empty spot. + unsigned char * cur_ptr = buffer + cur_idx * record_size; + record_copy(cur_ptr, child_ptr, record_size); + + // Continue the scan from the (now empty) child spot. + cur_idx = child_idx; + } + + // If the empty spot has a left child, swap it with the empty spot. + if (num_records > 1 && cur_idx <= heap_parent_index(num_records - 1)) { + size_t child_idx = heap_left_child_index(cur_idx); + unsigned char * cur_ptr = buffer + cur_idx * record_size; + unsigned char * child_ptr = buffer + child_idx * record_size; + record_copy(cur_ptr, child_ptr, record_size); + cur_idx = child_idx; + } + + // The empty spot is now in a leaf node of the heap. + // Scan back up to find the right place to insert the new node. + // + // Going all the way down and then back up may seem wasteful, + // but it is faster in practice because the correct insertion spot + // is likely to be in the bottom of the heap. + + while (cur_idx > insert_index) { + + // Find parent of the empty spot. + size_t parent_idx = heap_parent_index(cur_idx); + unsigned char * parent_ptr = buffer + parent_idx * record_size; + + // Compare the new value to the parent value. + if (record_compare(parent_ptr, insert_value, record_size) >= 0) { + // We found the right spot to insert the new value. + break; + } + + // Move the parent back to the empty spot. + unsigned char * cur_ptr = buffer + cur_idx * record_size; + record_copy(cur_ptr, parent_ptr, record_size); + + // Move to te parent node. + cur_idx = parent_idx; + } + + // Insert the new node at the empty position. + unsigned char * cur_ptr = buffer + cur_idx * record_size; + record_copy(cur_ptr, insert_value, record_size); +} + + +/** + * Sort an array of records using in-place heap sort. + * + * Run time: O(N * log(N)) + * Extra space: O(1) + */ +void heap_sort_records( + unsigned char * buffer, + size_t record_size, + size_t num_records) +{ + // Skip trivial cases. + if (num_records < 2) { + return; + } + + // Allocate temporary space for one record. + std::vector temp(record_size); + + // + // Phase 1: Transform the unordered array into a max-heap. + // + + // Bottom-up loop over all non-trivial sub-heaps. + // Start with the parent of the last node. + size_t cur_idx = heap_parent_index(num_records - 1); + + while (true) { + + // Remove the current node from the heap. + unsigned char * cur_ptr = buffer + cur_idx * record_size; + record_copy(temp.data(), cur_ptr, record_size); + + // Re-insert the node and repair the sub-heap rooted at this node. + heap_sift_down_records( + buffer, + record_size, + num_records, + cur_idx, + temp.data()); + + // Stop after processing the root node. + if (cur_idx == 0) { + break; + } + + // Go do the next sub-heap. + cur_idx--; + } + + // + // Phase 2: Transform the max-heap into a sorted array. + // + + // Loop over the record array from back to front. + cur_idx = num_records - 1; + while (cur_idx > 0) { + + // Remove the root node from the heap. + // This is the largest remaining element, which belongs at index CUR + // in the sorted array. + record_copy(temp.data(), buffer, record_size); + + // Remove the node at index CUR from the heap. + // This reduces the size of the heap by 1. + // Re-insert the removed node as the root node and repair the heap. + unsigned char * cur_ptr = buffer + cur_idx * record_size; + heap_sift_down_records(buffer, record_size, cur_idx, 0, cur_ptr); + + // Copy the former root node to its position in the sorted array. + record_copy(cur_ptr, temp.data(), record_size); + + // Go to next element. + cur_idx--; + } +} + + +/** + * Sort an array of records using in-place insertion sort. + * + * This is a helper function for quicksort_records(). + */ +void insertion_sort_records( + unsigned char * buffer, + size_t record_size, + size_t num_records) +{ + // Allocate temporary space for one record. + std::vector temp(record_size); + + for (size_t cur_idx = 1; cur_idx < num_records; cur_idx++) { + // The partial array 0 .. (cur_idx - 1) is already sorted. + // We will now insert cur_idx into the sorted array. + + // Quick check whether the new record is already in the right place. + unsigned char * cur_ptr = buffer + cur_idx * record_size; + unsigned char * prev_ptr = cur_ptr - record_size; + if (record_compare(cur_ptr, prev_ptr, record_size) >= 0) { + continue; + } + + // Scan backwards through the sorted array to find the right place + // to insert the new record. + unsigned char * insert_ptr = prev_ptr; + while (insert_ptr > buffer) { + + prev_ptr = insert_ptr - record_size; + if (record_compare(cur_ptr, prev_ptr, record_size) >= 0) { + // Found the right place to insert the new record. + break; + } + + // Continue backwards scan. + insert_ptr = prev_ptr; + } + + // Copy the new record to temporary storage. + record_copy(temp.data(), cur_ptr, record_size); + + // Move sorted records to make space for the new record. + memmove(insert_ptr + record_size, insert_ptr, cur_ptr - insert_ptr); + + // Copy the new record to its place. + record_copy(insert_ptr, temp.data(), record_size); + } +} + + + + +/** + * Sort an array of records using in-place quicksort. + * + * Run time: O(N * log(N)) + * Extra space: O(log(N)) + * + * Plain quicksort is known to have quadratic worst-case behaviour. + * This implementation uses median-of-three partitioning to reduce + * the probability of worst-case performance. If bad performance does + * occur, this implementation detects it and switches to heap sort. + */ +void quicksort_records( + unsigned char * buffer, + size_t record_size, + size_t num_records) +{ + // Recursive partitioning is only applied to fragments larger than + // this threshold. The rest of the work will be done by insertion sort. + const size_t insertion_sort_threshold = 12; + + // Skip trivial cases. + if (num_records < 2) { + return; + } + + // Determine maximum acceptable recursion depth. + unsigned int depth_limit = 1; + for (size_t nremain = num_records; nremain > 1; nremain >>= 1) { + depth_limit += 2; + } + + // Allocate stack. + std::vector> stack; + stack.reserve(depth_limit); + + // Prepare recursive partitioning of the entire array. + if (num_records > insertion_sort_threshold) { + stack.emplace_back(buffer, num_records, depth_limit); + } + + // Execute recursive partitioning. + while (!stack.empty()) { + + // Pop a range from the stack. + unsigned char * range_begin; + size_t range_num_records; + std::tie(range_begin, range_num_records, depth_limit) = stack.back(); + stack.pop_back(); + + // Check recursion depth. Switch to heap sort if we get to deep. + if (depth_limit == 0) { + heap_sort_records(range_begin, record_size, range_num_records); + continue; + } + + // Initialize pointers to start, end and middle of range. + unsigned char * left_ptr = range_begin; + unsigned char * right_ptr = + range_begin + (range_num_records - 1) * record_size; + unsigned char * pivot_ptr = + range_begin + (range_num_records / 2) * record_size; + + // Sort the first, middle and last records such that they are + // in proper order with respect to each other. + if (record_compare(pivot_ptr, left_ptr, record_size) < 0) { + record_swap(left_ptr, pivot_ptr, record_size); + } + if (record_compare(right_ptr, pivot_ptr, record_size) < 0) { + record_swap(pivot_ptr, right_ptr, record_size); + if (record_compare(pivot_ptr, left_ptr, record_size) < 0) { + record_swap(left_ptr, pivot_ptr, record_size); + } + } + + // The median of the three records we examined is now in the + // middle of the range, pointed to by pivot_ptr. + // This is not necessarily the final location of that element. + + // The first and last record of the range are now on the proper + // side of the partition. No need to examine them again. + left_ptr += record_size; + right_ptr -= record_size; + + // Partition the rest of the array based on comparing to the pivot. + while (true) { + + // Skip left-side records that are less than the pivot. + while (record_compare(left_ptr, pivot_ptr, record_size) < 0) { + left_ptr += record_size; + } + + // Skip right-side records that are greater than the pivot. + while (record_compare(pivot_ptr, right_ptr, record_size) < 0) { + right_ptr -= record_size; + } + + // Stop when the pointers meet. + if (left_ptr >= right_ptr) { + break; + } + + // Swap the records that are on the wrong sides. + record_swap(left_ptr, right_ptr, record_size); + + // If we moved the pivot, update its pointer so it keeps + // pointing to the pivot value. + if (pivot_ptr == left_ptr) { + pivot_ptr = right_ptr; + } else if (pivot_ptr == right_ptr) { + pivot_ptr = left_ptr; + } + + // Do not compare the swapped elements again. + left_ptr += record_size; + right_ptr -= record_size; + + // Stop when pointers cross. + // (Pointers equal is not good enough at this point, because + // we won't know on which side the pointed record belongs.) + if (left_ptr > right_ptr) { + break; + } + } + + // If pointers are equal, they must both be pointing to a pivot. + // Bump both pointers so they correctly delineate the new + // subranges. The record where the pointers meet is already in + // its final position. + if (left_ptr == right_ptr) { + left_ptr += record_size; + right_ptr -= record_size; + } + + // Push left subrange on the stack, if it meets the size threshold. + size_t num_left = + (right_ptr + record_size - range_begin) / record_size; + if (num_left > insertion_sort_threshold) { + stack.emplace_back(range_begin, num_left, depth_limit - 1); + } + + // Push right subrange on the stack, if it meets the size threshold. + size_t num_right = + range_num_records - (left_ptr - range_begin) / record_size; + if (num_right > insertion_sort_threshold) { + stack.emplace_back(left_ptr, num_right, depth_limit - 1); + } + } + + // Recursive partitining finished. + // The array is now roughly sorted, except for subranges that were + // skipped because they are within the threshold. + + // Finish with insertion sort to get a fully sorted array. + insertion_sort_records(buffer, record_size, num_records); +} + + +/** Sort the specified block of records (in-place). */ +void sort_records( + unsigned char * buffer, + size_t record_size, + size_t num_records) +{ + // TODO : multi-threaded quicksort + +// heap_sort_records(buffer, record_size, num_records); + quicksort_records(buffer, record_size, num_records); +} + + +/** + * Remove duplicate records from an already sorted array. + * + * @return the number of unique records + */ +size_t filter_duplicate_records( + unsigned char * buffer, + unsigned int record_size, + size_t num_records) +{ + // Special case for 0 or 1 records. + if (num_records < 2) { + return num_records; + } + + // Find the first duplicate record. + unsigned char * last_unique = buffer; + unsigned char * next_record = buffer + record_size; + size_t next_pos = 1; + + while (next_pos < num_records) { + if (memcmp(last_unique, next_record, record_size) == 0) { + break; + } + last_unique = next_record; + next_record += record_size; + next_pos++; + } + + // Scan the rest of the records and copy unique records. + size_t num_unique = next_pos; + while (next_pos < num_records) { + if (memcmp(last_unique, next_record, record_size) != 0) { + num_unique++; + last_unique += record_size; + memcpy(last_unique, next_record, record_size); + } + next_record += record_size; + next_pos++; + } + + return num_unique; +} + + +/** + * Sort the whole file in a single pass. + * + * This function is used only when the input file fits in memory. + */ +void single_pass( + BinaryFile& input_file, + BinaryFile& output_file, + const SortContext& ctx) +{ + assert(input_file.size() < SIZE_MAX); + + size_t input_size = input_file.size(); + size_t num_records = input_size / ctx.record_size; + + log(ctx, "sorting %zu records in a single pass\n", num_records); + + // Allocate memory. + log(ctx, "allocating memory\n"); + std::vector buffer(input_size); + + Timer timer; + + // Read input file into memory. + log(ctx, "reading input file\n"); + timer.start(); + input_file.read(buffer.data(), 0, input_size); + timer.stop(); + log(ctx, " t = %.3f seconds\n", timer.value()); + +// TODO : multi-threaded sorting with thread pool + + // Sort records in memory buffer. + log(ctx, "sorting records\n"); + timer.start(); + sort_records(buffer.data(), ctx.record_size, num_records); + timer.stop(); + log(ctx, " t = %.3f seconds\n", timer.value()); + + if (ctx.flag_unique) { + // Eliminate duplicate records. + log(ctx, "filtering duplicate records\n"); + timer.start(); + num_records = filter_duplicate_records( + buffer.data(), ctx.record_size, num_records); + timer.stop(); + log(ctx, " t = %.3f seconds\n", timer.value()); + log(ctx, "found %zu unique records\n", num_records); + } + + log(ctx, "writing output file\n"); + timer.start(); + + // Shrink output file if duplicate records were removed. + uint64_t output_size = num_records * ctx.record_size; + if (output_size < input_size) { + output_file.truncate_file(output_size); + } + + // Write memory buffer to output file. + output_file.write(buffer.data(), 0, output_size); + timer.stop(); + log(ctx, " t = %.3f seconds\n", timer.value()); +} + + +/** + * Perform the initial sort pass of multi-pass external sorting. + * + * All blocks will have the specified number of records, except for + * the last block in the file which may be smaller. + * + * @param input_file Input file. + * @param output_file Output file for this pass. + * @param records_per_block Number of records per sort block. + * @param num_blocks Number of sort blocks. + * @param ctx Reference to context structure. + */ +void sort_pass( + BinaryFile& input_file, + BinaryFile& output_file, + uint64_t records_per_block, + uint64_t num_blocks, + const SortContext& ctx) +{ + unsigned int record_size = ctx.record_size; + uint64_t file_size = input_file.size(); + uint64_t num_records = file_size / record_size; + + log(ctx, "running initial sort pass\n"); + + Timer timer; + timer.start(); + +// TODO : double-buffer with I/O in separate thread + // Allocate sort buffer. + assert(records_per_block < SIZE_MAX / record_size); + std::vector buffer(records_per_block * record_size); + +// TODO : multi-threaded sorting with thread pool + + // Loop over blocks to be sorted. + for (uint64_t block_index = 0; block_index < num_blocks; block_index++) { + + uint64_t first_record_idx = block_index * records_per_block; + size_t block_num_records = + std::min(records_per_block, num_records - first_record_idx); + + log(ctx, + "sorting block %" PRIu64 " / %" PRIu64 ": %" PRIu64 " records\n", + block_index, + num_blocks, + block_num_records); + + // Read block. + input_file.read( + buffer.data(), + first_record_idx * record_size, + block_num_records * record_size); + + // Sort records in this block. + sort_records( + buffer.data(), + record_size, + block_num_records); + + // Write block. + output_file.write( + buffer.data(), + first_record_idx * record_size, + block_num_records * record_size); + } + + timer.stop(); + log(ctx, "initial sort pass finished\n"); + log(ctx, " t = %.3f seconds\n", timer.value()); +} + + +/** + * Perform a merge pass of multi-pass external sorting. + * + * All input blocks will have the specified number of records, except for + * the last block in the file which may be smaller. + * + * All output blocks will have (branch_factor * records_per_block) records, + * except for the last output block in the file which may be smaller. + * + * If "filter_duplicates" is specified, the number of output blocks MUST be 1. + * In this case duplicate elements will be removed from the output. + * + * @param input_file Input file for this pass. + * @param output_file Output file for this pass. + * @param records_per_block Number of records per input block. + * @param num_blocks Number of input blocks. + * @param branch_factor Number of blocks to merge per an output block. + * @param filter_duplicates true to eliminate duplicate elements. + * @param ctx Reference to context structure. + */ +void merge_pass( + BinaryFile& input_file, + BinaryFile& output_file, + uint64_t records_per_block, + uint64_t num_blocks, + unsigned int branch_factor, + bool filter_duplicates, + const SortContext& ctx) +{ +// TODO TODO TODO TODO +#if 0 + // Eliminating duplicates is only supported for a single output block. + assert((!filter_duplicates) || (num_blocks <= branch_factor)); + + Timer timer; + timer.start(); + +// TODO : double-buffer with I/O in separate thread + // Allocate sort buffer. + assert(records_per_block < SIZE_MAX / record_size); + std::vector buffer(records_per_block * record_size); + + // Loop over groups of blocks to be sorted. + uint64_t block_index = 0; + while (block_index < num_blocks) { + + // Determine how many blocks will be merged in this group. + unsigned int this_branch_factor = branch_factor; + if (branch_factor > num_blocks - block_index) { + branch_factor = num_blocks - block_index; + } + + + } + + // Loop over blocks to be sorted. + for (uint64_t block_index = 0; block_index < num_blocks; block_index++) { + + uint64_t block_start_idx = block_index * records_per_block; + size_t block_num_records = + std::min(records_per_block, num_records - block_start_idx); + + log(ctx, + "sorting block %" PRIu64 " / %" PRIu64 ": %" PRIu64 " records\n", + block_index, + num_blocks, + block_num_records); + + // Read block. + input_file.read( + buffer.data(), + block_start_idx * record_size, + block_num_records * record_size); + + // Sort records in this block. + sort_records( + buffer.data(), + record_size, + block_num_records); + + // Write block. + output_file.write( + buffer.data(), + block_start_idx * record_size, + block_num_records * record_size); + } + + timer.stop(); + log(ctx, " t = %.3f seconds\n", timer.value()); +#endif +} + + +/** + * Prepare a strategy for multi-pass external sorting. + */ +SortStrategy plan_multi_pass_strategy( + uint64_t file_size, + const SortContext& ctx) +{ + // Plan the initial sort pass. + // Use blocks that are at most half of available memory + // (so we can use two buffers to overlap I/O and sorting). + uint64_t max_sort_block_size = ctx.memory_size / 2; + + // Calculate number of records per block. + uint64_t records_per_sort_block = max_sort_block_size / ctx.record_size; + + // Make sure the block size is a multiple of 4096 bytes. + records_per_sort_block -= records_per_sort_block % 4096; + + // Calculate the number of blocks during the initial sort pass. + uint64_t num_records = file_size / ctx.record_size; + uint64_t num_sort_blocks = 1 + (num_records - 1) / records_per_sort_block; + + SortStrategy strategy; + strategy.records_per_sort_block = records_per_sort_block; + strategy.num_sort_blocks = num_sort_blocks; + + // Plan the merge passes. + // Start with the result of the initial sort pass. + uint64_t records_per_block = records_per_sort_block; + uint64_t num_blocks = num_sort_blocks; + + // Keep merging until there is only one block left. + while (num_blocks > 1) { + // Calculate the number of blocks out of this merge pass. + uint64_t num_merged_blocks = 1 + (num_blocks - 1) / ctx.branch_factor; + + // Choose the smallest branch factor that produces this nr of blocks. + unsigned int branch_factor = 1 + (num_blocks - 1) / num_merged_blocks; + + SortStrategy::MergePass merge_pass; + merge_pass.records_per_input_block = records_per_block; + merge_pass.num_input_blocks = num_blocks; + merge_pass.branch_factor = branch_factor; + strategy.merge_pass.push_back(merge_pass); + + // Result of this merge pass will go into the next pass. + records_per_block *= branch_factor; + num_blocks = num_merged_blocks; + } + + return strategy; +} + + +/** + * Sort a binary data file. + * + * @param input_file Path name of binary input file. + * @param output_file Path name of binary output file. + * @param ctx Reference to context structure. + */ +void sortbin( + const std::string& input_name, + const std::string& output_name, + const SortContext& ctx) +{ + // We want file I/O to occur on 4096-byte boundaries. + // To ensure this, we want to do I/O on multiples of 4096 records. + // To ensure this is possible, we need room for ~ 32k records per branch. + if (ctx.branch_factor > ctx.memory_size / ctx.record_size / 32768) { + throw std::logic_error( + "Not enough memory for this combination of record size" + " and branch factor"); + } + + // Open input file. + log(ctx, "opening input file\n"); + BinaryInputFile input_file(input_name); + uint64_t file_size = input_file.size(); + + // Check that input file contains an integer number of records. + if (file_size % ctx.record_size != 0) { + throw std::logic_error( + "Input file does not contain an integer number of records"); + } + + // Create output file. + log(ctx, "creating output file\n"); + BinaryOutputFile output_file(output_name, file_size); + + if (file_size <= ctx.memory_size) { + // Data fits in memory. + + // Sort in a single pass. + single_pass(input_file, output_file, ctx); + + } else { + // Data does not fit in memory. + + // Plan a multi-pass strategy. + SortStrategy strategy = plan_multi_pass_strategy(file_size, ctx); + unsigned int num_merge_pass = strategy.merge_pass.size(); + + log(ctx, + "sorting %" PRIu64 " records in %" PRIu64 " blocks" + " followed by %u merge passes\n", + file_size / ctx.record_size, + strategy.num_sort_blocks, + num_merge_pass); + + // Create a temporary file. + log(ctx, "creating temporary file\n"); + BinaryTempFile temp_file(ctx.temporary_directory, file_size); + + // The merge passes alternate between tempfile-to-outputfile and + // outputfile-to-tempfile. + // The final merge pass will be tempfile-to-outputfile. + // Depending on the number of merge passes, the initial sort pass + // will either be inputfile-to-tempfile or inputfile-to-outputfile. + BinaryFile * output_or_temp_file[2] = { &output_file, &temp_file }; + BinaryFile * sort_output_file = + output_or_temp_file[num_merge_pass % 2]; + + // Execute the initial sort pass. + sort_pass( + input_file, + *sort_output_file, + strategy.records_per_sort_block, + strategy.num_sort_blocks, + ctx); + + // Execute the merge passes. + for (unsigned int mp = 0; mp < num_merge_pass; mp++) { + log(ctx, + "running merge pass %u / %u: " + "%" PRIu64 " blocks, branch factor %u\n", + mp, + num_merge_pass, + strategy.merge_pass[mp].num_input_blocks, + strategy.merge_pass[mp].branch_factor); + + // Filter duplicate records only on the last pass. + bool filter_dupl = ctx.flag_unique && (mp + 1 == num_merge_pass); + + // Alternate between temp_file and output_file. + BinaryFile * pass_input_file = + output_or_temp_file[(num_merge_pass - mp) % 2]; + BinaryFile * pass_output_file = + output_or_temp_file[(num_merge_pass - mp - 1) % 2]; + + // Execute the merge pass. + merge_pass( + *pass_input_file, + *pass_output_file, + strategy.merge_pass[mp].records_per_input_block, + strategy.merge_pass[mp].num_input_blocks, + strategy.merge_pass[mp].branch_factor, + filter_dupl, + ctx); + } + } + + log(ctx, "finished\n"); +} + + +std::string get_default_tmpdir(void) +{ + const char *tmpdir = getenv("TMPDIR"); + if (tmpdir == NULL) { + tmpdir = "/tmp"; + } + return std::string(tmpdir); +} + + +void usage() +{ + fprintf(stderr, + "\n" + "Usage: sortbin [options] inputfile outputfile\n" + "\n" + "Options:\n" + "\n" + " -s, --size=N specify record size of N bytes (required)\n" + " -u, --unique eleminate duplicates after sorting\n" + " --memory=M use at most M MByte RAM (default: %d)\n" + " --branch=B merge at most B arrays in one step (default: %d)\n" + " --temporary-directory=DIR write temporary file to the specified\n" + " directory (default: $TMPDIR)\n" + "\n" + "The output file must not yet exist.\n" + "If the data does not fit in memory, a temporary file will be\n" + "created with the same size as the input/output files.\n" + "\n", + DEFAULT_MEMORY_SIZE_MBYTE, + DEFAULT_BRANCH_FACTOR); +} + + +} // anonymous namespace + + +int main(int argc, char **argv) +{ + const struct option longopts[] = { + { "size", 1, NULL, 's' }, + { "unique", 0, NULL, 'u' }, + { "memory", 1, NULL, 'M' }, + { "branch", 1, NULL, 'B' }, + { "temporary-directory", 1, NULL, 'T' }, + { "verbose", 0, NULL, 'v' }, + { "help", 0, NULL, 'h' }, + { NULL, 0, NULL, 0 } + }; + bool flag_unique = false; + bool flag_verbose = false; + int record_size = 0; + int memory_size = DEFAULT_MEMORY_SIZE_MBYTE; + int branch_factor = DEFAULT_BRANCH_FACTOR; + std::string tempdir = get_default_tmpdir(); + int opt; + + while ((opt = getopt_long(argc, argv, "uv", longopts, NULL)) != -1) { + switch (opt) { + case 's': + record_size = atoi(optarg); + if (record_size < 1) { + fprintf(stderr, + "ERROR: Invalid record size (must be at least 1)\n"); + return EXIT_FAILURE; + } + break; + case 'u': + flag_unique = true; + break; + case 'M': + memory_size = atoi(optarg); + if (memory_size <= 0) { + fprintf(stderr, "ERROR: Invalid memory size\n"); + return EXIT_FAILURE; + } + break; + case 'B': + branch_factor = atoi(optarg); + if (branch_factor < 2) { + fprintf(stderr, + "ERROR: Invalid radix value, must be at least 2\n"); + return EXIT_FAILURE; + } + break; + case 'T': + tempdir = optarg; + break; + case 'v': + flag_verbose = true; + break; + case 'h': + usage(); + return EXIT_SUCCESS; + default: + usage(); + return EXIT_FAILURE; + } + } + + if (record_size < 1) { + fprintf(stderr, "ERROR: Missing required parameter --size\n"); + usage(); + return EXIT_FAILURE; + } + + if (argc < optind + 2) { + fprintf(stderr, + "ERROR: Input and output file names must be specified\n"); + usage(); + return EXIT_FAILURE; + } + + if (argc > optind + 2) { + fprintf(stderr, "ERROR: Unexpected command-line parameters\n"); + usage(); + return EXIT_FAILURE; + } + + if ((unsigned int)memory_size >= SIZE_MAX / 1024 / 1024) { + fprintf( + stderr, + "ERROR: This system can allocate at most %zu MB memory\n", + SIZE_MAX / 1024 / 1024 - 1); + return EXIT_FAILURE; + } + + std::string input_name(argv[optind]); + std::string output_name(argv[optind+1]); + + SortContext ctx; + ctx.record_size = record_size; + ctx.memory_size = uint64_t(memory_size) * 1024 * 1024; + ctx.branch_factor = branch_factor; + ctx.flag_unique = flag_unique; + ctx.flag_verbose = flag_verbose; + ctx.temporary_directory = tempdir; + + try { + sortbin(input_name, output_name, ctx); + } catch (const std::exception& ex) { + fprintf(stderr, "ERROR: %s\n", ex.what()); + return EXIT_FAILURE; + } + + return EXIT_SUCCESS; +}