1
0
Fork 0
maximum-weight-matching/python/mwmatching.py

1904 lines
71 KiB
Python

"""
Algorithm for finding a maximum weight matching in general graphs.
"""
from __future__ import annotations
import sys
import math
from collections.abc import Sequence
from typing import NamedTuple, Optional
from datastruct import UnionFindQueue, PriorityQueue
def maximum_weight_matching(
edges: Sequence[tuple[int, int, float]]
) -> list[tuple[int, int]]:
"""Compute a maximum-weighted matching in the general undirected weighted
graph given by "edges".
The graph is specified as a list of edges, each edge specified as a tuple
of its two vertices and the edge weight.
There may be at most one edge between any pair of vertices.
No vertex may have an edge to itself.
The graph may be non-connected (i.e. contain multiple components).
Vertices are indexed by consecutive, non-negative integers, such that
the first vertex has index 0 and the last vertex has index (n-1).
Edge weights may be integers or floating point numbers.
Isolated vertices (not incident to any edge) are allowed, but not
recommended since such vertices consume time and memory but have
no effect on the maximum-weight matching.
Edges with negative weight are ignored.
This function takes time O(n * (n + m) * log(n)),
where "n" is the number of vertices and "m" is the number of edges.
This function uses O(n + m) memory.
Parameters:
edges: List of edges, each edge specified as a tuple "(x, y, w)"
where "x" and "y" are vertex indices and "w" is the edge weight.
Returns:
List of pairs of matched vertex indices.
This is a subset of the edges in the graph.
It contains a tuple "(x, y)" if vertex "x" is matched to vertex "y".
Raises:
ValueError: If the input does not satisfy the constraints.
TypeError: If the input contains invalid data types.
MatchingError: If the matching algorithm fails.
This can only happen if there is a bug in the algorithm.
"""
# Check that the input meets all constraints.
_check_input_types(edges)
_check_input_graph(edges)
# Remove edges with negative weight.
edges = _remove_negative_weight_edges(edges)
# Special case for empty graphs.
if not edges:
return []
# Initialize graph representation.
graph = _GraphInfo(edges)
# Initialize the matching algorithm.
ctx = _MatchingContext(graph)
# Improve the solution until no further improvement is possible.
#
# Each successful pass through this loop increases the number
# of matched edges by 1.
#
# This loop runs through at most (n/2 + 1) iterations.
# Each iteration takes time O(n**2).
while ctx.run_stage():
pass
# Extract the final solution.
pairs: list[tuple[int, int]] = [
(x, y) for (x, y, _w) in edges if ctx.vertex_mate[x] == y]
# Verify that the matching is optimal.
# This is just a safeguard; the verification will always pass unless
# there is a bug in the matching algorithm.
# Verification only works reliably for integer weights.
if graph.integer_weights:
_verify_optimum(ctx)
return pairs
def adjust_weights_for_maximum_cardinality_matching(
edges: Sequence[tuple[int, int, float]]
) -> Sequence[tuple[int, int, float]]:
"""Adjust edge weights such that the maximum-weight matching of
the adjusted graph is a maximum-cardinality matching, equal to
a matching in the original graph that has maximum weight out of all
matchings with maximum cardinality.
The graph is specified as a list of edges, each edge specified as a tuple
of its two vertices and the edge weight.
Edge weights may be integers or floating point numbers.
Negative edge weights are allowed.
This function increases all edge weights by an equal amount such that
the adjusted weights satisfy the following conditions:
- All edge weights are positive;
- The minimum edge weight is at least "n" times the difference between
maximum and minimum edge weight.
These conditions ensure that a maximum-cardinality matching will be found.
Proof: The weight of any non-maximum-cardinality matching can be increased
by matching an additional edge, even if the new edge has minimum edge
weight and causes all other matched edges to degrade from maximum to
minimum edge weight.
Since we are only considering maximum-cardinality matchings, increasing
all edge weights by an equal amount will not change the set of edges
that makes up the maximum-weight matching.
This function increases edge weights by an amount that is proportional
to the product of the unadjusted weight range and the number of vertices
in the graph. In case of a big graph with floating point weights, this
may introduce rounding errors in the weights.
This function takes time O(m), where "m" is the number of edges.
Parameters:
edges: List of edges, each edge specified as a tuple "(x, y, w)"
where "x" and "y" are vertex indices and "w" is the edge weight.
Returns:
List of edges with adjusted weights. If no adjustments are necessary,
the input list instance may be returned.
Raises:
ValueError: If the input does not satisfy the constraints.
TypeError: If the input contains invalid data types.
"""
_check_input_types(edges)
# Don't worry about empty graphs:
if not edges:
return edges
num_vertex = 1 + max(max(x, y) for (x, y, _w) in edges)
min_weight = min(w for (_x, _y, w) in edges)
max_weight = max(w for (_x, _y, w) in edges)
weight_range = max_weight - min_weight
# Do nothing if the weights already ensure a maximum-cardinality matching.
if min_weight > 0 and min_weight >= num_vertex * weight_range:
return edges
delta: float
if weight_range > 0:
# Increase weights to make minimum edge weight large enough
# to improve any non-maximum-cardinality matching.
delta = num_vertex * weight_range - min_weight
else:
# All weights are the same. Increase weights to make them positive.
delta = 1 - min_weight
assert delta >= 0
# Increase all edge weights by "delta".
return [(x, y, w + delta) for (x, y, w) in edges]
class MatchingError(Exception):
"""Raised when verification of the matching fails.
This can only happen if there is a bug in the algorithm.
"""
pass
def _check_input_types(edges: Sequence[tuple[int, int, float]]) -> None:
"""Check that the input consists of valid data types and valid
numerical ranges.
This function takes time O(m).
Parameters:
edges: List of edges, each edge specified as a tuple "(x, y, w)"
where "x" and "y" are edge indices and "w" is the edge weight.
Raises:
ValueError: If the input does not satisfy the constraints.
TypeError: If the input contains invalid data types.
"""
float_limit = sys.float_info.max / 4
if not isinstance(edges, list):
raise TypeError('"edges" must be a list')
for e in edges:
if (not isinstance(e, tuple)) or (len(e) != 3):
raise TypeError("Each edge must be specified as a 3-tuple")
(x, y, w) = e
if (not isinstance(x, int)) or (not isinstance(y, int)):
raise TypeError("Edge endpoints must be integers")
if (x < 0) or (y < 0):
raise ValueError("Edge endpoints must be non-negative integers")
if not isinstance(w, (int, float)):
raise TypeError(
"Edge weights must be integers or floating point numbers")
if isinstance(w, float):
if not math.isfinite(w):
raise ValueError("Edge weights must be finite numbers")
# Check that this edge weight will not cause our dual variable
# calculations to exceed the valid floating point range.
if w > float_limit:
raise ValueError("Floating point edge weights must be"
f" less than {float_limit:g}")
def _check_input_graph(edges: Sequence[tuple[int, int, float]]) -> None:
"""Check that the input is a valid graph, without any multi-edges and
without any self-edges.
This function takes time O(m * log(m)).
Parameters:
edges: List of edges, each edge specified as a tuple "(x, y, w)"
where "x" and "y" are edge indices and "w" is the edge weight.
Raises:
ValueError: If the input does not satisfy the constraints.
"""
# Check that the graph has no self-edges.
for (x, y, _w) in edges:
if x == y:
raise ValueError("Self-edges are not supported")
# Check that the graph does not have multi-edges.
# Using a set() would be more straightforward, but the runtime bounds
# of the Python set type are not clearly specified.
# Sorting provides guaranteed O(m * log(m)) run time.
edge_endpoints = [((x, y) if (x < y) else (y, x)) for (x, y, _w) in edges]
edge_endpoints.sort()
for i in range(len(edge_endpoints) - 1):
if edge_endpoints[i] == edge_endpoints[i+1]:
raise ValueError(f"Duplicate edge {edge_endpoints[i]}")
def _remove_negative_weight_edges(
edges: Sequence[tuple[int, int, float]]
) -> Sequence[tuple[int, int, float]]:
"""Remove edges with negative weight.
This does not change the solution of the maximum-weight matching problem,
but prevents complications in the algorithm.
"""
if any(e[2] < 0 for e in edges):
return [e for e in edges if e[2] >= 0]
else:
return edges
class _GraphInfo:
"""Representation of the input graph.
These data remain unchanged while the algorithm runs.
"""
def __init__(self, edges: Sequence[tuple[int, int, float]]) -> None:
"""Initialize the graph representation and prepare an adjacency list.
This function takes time O(n + m).
"""
# Vertices are indexed by integers in range 0 .. n-1.
# Edges are indexed by integers in range 0 .. m-1.
#
# Each edge is incident on two vertices.
# Each edge also has a weight.
#
# "edges[e] = (x, y, w)" where
# "e" is an edge index;
# "x" and "y" are vertex indices of the incident vertices;
# "w" is the edge weight.
#
# These data remain unchanged while the algorithm runs.
self.edges: Sequence[tuple[int, int, float]] = edges
# num_vertex = the number of vertices.
if edges:
self.num_vertex = 1 + max(max(x, y) for (x, y, _w) in edges)
else:
self.num_vertex = 0
# Each vertex is incident to zero or more edges.
#
# "adjacent_edges[x]" is the list of edge indices of edges incident
# to the vertex with index "x".
#
# These data remain unchanged while the algorithm runs.
self.adjacent_edges: list[list[int]] = [
[] for v in range(self.num_vertex)]
for (e, (x, y, _w)) in enumerate(edges):
self.adjacent_edges[x].append(e)
self.adjacent_edges[y].append(e)
# Determine whether _all_ weights are integers.
# In this case we can avoid floating point computations entirely.
self.integer_weights: bool = all(isinstance(w, int)
for (_x, _y, w) in edges)
# Each vertex may be labeled "S" (outer) or "T" (inner) or be unlabeled.
_LABEL_NONE = 0
_LABEL_S = 1
_LABEL_T = 2
class _Blossom:
"""Represents a blossom in a partially matched graph.
A blossom is an odd-length alternating cycle over sub-blossoms.
An alternating path consists of alternating matched and unmatched edges.
An alternating cycle is an alternating path that starts and ends in
the same sub-blossom.
A single vertex by itself is also a blossom: a "trivial blossom".
An instance of this class represents either a trivial blossom,
or a non-trivial blossom which consists of multiple sub-blossoms.
Blossoms are recursive structures: A non-trivial blossoms contains
sub-blossoms, which may themselves contain sub-blossoms etc.
Each blossom contains exactly one vertex that is not matched to another
vertex in the same blossom. This is the "base vertex" of the blossom.
"""
def __init__(self, base_vertex: int) -> None:
"""Initialize a new blossom."""
# If this is not a top-level blossom,
# "parent" is the blossom in which this blossom is a sub-blossom.
#
# If this is a top-level blossom,
# "parent = None".
self.parent: Optional[_NonTrivialBlossom] = None
# "base_vertex" is the vertex index of the base of the blossom.
# This is the unique vertex which is contained in the blossom
# but not matched to another vertex in the same blossom.
#
# For trivial blossoms, the base vertex is simply the only
# vertex in the blossom.
self.base_vertex: int = base_vertex
# A top-level blossom that are part of an alternating tree,
# has labels S or T. Unlabeled top-level blossoms are not (yet)
# part of any alternating tree.
self.label: int = _LABEL_NONE
# Labeled top-level blossoms keep track of the edge through which
# they are attached to an alternating tree.
#
# "tree_edge = (x, y)" if the blossom is attached to an alternating
# tree via edge "(x, y)" and vertex "y" is contained in the blossom.
#
# "tree_edge = None" if the blossom is the root of an alternating tree.
self.tree_edge: Optional[tuple[int, int]] = None
# For a labeled top-level blossom,
# "alternating_tree" is the set of all top-level blossoms that belong
# to the same alternating tree. The same set instance is shared by
# all top-level blossoms in the tree.
self.tree_blossoms: "Optional[set[_Blossom]]" = None
# Each top-level blossom maintains a union-find datastructure
# containing all vertices in the blossom.
self.vertex_set: "UnionFindQueue[_Blossom, int]"
self.vertex_set = UnionFindQueue(self)
# If this is a top-level unlabeled blossom with an edge to an
# S-blossom, "delta2_node" is the corresponding node in the delta2
# queue.
self.delta2_node: Optional[PriorityQueue.Node] = None
# Support variable for lazy updating of vertex dual variables.
self.vertex_dual_offset: float = 0
# "marker" is a temporary variable used to discover common
# ancestors in the blossom tree. It is normally False, except
# when used by "trace_alternating_paths()".
self.marker: bool = False
def vertices(self) -> list[int]:
"""Return a list of vertex indices contained in the blossom."""
return [self.base_vertex]
class _NonTrivialBlossom(_Blossom):
"""Represents a non-trivial blossom in a partially matched graph.
A non-trivial blossom is a blossom that contains multiple sub-blossoms
(at least 3 sub-blossoms, since all blossoms have odd length).
Non-trivial blossoms maintain a list of their sub-blossoms and the edges
between their subblossoms.
Unlike trivial blossoms, each non-trivial blossom is associated with
a variable in the dual LPP problem.
Non-trivial blossoms are created and destroyed by the matching algorithm.
This implies that not every odd-length alternating cycle is a blossom;
it only becomes a blossom through an explicit action of the algorithm.
An existing blossom may change when the matching is augmented along
a path that runs through the blossom.
"""
def __init__(
self,
subblossoms: list[_Blossom],
edges: list[tuple[int, int]]
) -> None:
"""Initialize a new blossom."""
super().__init__(subblossoms[0].base_vertex)
# Sanity check.
n = len(subblossoms)
assert len(edges) == n
assert n >= 3
assert n % 2 == 1
# "subblossoms" is a list of the sub-blossoms of the blossom,
# ordered by their appearance in the alternating cycle.
#
# "subblossoms[0]" is the start and end of the alternating cycle.
# "subblossoms[0]" contains the base vertex of the blossom.
self.subblossoms: list[_Blossom] = subblossoms
# "edges" is a list of edges linking the sub-blossoms.
# Each edge is represented as an ordered pair "(x, y)" where "x"
# and "y" are vertex indices.
#
# "edges[0] = (x, y)" where vertex "x" in "subblossoms[0]" is
# adjacent to vertex "y" in "subblossoms[1]", etc.
self.edges: list[tuple[int, int]] = edges
# Every non-trivial blossom has a variable in the dual LPP.
# New blossoms start with dual variable 0.
#
# The value of the dual variable changes through delta steps,
# but these changes are implemented as lazy updates.
#
# The true dual value of a top-level S-blossom is
# blossom.dual_var + ctx.delta_sum_2x
#
# The true dual value of a top-level T-blossom is
# blossom.dual_var - ctx.delta_sum_2x
#
# The true dual value of any other type of blossom is simply
# blossom.dual_var
#
# Note that "dual_var" is invariant under delta steps.
self.dual_var: float = 0
# If this is a top-level T-blossom,
# "delta4_node" is the corresponding node in the delta4 queue.
# Otherwise "delta4_node" is None.
self.delta4_node: Optional[PriorityQueue.Node] = None
def vertices(self) -> list[int]:
"""Return a list of vertex indices contained in the blossom."""
# Use an explicit stack to avoid deep recursion.
stack: list[_NonTrivialBlossom] = [self]
nodes: list[int] = []
while stack:
b = stack.pop()
for sub in b.subblossoms:
if isinstance(sub, _NonTrivialBlossom):
stack.append(sub)
else:
nodes.append(sub.base_vertex)
return nodes
class _AlternatingPath(NamedTuple):
"""Represents a list of edges forming an alternating path or an
alternating cycle."""
edges: list[tuple[int, int]]
class _MatchingContext:
"""Holds all data used by the matching algorithm.
It contains a partial solution of the matching problem and several
auxiliary data structures.
"""
def __init__(self, graph: _GraphInfo) -> None:
"""Set up the initial state of the matching algorithm."""
num_vertex = graph.num_vertex
# Reference to the input graph.
# The graph does not change while the algorithm runs.
self.graph = graph
# Each vertex is either single (unmatched) or matched to
# another vertex.
#
# If vertex "x" is matched to vertex "y",
# "vertex_mate[x] == y" and "vertex_mate[y] == x".
#
# If vertex "x" is unmatched, "vertex_mate[x] == -1".
#
# Initially all vertices are unmatched.
self.vertex_mate: list[int] = num_vertex * [-1]
# Each vertex is associated with a trivial blossom.
# In addition, non-trivial blossoms may be created and destroyed
# during the course of the matching algorithm.
#
# "trivial_blossom[x]" is the trivial blossom that contains only
# vertex "x".
self.trivial_blossom: list[_Blossom] = [_Blossom(x)
for x in range(num_vertex)]
# Non-trivial blossoms may be created and destroyed during
# the course of the algorithm.
#
# Initially there are no non-trivial blossoms.
self.nontrivial_blossom: list[_NonTrivialBlossom] = []
# "vertex_set_node[x]" represents the vertex "x" inside the
# union-find datastructure of its top-level blossom.
self.vertex_set_node = [b.vertex_set.insert(i, math.inf)
for (i, b) in enumerate(self.trivial_blossom)]
# All vertex duals are initialized to half the maximum edge weight.
#
# "start_vertex_dual_2x" is 2 times the initial vertex dual value.
#
# Pre-multiplication by 2 ensures that the values are integers
# if all edge weights are integers.
self.start_vertex_dual_2x = max(w for (_x, _y, w) in graph.edges)
# Every vertex has a variable in the dual LPP.
#
# The value of the dual variable changes through delta steps,
# but these changes are implemented as lazy updates.
#
# The true dual value of an S-vertex is
# (vertex_dual_2x[x] - delta_sum_2x) / 2
#
# The true dual value of a T-vertex is
# (vertex_dual_2x[x] + delta_sum_2x + B(x).vertex_dual_offset) / 2
#
# The true dual value of an unlabeled vertex is
# (vertex_dual_2x[x] + B(x).vertex_dual_offset) / 2
#
# Note that "vertex_dual_2x" is invariant under delta steps.
self.vertex_dual_2x: list[float]
self.vertex_dual_2x = num_vertex * [self.start_vertex_dual_2x]
# Running sum of applied delta steps times 2.
self.delta_sum_2x: float = 0
# Queue containing unlabeled top-level blossoms that have an edge to
# an S-blossom. The priority of a blossom is 2 times the least slack
# to an S blossom, plus 2 times the running sum of delta steps.
self.delta2_queue: PriorityQueue[_Blossom] = PriorityQueue()
# Queue containing edges between S-vertices in different top-level
# blossoms. The priority of an edge is its slack plus 2 times the
# running sum of delta steps.
self.delta3_queue: PriorityQueue[int] = PriorityQueue()
self.delta3_set: set[int] = set()
# Queue containing top-level non-trivial T-blossoms.
# The priority of a blossom is its dual plus 2 times the running
# sum of delta steps.
self.delta4_queue: PriorityQueue[_NonTrivialBlossom] = PriorityQueue()
# For each T-vertex or unlabeled vertex "x",
# "vertex_best_edge[x]" is the edge index of the least-slack edge
# between "x" and any S-vertex, or -1 if no such edge has been found.
self.vertex_best_edge: list[int] = num_vertex * [-1]
# Queue of S-vertices to be scanned.
self.scan_queue: list[int] = []
def __del__(self) -> None:
"""Delete reference cycles during cleanup of the matching context."""
for blossom in self.trivial_blossom:
blossom.vertex_set.clear()
del blossom.vertex_set
blossom.tree_blossoms = None
for blossom in self.nontrivial_blossom:
blossom.vertex_set.clear()
del blossom.vertex_set
blossom.tree_blossoms = None
def edge_pseudo_slack_2x(self, e: int) -> float:
"""Return 2 times the pseudo-slack of the specified edge.
The pseudo-slack of an edge is related to its true slack, but
distorted in a way that makes it invariant under delta steps.
If the edge connects two S-vertices in different top-level blossoms,
the true slack is the pseudo-slack minus 2 times the running sum
of delta steps.
If the edge connects an S-vertex to an unlabeled vertex,
the true slack is the pseudo-slack minus the running sum of delta
steps, plus the pending offset of the top-level blossom that contains
the unlabeled vertex.
"""
(x, y, w) = self.graph.edges[e]
return self.vertex_dual_2x[x] + self.vertex_dual_2x[y] - 2 * w
#
# Least-slack edge tracking:
#
# To calculate delta steps, the matching algorithm needs to find
# - the least-slack edge between any S-vertex and an unlabeled vertex;
# - the least-slack edge between any pair of top-level S-blossoms.
#
# For each unlabeled vertex and each T-vertex, we keep track of the
# least-slack edge to any S-vertex. Tracking for unlabeled vertices
# serves to provide the least-slack edge for the delta step.
# Tracking for T-vertices is done because such vertices can turn into
# unlabeled vertices if they are part of a T-blossom that gets expanded.
#
# Note: For a given vertex or blossom, the identity of the least-slack
# edge to any S-blossom remains unchanged during a delta step.
# Although the delta step changes edge slacks, it changes the slack
# of every edge to an S-vertex by the same amount. Therefore the edge
# that had least slack before the delta step, will still have least slack
# after the delta step.
#
def lset_reset(self) -> None:
"""Reset least-slack edge tracking.
This function takes time O(n * log(n)).
"""
num_vertex = self.graph.num_vertex
for x in range(num_vertex):
self.vertex_best_edge[x] = -1
self.vertex_set_node[x].set_prio(math.inf)
self.delta2_queue.clear()
for blossom in self.trivial_blossom + self.nontrivial_blossom:
blossom.delta2_node = None
def lset_add_vertex_edge(self, y: int, by: _Blossom, e: int) -> None:
"""Add edge "e" from an S-vertex to unlabeled vertex or T-vertex "y".
This function takes time O(log(n)).
"""
prio = self.edge_pseudo_slack_2x(e)
best_edge = self.vertex_best_edge[y]
if best_edge != -1:
best_prio = self.edge_pseudo_slack_2x(best_edge)
if prio >= best_prio:
return
self.vertex_best_edge[y] = e
prev_min = by.vertex_set.min_prio()
self.vertex_set_node[y].set_prio(prio)
if (by.label == _LABEL_NONE) and (prio < prev_min):
prio += by.vertex_dual_offset
if by.delta2_node is None:
by.delta2_node = self.delta2_queue.insert(prio, by)
elif prio < by.delta2_node.prio:
self.delta2_queue.decrease_prio(by.delta2_node, prio)
def lset_get_best_vertex_edge(self) -> tuple[int, float]:
"""Return the index and slack of the least-slack edge between
any S-vertex and unlabeled vertex.
This function takes time O(log(n)).
Returns:
Tuple (edge_index, slack_2x) if there is a least-slack edge,
or (-1, 0) if there is no suitable edge.
"""
if self.delta2_queue.empty():
return (-1, 0)
delta2_node = self.delta2_queue.find_min()
blossom = delta2_node.data
prio = delta2_node.prio
slack_2x = prio - self.delta_sum_2x
assert blossom.parent is None
assert blossom.label == _LABEL_NONE
x = blossom.vertex_set.min_elem()
e = self.vertex_best_edge[x]
assert e >= 0
return (e, slack_2x)
#
# General support routines:
#
def assign_blossom_label_s(self, blossom: _Blossom) -> None:
"""Assign label S to an unlabeled top-level blossom."""
assert blossom.parent is None
assert blossom.label == _LABEL_NONE
blossom.label = _LABEL_S
# Delete blossom from delta2 queue.
if blossom.delta2_node is not None:
self.delta2_queue.delete(blossom.delta2_node)
blossom.delta2_node = None
# Prepare for lazy updating of S-blossom dual variable.
if isinstance(blossom, _NonTrivialBlossom):
blossom.dual_var -= self.delta_sum_2x
def remove_blossom_label_s(self, blossom: _Blossom) -> None:
"""Remove label S from a top-level S-blossom."""
assert blossom.parent is None
assert blossom.label == _LABEL_S
blossom.label = _LABEL_NONE
# Catch up with lazy updates to S-blossom dual variable.
if isinstance(blossom, _NonTrivialBlossom):
blossom.dual_var += self.delta_sum_2x
def assign_vertex_label_s(self, blossom: _Blossom) -> None:
"""Adjust after assigning label S to previously unlabeled vertices."""
# Add the new S-vertices to the scan queue.
vertices = blossom.vertices()
self.scan_queue.extend(vertices)
# Prepare for lazy updating of S-vertex dual variables.
vertex_dual_fixup = self.delta_sum_2x + blossom.vertex_dual_offset
blossom.vertex_dual_offset = 0
for x in vertices:
self.vertex_dual_2x[x] += vertex_dual_fixup
def assign_blossom_label_t(self, blossom: _Blossom) -> None:
"""Assign label T to an unlabeled top-level blossom."""
assert blossom.parent is None
assert blossom.label == _LABEL_NONE
blossom.label = _LABEL_T
# Delete blossom from delta2 queue.
if blossom.delta2_node is not None:
self.delta2_queue.delete(blossom.delta2_node)
blossom.delta2_node = None
if isinstance(blossom, _NonTrivialBlossom):
# Prepare for lazy updating of T-blossom dual variables.
blossom.dual_var += self.delta_sum_2x
# Insert blossom into the delta4 queue.
assert blossom.delta4_node is None
blossom.delta4_node = self.delta4_queue.insert(blossom.dual_var,
blossom)
# Prepare for lazy updating of T-vertex dual variables.
blossom.vertex_dual_offset -= self.delta_sum_2x
def remove_blossom_label_t(self, blossom: _Blossom) -> None:
"""Remove label T from a top-level T-blossom."""
assert blossom.parent is None
assert blossom.label == _LABEL_T
blossom.label = _LABEL_NONE
if isinstance(blossom, _NonTrivialBlossom):
# Remove blossom from delta4 queue.
assert blossom.delta4_node is not None
self.delta4_queue.delete(blossom.delta4_node)
blossom.delta4_node = None
# Unwind lazy updates to T-blossom dual variable.
blossom.dual_var -= self.delta_sum_2x
# Unwind lazy updates of T-vertex dual variables.
blossom.vertex_dual_offset += self.delta_sum_2x
def reset_blossom_label(self, blossom: _Blossom) -> None:
"""Remove blossom label and calculate true dual variables."""
assert blossom.parent is None
if blossom.label == _LABEL_S:
# Remove label.
blossom.label = _LABEL_NONE
# Unwind lazy delta updates to S-blossom dual variable.
if isinstance(blossom, _NonTrivialBlossom):
blossom.dual_var += self.delta_sum_2x
# Unwind lazy delta updates to S-vertex dual variables.
assert blossom.vertex_dual_offset == 0
vertex_dual_fixup = -self.delta_sum_2x
for x in blossom.vertices():
self.vertex_dual_2x[x] += vertex_dual_fixup
elif blossom.label == _LABEL_T:
# Remove label.
blossom.label = _LABEL_NONE
# Unwind lazy delta updates to T-blossom dual variable.
if isinstance(blossom, _NonTrivialBlossom):
blossom.dual_var -= self.delta_sum_2x
# Unwind lazy delta updates to T-vertex dual variables.
vertex_dual_fixup = self.delta_sum_2x + blossom.vertex_dual_offset
blossom.vertex_dual_offset = 0
for x in blossom.vertices():
self.vertex_dual_2x[x] += vertex_dual_fixup
else:
# Unwind lazy delta updates to vertex dual variables.
vertex_dual_fixup = blossom.vertex_dual_offset
blossom.vertex_dual_offset = 0
for x in blossom.vertices():
self.vertex_dual_2x[x] += vertex_dual_fixup
def reset_stage(self) -> None:
"""Reset data which are only valid during a stage.
Marks all blossoms as unlabeled, clears the queue,
and resets tracking of least-slack edges.
This function takes time O(n * log(n)).
"""
assert not self.scan_queue
# Check consistency of alternating tree.
for blossom in self.trivial_blossom + self.nontrivial_blossom:
if (blossom.parent is None) and (blossom.label != _LABEL_NONE):
assert blossom.tree_blossoms is not None
assert blossom in blossom.tree_blossoms
if blossom.tree_edge is not None:
bx = self.vertex_set_node[blossom.tree_edge[0]].find()
by = self.vertex_set_node[blossom.tree_edge[1]].find()
assert bx.tree_blossoms is blossom.tree_blossoms
assert by.tree_blossoms is blossom.tree_blossoms
else:
assert blossom.tree_blossoms is None
# Remove blossom labels and unwind lazy dual updates.
for blossom in self.trivial_blossom + self.nontrivial_blossom:
if blossom.parent is None:
self.reset_blossom_label(blossom)
if isinstance(blossom, _NonTrivialBlossom):
blossom.delta4_node = None
assert blossom.label == _LABEL_NONE
blossom.tree_edge = None
blossom.tree_blossoms = None
# Reset least-slack edge tracking.
self.lset_reset()
# Reset delta queues.
self.delta3_queue.clear()
self.delta3_set.clear()
self.delta4_queue.clear()
def trace_alternating_paths(self, x: int, y: int) -> _AlternatingPath:
"""Trace back through the alternating trees from vertices "x" and "y".
If both vertices are part of the same alternating tree, this function
discovers a new blossom. In this case it returns an alternating path
through the blossom that starts and ends in the same sub-blossom.
If the vertices are part of different alternating trees, this function
discovers an augmenting path. In this case it returns an alternating
path that starts and ends in an unmatched vertex.
This function takes time O(k*log(n)) to discover a blossom,
where "k" is the number of sub-blossoms,
or time O(n*log(n)) to discover an augmenting path.
Returns:
Alternating path as an ordered list of edges between top-level
blossoms.
"""
marked_blossoms: list[_Blossom] = []
# "xedges" is a list of edges used while tracing from "x".
# "yedges" is a list of edges used while tracing from "y".
# Pre-load the edge (x, y) on both lists.
xedges: list[tuple[int, int]] = [(x, y)]
yedges: list[tuple[int, int]] = [(y, x)]
# "first_common" is the first common ancestor of "x" and "y"
# in the alternating tree, or None if there is no common ancestor.
first_common: Optional[_Blossom] = None
# Alternate between tracing the path from "x" and the path from "y".
# This ensures that the search time is bounded by the size of the
# newly found blossom.
while x != -1 or y != -1:
# Check if we found a common ancestor.
bx = self.vertex_set_node[x].find()
if bx.marker:
first_common = bx
break
# Mark blossom as a potential common ancestor.
bx.marker = True
marked_blossoms.append(bx)
# Track back through the link in the alternating tree.
if bx.tree_edge is None:
# Reached the root of this alternating tree.
x = -1
else:
xedges.append(bx.tree_edge)
x = bx.tree_edge[0]
# Swap "x" and "y" to alternate between paths.
if y != -1:
(x, y) = (y, x)
(xedges, yedges) = (yedges, xedges)
# Remove all markers we placed.
for b in marked_blossoms:
b.marker = False
# If we found a common ancestor, trim the paths so they end there.
if first_common is not None:
assert self.vertex_set_node[xedges[-1][0]].find() is first_common
while (self.vertex_set_node[yedges[-1][0]].find()
is not first_common):
yedges.pop()
# Fuse the two paths.
# Flip the order of one path, and flip the edge tuples in the other
# path to obtain a continuous path with correctly ordered edge tuples.
# Skip the duplicate edge in one of the paths.
path_edges = xedges[::-1] + [(y, x) for (x, y) in yedges[1:]]
# Any S-to-S alternating path must have odd length.
assert len(path_edges) % 2 == 1
return _AlternatingPath(path_edges)
#
# Merge and expand blossoms:
#
def make_blossom(self, path: _AlternatingPath) -> None:
"""Create a new blossom from an alternating cycle.
Assign label S to the new blossom.
Relabel all T-sub-blossoms as S and add their vertices to the queue.
This function takes total time O(n*log(n)) per stage.
"""
# Check that the path is odd-length.
assert len(path.edges) % 2 == 1
assert len(path.edges) >= 3
# Construct the list of sub-blossoms (current top-level blossoms).
subblossoms = [self.vertex_set_node[x].find() for (x, y) in path.edges]
# Check that the path is cyclic.
# Note the path will not always start and end with the same _vertex_,
# but it must start and end in the same _blossom_.
subblossoms_next = [self.vertex_set_node[y].find()
for (x, y) in path.edges]
assert subblossoms[0] == subblossoms_next[-1]
assert subblossoms[1:] == subblossoms_next[:-1]
# Blossom must start and end with an S-sub-blossom.
assert subblossoms[0].label == _LABEL_S
# Remove blossom labels.
# Mark vertices inside former T-blossoms as S-vertices.
tree_blossoms = subblossoms[0].tree_blossoms
assert tree_blossoms is not None
for sub in subblossoms:
if sub.label == _LABEL_S:
self.remove_blossom_label_s(sub)
elif sub.label == _LABEL_T:
self.remove_blossom_label_t(sub)
self.assign_vertex_label_s(sub)
sub.tree_blossoms = None
tree_blossoms.remove(sub)
# Create the new blossom object.
blossom = _NonTrivialBlossom(subblossoms, path.edges)
# Assign label S to the new blossom and link it to the tree.
self.assign_blossom_label_s(blossom)
blossom.tree_edge = subblossoms[0].tree_edge
blossom.tree_blossoms = tree_blossoms
tree_blossoms.add(blossom)
# Insert into the blossom array.
self.nontrivial_blossom.append(blossom)
# Link the subblossoms to the their new parent.
for sub in subblossoms:
sub.parent = blossom
# Merge union-find structures.
blossom.vertex_set.merge([sub.vertex_set for sub in subblossoms])
@staticmethod
def find_path_through_blossom(
blossom: _NonTrivialBlossom,
sub: _Blossom
) -> tuple[list[_Blossom], list[tuple[int, int]]]:
"""Construct a path through the specified blossom,
from sub-blossom "sub" to the base of the blossom.
Return:
Tuple (nodes, edges).
"""
# Walk around the blossom from "sub" to its base.
p = blossom.subblossoms.index(sub)
if p % 2 == 0:
# Walk backwards around the blossom.
# Flip edges from (i,j) to (j,i) to make them fit
# in the path from "sub" to base.
nodes = blossom.subblossoms[p::-1]
edges = [(j, i) for (i, j) in blossom.edges[:p][::-1]]
else:
# Walk forward around the blossom.
nodes = blossom.subblossoms[p:] + blossom.subblossoms[0:1]
edges = blossom.edges[p:]
return (nodes, edges)
def expand_t_blossom(self, blossom: _NonTrivialBlossom) -> None:
"""Expand the specified T-blossom.
This function takes total time O(n*log(n)) per stage.
"""
assert blossom.parent is None
assert blossom.label == _LABEL_T
assert blossom.delta2_node is None
# Remove expanded blossom from the delta4 queue.
assert blossom.delta4_node is not None
self.delta4_queue.delete(blossom.delta4_node)
blossom.delta4_node = None
# Remove blossom from its alternating tree.
tree_blossoms = blossom.tree_blossoms
assert tree_blossoms is not None
tree_blossoms.remove(blossom)
# Split union-find structure.
blossom.vertex_set.split()
# Prepare to push lazy delta updates down to the sub-blossoms.
vertex_dual_fixup = self.delta_sum_2x + blossom.vertex_dual_offset
blossom.vertex_dual_offset = 0
# Convert sub-blossoms into top-level blossoms.
for sub in blossom.subblossoms:
assert sub.label == _LABEL_NONE
sub.parent = None
assert sub.vertex_dual_offset == 0
sub.vertex_dual_offset = vertex_dual_fixup
# Insert blossom in delta2_queue if necessary.
prio = sub.vertex_set.min_prio()
if prio < math.inf:
assert sub.delta2_node is None
prio += sub.vertex_dual_offset
sub.delta2_node = self.delta2_queue.insert(prio, sub)
# The expanding blossom was part of an alternating tree, linked to
# a parent node in the tree via one of its subblossoms, and linked to
# a child node of the tree via the base vertex.
# We must reconstruct this part of the alternating tree, which will
# now run via sub-blossoms of the expanded blossom.
# Find the sub-blossom that is attached to the parent node in
# the alternating tree.
assert blossom.tree_edge is not None
(x, y) = blossom.tree_edge
sub = self.vertex_set_node[y].find()
# Assign label T to that sub-blossom.
self.assign_blossom_label_t(sub)
sub.tree_edge = blossom.tree_edge
sub.tree_blossoms = tree_blossoms
tree_blossoms.add(sub)
# Walk through the expanded blossom from "sub" to the base vertex.
# Assign alternating S and T labels to the sub-blossoms and attach
# them to the alternating tree.
(path_nodes, path_edges) = self.find_path_through_blossom(blossom,
sub)
for p in range(0, len(path_edges), 2):
#
# (p) ==(y,x)== (p+1) ----- (p+2)
# T S T
#
# path_nodes[p] has already been labeled T.
# We now assign labels to path_nodes[p+1] and path_nodes[p+2].
# Assign label S to path_nodes[p+1].
(y, x) = path_edges[p]
self.extend_tree_s(x)
# Assign label T to path_nodes[i+2] and attach it
# to path_nodes[p+1].
sub = path_nodes[p+2]
self.assign_blossom_label_t(sub)
sub.tree_edge = path_edges[p+1]
sub.tree_blossoms = tree_blossoms
tree_blossoms.add(sub)
# Delete the expanded blossom.
# TODO -- list manipulation is too slow
self.nontrivial_blossom.remove(blossom)
def expand_unlabeled_blossom(self, blossom: _NonTrivialBlossom) -> None:
"""Expand the specified unlabeled blossom.
This function takes total time O(n*log(n)) per stage.
"""
assert blossom.parent is None
assert blossom.label == _LABEL_NONE
# Remove blossom from delta2 heap.
assert blossom.delta2_node is not None
self.delta2_queue.delete(blossom.delta2_node)
blossom.delta2_node = None
# Split union-find structure.
blossom.vertex_set.split()
# Prepare to push lazy delta updates down to the sub-blossoms.
vertex_dual_offset = blossom.vertex_dual_offset
blossom.vertex_dual_offset = 0
# Convert sub-blossoms into top-level blossoms.
for sub in blossom.subblossoms:
assert sub.label == _LABEL_NONE
sub.parent = None
assert sub.vertex_dual_offset == 0
sub.vertex_dual_offset = vertex_dual_offset
# Insert blossom in delta2_queue if necessary.
prio = sub.vertex_set.min_prio()
if prio < math.inf:
assert sub.delta2_node is None
prio += sub.vertex_dual_offset
sub.delta2_node = self.delta2_queue.insert(prio, sub)
# Delete the expanded blossom.
# TODO -- list manipulation is too slow
self.nontrivial_blossom.remove(blossom)
#
# Augmenting:
#
def augment_blossom_rec(
self,
blossom: _NonTrivialBlossom,
sub: _Blossom,
stack: list[tuple[_NonTrivialBlossom, _Blossom]]
) -> None:
"""Augment along an alternating path through the specified blossom,
from sub-blossom "sub" to the base vertex of the blossom.
Modify the blossom to reflect that sub-blossom "sub" contains
the base vertex after augmenting.
Mark any sub-blossoms on the alternating path for recursive
augmentation, except for sub-blossom "sub" which has already been
augmented. Use the stack instead of making direct recursive calls.
"""
# Walk through the blossom from "sub" to the base vertex.
(path_nodes, path_edges) = self.find_path_through_blossom(blossom,
sub)
for p in range(0, len(path_edges), 2):
# Before augmentation:
# path_nodes[p] is matched to path_nodes[p+1]
#
# (p) ===== (p+1) ---(x,y)--- (p+2)
#
# After augmentation:
# path_nodes[p+1] matched to path_nodes[p+2] via edge (i,j)
#
# (p) ----- (p+1) ===(x,y)=== (p+2)
#
# Pull the edge (x, y) into the matching.
(x, y) = path_edges[p+1]
self.vertex_mate[x] = y
self.vertex_mate[y] = x
# Augment through the subblossoms touching the edge (x, y).
# Nothing needs to be done for trivial subblossoms.
bx = path_nodes[p+1]
if isinstance(bx, _NonTrivialBlossom):
stack.append((bx, self.trivial_blossom[x]))
by = path_nodes[p+2]
if isinstance(by, _NonTrivialBlossom):
stack.append((by, self.trivial_blossom[y]))
# Rotate the subblossom list so the new base ends up in position 0.
p = blossom.subblossoms.index(sub)
blossom.subblossoms = (
blossom.subblossoms[p:] + blossom.subblossoms[:p])
blossom.edges = blossom.edges[p:] + blossom.edges[:p]
# Update the base vertex.
# We can pull this from the sub-blossom where we started since
# its augmentation has already finished.
blossom.base_vertex = sub.base_vertex
def augment_blossom(
self,
blossom: _NonTrivialBlossom,
sub: _Blossom
) -> None:
"""Augment along an alternating path through the specified blossom,
from sub-blossom "sub" to the base vertex of the blossom.
Recursively augment any sub-blossoms on the alternating path.
This function takes time O(n).
"""
# Use an explicit stack to avoid deep recursion.
stack = [(blossom, sub)]
while stack:
(outer_blossom, sub) = stack.pop()
assert sub.parent is not None
blossom = sub.parent
if blossom != outer_blossom:
# Sub-blossom "sub" is an indirect (nested) child of
# the "outer_blossom" we are supposed to be augmenting.
#
# "blossom" is the direct parent of "sub".
# Let's first augment "blossom" from "sub" to its base vertex.
# Then continue by augmenting the parent of "blossom",
# from "blossom" to its base vertex, and so on until we
# get to the "outer_blossom".
#
# Set up to continue augmenting through the parent of
# "blossom".
stack.append((outer_blossom, blossom))
# Augment "blossom" from "sub" to the base vertex.
self.augment_blossom_rec(blossom, sub, stack)
def augment_matching(self, path: _AlternatingPath) -> None:
"""Augment the matching through the specified augmenting path.
This function takes time O(n).
"""
# Check that the augmenting path starts and ends in
# an unmatched vertex or a blossom with unmatched base.
assert len(path.edges) % 2 == 1
for x in (path.edges[0][0], path.edges[-1][1]):
b = self.vertex_set_node[x].find()
assert self.vertex_mate[b.base_vertex] == -1
# The augmenting path looks like this:
#
# (unmatched) ---- (B) ==== (B) ---- (B) ==== (B) ---- (unmatched)
#
# The first and last vertex (or blossom) of the path are unmatched
# (or have unmatched base vertex). After augmenting, those vertices
# will be matched. All matched edges on the path become unmatched,
# and unmatched edges become matched.
#
# This loop walks along the edges of this path that were not matched
# before augmenting.
for (x, y) in path.edges[0::2]:
# Augment the non-trivial blossoms on either side of this edge.
# No action is necessary for trivial blossoms.
bx = self.vertex_set_node[x].find()
if isinstance(bx, _NonTrivialBlossom):
self.augment_blossom(bx, self.trivial_blossom[x])
by = self.vertex_set_node[y].find()
if isinstance(by, _NonTrivialBlossom):
self.augment_blossom(by, self.trivial_blossom[y])
# Pull the edge into the matching.
self.vertex_mate[x] = y
self.vertex_mate[y] = x
#
# Labeling and alternating tree expansion:
#
def extend_tree_s(self, x: int) -> None:
"""Assign label S to the unlabeled blossom that contains vertex "x".
If vertex "x" is matched, it is attached to the alternating tree
via its matched edge. If vertex "x" is unmatched, it becomes the root
of an alternating tree.
All vertices in the newly labeled blossom are added to the scan queue.
Precondition:
"x" is an unlabeled vertex, either unmatched or matched to
a T-vertex via a tight edge.
"""
# Assign label S to the blossom that contains vertex "x".
bx = self.vertex_set_node[x].find()
self.assign_blossom_label_s(bx)
self.assign_vertex_label_s(bx)
y = self.vertex_mate[x]
if y == -1:
# Vertex "x" is unmatched.
# It must be either a top-level vertex or the base vertex of
# a top-level blossom.
assert bx.base_vertex == x
# Mark the blossom as root of an alternating tree.
bx.tree_edge = None
bx.tree_blossoms = {bx}
else:
# Vertex "x" is matched to T-vertex "y".
by = self.vertex_set_node[y].find()
assert by.label == _LABEL_T
# Attach the blossom that contains "x" to the alternating tree.
bx.tree_edge = (y, x)
bx.tree_blossoms = by.tree_blossoms
assert bx.tree_blossoms is not None
bx.tree_blossoms.add(bx)
def extend_tree_t(self, x: int, y: int) -> None:
"""Assign label T to the unlabeled blossom that contains vertex "y".
Attach it to the alternating tree via edge (x, y).
Then immediately assign label S to the mate of vertex "y".
Preconditions:
- "x" is an S-vertex.
- "y" is an unlabeled, matched vertex.
- There is a tight edge between vertices "x" and "y".
"""
bx = self.vertex_set_node[x].find()
by = self.vertex_set_node[y].find()
assert bx.label == _LABEL_S
# Expand zero-dual blossoms before assigning label T.
while isinstance(by, _NonTrivialBlossom) and (by.dual_var == 0):
self.expand_unlabeled_blossom(by)
by = self.vertex_set_node[y].find()
# Assign label T to the unlabeled blossom.
self.assign_blossom_label_t(by)
by.tree_edge = (x, y)
by.tree_blossoms = bx.tree_blossoms
assert by.tree_blossoms is not None
by.tree_blossoms.add(by)
# Assign label S to the blossom that is mated to the T-blossom.
z = self.vertex_mate[by.base_vertex]
assert z != -1
self.extend_tree_s(z)
def add_s_to_s_edge(self, x: int, y: int) -> Optional[_AlternatingPath]:
"""Add the edge between S-vertices "x" and "y".
If the edge connects blossoms that are part of the same alternating
tree, this function creates a new S-blossom and returns None.
If the edge connects two different alternating trees, an augmenting
path has been discovered. In this case the function changes nothing
and returns the augmenting path.
Returns:
Augmenting path if found; otherwise None.
"""
# Trace back through the alternating trees from "x" and "y".
path = self.trace_alternating_paths(x, y)
# If the path is a cycle, create a new blossom.
# Otherwise the path is an augmenting path.
# Note that an alternating starts and ends in the same blossom,
# but not necessarily in the same vertex within that blossom.
p = path.edges[0][0]
q = path.edges[-1][1]
if self.vertex_set_node[p].find() is self.vertex_set_node[q].find():
self.make_blossom(path)
return None
else:
return path
def substage_scan(self) -> None:
"""Scan queued S-vertices and consider their incident edges.
Edges are inserted in delta2 and delta3 tracking.
This function does not yet use the edges to extend the alternating
tree or find blossoms or augmenting paths, even if the edges
are tight. An edge that is already tight may be used later through
a zero-delta step.
"""
edges = self.graph.edges
adjacent_edges = self.graph.adjacent_edges
# Process S-vertices waiting to be scanned.
# This loop runs through O(n) iterations per stage.
for x in self.scan_queue:
# Double-check that "x" is an S-vertex.
bx = self.vertex_set_node[x].find()
assert bx.label == _LABEL_S
# Scan the edges that are incident on "x".
# This loop runs through O(m) iterations per stage.
for e in adjacent_edges[x]:
(p, q, _w) = edges[e]
y = p if p != x else q
# Consider the edge between vertices "x" and "y".
# Update delta2 or delta3 tracking accordingly.
#
# We don't actually use the edge right now to extend
# the alternating tree or create a blossom or alternating path.
# If appropriate, insert this edge into delta2 or delta3
# tracking.
# Insert this edge into delta2 or delta3 tracking
# Try to pull this edge into an alternating tree.
# Ignore edges that are internal to a blossom.
by = self.vertex_set_node[y].find()
if bx is by:
continue
if by.label == _LABEL_S:
# Update tracking of least-slack edges between S-blossoms.
# Priority is edge slack plus 2 times the running sum of
# delta steps.
if e not in self.delta3_set:
prio_2x = self.edge_pseudo_slack_2x(e)
if self.graph.integer_weights:
# If all edge weights are integers, the slack of
# any edge between S-vertices is also an integer.
assert prio_2x % 2 == 0
prio = prio_2x // 2
else:
prio = prio_2x / 2
self.delta3_set.add(e)
self.delta3_queue.insert(prio, e)
else:
# Update tracking of least-slack edges from vertex "y" to
# any S-vertex. We do this for T-vertices and unlabeled
# vertices. Edges which already have zero slack are still
# tracked.
self.lset_add_vertex_edge(y, by, e)
self.scan_queue.clear()
#
# Delta steps:
#
def substage_calc_dual_delta(
self
) -> tuple[int, float, int, Optional[_NonTrivialBlossom]]:
"""Calculate a delta step in the dual LPP problem.
This function returns the minimum of the 4 types of delta values,
and the type of delta which obtain the minimum, and the edge or
blossom that produces the minimum delta, if applicable.
The returned value is 2 times the actual delta value.
Multiplication by 2 ensures that the result is an integer if all edge
weights are integers.
This function assumes that there is at least one S-vertex.
This function takes total time O(m * log(n)) for all calls
within a stage.
Returns:
Tuple (delta_type, delta_2x, delta_edge, delta_blossom).
"""
delta_edge = -1
delta_blossom: Optional[_NonTrivialBlossom] = None
# Compute delta1: minimum dual variable of any S-vertex.
# All unmatched vertices have the same dual value, and this is
# the minimum value among all S-vertices.
delta_type = 1
delta_2x = self.start_vertex_dual_2x - self.delta_sum_2x
# Compute delta2: minimum slack of any edge between an S-vertex and
# an unlabeled vertex.
# This takes time O(log(n)).
(e, slack) = self.lset_get_best_vertex_edge()
if (e != -1) and (slack <= delta_2x):
delta_type = 2
delta_2x = slack
delta_edge = e
# Compute delta3: half minimum slack of any edge between two top-level
# S-blossoms.
#
# This loop iterates O(m) times per stage.
# Each iteration takes time O(log(n)).
while not self.delta3_queue.empty():
delta3_node = self.delta3_queue.find_min()
e = delta3_node.data
(x, y, _w) = self.graph.edges[e]
bx = self.vertex_set_node[x].find()
by = self.vertex_set_node[y].find()
assert (bx.label == _LABEL_S) and (by.label == _LABEL_S)
if bx is not by:
# Found edge between different top-level S-blossoms.
slack = delta3_node.prio - self.delta_sum_2x
if slack <= delta_2x:
delta_type = 3
delta_2x = slack
delta_edge = e
break
# Reject edges between vertices within the same top-level blossom.
# Although intra-blossom edges are never inserted into the queue,
# existing edges in the queue may become intra-blossom when
# a new blossom is formed.
self.delta3_queue.delete(delta3_node)
self.delta3_set.remove(e)
# Compute delta4: half minimum dual variable of a top-level T-blossom.
# This takes time O(log(n)).
if not self.delta4_queue.empty():
blossom = self.delta4_queue.find_min().data
assert blossom.label == _LABEL_T
assert blossom.parent is None
blossom_dual = blossom.dual_var - self.delta_sum_2x
if blossom_dual <= delta_2x:
delta_type = 4
delta_2x = blossom_dual
delta_blossom = blossom
return (delta_type, delta_2x, delta_edge, delta_blossom)
#
# Main stage function:
#
def run_stage(self) -> bool:
"""Run one stage of the matching algorithm.
The stage searches a maximum-weight augmenting path.
If this path is found, it is used to augment the matching,
thereby increasing the number of matched edges by 1.
If no such path is found, the matching must already be optimal.
This function takes time O((n + m) * log(n)).
Returns:
True if the matching was successfully augmented.
False if no further improvement is possible.
"""
num_vertex = self.graph.num_vertex
# Assign label S to all unmatched vertices and put them in the queue.
for x in range(num_vertex):
if self.vertex_mate[x] == -1:
self.extend_tree_s(x)
# Stop if all vertices are matched.
# No further improvement is possible in that case.
# This avoids messy calculations of delta steps without any S-vertex.
if not self.scan_queue:
return False
# Each pass through the following loop is a "substage".
# The substage tries to find an augmenting path.
# If an augmenting path is found, we augment the matching and end
# the stage. Otherwise we update the dual LPP problem and enter the
# next substage, or stop if no further improvement is possible.
#
# This loop runs through at most O(n) iterations per stage.
augmenting_path = None
while True:
# Consider the incident edges of newly labeled S-vertices.
self.substage_scan()
# Calculate delta step in the dual LPP problem.
(delta_type, delta_2x, delta_edge, delta_blossom
) = self.substage_calc_dual_delta()
# Update the running sum of delta steps.
# This implicitly updates the dual variables as needed, because
# the running delta sum is taken into account when calculating
# dual values.
self.delta_sum_2x += delta_2x
if delta_type == 2:
# Use the edge from S-vertex to unlabeled vertex that got
# unlocked through the delta update.
(x, y, _w) = self.graph.edges[delta_edge]
if self.vertex_set_node[x].find().label != _LABEL_S:
(x, y) = (y, x)
self.extend_tree_t(x, y)
elif delta_type == 3:
# Use the S-to-S edge that got unlocked by the delta update.
# This may reveal an augmenting path.
(x, y, _w) = self.graph.edges[delta_edge]
augmenting_path = self.add_s_to_s_edge(x, y)
if augmenting_path is not None:
break
elif delta_type == 4:
# Expand the T-blossom that reached dual value 0 through
# the delta update.
assert delta_blossom is not None
self.expand_t_blossom(delta_blossom)
else:
# No further improvement possible. End the stage.
assert delta_type == 1
break
# Augment the matching if an augmenting path was found.
if augmenting_path is not None:
self.augment_matching(augmenting_path)
# Remove all labels, clear queue.
self.reset_stage()
# Return True if the matching was augmented.
return (augmenting_path is not None)
def _verify_blossom_edges(
ctx: _MatchingContext,
blossom: _NonTrivialBlossom,
edge_slack_2x: list[float]
) -> None:
"""Descend down the blossom tree to find edges that are contained
in blossoms.
Adjust the slack of all contained edges to account for the dual variables
of its containing blossoms.
On the way down, keep track of the sum of dual variables of
the containing blossoms.
On the way up, keep track of the total number of matched edges
in the subblossoms. Then check that all blossoms with non-zero
dual variable are "full".
Raises:
MatchingError: If a blossom with non-zero dual is not full.
"""
num_vertex = ctx.graph.num_vertex
# For each vertex "x",
# "vertex_depth[x]" is the depth of the smallest blossom on
# the current descent path that contains "x".
vertex_depth: list[int] = num_vertex * [0]
# Keep track of the sum of blossom duals at each depth along
# the current descent path.
path_sum_dual: list[float] = [0]
# Keep track of the number of matched edges at each depth along
# the current descent path.
path_num_matched: list[int] = [0]
# Use an explicit stack to avoid deep recursion.
stack: list[tuple[_NonTrivialBlossom, int]] = [(blossom, -1)]
while stack:
(blossom, p) = stack[-1]
depth = len(stack)
if p == -1:
# We just entered this sub-blossom.
# Update the depth of all vertices in this sub-blossom.
for x in blossom.vertices():
vertex_depth[x] = depth
# Calculate the sub of blossoms at the current depth.
path_sum_dual.append(path_sum_dual[-1] + blossom.dual_var)
# Initialize the number of matched edges at the current depth.
path_num_matched.append(0)
p += 1
if p < len(blossom.subblossoms):
# Update the sub-blossom pointer at the current level.
stack[-1] = (blossom, p + 1)
# Examine the next sub-blossom at the current level.
sub = blossom.subblossoms[p]
if isinstance(sub, _NonTrivialBlossom):
# Prepare to descent into the selected sub-blossom and
# scan it recursively.
stack.append((sub, -1))
else:
# Handle this trivial sub-blossom.
# Scan its adjacent edges and find the smallest blossom
# that contains each edge.
for e in ctx.graph.adjacent_edges[sub.base_vertex]:
(x, y, _w) = ctx.graph.edges[e]
# Only process edges that are ordered out from this
# sub-blossom. This ensures that we process each edge in
# the blossom only once.
if x == sub.base_vertex:
edge_depth = vertex_depth[y]
if edge_depth > 0:
# This edge is contained in an ancestor blossom.
# Update its slack.
edge_slack_2x[e] += 2 * path_sum_dual[edge_depth]
# Update the number of matched edges in ancestor.
if ctx.vertex_mate[x] == y:
path_num_matched[edge_depth] += 1
else:
# We are now leaving the current sub-blossom.
# Count the number of vertices inside this blossom.
blossom_vertices = blossom.vertices()
blossom_num_vertex = len(blossom_vertices)
# Check that all blossoms are "full".
# A blossom is full if all except one of its vertices are
# matched to another vertex in the blossom.
blossom_num_matched = path_num_matched[depth]
if blossom_num_vertex != 2 * blossom_num_matched + 1:
raise MatchingError(
"Verification failed: blossom non-full"
f" dual={blossom.dual_var}"
f" nvertex={blossom_num_vertex}"
f" nmatched={blossom_num_matched}")
# Update the number of matched edges in the parent blossom to
# take into account the matched edges in this blossom.
path_num_matched[depth - 1] += path_num_matched[depth]
# Revert the depth of the vertices in this sub-blossom.
for x in blossom_vertices:
vertex_depth[x] = depth - 1
# Trim the descending path.
path_sum_dual.pop()
path_num_matched.pop()
# Remove the current blossom from the stack.
# We thus continue our scan of the parent blossom.
stack.pop()
def _verify_optimum(ctx: _MatchingContext) -> None:
"""Verify that the optimum solution has been found.
This function takes time O(n**2).
Raises:
MatchingError: If the solution is not optimal.
"""
num_vertex = ctx.graph.num_vertex
num_edge = len(ctx.graph.edges)
# Check that each matched edge actually exists in the graph.
num_matched_vertex = 0
for x in range(num_vertex):
y = ctx.vertex_mate[x]
if y != -1:
if ctx.vertex_mate[y] != x:
raise MatchingError(
"Verification failed:"
f" asymmetric match of vertex {x} and {y}")
num_matched_vertex += 1
num_matched_edge = 0
for (x, y, _w) in ctx.graph.edges:
if ctx.vertex_mate[x] == y:
num_matched_edge += 1
if num_matched_vertex != 2 * num_matched_edge:
raise MatchingError(
f"Verification failed: {num_matched_vertex} matched vertices"
f" inconsistent with {num_matched_edge} matched edges")
# Check that all dual variables are non-negative.
for x in range(num_vertex):
if ctx.vertex_dual_2x[x] < 0:
raise MatchingError(
"Verification failed:"
f" vertex {x} has negative dual {ctx.vertex_dual_2x[x]/2}")
for blossom in ctx.nontrivial_blossom:
if blossom.dual_var < 0:
raise MatchingError("Verification failed:"
f" negative blossom dual {blossom.dual_var}")
# Check that all unmatched vertices have zero dual.
for x in range(num_vertex):
if ctx.vertex_mate[x] == -1 and ctx.vertex_dual_2x[x] != 0:
raise MatchingError(
f"Verification failed: Unmatched vertex {x}"
f" has non-zero dual {ctx.vertex_dual_2x[x]/2}")
# Calculate the slack of each edge.
# A correction will be needed for edges inside blossoms.
edge_slack_2x: list[float] = [
ctx.vertex_dual_2x[x] + ctx.vertex_dual_2x[y] - 2 * w
for (x, y, w) in ctx.graph.edges]
# Descend down each top-level blossom.
# Adjust edge slacks to account for the duals of its containing blossoms.
# And check that all blossoms are full.
# This takes total time O(n**2).
for blossom in ctx.nontrivial_blossom:
if blossom.parent is None:
_verify_blossom_edges(ctx, blossom, edge_slack_2x)
# We now know the correct slack of each edge.
# Check that all edges have non-negative slack.
min_edge_slack = min(edge_slack_2x)
if min_edge_slack < 0:
raise MatchingError(
f"Verification failed: negative edge slack {min_edge_slack/2}")
# Check that all matched edges have zero slack.
for e in range(num_edge):
(x, y, _w) = ctx.graph.edges[e]
if ctx.vertex_mate[x] == y and edge_slack_2x[e] != 0:
raise MatchingError(
"Verification failed:"
f" matched edge ({x}, {y}) has slack {edge_slack_2x[e]/2}")
# Optimum solution confirmed.