/* * Sort arrays of binary data records. * * Input and output files contain plain, raw arrays of fixed-length * binary data records. * * Records are interpreted as fixed-length strings of 8-bit unsigned integers. * The program sorts these records in lexicographic order. * * This program is optimized for short records, e.g. up to 20 bytes. */ /* * TESTING in-memory sort: * * Input: 10**8 records of 10 bytes * Storage: SSD * * GNU sort: 101.7 seconds * GNU sort -S 2G: 110.2 seconds * sortbin, qsort_r(): 31.1, 31.2, 31.1 seconds (correct output) * sortbin, heapsort: 57.4, 57.1, 58.5 seconds (correct output) * sortbin, quicksort: 24.5, 24.4, 24.4 seconds (correct output) * sortbin, quicksort, depth_limit=8: 31.6 seconds (correct output) * * * Input: 10**8 records of 10 bytes, 70502908 unique records * * GNU sort -u: 120.2 seconds * sortbin -u: 26.2 seconds (correct output) * */ // (already defined by g++) #define _GNU_SOURCE #define _FILE_OFFSET_BITS 64 #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include // TODO : use a background thread for file I/O /* Maximum amount of RAM to use (in MBytes). */ #define DEFAULT_MEMORY_SIZE_MBYTE 1024 /* Default branch factor while merging. */ #define DEFAULT_BRANCH_FACTOR 16 /* Default number of sorting threads. */ #define DEFAULT_THREADS 1 namespace { // anonymous namespace /** Information about the sort job. */ struct SortContext { /** Record length in bytes. */ unsigned int record_size; /** Maximum memory to use (bytes). */ uint64_t memory_size; /** Maximum number of arrays to merge in one step. */ unsigned int branch_factor; /** True to eliminate duplicate records. */ bool flag_unique; /** True to write progress messages to stderr. */ bool flag_verbose; /** Directory where temporary files are created. */ std::string temporary_directory; }; /** Strategy for multi-pass external sorting. */ struct SortStrategy { /** Strategy for a single merge pass. */ struct MergePass { /** Number of records per input block into this merge pass. */ uint64_t records_per_input_block; /** Total number of input blocks into this merge pass. */ uint64_t num_input_blocks; /** Number of input blocks to merge into each output block. */ unsigned int branch_factor; }; /** Number of records per block during the initial sort pass. */ uint64_t records_per_sort_block; /** Number of blocks for the initial sort pass. */ uint64_t num_sort_blocks; /** List of merge passes. */ std::vector merge_pass; }; /** Show a progress message. */ void log(const SortContext& ctx, const char *format, ...) __attribute__ ((format (printf, 2, 3))) ; void log(const SortContext& ctx, const char *format, ...) { va_list ap; va_start(ap, format); if (ctx.flag_verbose) { vfprintf(stderr, format, ap); } va_end(ap); } /** Time measurements. */ class Timer { public: Timer() : m_value(0.0), m_running(false) { } double value() const { double v = m_value; if (m_running) { struct timespec tnow; clock_gettime(CLOCK_REALTIME, &tnow); v += tnow.tv_sec - m_tstart.tv_sec; v += 1.0e-9 * (tnow.tv_nsec - m_tstart.tv_nsec); } return m_value; } void start() { m_value = 0.0; m_running = true; clock_gettime(CLOCK_REALTIME, &m_tstart); } void resume() { if (!m_running) { m_running = true; clock_gettime(CLOCK_REALTIME, &m_tstart); } } void stop() { if (m_running) { struct timespec tnow; clock_gettime(CLOCK_REALTIME, &tnow); m_value += tnow.tv_sec - m_tstart.tv_sec; m_value += 1.0e-9 * (tnow.tv_nsec - m_tstart.tv_nsec); m_running = false; } } private: double m_value; bool m_running; struct timespec m_tstart; }; /** * Binary file access. * * This is a base class which can not be directly constructed. * Subclasses implement constructors that open files in various ways. */ class BinaryFile { public: // Prevent copying and assignment. BinaryFile(const BinaryFile&) = delete; BinaryFile& operator=(const BinaryFile&) = delete; /** Return file size in bytes. */ uint64_t size() const { return m_file_size; } /** Truncate file to reduce its size. */ void truncate_file(uint64_t new_size); /** Read raw bytes from file. */ void read( unsigned char * buf, uint64_t file_offset, size_t length); /** Write raw bytes to file. */ void write( const unsigned char * buf, uint64_t file_offset, size_t length); protected: // Prevent constructing and destructing via this base class. BinaryFile() : m_fd(-1), m_file_size(0) { } /** Destructor: close file. */ ~BinaryFile() { if (m_fd >= 0) { close(m_fd); } } /** File descriptor. */ int m_fd; /** File size (bytes). */ uint64_t m_file_size; }; // Truncate file to reduce its size. void BinaryFile::truncate_file(uint64_t new_size) { int ret = ftruncate(m_fd, new_size); if (ret != 0) { throw std::system_error( errno, std::system_category(), "Can not truncate file"); } m_file_size = new_size; } // Read raw bytes from file. void BinaryFile::read( unsigned char * buf, uint64_t file_offset, size_t length) { // Note that pread() may read fewer bytes than requested. while (length > 0) { ssize_t ret = pread(m_fd, buf, length, file_offset); if (ret <= 0) { throw std::system_error( errno, std::system_category(), "Can not read from file"); } buf += ret; file_offset += ret; length -= ret; } } // Write raw bytes to file. void BinaryFile::write( const unsigned char * buf, uint64_t file_offset, size_t length) { // Note that pwrite() may write fewer bytes than requested. while (length > 0) { ssize_t ret = pwrite(m_fd, buf, length, file_offset); if (ret <= 0) { throw std::system_error( errno, std::system_category(), "Can not write to file"); } buf += ret; file_offset += ret; length -= ret; } } /** Binary input file. */ class BinaryInputFile : public BinaryFile { public: /** Open existing file for read-only access. */ explicit BinaryInputFile(const std::string& filename) { m_fd = open(filename.c_str(), O_RDONLY); if (m_fd < 0) { throw std::system_error( errno, std::system_category(), "Can not open input file"); } off_t fsz = lseek(m_fd, 0, SEEK_END); if (fsz == (off_t)(-1)) { throw std::system_error( errno, std::system_category(), "Can not seek input file"); } m_file_size = fsz; } }; /** Binary output file. */ class BinaryOutputFile : public BinaryFile { public: /** Create output file and pre-allocate space. */ BinaryOutputFile(const std::string& filename, uint64_t new_size) { m_fd = open(filename.c_str(), O_RDWR | O_CREAT | O_EXCL, 0666); if (m_fd < 0) { throw std::system_error( errno, std::system_category(), "Can not create output file"); } int ret = posix_fallocate(m_fd, 0, new_size); if (ret != 0) { int errnum = errno; // Delete empty output file. unlink(filename.c_str()); throw std::system_error( errnum, std::system_category(), "Can not allocate space in output file"); } m_file_size = new_size; } }; /** Temporary binary file. */ class BinaryTempFile : public BinaryFile { public: /** Create temporary file and pre-allocate space. */ BinaryTempFile(const std::string& tempdir, uint64_t new_size) { // O_TMPFILE creates a new file without a name. // Effectively the file is created and immediately unlinked. // As a result, the file will be automatically deleted when closed. m_fd = open(tempdir.c_str(), O_RDWR | O_TMPFILE, 0600); if (m_fd < 0) { throw std::system_error( errno, std::system_category(), "Can not create temporary file"); } int ret = posix_fallocate(m_fd, 0, new_size); if (ret != 0) { throw std::system_error( errno, std::system_category(), "Can not allocate space in temporary file"); } m_file_size = new_size; } }; /** * Read binary records from an input file with buffering. * * The input stream reads from a sequence of discontinuous, equally spaced * sections in the input file. All sections have the same size, except for * the last section which may be shorter if it runs to the end of the file. * Each section contains a flat array of binary records. * * The input stream starts in the "end_of_stream" state. * The first call to "next_section()" enables reading records from the * first input section. When the input stream reaches the end of the * current section, it again enters the "end_of_stream" state until * the next call to "next_section()". */ class RecordInputStream { public: /** * Construct a record input stream. * * The stream will initially contain no input sections. * * @param input_file Input file where records read from. * @param record_size Record size in bytes. * @param section_offset Offset in input file of first input section. * @param section_size Size of each input section in bytes. * Must be a multiple of "record_size". * The last input section may be shorter if it * runs to the end of the file. * @param section_stride Distance between start of sections in bytes. * @param buffer_size Buffer size in bytes. * Must be a multiple of "record_size". * Note: Each RecordInputStream creates two buffers * of the specified size. */ RecordInputStream( BinaryFile& input_file, unsigned int record_size, uint64_t section_offset, uint64_t section_size, uint64_t section_stride, size_t buffer_size) : m_input_file(input_file), m_record_size(record_size), m_section_offset(section_offset), m_section_size(section_size), m_section_stride(section_stride), m_section_remaining(0), m_file_offset(0), m_bufpos(NULL), m_bufend(NULL), m_buffer(buffer_size) { assert(section_size % record_size == 0); assert(section_size > record_size); assert(section_size <= section_stride); assert(buffer_size % record_size == 0); assert(buffer_size > record_size); } // Prevent copying and assignment. RecordInputStream(const RecordInputStream&) = delete; RecordInputStream& operator=(const RecordInputStream&) = delete; /** Return true if the end of the current input section is reached. */ inline bool end_of_stream() const { return (m_bufpos == m_bufend); } /** * Return a pointer to the current binary record. * * This function must only be used if "end_of_stream()" returns false. * The returned pointer becomes invalid after a call to "next_record()". */ inline const unsigned char * current_record() const { return m_bufpos; } /** * Move to the next record of the current section. * * This function must only be used if "end_of_stream()" returns false. * Calling this function invalidates all pointers previously returned * by "current_record()". */ inline void next_record() { assert(m_bufpos != m_bufend); m_bufpos += m_record_size; if (m_bufpos == m_bufend) { m_bufpos = m_buffer.data(); m_bufend = m_buffer.data(); refill_buffer(); } } /** * Start reading from the next input section. * * This function may only be called if "end_of_stream()" returns true. */ void next_section() { assert(m_bufpos == m_bufend); uint64_t file_size = m_input_file.size(); assert(m_section_stride < file_size - m_section_offset); m_section_offset += m_section_stride; m_file_offset = m_section_offset; m_section_remaining = std::min(m_section_size, file_size - m_section_offset); refill_buffer(); } private: /** Refill the buffer from the current input section. */ void refill_buffer() { if (m_section_remaining > 0) { size_t block_size = (m_buffer.size() < m_section_remaining) ? m_buffer.size() : m_section_remaining; m_input_file.read(m_buffer.data(), m_file_offset, block_size); m_file_offset += block_size; m_section_remaining -= block_size; m_bufend = m_buffer.data() + block_size; } } BinaryFile& m_input_file; const unsigned int m_record_size; uint64_t m_section_offset; uint64_t m_section_size; uint64_t m_section_stride; uint64_t m_section_remaining; uint64_t m_file_offset; unsigned char * m_bufpos; unsigned char * m_bufend; std::vector m_buffer; // TODO : double-buffering with delayed I/O via background thread }; /** * Write binary records to an output file with buffering. */ class RecordOutputStream { public: /** * Construct a record output stream. * * @param output_file Output file where records are written. * @param record_size Record size in bytes. * @param file_offset Start offset in the output file. * @param buffer_size Buffer size in bytes. * Note: Each RecordOutputStream creates two buffers * of the specified size. */ RecordOutputStream( BinaryFile& output_file, unsigned int record_size, uint64_t file_offset, size_t buffer_size) : m_output_file(output_file), m_record_size(record_size), m_file_offset(file_offset), m_bufpos(0), m_buffer(buffer_size) { } // Prevent copying and assignment. RecordOutputStream(const RecordOutputStream&) = delete; RecordOutputStream& operator=(const RecordOutputStream&) = delete; /** Append a record to the output stream. */ inline void put(const unsigned char *record) { if (m_record_size > m_buffer.size() - m_bufpos) { flush(); } memcpy(m_buffer.data() + m_bufpos, record, m_record_size); } /** Flush buffered records to the output file. */ void flush() { if (m_bufpos > 0) { m_output_file.write(m_buffer.data(), m_file_offset, m_bufpos); m_file_offset += m_bufpos; m_bufpos = 0; } } private: BinaryFile& m_output_file; const unsigned int m_record_size; uint64_t m_file_offset; size_t m_bufpos; std::vector m_buffer; // TODO : double-buffering with delayed I/O via background thread }; /** Compare two records. */ #define record_compare(_a, _b, _n) (memcmp(_a, _b, _n)) /** Copy a record. */ #define record_copy(_dst, _src, _n) (memcpy(_dst, _src, _n)) /** * Swap two records. */ inline void record_swap( unsigned char *a, unsigned char *b, size_t record_size) { while (record_size > 0) { unsigned char aa = *a; unsigned char bb = *b; *b = aa; *a = bb; a++; b++; record_size--; } } /** Return the index of the parent of the specified heap node. */ inline size_t heap_parent_index(size_t node_index) { return (node_index - 1) / 2; } /** Return the index of the left child of the specified heap node. */ inline size_t heap_left_child_index(size_t node_index) { return node_index * 2 + 1; } /** * Insert a node into an empty spot in the heap, then repair the sub-heap * rooted at that position. */ void heap_sift_down_records( unsigned char * buffer, size_t record_size, size_t num_records, size_t insert_index, const unsigned char * insert_value) { // Find the first node index which does not have two child nodes. size_t parent_end = heap_parent_index(num_records); // Move the empty spot all the way down through the sub-heap. size_t cur_idx = insert_index; while (cur_idx < parent_end) { // Find the left child node of the current node. size_t child_idx = heap_left_child_index(cur_idx); unsigned char * child_ptr = buffer + child_idx * record_size; // Compare the two child nodes. if (record_compare(child_ptr, child_ptr + record_size, record_size) < 0) { // Right child is greater, choose that one. child_idx += 1; child_ptr += record_size; } // Move the chosen child to the empty spot. unsigned char * cur_ptr = buffer + cur_idx * record_size; record_copy(cur_ptr, child_ptr, record_size); // Continue the scan from the (now empty) child spot. cur_idx = child_idx; } // If the empty spot has a left child, swap it with the empty spot. if (num_records > 1 && cur_idx <= heap_parent_index(num_records - 1)) { size_t child_idx = heap_left_child_index(cur_idx); unsigned char * cur_ptr = buffer + cur_idx * record_size; unsigned char * child_ptr = buffer + child_idx * record_size; record_copy(cur_ptr, child_ptr, record_size); cur_idx = child_idx; } // The empty spot is now in a leaf node of the heap. // Scan back up to find the right place to insert the new node. // // Going all the way down and then back up may seem wasteful, // but it is faster in practice because the correct insertion spot // is likely to be in the bottom of the heap. while (cur_idx > insert_index) { // Find parent of the empty spot. size_t parent_idx = heap_parent_index(cur_idx); unsigned char * parent_ptr = buffer + parent_idx * record_size; // Compare the new value to the parent value. if (record_compare(parent_ptr, insert_value, record_size) >= 0) { // We found the right spot to insert the new value. break; } // Move the parent back to the empty spot. unsigned char * cur_ptr = buffer + cur_idx * record_size; record_copy(cur_ptr, parent_ptr, record_size); // Move to te parent node. cur_idx = parent_idx; } // Insert the new node at the empty position. unsigned char * cur_ptr = buffer + cur_idx * record_size; record_copy(cur_ptr, insert_value, record_size); } /** * Sort an array of records using in-place heap sort. * * Run time: O(N * log(N)) * Extra space: O(1) */ void heap_sort_records( unsigned char * buffer, size_t record_size, size_t num_records) { // Skip trivial cases. if (num_records < 2) { return; } // Allocate temporary space for one record. std::vector temp(record_size); // // Phase 1: Transform the unordered array into a max-heap. // // Bottom-up loop over all non-trivial sub-heaps. // Start with the parent of the last node. size_t cur_idx = heap_parent_index(num_records - 1); while (true) { // Remove the current node from the heap. unsigned char * cur_ptr = buffer + cur_idx * record_size; record_copy(temp.data(), cur_ptr, record_size); // Re-insert the node and repair the sub-heap rooted at this node. heap_sift_down_records( buffer, record_size, num_records, cur_idx, temp.data()); // Stop after processing the root node. if (cur_idx == 0) { break; } // Go do the next sub-heap. cur_idx--; } // // Phase 2: Transform the max-heap into a sorted array. // // Loop over the record array from back to front. cur_idx = num_records - 1; while (cur_idx > 0) { // Remove the root node from the heap. // This is the largest remaining element, which belongs at index CUR // in the sorted array. record_copy(temp.data(), buffer, record_size); // Remove the node at index CUR from the heap. // This reduces the size of the heap by 1. // Re-insert the removed node as the root node and repair the heap. unsigned char * cur_ptr = buffer + cur_idx * record_size; heap_sift_down_records(buffer, record_size, cur_idx, 0, cur_ptr); // Copy the former root node to its position in the sorted array. record_copy(cur_ptr, temp.data(), record_size); // Go to next element. cur_idx--; } } /** * Sort an array of records using in-place insertion sort. * * This is a helper function for quicksort_records(). */ void insertion_sort_records( unsigned char * buffer, size_t record_size, size_t num_records) { // Allocate temporary space for one record. std::vector temp(record_size); for (size_t cur_idx = 1; cur_idx < num_records; cur_idx++) { // The partial array 0 .. (cur_idx - 1) is already sorted. // We will now insert cur_idx into the sorted array. // Quick check whether the new record is already in the right place. unsigned char * cur_ptr = buffer + cur_idx * record_size; unsigned char * prev_ptr = cur_ptr - record_size; if (record_compare(cur_ptr, prev_ptr, record_size) >= 0) { continue; } // Scan backwards through the sorted array to find the right place // to insert the new record. unsigned char * insert_ptr = prev_ptr; while (insert_ptr > buffer) { prev_ptr = insert_ptr - record_size; if (record_compare(cur_ptr, prev_ptr, record_size) >= 0) { // Found the right place to insert the new record. break; } // Continue backwards scan. insert_ptr = prev_ptr; } // Copy the new record to temporary storage. record_copy(temp.data(), cur_ptr, record_size); // Move sorted records to make space for the new record. memmove(insert_ptr + record_size, insert_ptr, cur_ptr - insert_ptr); // Copy the new record to its place. record_copy(insert_ptr, temp.data(), record_size); } } /** * Sort an array of records using in-place quicksort. * * Run time: O(N * log(N)) * Extra space: O(log(N)) * * Plain quicksort is known to have quadratic worst-case behaviour. * This implementation uses median-of-three partitioning to reduce * the probability of worst-case performance. If bad performance does * occur, this implementation detects it and switches to heap sort. */ void quicksort_records( unsigned char * buffer, size_t record_size, size_t num_records) { // Recursive partitioning is only applied to fragments larger than // this threshold. The rest of the work will be done by insertion sort. const size_t insertion_sort_threshold = 12; // Skip trivial cases. if (num_records < 2) { return; } // Determine maximum acceptable recursion depth. unsigned int depth_limit = 1; for (size_t nremain = num_records; nremain > 1; nremain >>= 1) { depth_limit += 2; } // Allocate stack. std::vector> stack; stack.reserve(depth_limit); // Prepare recursive partitioning of the entire array. if (num_records > insertion_sort_threshold) { stack.emplace_back(buffer, num_records, depth_limit); } // Execute recursive partitioning. while (!stack.empty()) { // Pop a range from the stack. unsigned char * range_begin; size_t range_num_records; std::tie(range_begin, range_num_records, depth_limit) = stack.back(); stack.pop_back(); // Check recursion depth. Switch to heap sort if we get to deep. if (depth_limit == 0) { heap_sort_records(range_begin, record_size, range_num_records); continue; } // Initialize pointers to start, end and middle of range. unsigned char * left_ptr = range_begin; unsigned char * right_ptr = range_begin + (range_num_records - 1) * record_size; unsigned char * pivot_ptr = range_begin + (range_num_records / 2) * record_size; // Sort the first, middle and last records such that they are // in proper order with respect to each other. if (record_compare(pivot_ptr, left_ptr, record_size) < 0) { record_swap(left_ptr, pivot_ptr, record_size); } if (record_compare(right_ptr, pivot_ptr, record_size) < 0) { record_swap(pivot_ptr, right_ptr, record_size); if (record_compare(pivot_ptr, left_ptr, record_size) < 0) { record_swap(left_ptr, pivot_ptr, record_size); } } // The median of the three records we examined is now in the // middle of the range, pointed to by pivot_ptr. // This is not necessarily the final location of that element. // The first and last record of the range are now on the proper // side of the partition. No need to examine them again. left_ptr += record_size; right_ptr -= record_size; // Partition the rest of the array based on comparing to the pivot. while (true) { // Skip left-side records that are less than the pivot. while (record_compare(left_ptr, pivot_ptr, record_size) < 0) { left_ptr += record_size; } // Skip right-side records that are greater than the pivot. while (record_compare(pivot_ptr, right_ptr, record_size) < 0) { right_ptr -= record_size; } // Stop when the pointers meet. if (left_ptr >= right_ptr) { break; } // Swap the records that are on the wrong sides. record_swap(left_ptr, right_ptr, record_size); // If we moved the pivot, update its pointer so it keeps // pointing to the pivot value. if (pivot_ptr == left_ptr) { pivot_ptr = right_ptr; } else if (pivot_ptr == right_ptr) { pivot_ptr = left_ptr; } // Do not compare the swapped elements again. left_ptr += record_size; right_ptr -= record_size; // Stop when pointers cross. // (Pointers equal is not good enough at this point, because // we won't know on which side the pointed record belongs.) if (left_ptr > right_ptr) { break; } } // If pointers are equal, they must both be pointing to a pivot. // Bump both pointers so they correctly delineate the new // subranges. The record where the pointers meet is already in // its final position. if (left_ptr == right_ptr) { left_ptr += record_size; right_ptr -= record_size; } // Push left subrange on the stack, if it meets the size threshold. size_t num_left = (right_ptr + record_size - range_begin) / record_size; if (num_left > insertion_sort_threshold) { stack.emplace_back(range_begin, num_left, depth_limit - 1); } // Push right subrange on the stack, if it meets the size threshold. size_t num_right = range_num_records - (left_ptr - range_begin) / record_size; if (num_right > insertion_sort_threshold) { stack.emplace_back(left_ptr, num_right, depth_limit - 1); } } // Recursive partitining finished. // The array is now roughly sorted, except for subranges that were // skipped because they are within the threshold. // Finish with insertion sort to get a fully sorted array. insertion_sort_records(buffer, record_size, num_records); } /** Sort the specified block of records (in-place). */ void sort_records( unsigned char * buffer, size_t record_size, size_t num_records) { // TODO : multi-threaded quicksort // heap_sort_records(buffer, record_size, num_records); quicksort_records(buffer, record_size, num_records); } /** * Remove duplicate records from an already sorted array. * * @return the number of unique records */ size_t filter_duplicate_records( unsigned char * buffer, unsigned int record_size, size_t num_records) { // Special case for 0 or 1 records. if (num_records < 2) { return num_records; } // Find the first duplicate record. unsigned char * last_unique = buffer; unsigned char * next_record = buffer + record_size; size_t next_pos = 1; while (next_pos < num_records) { if (memcmp(last_unique, next_record, record_size) == 0) { break; } last_unique = next_record; next_record += record_size; next_pos++; } // Scan the rest of the records and copy unique records. size_t num_unique = next_pos; while (next_pos < num_records) { if (memcmp(last_unique, next_record, record_size) != 0) { num_unique++; last_unique += record_size; memcpy(last_unique, next_record, record_size); } next_record += record_size; next_pos++; } return num_unique; } /** * Sort the whole file in a single pass. * * This function is used only when the input file fits in memory. */ void single_pass( BinaryFile& input_file, BinaryFile& output_file, const SortContext& ctx) { assert(input_file.size() < SIZE_MAX); size_t input_size = input_file.size(); size_t num_records = input_size / ctx.record_size; log(ctx, "sorting %zu records in a single pass\n", num_records); // Allocate memory. log(ctx, "allocating memory\n"); std::vector buffer(input_size); Timer timer; // Read input file into memory. log(ctx, "reading input file\n"); timer.start(); input_file.read(buffer.data(), 0, input_size); timer.stop(); log(ctx, " t = %.3f seconds\n", timer.value()); // TODO : multi-threaded sorting with thread pool // Sort records in memory buffer. log(ctx, "sorting records\n"); timer.start(); sort_records(buffer.data(), ctx.record_size, num_records); timer.stop(); log(ctx, " t = %.3f seconds\n", timer.value()); if (ctx.flag_unique) { // Eliminate duplicate records. log(ctx, "filtering duplicate records\n"); timer.start(); num_records = filter_duplicate_records( buffer.data(), ctx.record_size, num_records); timer.stop(); log(ctx, " t = %.3f seconds\n", timer.value()); log(ctx, "found %zu unique records\n", num_records); } log(ctx, "writing output file\n"); timer.start(); // Shrink output file if duplicate records were removed. uint64_t output_size = num_records * ctx.record_size; if (output_size < input_size) { output_file.truncate_file(output_size); } // Write memory buffer to output file. output_file.write(buffer.data(), 0, output_size); timer.stop(); log(ctx, " t = %.3f seconds\n", timer.value()); } /** * Perform the initial sort pass of multi-pass external sorting. * * All blocks will have the specified number of records, except for * the last block in the file which may be smaller. * * @param input_file Input file. * @param output_file Output file for this pass. * @param records_per_block Number of records per sort block. * @param num_blocks Number of sort blocks. * @param ctx Reference to context structure. */ void sort_pass( BinaryFile& input_file, BinaryFile& output_file, uint64_t records_per_block, uint64_t num_blocks, const SortContext& ctx) { unsigned int record_size = ctx.record_size; uint64_t file_size = input_file.size(); uint64_t num_records = file_size / record_size; log(ctx, "running initial sort pass\n"); Timer timer; timer.start(); // TODO : double-buffer with I/O in separate thread // Allocate sort buffer. assert(records_per_block < SIZE_MAX / record_size); std::vector buffer(records_per_block * record_size); // TODO : multi-threaded sorting with thread pool // Loop over blocks to be sorted. for (uint64_t block_index = 0; block_index < num_blocks; block_index++) { uint64_t first_record_idx = block_index * records_per_block; size_t block_num_records = std::min(records_per_block, num_records - first_record_idx); log(ctx, "sorting block %" PRIu64 " / %" PRIu64 ": %" PRIu64 " records\n", block_index, num_blocks, block_num_records); // Read block. input_file.read( buffer.data(), first_record_idx * record_size, block_num_records * record_size); // Sort records in this block. sort_records( buffer.data(), record_size, block_num_records); // Write block. output_file.write( buffer.data(), first_record_idx * record_size, block_num_records * record_size); } timer.stop(); log(ctx, "initial sort pass finished\n"); log(ctx, " t = %.3f seconds\n", timer.value()); } /** * Perform a merge pass of multi-pass external sorting. * * All input blocks will have the specified number of records, except for * the last block in the file which may be smaller. * * All output blocks will have (branch_factor * records_per_block) records, * except for the last output block in the file which may be smaller. * * If "filter_duplicates" is specified, the number of output blocks MUST be 1. * In this case duplicate elements will be removed from the output. * * @param input_file Input file for this pass. * @param output_file Output file for this pass. * @param records_per_block Number of records per input block. * @param num_blocks Number of input blocks. * @param branch_factor Number of blocks to merge per an output block. * @param filter_duplicates true to eliminate duplicate elements. * @param ctx Reference to context structure. */ void merge_pass( BinaryFile& input_file, BinaryFile& output_file, uint64_t records_per_block, uint64_t num_blocks, unsigned int branch_factor, bool filter_duplicates, const SortContext& ctx) { // TODO TODO TODO TODO #if 0 // Eliminating duplicates is only supported for a single output block. assert((!filter_duplicates) || (num_blocks <= branch_factor)); Timer timer; timer.start(); // TODO : double-buffer with I/O in separate thread // Allocate sort buffer. assert(records_per_block < SIZE_MAX / record_size); std::vector buffer(records_per_block * record_size); // Loop over groups of blocks to be sorted. uint64_t block_index = 0; while (block_index < num_blocks) { // Determine how many blocks will be merged in this group. unsigned int this_branch_factor = branch_factor; if (branch_factor > num_blocks - block_index) { branch_factor = num_blocks - block_index; } } // Loop over blocks to be sorted. for (uint64_t block_index = 0; block_index < num_blocks; block_index++) { uint64_t block_start_idx = block_index * records_per_block; size_t block_num_records = std::min(records_per_block, num_records - block_start_idx); log(ctx, "sorting block %" PRIu64 " / %" PRIu64 ": %" PRIu64 " records\n", block_index, num_blocks, block_num_records); // Read block. input_file.read( buffer.data(), block_start_idx * record_size, block_num_records * record_size); // Sort records in this block. sort_records( buffer.data(), record_size, block_num_records); // Write block. output_file.write( buffer.data(), block_start_idx * record_size, block_num_records * record_size); } timer.stop(); log(ctx, " t = %.3f seconds\n", timer.value()); #endif } /** * Prepare a strategy for multi-pass external sorting. */ SortStrategy plan_multi_pass_strategy( uint64_t file_size, const SortContext& ctx) { // Plan the initial sort pass. // Use blocks that are at most half of available memory // (so we can use two buffers to overlap I/O and sorting). uint64_t max_sort_block_size = ctx.memory_size / 2; // Calculate number of records per block. uint64_t records_per_sort_block = max_sort_block_size / ctx.record_size; // Make sure the block size is a multiple of 4096 bytes. records_per_sort_block -= records_per_sort_block % 4096; // Calculate the number of blocks during the initial sort pass. uint64_t num_records = file_size / ctx.record_size; uint64_t num_sort_blocks = 1 + (num_records - 1) / records_per_sort_block; SortStrategy strategy; strategy.records_per_sort_block = records_per_sort_block; strategy.num_sort_blocks = num_sort_blocks; // Plan the merge passes. // Start with the result of the initial sort pass. uint64_t records_per_block = records_per_sort_block; uint64_t num_blocks = num_sort_blocks; // Keep merging until there is only one block left. while (num_blocks > 1) { // Calculate the number of blocks out of this merge pass. uint64_t num_merged_blocks = 1 + (num_blocks - 1) / ctx.branch_factor; // Choose the smallest branch factor that produces this nr of blocks. unsigned int branch_factor = 1 + (num_blocks - 1) / num_merged_blocks; SortStrategy::MergePass merge_pass; merge_pass.records_per_input_block = records_per_block; merge_pass.num_input_blocks = num_blocks; merge_pass.branch_factor = branch_factor; strategy.merge_pass.push_back(merge_pass); // Result of this merge pass will go into the next pass. records_per_block *= branch_factor; num_blocks = num_merged_blocks; } return strategy; } /** * Sort a binary data file. * * @param input_file Path name of binary input file. * @param output_file Path name of binary output file. * @param ctx Reference to context structure. */ void sortbin( const std::string& input_name, const std::string& output_name, const SortContext& ctx) { // We want file I/O to occur on 4096-byte boundaries. // To ensure this, we want to do I/O on multiples of 4096 records. // To ensure this is possible, we need room for ~ 32k records per branch. if (ctx.branch_factor > ctx.memory_size / ctx.record_size / 32768) { throw std::logic_error( "Not enough memory for this combination of record size" " and branch factor"); } // Open input file. log(ctx, "opening input file\n"); BinaryInputFile input_file(input_name); uint64_t file_size = input_file.size(); // Check that input file contains an integer number of records. if (file_size % ctx.record_size != 0) { throw std::logic_error( "Input file does not contain an integer number of records"); } // Create output file. log(ctx, "creating output file\n"); BinaryOutputFile output_file(output_name, file_size); if (file_size <= ctx.memory_size) { // Data fits in memory. // Sort in a single pass. single_pass(input_file, output_file, ctx); } else { // Data does not fit in memory. // Plan a multi-pass strategy. SortStrategy strategy = plan_multi_pass_strategy(file_size, ctx); unsigned int num_merge_pass = strategy.merge_pass.size(); log(ctx, "sorting %" PRIu64 " records in %" PRIu64 " blocks" " followed by %u merge passes\n", file_size / ctx.record_size, strategy.num_sort_blocks, num_merge_pass); // Create a temporary file. log(ctx, "creating temporary file\n"); BinaryTempFile temp_file(ctx.temporary_directory, file_size); // The merge passes alternate between tempfile-to-outputfile and // outputfile-to-tempfile. // The final merge pass will be tempfile-to-outputfile. // Depending on the number of merge passes, the initial sort pass // will either be inputfile-to-tempfile or inputfile-to-outputfile. BinaryFile * output_or_temp_file[2] = { &output_file, &temp_file }; BinaryFile * sort_output_file = output_or_temp_file[num_merge_pass % 2]; // Execute the initial sort pass. sort_pass( input_file, *sort_output_file, strategy.records_per_sort_block, strategy.num_sort_blocks, ctx); // Execute the merge passes. for (unsigned int mp = 0; mp < num_merge_pass; mp++) { log(ctx, "running merge pass %u / %u: " "%" PRIu64 " blocks, branch factor %u\n", mp, num_merge_pass, strategy.merge_pass[mp].num_input_blocks, strategy.merge_pass[mp].branch_factor); // Filter duplicate records only on the last pass. bool filter_dupl = ctx.flag_unique && (mp + 1 == num_merge_pass); // Alternate between temp_file and output_file. BinaryFile * pass_input_file = output_or_temp_file[(num_merge_pass - mp) % 2]; BinaryFile * pass_output_file = output_or_temp_file[(num_merge_pass - mp - 1) % 2]; // Execute the merge pass. merge_pass( *pass_input_file, *pass_output_file, strategy.merge_pass[mp].records_per_input_block, strategy.merge_pass[mp].num_input_blocks, strategy.merge_pass[mp].branch_factor, filter_dupl, ctx); } } log(ctx, "finished\n"); } std::string get_default_tmpdir(void) { const char *tmpdir = getenv("TMPDIR"); if (tmpdir == NULL) { tmpdir = "/tmp"; } return std::string(tmpdir); } void usage() { fprintf(stderr, "\n" "Usage: sortbin [options] inputfile outputfile\n" "\n" "Options:\n" "\n" " -s, --size=N specify record size of N bytes (required)\n" " -u, --unique eleminate duplicates after sorting\n" " --memory=M use at most M MByte RAM (default: %d)\n" " --branch=B merge at most B arrays in one step (default: %d)\n" " --temporary-directory=DIR write temporary file to the specified\n" " directory (default: $TMPDIR)\n" "\n" "The output file must not yet exist.\n" "If the data does not fit in memory, a temporary file will be\n" "created with the same size as the input/output files.\n" "\n", DEFAULT_MEMORY_SIZE_MBYTE, DEFAULT_BRANCH_FACTOR); } } // anonymous namespace int main(int argc, char **argv) { const struct option longopts[] = { { "size", 1, NULL, 's' }, { "unique", 0, NULL, 'u' }, { "memory", 1, NULL, 'M' }, { "branch", 1, NULL, 'B' }, { "temporary-directory", 1, NULL, 'T' }, { "verbose", 0, NULL, 'v' }, { "help", 0, NULL, 'h' }, { NULL, 0, NULL, 0 } }; bool flag_unique = false; bool flag_verbose = false; int record_size = 0; int memory_size = DEFAULT_MEMORY_SIZE_MBYTE; int branch_factor = DEFAULT_BRANCH_FACTOR; std::string tempdir = get_default_tmpdir(); int opt; while ((opt = getopt_long(argc, argv, "uv", longopts, NULL)) != -1) { switch (opt) { case 's': record_size = atoi(optarg); if (record_size < 1) { fprintf(stderr, "ERROR: Invalid record size (must be at least 1)\n"); return EXIT_FAILURE; } break; case 'u': flag_unique = true; break; case 'M': memory_size = atoi(optarg); if (memory_size <= 0) { fprintf(stderr, "ERROR: Invalid memory size\n"); return EXIT_FAILURE; } break; case 'B': branch_factor = atoi(optarg); if (branch_factor < 2) { fprintf(stderr, "ERROR: Invalid radix value, must be at least 2\n"); return EXIT_FAILURE; } break; case 'T': tempdir = optarg; break; case 'v': flag_verbose = true; break; case 'h': usage(); return EXIT_SUCCESS; default: usage(); return EXIT_FAILURE; } } if (record_size < 1) { fprintf(stderr, "ERROR: Missing required parameter --size\n"); usage(); return EXIT_FAILURE; } if (argc < optind + 2) { fprintf(stderr, "ERROR: Input and output file names must be specified\n"); usage(); return EXIT_FAILURE; } if (argc > optind + 2) { fprintf(stderr, "ERROR: Unexpected command-line parameters\n"); usage(); return EXIT_FAILURE; } if ((unsigned int)memory_size >= SIZE_MAX / 1024 / 1024) { fprintf( stderr, "ERROR: This system can allocate at most %zu MB memory\n", SIZE_MAX / 1024 / 1024 - 1); return EXIT_FAILURE; } std::string input_name(argv[optind]); std::string output_name(argv[optind+1]); SortContext ctx; ctx.record_size = record_size; ctx.memory_size = uint64_t(memory_size) * 1024 * 1024; ctx.branch_factor = branch_factor; ctx.flag_unique = flag_unique; ctx.flag_verbose = flag_verbose; ctx.temporary_directory = tempdir; try { sortbin(input_name, output_name, ctx); } catch (const std::exception& ex) { fprintf(stderr, "ERROR: %s\n", ex.what()); return EXIT_FAILURE; } return EXIT_SUCCESS; }