Source code for graphqomb.focus_flow

"""Focus flow algorithm.

This module provides:

- `is_focused`: Check if a flowlike object is focused.
- `focus_gflow`: Focus a flowlike object.
"""

from __future__ import annotations

from graphlib import TopologicalSorter
from typing import TYPE_CHECKING

from graphqomb.common import Plane
from graphqomb.feedforward import _is_flow, _is_gflow, check_flow, dag_from_flow
from graphqomb.graphstate import odd_neighbors

if TYPE_CHECKING:
    from collections.abc import Iterable, Mapping, Sequence
    from collections.abc import Set as AbstractSet

    from graphqomb.graphstate import BaseGraphState


[docs] def is_focused(flowlike: Mapping[int, int] | Mapping[int, AbstractSet[int]], graph: BaseGraphState) -> bool: r"""Check if a flowlike object is focused. Parameters ---------- flowlike : `collections.abc.Mapping`\[`int`, `int`\] | `collections.abc.Mapping`\[`int`, `collections.abc.Set`\[`int`\]\]` flowlike object graph : `BaseGraphState` graph state Returns ------- `bool` True if the flowlike object is focused, False otherwise Raises ------ TypeError If the flowlike object is not a Flow or GFlow """ # noqa: E501 meas_bases = graph.meas_bases outputs = set(graph.output_node_indices) focused = True for node in set(flowlike) - outputs: if _is_flow(flowlike): for child in graph.neighbors(flowlike[node]) - outputs: focused &= node == child elif _is_gflow(flowlike): for child in flowlike[node]: if child in outputs: continue focused &= (meas_bases[child].plane == Plane.XY) or (node == child) for child in odd_neighbors(flowlike[node], graph): if child in outputs: continue focused &= (meas_bases[child].plane != Plane.XY) or (node == child) else: msg = "Invalid flowlike object" raise TypeError(msg) return focused
[docs] def focus_gflow( flowlike: Mapping[int, int] | Mapping[int, AbstractSet[int]], graph: BaseGraphState ) -> dict[int, set[int]]: r"""Focus a flowlike object. Parameters ---------- flowlike : `collections.abc.Mapping`\[`int`, `int`\] | `collections.abc.Mapping`\[`int`, `collections.abc.Set`\[`int`\]\] flowlike object graph : `BaseGraphState` graph state Returns ------- `dict`\[`int`, `set`\[`int`\]\] focused flowlike object Raises ------ TypeError If the flowlike object is not a Flow or GFlow """ # noqa: E501 if _is_flow(flowlike): flowlike = {key: {value} for key, value in flowlike.items()} elif _is_gflow(flowlike): flowlike = {key: set(value) for key, value in flowlike.items()} else: msg = "Invalid flowlike object" raise TypeError(msg) check_flow(graph, flowlike) outputs = graph.physical_nodes - set(flowlike) dag = dag_from_flow(graph, flowlike) topo_order = list(TopologicalSorter(dag).static_order()) topo_order.reverse() # children first for output in outputs: topo_order.remove(output) for target in topo_order: flowlike = _focus(target, flowlike, graph, topo_order) return flowlike
def _focus( target: int, gflow: dict[int, set[int]], graph: BaseGraphState, topo_order: Sequence[int] ) -> dict[int, set[int]]: r"""Subroutine of the focus_gflow function. Parameters ---------- target : `int` target node to be focused gflow : `dict`\[`int`, `set`\[`int`\]\] gflow object graph : `BaseGraphState` graph state topo_order : `collections.abc.Sequence`\[`int`\] topological order of the graph state Returns ------- `dict`\[`int`, `set`\[`int`\] flowlike object after focusing the target node """ k = 0 s_k = _find_unfocused_corrections(target, gflow, graph) while s_k: gflow = _update_gflow(target, gflow, s_k, topo_order) s_k = _find_unfocused_corrections(target, gflow, graph) k += 1 return gflow def _find_unfocused_corrections(target: int, gflow: dict[int, set[int]], graph: BaseGraphState) -> set[int]: r"""Subroutine of the _focus function. Parameters ---------- target : `int` target node gflow : `dict`\[`int`, `set`\[`int`\] flowlike object graph : `BaseGraphState` graph state Returns ------- `set`\[`int`\] set of unfocused corrections """ meas_bases = graph.meas_bases non_outputs = set(gflow) - set(graph.output_node_indices) s_xy_candidate = odd_neighbors(gflow[target], graph) & non_outputs - {target} s_xz_candidate = gflow[target] & non_outputs - {target} s_yz_candidate = gflow[target] & non_outputs - {target} s_xy = {node for node in s_xy_candidate if meas_bases[node].plane == Plane.XY} s_xz = {node for node in s_xz_candidate if meas_bases[node].plane == Plane.XZ} s_yz = {node for node in s_yz_candidate if meas_bases[node].plane == Plane.YZ} return s_xy | s_xz | s_yz def _update_gflow( target: int, gflow: dict[int, set[int]], s_k: Iterable[int], topo_order: Sequence[int] ) -> dict[int, set[int]]: r"""Subroutine of the _focus function. Parameters ---------- target : `int` target node gflow : `dict`\[`int`, `set`\[`int`\] flowlike object s_k : `Iterable`\[`int`\] unfocused correction topo_order : `collections.abc.Sequence`\[`int`\] topological order of the graph state Returns ------- `dict`\[`int`, `set`\[`int`\] gflow object after updating the target node """ minimal_in_s_k = min(s_k, key=topo_order.index) gflow[target] ^= gflow[minimal_in_s_k] return gflow