diff --git a/python/run_matching.py b/python/run_matching.py new file mode 100644 index 0000000..5870dad --- /dev/null +++ b/python/run_matching.py @@ -0,0 +1,385 @@ +#!/usr/bin/env python3 + +""" +Calculate maximum weighted matching of graphs in DIMACS format. +""" + +from __future__ import annotations + +import sys +import argparse +import math +import os.path +from typing import Optional, TextIO + +from max_weight_matching import ( + maximum_weight_matching, + adjust_weights_for_maximum_cardinality_matching) + + +def parse_int_or_float(s: str) -> int|float: + """Convert a string to integer or float value.""" + try: + return int(s) + except ValueError: + pass + return float(s) + + +def read_dimacs_graph(f: TextIO) -> list[tuple[int, int, int|float]]: + """Read a graph in DIMACS edge list format.""" + + edges: list[tuple[int, int, float]] = [] + + for s in f: + words = s.strip().split() + + if not words[0]: + # Skip empty line. + continue + + if words[0].startswith("c"): + # Skip comment line. + pass + + elif words[0] == "p": + # Handle "problem" line. + if len(words) != 4: + raise ValueError( + f"Expecting DIMACS edge format but got {s.strip()!r}") + if words[1] != "edge": + raise ValueError( + f"Expecting DIMACS edge format but got {words[1]!r}") + + elif words[0] == "e": + # Handle "edge" line. + if len(words) != 4: + raise ValueError(f"Expecting edge but got {s.strip()!r}") + i = int(words[1]) + j = int(words[2]) + w = parse_int_or_float(words[3]) + edges.append((i, j, w)) + + else: + raise ValueError(f"Unknown line type {words[0]!r}") + + return edges + + +def read_dimacs_graph_file(filename: str) -> list[tuple[int, int, int|float]]: + """Read a graph from file or stdin.""" + if filename: + with open(filename, "r") as f: + try: + return read_dimacs_graph(f) + except ValueError as exc: + raise ValueError(f"{exc} in {filename!r}") from None + else: + try: + return read_dimacs_graph(sys.stdin) + except ValueError as exc: + raise ValueError(f"{exc} in (stdin)") from None + + +def read_dimacs_matching( + f: TextIO + ) -> tuple[int|float, list[tuple[int, int]]]: + """Read a matching solution in DIMACS format.""" + + have_weight = False + weight: int|float = 0 + pairs: list[tuple[int, int]] = [] + + for s in f: + words = s.strip().split() + + if not words[0]: + # Skip empty line. + continue + + if words[0].startswith("c"): + # Skip comment line. + pass + + elif words[0] == "s": + # Handle "solution" line. + if len(words) != 2: + raise ValueError( + f"Expecting solution line but got {s.strip()}") + if have_weight: + raise ValueError("Duplicate solution line") + have_weight = True + weight = parse_int_or_float(words[1]) + + elif words[0] == "m": + # Handle "matching" line. + if len(words) != 3: + raise ValueError( + f"Expecting matched edge but got {s.strip()}") + i = int(words[1]) + j = int(words[2]) + pairs.append((i, j)) + + else: + raise ValueError(f"Unknown line type {words[0]!r}") + + if not have_weight: + raise ValueError("Missing solution line") + + return (weight, pairs) + + +def read_dimacs_matching_file( + filename: str + ) -> tuple[int|float, list[tuple[int, int]]]: + """Read a matching from file.""" + with open(filename, "r") as f: + try: + return read_dimacs_matching(f) + except ValueError as exc: + raise ValueError(f"{exc} in {filename!r}") from None + + +def write_dimacs_matching( + f: TextIO, + weight: int|float, + pairs: list[tuple[int, int]] + ) -> None: + """Write a matching solution in DIMACS format.""" + + if isinstance(weight, int): + print("s", weight, file=f) + else: + print("s", f"{weight:.12g}", file=f) + + for (i, j) in pairs: + print("m", i, j, file=f) + + +def write_dimacs_matching_file( + filename: str, + weight: int|float, + pairs: list[tuple[int, int]] + ) -> None: + """Write a matching to file or stdout.""" + if filename: + with open(filename, "x") as f: + write_dimacs_matching(f, weight, pairs) + else: + write_dimacs_matching(sys.stdout, weight, pairs) + + +def calc_matching_weight( + edges: list[tuple[int, int, int|float]], + pairs: list[tuple[int, int]] + ) -> int|float: + """Verify that the matching is valid and calculate its weight. + + Matched pairs are assumed to be in the same order as edges. + """ + + weight: int|float = 0 + + edge_pos = 0 + for pair in pairs: + while edge_pos < len(edges): + if edges[edge_pos][0:2] == pair: + break + edge_pos += 1 + assert edge_pos <= len(edges) + (i, j, w) = edges[edge_pos] + assert pair == (i, j) + weight += w + edge_pos += 1 + + return weight + + +def generate_matching( + input_filename: str, + output_filename: str, + maxcard: bool + ) -> None: + """Calculate matching of one graph instance.""" + + edges = read_dimacs_graph_file(input_filename) + + if maxcard: + edges_adj = adjust_weights_for_maximum_cardinality_matching(edges) + pairs = maximum_weight_matching(edges_adj) + else: + pairs = maximum_weight_matching(edges) + + weight = calc_matching_weight(edges, pairs) + + write_dimacs_matching_file(output_filename, weight, pairs) + + +def run_generate( + filenames: list[str], + outdir: Optional[str], + maxcard: bool + ) -> int: + """Calculate matching(s) and write output to disk or stdout.""" + + if len(filenames) == 0: + # Read from stdin; write to stdout. + generate_matching("", "", maxcard) + + elif not outdir: + # Read from file, write to stdout. + assert len(filenames) == 1 + generate_matching(filenames[0], "", maxcard) + + else: + # Read from file, write to file. + for filename in filenames: + output_filename = os.path.join( + outdir, + os.path.splitext(os.path.basename(filename))[0] + ".out") + print(f"Processing {filename!r} -> {output_filename!r} ...", + end=" ") + sys.stdout.flush() + + generate_matching(filename, output_filename, maxcard) + + print(" OK") + sys.stdout.flush() + + return 0 + + +def verify_matching(filename: str, maxcard: bool, wfactor: float) -> bool: + """Verify matching of one graph instance.""" + + print("Verifying", repr(filename), "...", end=" ") + sys.stdout.flush() + + matching_filename = os.path.splitext(filename)[0] + ".out" + + edges = read_dimacs_graph_file(filename) + (gold_weight, gold_pairs) = read_dimacs_matching_file(matching_filename) + + edges_adj = edges + + if wfactor != 1.0: + if wfactor.is_integer(): + wfactor = round(wfactor) + edges_adj = [(i, j, w * wfactor) for (i, j, w) in edges_adj] + + if maxcard: + edges_adj = adjust_weights_for_maximum_cardinality_matching(edges_adj) + + pairs = maximum_weight_matching(edges_adj) + weight = calc_matching_weight(edges, pairs) + + if maxcard: + if len(pairs) != len(gold_pairs): + print("FAILED", + f"(got {len(pairs)} pairs, expected {len(gold_pairs)}") + return False + + if isinstance(weight, int) and isinstance(gold_weight, int): + good = (weight == gold_weight) + else: + good = math.isclose(weight, gold_weight, rel_tol=1e-9, abs_tol=1e-9) + + if not good: + print(f"FAILED (got weight {weight}, expected {gold_weight}") + return False + + print("OK") + return True + + +def run_verify(filenames: list[str], maxcard: bool, wfactor: float) -> int: + """Verify matching(s).""" + + num_passed = 0 + failed_tests: list[str] = [] + + for filename in filenames: + if verify_matching(filename, maxcard, wfactor): + num_passed += 1 + else: + failed_tests.append(filename) + sys.stdout.flush() + + print("done.") + print(num_passed, "tests passed") + if failed_tests: + print(len(failed_tests), "tests failed:") + for filename in failed_tests: + print(" ", filename, "FAILED") + else: + print("All tests passed") + sys.stdout.flush() + + return 1 if failed_tests else 0 + + +def main() -> int: + """Main program.""" + + parser = argparse.ArgumentParser() + parser.description = ( + "Calculate maximum weighted matching of graphs in DIMACS format.") + + parser.add_argument("--stdin", + action="store_true", + help="read graph from stdin") + parser.add_argument("--verify", + action="store_true", + help="verify existing output file(s)") + parser.add_argument("--maxcard", + action="store_true", + help="calculate maximum-cardinality matching") + parser.add_argument("--wfactor", + action="store", + type=float, + default=1.0, + help="adjust weights by specified factor") + parser.add_argument("--outdir", + action="store", + type=str, + help="directory to write output") + parser.add_argument("input", + nargs="*", + help="input file(s)") + + args = parser.parse_args() + + if args.stdin and args.verify: + print("ERROR: Can not verify when reading from stdin", + file=sys.stderr) + return 1 + + if (not args.stdin) and (not args.input): + parser.print_help(sys.stderr) + print(file=sys.stderr) + print("ERROR: Specify either --stdin or at least one input file", + file=sys.stderr) + return 1 + + if args.stdin and args.input: + print("ERROR: Specify either --stdin or input files, not both", + file=sys.stderr) + return 1 + + if len(args.input) > 1 and (not args.verify) and (not args.outdir): + print("ERROR: Need --outdir or --verify to process multiple inputs", + file=sys.stderr) + return 1 + + try: + if args.verify: + return run_verify(args.input, args.maxcard, args.wfactor) + else: + return run_generate(args.input, args.outdir, args.maxcard) + except (OSError, ValueError) as exc: + print("ERROR:", exc, file=sys.stderr) + return 1 + + +if __name__ == "__main__": + sys.exit(main())