diff --git a/src/sortbin.cpp b/src/sortbin.cpp index 2c32806..d06a128 100644 --- a/src/sortbin.cpp +++ b/src/sortbin.cpp @@ -57,6 +57,9 @@ #define DEFAULT_THREADS 1 #define MAX_THREADS 128 +/* By default use background threads for I/O. */ +#define DEFAULT_IO_THREAD 1 + /* Align buffer sizes and I/O on this number of records. For efficiency, I/O should be done in multiples of 4096 bytes. */ #define TRANSFER_ALIGNMENT 4096 @@ -86,6 +89,9 @@ struct SortContext /** True to eliminate duplicate records. */ bool flag_unique; + /** True to do I/O in background threads. */ + bool flag_io_thread; + /** True to write progress messages to stderr. */ bool flag_verbose; @@ -2206,22 +2212,42 @@ void usage() "\n" "Options:\n" "\n" - " -s, --size=N specify record size of N bytes (required)\n" - " -u, --unique eliminate duplicates after sorting\n" - " --memory=M use at most MiByte RAM (default: %d)\n" - " --memory=G use at most GiByte RAM\n" - " --branch=N merge N subarrays in one step (default: %d)\n" - " --threads=N use N threads for parallel sorting (default: %d)\n" - " --temporary-directory=DIR write temporary file to the specified\n" - " directory (default: $TMPDIR)\n" + " -s, --size=N\n" + " Specify record size of N bytes (required)\n" + "\n" + " -u, --unique\n" + " Eliminate duplicates after sorting\n" + "\n" + " --memory=M, --memory=G\n" + " Specify the amount of RAM that may be used.\n" + " Use suffix 'M' for MiByte, or 'G' for GiByte." + " (default: %d MiB)\n" + "\n" + " --branch=N\n" + " Merge at most N subarrays in one step. (default: %d)\n" + "\n" + " --parallel=N\n" + " Use N threads for parallel sorting. (default: %d)\n" + "\n" + " --iothread / --no-iothread\n" + " Enable or disable use of background threads for I/O.\n" + " (default: %s)\n" + "\n" + " -T, --temporary-directory=DIR\n" + " Write temporary file to the specified directory." + " (default: $TMPDIR)\n" + "\n" + " -v, --verbose\n" + " Write progress messages to STDERR.\n" "\n" "The output file must not yet exist.\n" - "If the data does not fit in memory, a temporary file will be\n" + "If the file does not fit in memory, a temporary file will be\n" "created with the same size as the input/output files.\n" "\n", DEFAULT_MEMORY_SIZE_MBYTE, DEFAULT_BRANCH_FACTOR, - DEFAULT_THREADS); + DEFAULT_THREADS, + DEFAULT_IO_THREAD ? "enable" : "disable"); } @@ -2235,59 +2261,72 @@ int main(int argc, char **argv) { "unique", 0, NULL, 'u' }, { "memory", 1, NULL, 'M' }, { "branch", 1, NULL, 'B' }, - { "threads", 1, NULL, 'J' }, + { "parallel", 1, NULL, 'P' }, { "temporary-directory", 1, NULL, 'T' }, + { "iothread", 0, NULL, 'X' }, + { "no-iothread", 0, NULL, 'x' }, { "verbose", 0, NULL, 'v' }, { "help", 0, NULL, 'h' }, { NULL, 0, NULL, 0 } }; - bool flag_unique = false; - bool flag_verbose = false; - unsigned int record_size = 0; - unsigned int branch_factor = DEFAULT_BRANCH_FACTOR; - unsigned int num_threads = DEFAULT_THREADS; - uint64_t memory_size = uint64_t(DEFAULT_MEMORY_SIZE_MBYTE) * 1024 * 1024; - std::string tempdir = get_default_tmpdir(); - int opt; + SortContext ctx; + ctx.record_size = 0; + ctx.memory_size = uint64_t(DEFAULT_MEMORY_SIZE_MBYTE) * 1024 * 1024; + ctx.branch_factor = DEFAULT_BRANCH_FACTOR; + ctx.num_threads = DEFAULT_THREADS; + ctx.flag_unique = false; + ctx.flag_io_thread = DEFAULT_IO_THREAD; + ctx.flag_verbose = false; + ctx.temporary_directory = get_default_tmpdir(); + + int opt; while ((opt = getopt_long(argc, argv, "s:T:uvh", longopts, NULL)) != -1) { switch (opt) { case 's': - if (!parse_uint(optarg, record_size) || record_size < 1) { + if (!parse_uint(optarg, ctx.record_size) + || ctx.record_size < 1) { fprintf(stderr, "ERROR: Invalid record size (must be at least 1)\n"); return EXIT_FAILURE; } break; case 'u': - flag_unique = true; + ctx.flag_unique = true; break; case 'M': - memory_size = parse_memory_size(optarg); - if (memory_size == 0) { + ctx.memory_size = parse_memory_size(optarg); + if (ctx.memory_size == 0) { return EXIT_FAILURE; } break; case 'B': - if (!parse_uint(optarg, branch_factor) || branch_factor < 2) { + if (!parse_uint(optarg, ctx.branch_factor) + || ctx.branch_factor < 2) { fprintf(stderr, - "ERROR: Invalid radix value, must be at least 2\n"); + "ERROR: Invalid branch factor, must be at least 2\n"); return EXIT_FAILURE; } break; - case 'J': - if (!parse_uint(optarg, num_threads) - || num_threads < 1 - || num_threads > MAX_THREADS) { + case 'P': + if (!parse_uint(optarg, ctx.num_threads) + || ctx.num_threads < 1 + || ctx.num_threads > MAX_THREADS) { fprintf(stderr, "ERROR: Invalid number of threads\n"); return EXIT_FAILURE; } break; case 'T': - tempdir = optarg; + ctx.temporary_directory = optarg; + break; + case 'X': + ctx.flag_io_thread = true; + break; + case 'x': + ctx.flag_io_thread = false; break; case 'v': - flag_verbose = true; + ctx.flag_verbose = true; break; case 'h': usage(); @@ -2298,7 +2337,7 @@ int main(int argc, char **argv) } } - if (record_size < 1) { + if (ctx.record_size < 1) { fprintf(stderr, "ERROR: Missing required parameter --size\n"); usage(); return EXIT_FAILURE; @@ -2317,7 +2356,7 @@ int main(int argc, char **argv) return EXIT_FAILURE; } - if (memory_size >= SIZE_MAX) { + if (ctx.memory_size >= SIZE_MAX) { fprintf( stderr, "ERROR: This system supports at most %zu MB memory\n", @@ -2328,15 +2367,6 @@ int main(int argc, char **argv) std::string input_name(argv[optind]); std::string output_name(argv[optind+1]); - SortContext ctx; - ctx.record_size = record_size; - ctx.memory_size = memory_size; - ctx.branch_factor = branch_factor; - ctx.num_threads = num_threads; - ctx.flag_unique = flag_unique; - ctx.flag_verbose = flag_verbose; - ctx.temporary_directory = tempdir; - try { sortbin(input_name, output_name, ctx); } catch (const std::exception& ex) {