/* * Sort arrays of binary data records. * * Input and output files contain plain, raw arrays of fixed-length * binary data records. * * Records are interpreted as fixed-length strings of 8-bit unsigned integers. * The program sorts these records in lexicographic order. * * This program is optimized for short records, e.g. up to 20 bytes. */ /* * TESTING in-memory sort: * * Input: 10**8 records of 10 bytes * Storage: SSD * * GNU sort: 101.7 seconds * GNU sort -S 2G: 110.2 seconds * sortbin, qsort_r(): 31.1, 31.2, 31.1 seconds (correct output) * sortbin, heapsort: 57.4, 57.1, 58.5 seconds (correct output) * sortbin, quicksort: 24.5, 24.4, 24.4 seconds (correct output) * sortbin, quicksort, depth_limit=8: 31.6 seconds (correct output) * * * Input: 10**8 records of 10 bytes, 70502908 unique records * * GNU sort -u: 120.2 seconds * sortbin -u: 26.2 seconds (correct output) * */ // (already defined by g++) #define _GNU_SOURCE #define _FILE_OFFSET_BITS 64 #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include // TODO : use a background thread for file I/O /* Maximum amount of RAM to use (in MBytes). */ #define DEFAULT_MEMORY_SIZE_MBYTE 1024 /* Default branch factor while merging. */ #define DEFAULT_BRANCH_FACTOR 16 /* Default number of sorting threads. */ #define DEFAULT_THREADS 1 /* Align buffer sizes and I/O on this number of records. For efficiency, I/O should be done in multiples of 4096 bytes. */ #define TRANSFER_ALIGNMENT 4096 namespace { // anonymous namespace /** Information about the sort job. */ struct SortContext { /** Record length in bytes. */ unsigned int record_size; /** Maximum memory to use (bytes). */ uint64_t memory_size; /** Maximum number of arrays to merge in one step. */ unsigned int branch_factor; /** True to eliminate duplicate records. */ bool flag_unique; /** True to write progress messages to stderr. */ bool flag_verbose; /** Directory where temporary files are created. */ std::string temporary_directory; }; /** Strategy for multi-pass external sorting. */ struct SortStrategy { /** Strategy for a single merge pass. */ struct MergePass { /** Number of records per input block into this merge pass. */ uint64_t records_per_input_block; /** Total number of input blocks into this merge pass. */ uint64_t num_input_blocks; /** Number of input blocks to merge into each output block. */ unsigned int branch_factor; }; /** Number of records per block during the initial sort pass. */ uint64_t records_per_sort_block; /** Number of blocks for the initial sort pass. */ uint64_t num_sort_blocks; /** List of merge passes. */ std::vector merge_pass; }; /** Show a progress message. */ void log(const SortContext& ctx, const char *format, ...) __attribute__ ((format (printf, 2, 3))) ; void log(const SortContext& ctx, const char *format, ...) { va_list ap; va_start(ap, format); if (ctx.flag_verbose) { vfprintf(stderr, format, ap); } va_end(ap); } /** Time measurements. */ class Timer { public: Timer() : m_value(0.0), m_running(false) { } double value() const { double v = m_value; if (m_running) { struct timespec tnow; clock_gettime(CLOCK_REALTIME, &tnow); v += tnow.tv_sec - m_tstart.tv_sec; v += 1.0e-9 * (tnow.tv_nsec - m_tstart.tv_nsec); } return m_value; } void start() { m_value = 0.0; m_running = true; clock_gettime(CLOCK_REALTIME, &m_tstart); } void resume() { if (!m_running) { m_running = true; clock_gettime(CLOCK_REALTIME, &m_tstart); } } void stop() { if (m_running) { struct timespec tnow; clock_gettime(CLOCK_REALTIME, &tnow); m_value += tnow.tv_sec - m_tstart.tv_sec; m_value += 1.0e-9 * (tnow.tv_nsec - m_tstart.tv_nsec); m_running = false; } } private: double m_value; bool m_running; struct timespec m_tstart; }; /** * Binary file access. * * This is a base class which can not be directly constructed. * Subclasses implement constructors that open files in various ways. */ class BinaryFile { public: // Prevent copying and assignment. BinaryFile(const BinaryFile&) = delete; BinaryFile& operator=(const BinaryFile&) = delete; /** Return file size in bytes. */ uint64_t size() const { return m_file_size; } /** Truncate file to reduce its size. */ void truncate_file(uint64_t new_size); /** Read raw bytes from file. */ void read( unsigned char * buf, uint64_t file_offset, size_t length); /** Write raw bytes to file. */ void write( const unsigned char * buf, uint64_t file_offset, size_t length); protected: // Prevent constructing and destructing via this base class. BinaryFile() : m_fd(-1), m_file_size(0) { } /** Destructor: close file. */ ~BinaryFile() { if (m_fd >= 0) { close(m_fd); } } /** File descriptor. */ int m_fd; /** File size (bytes). */ uint64_t m_file_size; }; // Truncate file to reduce its size. void BinaryFile::truncate_file(uint64_t new_size) { int ret = ftruncate(m_fd, new_size); if (ret != 0) { throw std::system_error( errno, std::system_category(), "Can not truncate file"); } m_file_size = new_size; } // Read raw bytes from file. void BinaryFile::read( unsigned char * buf, uint64_t file_offset, size_t length) { // Note that pread() may read fewer bytes than requested. while (length > 0) { ssize_t ret = pread(m_fd, buf, length, file_offset); if (ret <= 0) { throw std::system_error( errno, std::system_category(), "Can not read from file"); } buf += ret; file_offset += ret; length -= ret; } } // Write raw bytes to file. void BinaryFile::write( const unsigned char * buf, uint64_t file_offset, size_t length) { // Note that pwrite() may write fewer bytes than requested. while (length > 0) { ssize_t ret = pwrite(m_fd, buf, length, file_offset); if (ret <= 0) { throw std::system_error( errno, std::system_category(), "Can not write to file"); } buf += ret; file_offset += ret; length -= ret; } } /** Binary input file. */ class BinaryInputFile : public BinaryFile { public: /** Open existing file for read-only access. */ explicit BinaryInputFile(const std::string& filename) { m_fd = open(filename.c_str(), O_RDONLY); if (m_fd < 0) { throw std::system_error( errno, std::system_category(), "Can not open input file"); } off_t fsz = lseek(m_fd, 0, SEEK_END); if (fsz == (off_t)(-1)) { throw std::system_error( errno, std::system_category(), "Can not seek input file"); } m_file_size = fsz; } }; /** Binary output file. */ class BinaryOutputFile : public BinaryFile { public: /** Create output file and pre-allocate space. */ BinaryOutputFile(const std::string& filename, uint64_t new_size) { m_fd = open(filename.c_str(), O_RDWR | O_CREAT | O_EXCL, 0666); if (m_fd < 0) { throw std::system_error( errno, std::system_category(), "Can not create output file"); } int ret = posix_fallocate(m_fd, 0, new_size); if (ret != 0) { int errnum = errno; // Delete empty output file. unlink(filename.c_str()); throw std::system_error( errnum, std::system_category(), "Can not allocate space in output file"); } m_file_size = new_size; } }; /** Temporary binary file. */ class BinaryTempFile : public BinaryFile { public: /** Create temporary file and pre-allocate space. */ BinaryTempFile(const std::string& tempdir, uint64_t new_size) { // O_TMPFILE creates a new file without a name. // Effectively the file is created and immediately unlinked. // As a result, the file will be automatically deleted when closed. m_fd = open(tempdir.c_str(), O_RDWR | O_TMPFILE, 0600); if (m_fd < 0) { throw std::system_error( errno, std::system_category(), "Can not create temporary file"); } int ret = posix_fallocate(m_fd, 0, new_size); if (ret != 0) { throw std::system_error( errno, std::system_category(), "Can not allocate space in temporary file"); } m_file_size = new_size; } }; /** * Read binary records from an input file with buffering. * * The input stream reads from a sequence of discontinuous, equally spaced * blocks in the input file. All blocks have the same size, except for * the last block which may be shorter if it runs to the end of the file. * Each block contains a flat array of binary records. * * The input stream starts in the "empty" state. * The first call to "next_block()" enables reading records from the * first input block. When the input stream reaches the end of the * current block, it again enters the "empty" state until the following * call to "next_block()". */ class RecordInputStream { // TODO : double-buffering with delayed I/O via background thread public: /** * Construct a record input stream. * * The stream will initially contain no input sections. * * @param input_file Input file where records read from. * @param record_size Record size in bytes. * @param start_offset Offset in input file of first input section. * @param block_size Size of each input block in bytes. * Must be a multiple of "record_size". * The last input block may be shorter if it runs * to the end of the file. * @param block_stride Distance between start of blocks in bytes. * @param buffer_size Buffer size in bytes. * Must be a multiple of "record_size". * Note: Each RecordInputStream creates two buffers * of the specified size. */ RecordInputStream( BinaryFile& input_file, unsigned int record_size, uint64_t start_offset, uint64_t block_size, uint64_t block_stride, size_t buffer_size) : m_input_file(input_file), m_record_size(record_size), m_block_offset(start_offset), m_block_size(block_size), m_block_stride(block_stride), m_block_remaining(0), m_file_offset(0), m_bufpos(NULL), m_bufend(NULL), m_buffer(buffer_size) { assert(start_offset <= input_file.size()); assert(block_size % record_size == 0); assert(block_size <= block_stride); assert(buffer_size % record_size == 0); assert(buffer_size > record_size); } // Prevent copying and assignment. RecordInputStream(const RecordInputStream&) = delete; RecordInputStream& operator=(const RecordInputStream&) = delete; /** Return true if the end of the current input block is reached. */ inline bool empty() const { return (m_bufpos == m_bufend); } /** * Return a pointer to the current binary record. * * This function must only be used if "end_of_stream()" returns false. * The returned pointer becomes invalid after a call to "next_record()". */ inline const unsigned char * record() const { return m_bufpos; } /** * Move to the next record of the current section. * * This function must only be used if "empty()" returns false. * Calling this function invalidates all pointers previously returned * by "current_record()". */ inline void next_record() { assert(m_bufpos != m_bufend); m_bufpos += m_record_size; if (m_bufpos == m_bufend) { m_bufpos = m_buffer.data(); m_bufend = m_buffer.data(); refill_buffer(); } } /** * Start reading from the next input section. * * This function may only be called if "empty()" returns true. */ void next_block() { assert(m_bufpos == m_bufend); uint64_t file_size = m_input_file.size(); assert(m_block_stride < file_size - m_block_offset); m_block_offset += m_block_stride; m_file_offset = m_block_offset; m_block_remaining = std::min(m_block_size, file_size - m_block_offset); refill_buffer(); } private: /** Refill the buffer from the current input section. */ void refill_buffer() { if (m_block_remaining > 0) { size_t transfer_size = (m_buffer.size() < m_block_remaining) ? m_buffer.size() : m_block_remaining; m_input_file.read(m_buffer.data(), m_file_offset, transfer_size); m_file_offset += transfer_size; m_block_remaining -= transfer_size; m_bufend = m_buffer.data() + transfer_size; } } BinaryFile& m_input_file; const unsigned int m_record_size; uint64_t m_block_offset; uint64_t m_block_size; uint64_t m_block_stride; uint64_t m_block_remaining; uint64_t m_file_offset; unsigned char * m_bufpos; unsigned char * m_bufend; std::vector m_buffer; }; /** * Write binary records to an output file with buffering. */ class RecordOutputStream { // TODO : double-buffering with delayed I/O via background thread public: /** * Construct a record output stream. * * @param output_file Output file where records are written. * @param record_size Record size in bytes. * @param file_offset Start offset in the output file. * @param buffer_size Buffer size in bytes. * Note: Each RecordOutputStream creates two buffers * of the specified size. */ RecordOutputStream( BinaryFile& output_file, unsigned int record_size, uint64_t file_offset, size_t buffer_size) : m_output_file(output_file), m_record_size(record_size), m_file_offset(file_offset), m_bufpos(0), m_buffer(buffer_size) { } // Prevent copying and assignment. RecordOutputStream(const RecordOutputStream&) = delete; RecordOutputStream& operator=(const RecordOutputStream&) = delete; /** Append a record to the output stream. */ inline void put(const unsigned char *record) { if (m_record_size > m_buffer.size() - m_bufpos) { flush(); } memcpy(m_buffer.data() + m_bufpos, record, m_record_size); } /** Flush buffered records to the output file. */ void flush() { if (m_bufpos > 0) { m_output_file.write(m_buffer.data(), m_file_offset, m_bufpos); m_file_offset += m_bufpos; m_bufpos = 0; } } private: BinaryFile& m_output_file; const unsigned int m_record_size; uint64_t m_file_offset; size_t m_bufpos; std::vector m_buffer; }; /** Compare two records. */ #define record_compare(_a, _b, _n) (memcmp(_a, _b, _n)) /** Copy a record. */ #define record_copy(_dst, _src, _n) (memcpy(_dst, _src, _n)) /** * Swap two records. */ inline void record_swap( unsigned char *a, unsigned char *b, size_t record_size) { while (record_size > 0) { unsigned char aa = *a; unsigned char bb = *b; *b = aa; *a = bb; a++; b++; record_size--; } } /** Return the index of the parent of the specified heap node. */ inline size_t heap_parent_index(size_t node_index) { return (node_index - 1) / 2; } /** Return the index of the left child of the specified heap node. */ inline size_t heap_left_child_index(size_t node_index) { return node_index * 2 + 1; } /** * Insert a node into an empty spot in the heap, then repair the sub-heap * rooted at that position. */ void heap_sift_down_records( unsigned char * buffer, size_t record_size, size_t num_records, size_t insert_index, const unsigned char * insert_value) { // Find the first node index which does not have two child nodes. size_t parent_end = heap_parent_index(num_records); // Move the empty spot all the way down through the sub-heap. size_t cur_idx = insert_index; while (cur_idx < parent_end) { // Find the left child node of the current node. size_t child_idx = heap_left_child_index(cur_idx); unsigned char * child_ptr = buffer + child_idx * record_size; // Compare the two child nodes. if (record_compare(child_ptr, child_ptr + record_size, record_size) < 0) { // Right child is greater, choose that one. child_idx += 1; child_ptr += record_size; } // Move the chosen child to the empty spot. unsigned char * cur_ptr = buffer + cur_idx * record_size; record_copy(cur_ptr, child_ptr, record_size); // Continue the scan from the (now empty) child spot. cur_idx = child_idx; } // If the empty spot has a left child, swap it with the empty spot. if (num_records > 1 && cur_idx <= heap_parent_index(num_records - 1)) { size_t child_idx = heap_left_child_index(cur_idx); unsigned char * cur_ptr = buffer + cur_idx * record_size; unsigned char * child_ptr = buffer + child_idx * record_size; record_copy(cur_ptr, child_ptr, record_size); cur_idx = child_idx; } // The empty spot is now in a leaf node of the heap. // Scan back up to find the right place to insert the new node. // // Going all the way down and then back up may seem wasteful, // but it is faster in practice because the correct insertion spot // is likely to be in the bottom of the heap. while (cur_idx > insert_index) { // Find parent of the empty spot. size_t parent_idx = heap_parent_index(cur_idx); unsigned char * parent_ptr = buffer + parent_idx * record_size; // Compare the new value to the parent value. if (record_compare(parent_ptr, insert_value, record_size) >= 0) { // We found the right spot to insert the new value. break; } // Move the parent back to the empty spot. unsigned char * cur_ptr = buffer + cur_idx * record_size; record_copy(cur_ptr, parent_ptr, record_size); // Move to te parent node. cur_idx = parent_idx; } // Insert the new node at the empty position. unsigned char * cur_ptr = buffer + cur_idx * record_size; record_copy(cur_ptr, insert_value, record_size); } /** * Sort an array of records using in-place heap sort. * * Run time: O(N * log(N)) * Extra space: O(1) */ void heap_sort_records( unsigned char * buffer, size_t record_size, size_t num_records) { // Skip trivial cases. if (num_records < 2) { return; } // Allocate temporary space for one record. std::vector temp(record_size); // // Phase 1: Transform the unordered array into a max-heap. // // Bottom-up loop over all non-trivial sub-heaps. // Start with the parent of the last node. size_t cur_idx = heap_parent_index(num_records - 1); while (true) { // Remove the current node from the heap. unsigned char * cur_ptr = buffer + cur_idx * record_size; record_copy(temp.data(), cur_ptr, record_size); // Re-insert the node and repair the sub-heap rooted at this node. heap_sift_down_records( buffer, record_size, num_records, cur_idx, temp.data()); // Stop after processing the root node. if (cur_idx == 0) { break; } // Go do the next sub-heap. cur_idx--; } // // Phase 2: Transform the max-heap into a sorted array. // // Loop over the record array from back to front. cur_idx = num_records - 1; while (cur_idx > 0) { // Remove the root node from the heap. // This is the largest remaining element, which belongs at index CUR // in the sorted array. record_copy(temp.data(), buffer, record_size); // Remove the node at index CUR from the heap. // This reduces the size of the heap by 1. // Re-insert the removed node as the root node and repair the heap. unsigned char * cur_ptr = buffer + cur_idx * record_size; heap_sift_down_records(buffer, record_size, cur_idx, 0, cur_ptr); // Copy the former root node to its position in the sorted array. record_copy(cur_ptr, temp.data(), record_size); // Go to next element. cur_idx--; } } /** * Sort an array of records using in-place insertion sort. * * This is a helper function for quicksort_records(). */ void insertion_sort_records( unsigned char * buffer, size_t record_size, size_t num_records) { // Allocate temporary space for one record. std::vector temp(record_size); for (size_t cur_idx = 1; cur_idx < num_records; cur_idx++) { // The partial array 0 .. (cur_idx - 1) is already sorted. // We will now insert cur_idx into the sorted array. // Quick check whether the new record is already in the right place. unsigned char * cur_ptr = buffer + cur_idx * record_size; unsigned char * prev_ptr = cur_ptr - record_size; if (record_compare(cur_ptr, prev_ptr, record_size) >= 0) { continue; } // Scan backwards through the sorted array to find the right place // to insert the new record. unsigned char * insert_ptr = prev_ptr; while (insert_ptr > buffer) { prev_ptr = insert_ptr - record_size; if (record_compare(cur_ptr, prev_ptr, record_size) >= 0) { // Found the right place to insert the new record. break; } // Continue backwards scan. insert_ptr = prev_ptr; } // Copy the new record to temporary storage. record_copy(temp.data(), cur_ptr, record_size); // Move sorted records to make space for the new record. memmove(insert_ptr + record_size, insert_ptr, cur_ptr - insert_ptr); // Copy the new record to its place. record_copy(insert_ptr, temp.data(), record_size); } } /** * Sort an array of records using in-place quicksort. * * Run time: O(N * log(N)) * Extra space: O(log(N)) * * Plain quicksort is known to have quadratic worst-case behaviour. * This implementation uses median-of-three partitioning to reduce * the probability of worst-case performance. If bad performance does * occur, this implementation detects it and switches to heap sort. */ void quicksort_records( unsigned char * buffer, size_t record_size, size_t num_records) { // Recursive partitioning is only applied to fragments larger than // this threshold. The rest of the work will be done by insertion sort. const size_t insertion_sort_threshold = 12; // Skip trivial cases. if (num_records < 2) { return; } // Determine maximum acceptable recursion depth. unsigned int depth_limit = 1; for (size_t nremain = num_records; nremain > 1; nremain >>= 1) { depth_limit += 2; } // Allocate stack. std::vector> stack; stack.reserve(depth_limit); // Prepare recursive partitioning of the entire array. if (num_records > insertion_sort_threshold) { stack.emplace_back(buffer, num_records, depth_limit); } // Execute recursive partitioning. while (!stack.empty()) { // Pop a range from the stack. unsigned char * range_begin; size_t range_num_records; std::tie(range_begin, range_num_records, depth_limit) = stack.back(); stack.pop_back(); // Check recursion depth. Switch to heap sort if we get to deep. if (depth_limit == 0) { heap_sort_records(range_begin, record_size, range_num_records); continue; } // Initialize pointers to start, end and middle of range. unsigned char * left_ptr = range_begin; unsigned char * right_ptr = range_begin + (range_num_records - 1) * record_size; unsigned char * pivot_ptr = range_begin + (range_num_records / 2) * record_size; // Sort the first, middle and last records such that they are // in proper order with respect to each other. if (record_compare(pivot_ptr, left_ptr, record_size) < 0) { record_swap(left_ptr, pivot_ptr, record_size); } if (record_compare(right_ptr, pivot_ptr, record_size) < 0) { record_swap(pivot_ptr, right_ptr, record_size); if (record_compare(pivot_ptr, left_ptr, record_size) < 0) { record_swap(left_ptr, pivot_ptr, record_size); } } // The median of the three records we examined is now in the // middle of the range, pointed to by pivot_ptr. // This is not necessarily the final location of that element. // The first and last record of the range are now on the proper // side of the partition. No need to examine them again. left_ptr += record_size; right_ptr -= record_size; // Partition the rest of the array based on comparing to the pivot. while (true) { // Skip left-side records that are less than the pivot. while (record_compare(left_ptr, pivot_ptr, record_size) < 0) { left_ptr += record_size; } // Skip right-side records that are greater than the pivot. while (record_compare(pivot_ptr, right_ptr, record_size) < 0) { right_ptr -= record_size; } // Stop when the pointers meet. if (left_ptr >= right_ptr) { break; } // Swap the records that are on the wrong sides. record_swap(left_ptr, right_ptr, record_size); // If we moved the pivot, update its pointer so it keeps // pointing to the pivot value. if (pivot_ptr == left_ptr) { pivot_ptr = right_ptr; } else if (pivot_ptr == right_ptr) { pivot_ptr = left_ptr; } // Do not compare the swapped elements again. left_ptr += record_size; right_ptr -= record_size; // Stop when pointers cross. // (Pointers equal is not good enough at this point, because // we won't know on which side the pointed record belongs.) if (left_ptr > right_ptr) { break; } } // If pointers are equal, they must both be pointing to a pivot. // Bump both pointers so they correctly delineate the new // subranges. The record where the pointers meet is already in // its final position. if (left_ptr == right_ptr) { left_ptr += record_size; right_ptr -= record_size; } // Push left subrange on the stack, if it meets the size threshold. size_t num_left = (right_ptr + record_size - range_begin) / record_size; if (num_left > insertion_sort_threshold) { stack.emplace_back(range_begin, num_left, depth_limit - 1); } // Push right subrange on the stack, if it meets the size threshold. size_t num_right = range_num_records - (left_ptr - range_begin) / record_size; if (num_right > insertion_sort_threshold) { stack.emplace_back(left_ptr, num_right, depth_limit - 1); } } // Recursive partitining finished. // The array is now roughly sorted, except for subranges that were // skipped because they are within the threshold. // Finish with insertion sort to get a fully sorted array. insertion_sort_records(buffer, record_size, num_records); } /** Sort the specified block of records (in-place). */ void sort_records( unsigned char * buffer, size_t record_size, size_t num_records) { // TODO : multi-threaded quicksort // heap_sort_records(buffer, record_size, num_records); quicksort_records(buffer, record_size, num_records); } /** * Remove duplicate records from an already sorted array. * * @return the number of unique records */ size_t filter_duplicate_records( unsigned char * buffer, unsigned int record_size, size_t num_records) { // Special case for 0 or 1 records. if (num_records < 2) { return num_records; } // Find the first duplicate record. unsigned char * last_unique = buffer; unsigned char * next_record = buffer + record_size; size_t next_pos = 1; while (next_pos < num_records) { if (memcmp(last_unique, next_record, record_size) == 0) { break; } last_unique = next_record; next_record += record_size; next_pos++; } // Scan the rest of the records and copy unique records. size_t num_unique = next_pos; while (next_pos < num_records) { if (memcmp(last_unique, next_record, record_size) != 0) { num_unique++; last_unique += record_size; memcpy(last_unique, next_record, record_size); } next_record += record_size; next_pos++; } return num_unique; } /** * Sort the whole file in a single pass. * * This function is used only when the input file fits in memory. */ void single_pass( BinaryFile& input_file, BinaryFile& output_file, const SortContext& ctx) { assert(input_file.size() < SIZE_MAX); size_t input_size = input_file.size(); size_t num_records = input_size / ctx.record_size; log(ctx, "sorting %zu records in a single pass\n", num_records); // Allocate memory. log(ctx, "allocating memory\n"); std::vector buffer(input_size); Timer timer; // Read input file into memory. log(ctx, "reading input file\n"); timer.start(); input_file.read(buffer.data(), 0, input_size); timer.stop(); log(ctx, " t = %.3f seconds\n", timer.value()); // TODO : multi-threaded sorting with thread pool // Sort records in memory buffer. log(ctx, "sorting records\n"); timer.start(); sort_records(buffer.data(), ctx.record_size, num_records); timer.stop(); log(ctx, " t = %.3f seconds\n", timer.value()); if (ctx.flag_unique) { // Eliminate duplicate records. log(ctx, "filtering duplicate records\n"); timer.start(); num_records = filter_duplicate_records( buffer.data(), ctx.record_size, num_records); timer.stop(); log(ctx, " t = %.3f seconds\n", timer.value()); log(ctx, "found %zu unique records\n", num_records); } log(ctx, "writing output file\n"); timer.start(); // Shrink output file if duplicate records were removed. uint64_t output_size = num_records * ctx.record_size; if (output_size < input_size) { output_file.truncate_file(output_size); } // Write memory buffer to output file. output_file.write(buffer.data(), 0, output_size); timer.stop(); log(ctx, " t = %.3f seconds\n", timer.value()); } /** * Perform the initial sort pass of multi-pass external sorting. * * All blocks will have the specified number of records, except for * the last block in the file which may be smaller. * * @param input_file Input file. * @param output_file Output file for this pass. * @param records_per_block Number of records per sort block. * @param num_blocks Number of sort blocks. * @param ctx Reference to context structure. */ void sort_pass( BinaryFile& input_file, BinaryFile& output_file, uint64_t records_per_block, uint64_t num_blocks, const SortContext& ctx) { unsigned int record_size = ctx.record_size; uint64_t file_size = input_file.size(); uint64_t num_records = file_size / record_size; log(ctx, "running initial sort pass\n"); Timer timer; timer.start(); // TODO : double-buffer with I/O in separate thread // Allocate sort buffer. assert(records_per_block < SIZE_MAX / record_size); std::vector buffer(records_per_block * record_size); // TODO : multi-threaded sorting with thread pool // Loop over blocks to be sorted. for (uint64_t block_index = 0; block_index < num_blocks; block_index++) { uint64_t first_record_idx = block_index * records_per_block; size_t block_num_records = std::min(records_per_block, num_records - first_record_idx); log(ctx, "sorting block %" PRIu64 " / %" PRIu64 ": %" PRIu64 " records\n", block_index, num_blocks, block_num_records); // Read block. input_file.read( buffer.data(), first_record_idx * record_size, block_num_records * record_size); // Sort records in this block. sort_records( buffer.data(), record_size, block_num_records); // Write block. output_file.write( buffer.data(), first_record_idx * record_size, block_num_records * record_size); } timer.stop(); log(ctx, "initial sort pass finished\n"); log(ctx, " t = %.3f seconds\n", timer.value()); } /** * Merge 2 sorted blocks of records into a single sorted block. * * @param instream1 Input stream containing block 1. * @param instream2 Input stream containing block 2. * @param output_stream Output stream for the merged block. * @param record_size Record size in bytes. */ void merge_2_blocks( RecordInputStream& instream1, RecordInputStream& instream2, RecordOutputStream& output_stream, size_t record_size) { // Input blocks should not be empty. assert(!instream1.empty()); assert(!instream2.empty()); const unsigned char * rec1 = instream1.record(); const unsigned char * rec2 = instream2.record(); // Merge until one stream runs empty. while (true) { // Choose which record should go first. if (record_compare(rec1, rec2, record_size) < 0) { // Push record from stream 1 and load next record. output_stream.put(rec1); instream1.next_record(); if (instream1.empty()) { rec1 = NULL; break; } rec1 = instream1.record(); } else { // Push record from stream 2 and load next record. output_stream.put(rec2); instream2.next_record(); if (instream2.empty()) { rec2 = NULL; break; } rec2 = instream2.record(); } } // At most one of the streams still has records left. // Copy those records to the output. while (!instream1.empty()) { output_stream.put(instream1.record()); instream1.next_record(); } while (!instream2.empty()) { output_stream.put(instream2.record()); instream2.next_record(); } } /** * Merge sorted blocks of records into a single sorted block. * * @param input_streams One input stream for each input blocks. * @param output_stream Output stream for the merged block. * @param record_size Record size in bytes. * @param branch_factor Number of input blocks. * May be less than the length of input_streams. * @param filter_dupl True to eliminate duplicate records. */ void merge_n_blocks( std::vector>& input_streams, RecordOutputStream& output_stream, size_t record_size, unsigned int branch_factor, bool filter_dupl) { assert(branch_factor > 1); assert(branch_factor <= input_streams.size()); // Put the head element of each block into a heap. // The heap will determine which block contains the element that // should go first in the merged block. typedef std::tuple HeapElement; // Function which compares records and returns true if // record A comes after record B in sort order. // If this function is used as the compare operator of a max-heap, // the record that comes first in sort order will be at the top // of the heap. auto cmp_heap_elem = [record_size](const HeapElement& a, const HeapElement& b) { const unsigned char *reca = std::get<0>(a); const unsigned char *recb = std::get<0>(b); return record_compare(reca, recb, record_size) > 0; }; // Initialize empty heap. std::vector heap; // Get the first element of each block. for (unsigned int i = 0; i < branch_factor; i++) { // Input blocks should not be empty. assert(!input_streams[i]->empty()); heap.emplace_back(input_streams[i]->record(), input_streams[i].get()); } // Make a heap of the first blocks. std::make_heap(heap.begin(), heap.end(), cmp_heap_elem); // Allocate a temporary record for duplicate filtering. std::vector temp_record(record_size); // The very first record can not be filtered out. bool filter_first_pass = true; // Keep merging until the heap runs empty. while (!heap.empty()) { // Extract the first element from the heap. const unsigned char * rec; RecordInputStream * instream; std::tie(rec, instream) = heap[0]; std::pop_heap(heap.begin(), heap.end()); if (filter_dupl) { // Compare against previous record, only output if different. if (filter_first_pass || record_compare(temp_record.data(), rec, record_size) != 0) { output_stream.put(rec); record_copy(temp_record.data(), rec, record_size); } filter_first_pass = false; } else { // No filtering, just push record to the output block. output_stream.put(rec); } // Try to pull the next record from this input stream. instream->next_record(); if (instream->empty()) { // Stream is empty. This stream is now out of the game. // The heap shrinks by 1 element. heap.pop_back(); } else { // Push next record from the stream into the heap. heap.back() = std::make_tuple(instream->record(), instream); std::push_heap(heap.begin(), heap.end()); } } } /** * Perform a merge pass of multi-pass external sorting. * * All input blocks will have the specified number of records, except for * the last block in the file which may be smaller. * * All output blocks will have (branch_factor * records_per_block) records, * except for the last output block in the file which may be smaller. * * If "filter_duplicates" is specified, the number of output blocks MUST be 1. * In this case duplicate elements will be removed from the output. * * @param input_file Input file for this pass. * @param output_file Output file for this pass. * @param records_per_block Number of records per input block. * @param num_blocks Number of input blocks. * @param branch_factor Number of blocks to merge per output block. * @param filter_dupl True to eliminate duplicate records. * @param ctx Reference to context structure. */ void merge_pass( BinaryFile& input_file, BinaryFile& output_file, uint64_t records_per_block, uint64_t num_blocks, unsigned int branch_factor, bool filter_dupl, const SortContext& ctx) { assert(branch_factor > 1); assert(branch_factor <= num_blocks); // Only filter duplicates when the output is a single block. assert((!filter_dupl) || (branch_factor == num_blocks)); Timer timer; timer.start(); // Calculate number of buffers: // 2 buffers per input stream + buffers for output 1 stream. size_t num_output_buffers = 2 + (branch_factor - 1) / 2; size_t num_buffers = 2 * branch_factor + num_output_buffers; // Calculate buffer size. // Must be a multiple of the record size and the transfer alignment. size_t buffer_size = ctx.memory_size / num_buffers; buffer_size -= buffer_size % (TRANSFER_ALIGNMENT * ctx.record_size); // TODO : double-buffering with I/O in separate thread // Initialize input streams. std::vector> input_streams; for (unsigned int i = 0; i < branch_factor; i++) { uint64_t block_size = records_per_block * ctx.record_size; uint64_t start_offset = i * block_size; uint64_t block_stride = branch_factor * block_size; if (start_offset >= input_file.size()) { break; } input_streams.emplace_back(new RecordInputStream( input_file, ctx.record_size, start_offset, block_size, block_stride, buffer_size)); } // Initialize output stream. RecordOutputStream output_stream( output_file, ctx.record_size, 0, buffer_size); // Loop over groups of blocks to be sorted. // Every group consists of "branch_factor" blocks, except the last // group which may contain fewer blocks. // Each group produces one output block. uint64_t block_index = 0; while (block_index < num_blocks) { // Determine how many blocks will be merged in this group. unsigned int this_branch_factor = branch_factor; if (branch_factor > num_blocks - block_index) { branch_factor = num_blocks - block_index; } // Skip to the next section of each active input stream. for (unsigned int i = 0; i < this_branch_factor; i++) { input_streams[i]->next_block(); } if (this_branch_factor == 1) { // Last group contains just 1 block. // Copy it to the output. assert(!filter_dupl); RecordInputStream * instream = input_streams[0].get(); while (!instream->empty()) { output_stream.put(instream->record()); instream->next_record(); } } else if (this_branch_factor == 2 && !filter_dupl) { // Special case for merging 2 blocks. merge_2_blocks( *input_streams[0], *input_streams[1], output_stream, ctx.record_size); } else { // Merge more than 2 blocks or filter duplicates. merge_n_blocks( input_streams, output_stream, ctx.record_size, this_branch_factor, filter_dupl); } // Skip to the start of the next block group. block_index += this_branch_factor; } // Flush output stream buffers. output_stream.flush(); timer.stop(); log(ctx, " t = %.3f seconds\n", timer.value()); } /** * Prepare a strategy for multi-pass external sorting. */ SortStrategy plan_multi_pass_strategy( uint64_t file_size, const SortContext& ctx) { // Plan the initial sort pass. // Use blocks that are at most half of available memory, // so we can use two buffers to overlap I/O and sorting. uint64_t max_sort_block_size = ctx.memory_size / 2; // Calculate number of records per block. // Make sure this is a multiple of the transfer alignment size. uint64_t records_per_sort_block = max_sort_block_size / ctx.record_size; records_per_sort_block -= records_per_sort_block % TRANSFER_ALIGNMENT; // Calculate number of blocks during the initial sort pass. uint64_t num_records = file_size / ctx.record_size; uint64_t num_sort_blocks = 1 + (num_records - 1) / records_per_sort_block; SortStrategy strategy; strategy.records_per_sort_block = records_per_sort_block; strategy.num_sort_blocks = num_sort_blocks; // Plan the merge passes. // Start with the result of the initial sort pass. uint64_t records_per_block = records_per_sort_block; uint64_t num_blocks = num_sort_blocks; // Keep merging until there is only one block left. while (num_blocks > 1) { // Calculate the number of blocks out of this merge pass. uint64_t num_merged_blocks = 1 + (num_blocks - 1) / ctx.branch_factor; // Choose the smallest branch factor that produces this nr of blocks. unsigned int branch_factor = 1 + (num_blocks - 1) / num_merged_blocks; SortStrategy::MergePass merge_pass; merge_pass.records_per_input_block = records_per_block; merge_pass.num_input_blocks = num_blocks; merge_pass.branch_factor = branch_factor; strategy.merge_pass.push_back(merge_pass); // Result of this merge pass will go into the next pass. records_per_block *= branch_factor; num_blocks = num_merged_blocks; } return strategy; } /** * Sort a binary data file. * * @param input_file Path name of binary input file. * @param output_file Path name of binary output file. * @param ctx Reference to context structure. */ void sortbin( const std::string& input_name, const std::string& output_name, const SortContext& ctx) { // We want file I/O to occur on 4096-byte boundaries. // To ensure this, we want to do I/O on multiples of 4096 records. // To ensure this is possible, we need room for ~ 32k records per branch. if (ctx.memory_size / ctx.record_size / ctx.branch_factor < 8 * TRANSFER_ALIGNMENT) { throw std::logic_error( "Not enough memory for this combination of record size" " and branch factor"); } // Open input file. log(ctx, "opening input file\n"); BinaryInputFile input_file(input_name); uint64_t file_size = input_file.size(); // Check that input file contains an integer number of records. if (file_size % ctx.record_size != 0) { throw std::logic_error( "Input file does not contain an integer number of records"); } // Create output file. log(ctx, "creating output file\n"); BinaryOutputFile output_file(output_name, file_size); if (file_size <= ctx.memory_size) { // Data fits in memory. // Sort in a single pass. single_pass(input_file, output_file, ctx); } else { // Data does not fit in memory. // Plan a multi-pass strategy. SortStrategy strategy = plan_multi_pass_strategy(file_size, ctx); unsigned int num_merge_pass = strategy.merge_pass.size(); log(ctx, "sorting %" PRIu64 " records in %" PRIu64 " blocks" " followed by %u merge passes\n", file_size / ctx.record_size, strategy.num_sort_blocks, num_merge_pass); // Create a temporary file. log(ctx, "creating temporary file\n"); BinaryTempFile temp_file(ctx.temporary_directory, file_size); // The merge passes alternate between tempfile-to-outputfile and // outputfile-to-tempfile. // The final merge pass will be tempfile-to-outputfile. // Depending on the number of merge passes, the initial sort pass // will either be inputfile-to-tempfile or inputfile-to-outputfile. BinaryFile * output_or_temp_file[2] = { &output_file, &temp_file }; BinaryFile * sort_output_file = output_or_temp_file[num_merge_pass % 2]; // Execute the initial sort pass. sort_pass( input_file, *sort_output_file, strategy.records_per_sort_block, strategy.num_sort_blocks, ctx); // Execute the merge passes. for (unsigned int mp = 0; mp < num_merge_pass; mp++) { log(ctx, "running merge pass %u / %u: " "%" PRIu64 " blocks, branch factor %u\n", mp, num_merge_pass, strategy.merge_pass[mp].num_input_blocks, strategy.merge_pass[mp].branch_factor); // Filter duplicate records only on the last pass. bool filter_dupl = ctx.flag_unique && (mp + 1 == num_merge_pass); // Alternate between temp_file and output_file. BinaryFile * pass_input_file = output_or_temp_file[(num_merge_pass - mp) % 2]; BinaryFile * pass_output_file = output_or_temp_file[(num_merge_pass - mp - 1) % 2]; // Execute the merge pass. merge_pass( *pass_input_file, *pass_output_file, strategy.merge_pass[mp].records_per_input_block, strategy.merge_pass[mp].num_input_blocks, strategy.merge_pass[mp].branch_factor, filter_dupl, ctx); } } log(ctx, "finished\n"); } std::string get_default_tmpdir(void) { const char *tmpdir = getenv("TMPDIR"); if (tmpdir == NULL) { tmpdir = "/tmp"; } return std::string(tmpdir); } void usage() { fprintf(stderr, "\n" "Usage: sortbin [options] inputfile outputfile\n" "\n" "Options:\n" "\n" " -s, --size=N specify record size of N bytes (required)\n" " -u, --unique eleminate duplicates after sorting\n" " --memory=M use at most M MByte RAM (default: %d)\n" " --branch=B merge at most B arrays in one step (default: %d)\n" " --temporary-directory=DIR write temporary file to the specified\n" " directory (default: $TMPDIR)\n" "\n" "The output file must not yet exist.\n" "If the data does not fit in memory, a temporary file will be\n" "created with the same size as the input/output files.\n" "\n", DEFAULT_MEMORY_SIZE_MBYTE, DEFAULT_BRANCH_FACTOR); } } // anonymous namespace int main(int argc, char **argv) { const struct option longopts[] = { { "size", 1, NULL, 's' }, { "unique", 0, NULL, 'u' }, { "memory", 1, NULL, 'M' }, { "branch", 1, NULL, 'B' }, { "temporary-directory", 1, NULL, 'T' }, { "verbose", 0, NULL, 'v' }, { "help", 0, NULL, 'h' }, { NULL, 0, NULL, 0 } }; bool flag_unique = false; bool flag_verbose = false; int record_size = 0; int memory_size = DEFAULT_MEMORY_SIZE_MBYTE; int branch_factor = DEFAULT_BRANCH_FACTOR; std::string tempdir = get_default_tmpdir(); int opt; while ((opt = getopt_long(argc, argv, "uv", longopts, NULL)) != -1) { switch (opt) { case 's': record_size = atoi(optarg); if (record_size < 1) { fprintf(stderr, "ERROR: Invalid record size (must be at least 1)\n"); return EXIT_FAILURE; } break; case 'u': flag_unique = true; break; case 'M': memory_size = atoi(optarg); if (memory_size <= 0) { fprintf(stderr, "ERROR: Invalid memory size\n"); return EXIT_FAILURE; } break; case 'B': branch_factor = atoi(optarg); if (branch_factor < 2) { fprintf(stderr, "ERROR: Invalid radix value, must be at least 2\n"); return EXIT_FAILURE; } break; case 'T': tempdir = optarg; break; case 'v': flag_verbose = true; break; case 'h': usage(); return EXIT_SUCCESS; default: usage(); return EXIT_FAILURE; } } if (record_size < 1) { fprintf(stderr, "ERROR: Missing required parameter --size\n"); usage(); return EXIT_FAILURE; } if (argc < optind + 2) { fprintf(stderr, "ERROR: Input and output file names must be specified\n"); usage(); return EXIT_FAILURE; } if (argc > optind + 2) { fprintf(stderr, "ERROR: Unexpected command-line parameters\n"); usage(); return EXIT_FAILURE; } if ((unsigned int)memory_size >= SIZE_MAX / 1024 / 1024) { fprintf( stderr, "ERROR: This system can allocate at most %zu MB memory\n", SIZE_MAX / 1024 / 1024 - 1); return EXIT_FAILURE; } std::string input_name(argv[optind]); std::string output_name(argv[optind+1]); SortContext ctx; ctx.record_size = record_size; ctx.memory_size = size_t(memory_size) * 1024 * 1024; ctx.branch_factor = branch_factor; ctx.flag_unique = flag_unique; ctx.flag_verbose = flag_verbose; ctx.temporary_directory = tempdir; try { sortbin(input_name, output_name, ctx); } catch (const std::exception& ex) { fprintf(stderr, "ERROR: %s\n", ex.what()); return EXIT_FAILURE; } return EXIT_SUCCESS; }