1
0
Fork 0

Add command-line option for I/O thread

Actual background I/O not yet implemented.
This commit is contained in:
Joris van Rantwijk 2022-06-27 22:35:21 +02:00
parent eb3b06eede
commit 57c20dbca3
1 changed files with 72 additions and 42 deletions

View File

@ -57,6 +57,9 @@
#define DEFAULT_THREADS 1 #define DEFAULT_THREADS 1
#define MAX_THREADS 128 #define MAX_THREADS 128
/* By default use background threads for I/O. */
#define DEFAULT_IO_THREAD 1
/* Align buffer sizes and I/O on this number of records. /* Align buffer sizes and I/O on this number of records.
For efficiency, I/O should be done in multiples of 4096 bytes. */ For efficiency, I/O should be done in multiples of 4096 bytes. */
#define TRANSFER_ALIGNMENT 4096 #define TRANSFER_ALIGNMENT 4096
@ -86,6 +89,9 @@ struct SortContext
/** True to eliminate duplicate records. */ /** True to eliminate duplicate records. */
bool flag_unique; bool flag_unique;
/** True to do I/O in background threads. */
bool flag_io_thread;
/** True to write progress messages to stderr. */ /** True to write progress messages to stderr. */
bool flag_verbose; bool flag_verbose;
@ -2206,22 +2212,42 @@ void usage()
"\n" "\n"
"Options:\n" "Options:\n"
"\n" "\n"
" -s, --size=N specify record size of N bytes (required)\n" " -s, --size=N\n"
" -u, --unique eliminate duplicates after sorting\n" " Specify record size of N bytes (required)\n"
" --memory=<n>M use at most <n> MiByte RAM (default: %d)\n" "\n"
" --memory=<n>G use at most <n> GiByte RAM\n" " -u, --unique\n"
" --branch=N merge N subarrays in one step (default: %d)\n" " Eliminate duplicates after sorting\n"
" --threads=N use N threads for parallel sorting (default: %d)\n" "\n"
" --temporary-directory=DIR write temporary file to the specified\n" " --memory=<n>M, --memory=<n>G\n"
" directory (default: $TMPDIR)\n" " Specify the amount of RAM that may be used.\n"
" Use suffix 'M' for MiByte, or 'G' for GiByte."
" (default: %d MiB)\n"
"\n"
" --branch=N\n"
" Merge at most N subarrays in one step. (default: %d)\n"
"\n"
" --parallel=N\n"
" Use N threads for parallel sorting. (default: %d)\n"
"\n"
" --iothread / --no-iothread\n"
" Enable or disable use of background threads for I/O.\n"
" (default: %s)\n"
"\n"
" -T, --temporary-directory=DIR\n"
" Write temporary file to the specified directory."
" (default: $TMPDIR)\n"
"\n"
" -v, --verbose\n"
" Write progress messages to STDERR.\n"
"\n" "\n"
"The output file must not yet exist.\n" "The output file must not yet exist.\n"
"If the data does not fit in memory, a temporary file will be\n" "If the file does not fit in memory, a temporary file will be\n"
"created with the same size as the input/output files.\n" "created with the same size as the input/output files.\n"
"\n", "\n",
DEFAULT_MEMORY_SIZE_MBYTE, DEFAULT_MEMORY_SIZE_MBYTE,
DEFAULT_BRANCH_FACTOR, DEFAULT_BRANCH_FACTOR,
DEFAULT_THREADS); DEFAULT_THREADS,
DEFAULT_IO_THREAD ? "enable" : "disable");
} }
@ -2235,59 +2261,72 @@ int main(int argc, char **argv)
{ "unique", 0, NULL, 'u' }, { "unique", 0, NULL, 'u' },
{ "memory", 1, NULL, 'M' }, { "memory", 1, NULL, 'M' },
{ "branch", 1, NULL, 'B' }, { "branch", 1, NULL, 'B' },
{ "threads", 1, NULL, 'J' }, { "parallel", 1, NULL, 'P' },
{ "temporary-directory", 1, NULL, 'T' }, { "temporary-directory", 1, NULL, 'T' },
{ "iothread", 0, NULL, 'X' },
{ "no-iothread", 0, NULL, 'x' },
{ "verbose", 0, NULL, 'v' }, { "verbose", 0, NULL, 'v' },
{ "help", 0, NULL, 'h' }, { "help", 0, NULL, 'h' },
{ NULL, 0, NULL, 0 } { NULL, 0, NULL, 0 }
}; };
bool flag_unique = false;
bool flag_verbose = false;
unsigned int record_size = 0;
unsigned int branch_factor = DEFAULT_BRANCH_FACTOR;
unsigned int num_threads = DEFAULT_THREADS;
uint64_t memory_size = uint64_t(DEFAULT_MEMORY_SIZE_MBYTE) * 1024 * 1024;
std::string tempdir = get_default_tmpdir();
int opt;
SortContext ctx;
ctx.record_size = 0;
ctx.memory_size = uint64_t(DEFAULT_MEMORY_SIZE_MBYTE) * 1024 * 1024;
ctx.branch_factor = DEFAULT_BRANCH_FACTOR;
ctx.num_threads = DEFAULT_THREADS;
ctx.flag_unique = false;
ctx.flag_io_thread = DEFAULT_IO_THREAD;
ctx.flag_verbose = false;
ctx.temporary_directory = get_default_tmpdir();
int opt;
while ((opt = getopt_long(argc, argv, "s:T:uvh", longopts, NULL)) != -1) { while ((opt = getopt_long(argc, argv, "s:T:uvh", longopts, NULL)) != -1) {
switch (opt) { switch (opt) {
case 's': case 's':
if (!parse_uint(optarg, record_size) || record_size < 1) { if (!parse_uint(optarg, ctx.record_size)
|| ctx.record_size < 1) {
fprintf(stderr, fprintf(stderr,
"ERROR: Invalid record size (must be at least 1)\n"); "ERROR: Invalid record size (must be at least 1)\n");
return EXIT_FAILURE; return EXIT_FAILURE;
} }
break; break;
case 'u': case 'u':
flag_unique = true; ctx.flag_unique = true;
break; break;
case 'M': case 'M':
memory_size = parse_memory_size(optarg); ctx.memory_size = parse_memory_size(optarg);
if (memory_size == 0) { if (ctx.memory_size == 0) {
return EXIT_FAILURE; return EXIT_FAILURE;
} }
break; break;
case 'B': case 'B':
if (!parse_uint(optarg, branch_factor) || branch_factor < 2) { if (!parse_uint(optarg, ctx.branch_factor)
|| ctx.branch_factor < 2) {
fprintf(stderr, fprintf(stderr,
"ERROR: Invalid radix value, must be at least 2\n"); "ERROR: Invalid branch factor, must be at least 2\n");
return EXIT_FAILURE; return EXIT_FAILURE;
} }
break; break;
case 'J': case 'P':
if (!parse_uint(optarg, num_threads) if (!parse_uint(optarg, ctx.num_threads)
|| num_threads < 1 || ctx.num_threads < 1
|| num_threads > MAX_THREADS) { || ctx.num_threads > MAX_THREADS) {
fprintf(stderr, "ERROR: Invalid number of threads\n"); fprintf(stderr, "ERROR: Invalid number of threads\n");
return EXIT_FAILURE; return EXIT_FAILURE;
} }
break; break;
case 'T': case 'T':
tempdir = optarg; ctx.temporary_directory = optarg;
break;
case 'X':
ctx.flag_io_thread = true;
break;
case 'x':
ctx.flag_io_thread = false;
break; break;
case 'v': case 'v':
flag_verbose = true; ctx.flag_verbose = true;
break; break;
case 'h': case 'h':
usage(); usage();
@ -2298,7 +2337,7 @@ int main(int argc, char **argv)
} }
} }
if (record_size < 1) { if (ctx.record_size < 1) {
fprintf(stderr, "ERROR: Missing required parameter --size\n"); fprintf(stderr, "ERROR: Missing required parameter --size\n");
usage(); usage();
return EXIT_FAILURE; return EXIT_FAILURE;
@ -2317,7 +2356,7 @@ int main(int argc, char **argv)
return EXIT_FAILURE; return EXIT_FAILURE;
} }
if (memory_size >= SIZE_MAX) { if (ctx.memory_size >= SIZE_MAX) {
fprintf( fprintf(
stderr, stderr,
"ERROR: This system supports at most %zu MB memory\n", "ERROR: This system supports at most %zu MB memory\n",
@ -2328,15 +2367,6 @@ int main(int argc, char **argv)
std::string input_name(argv[optind]); std::string input_name(argv[optind]);
std::string output_name(argv[optind+1]); std::string output_name(argv[optind+1]);
SortContext ctx;
ctx.record_size = record_size;
ctx.memory_size = memory_size;
ctx.branch_factor = branch_factor;
ctx.num_threads = num_threads;
ctx.flag_unique = flag_unique;
ctx.flag_verbose = flag_verbose;
ctx.temporary_directory = tempdir;
try { try {
sortbin(input_name, output_name, ctx); sortbin(input_name, output_name, ctx);
} catch (const std::exception& ex) { } catch (const std::exception& ex) {