Link Search Menu Expand Document

Graphs

  1. GraphConfig
  2. Subgraph
  3. Cycles
    1. Back Edge Annotation
    2. Initial Packet
    3. Delay in a Loop
    4. Early Termination of a Calculator When One Input Stream is Done
    5. Relevant Source Code
      1. Delay Calculator
      2. Graph Config

GraphConfig

A GraphConfig is a specification that describes the topology and functionality of a MediaPipe graph. In the specification, a node in the graph represents an instance of a particular calculator. All the necessary configurations of the node, such its type, inputs and outputs must be described in the specification. Description of the node can also include several optional fields, such as node-specific options, input policy and executor, discussed in Synchronization.

GraphConfig has several other fields to configure the global graph-level settings, eg, graph executor configs, number of threads, and maximum queue size of input streams. Several graph-level settings are useful for tuning the performance of the graph on different platforms (eg, desktop v.s. mobile). For instance, on mobile, attaching a heavy model-inference calculator to a separate executor can improve the performance of a real-time application since this enables thread locality.

Below is a trivial GraphConfig example where we have series of passthrough calculators :

# This graph named main_pass_throughcals_nosubgraph.pbtxt contains 4
# passthrough calculators.
input_stream: "in"
node {
    calculator: "PassThroughCalculator"
    input_stream: "in"
    output_stream: "out1"
}
node {
    calculator: "PassThroughCalculator"
    input_stream: "out1"
    output_stream: "out2"
}
node {
    calculator: "PassThroughCalculator"
    input_stream: "out2"
    output_stream: "out3"
}
node {
    calculator: "PassThroughCalculator"
    input_stream: "out3"
    output_stream: "out4"
}

Subgraph

To modularize a CalculatorGraphConfig into sub-modules and assist with re-use of perception solutions, a MediaPipe graph can be defined as a Subgraph. The public interface of a subgraph consists of a set of input and output streams similar to a calculator’s public interface. The subgraph can then be included in an CalculatorGraphConfig as if it were a calculator. When a MediaPipe graph is loaded from a CalculatorGraphConfig, each subgraph node is replaced by the corresponding graph of calculators. As a result, the semantics and performance of the subgraph is identical to the corresponding graph of calculators.

Below is an example of how to create a subgraph named TwoPassThroughSubgraph.

  1. Defining the subgraph.

    # This subgraph is defined in two_pass_through_subgraph.pbtxt
    # and is registered as "TwoPassThroughSubgraph"
    
    type: "TwoPassThroughSubgraph"
    input_stream: "out1"
    output_stream: "out3"
    
    node {
        calculator: "PassThroughCalculator"
        input_stream: "out1"
        output_stream: "out2"
    }
    node {
        calculator: "PassThroughCalculator"
        input_stream: "out2"
        output_stream: "out3"
    }
    

    The public interface to the subgraph consists of:

    • Graph input streams
    • Graph output streams
    • Graph input side packets
    • Graph output side packets
  2. Register the subgraph using BUILD rule mediapipe_simple_subgraph. The parameter register_as defines the component name for the new subgraph.

    # Small section of BUILD file for registering the "TwoPassThroughSubgraph"
    # subgraph for use by main graph main_pass_throughcals.pbtxt
    
    mediapipe_simple_subgraph(
        name = "twopassthrough_subgraph",
        graph = "twopassthrough_subgraph.pbtxt",
        register_as = "TwoPassThroughSubgraph",
        deps = [
                "//mediapipe/calculators/core:pass_through_calculator",
                "//mediapipe/framework:calculator_graph",
        ],
    )
    
  3. Use the subgraph in the main graph.

    # This main graph is defined in main_pass_throughcals.pbtxt
    # using subgraph called "TwoPassThroughSubgraph"
    
    input_stream: "in"
    node {
        calculator: "PassThroughCalculator"
        input_stream: "in"
        output_stream: "out1"
    }
    node {
        calculator: "TwoPassThroughSubgraph"
        input_stream: "out1"
        output_stream: "out3"
    }
    node {
        calculator: "PassThroughCalculator"
        input_stream: "out3"
        output_stream: "out4"
    }
    

Cycles

By default, MediaPipe requires calculator graphs to be acyclic and treats cycles in a graph as errors. If a graph is intended to have cycles, the cycles need to be annotated in the graph config. This page describes how to do that.

NOTE: The current approach is experimental and subject to change. We welcome your feedback.

Please use the CalculatorGraphTest.Cycle unit test in mediapipe/framework/calculator_graph_test.cc as sample code. Shown below is the cyclic graph in the test. The sum output of the adder is the sum of the integers generated by the integer source calculator.

a cyclic graph that adds a stream of integers

This simple graph illustrates all the issues in supporting cyclic graphs.

Back Edge Annotation

We require that an edge in each cycle be annotated as a back edge. This allows MediaPipe’s topological sort to work, after removing all the back edges.

There are usually multiple ways to select the back edges. Which edges are marked as back edges affects which nodes are considered as upstream and which nodes are considered as downstream, which in turn affects the priorities MediaPipe assigns to the nodes.

For example, the CalculatorGraphTest.Cycle test marks the old_sum edge as a back edge, so the Delay node is considered as a downstream node of the adder node and is given a higher priority. Alternatively, we could mark the sum input to the delay node as the back edge, in which case the delay node would be considered as an upstream node of the adder node and is given a lower priority.

Initial Packet

For the adder calculator to be runnable when the first integer from the integer source arrives, we need an initial packet, with value 0 and with the same timestamp, on the old_sum input stream to the adder. This initial packet should be output by the delay calculator in the Open() method.

Delay in a Loop

Each loop should incur a delay to align the previous sum output with the next integer input. This is also done by the delay node. So the delay node needs to know the following about the timestamps of the integer source calculator:

  • The timestamp of the first output.

  • The timestamp delta between successive outputs.

We plan to add an alternative scheduling policy that only cares about packet ordering and ignores packet timestamps, which will eliminate this inconvenience.

Early Termination of a Calculator When One Input Stream is Done

By default, MediaPipe calls the Close() method of a non-source calculator when all of its input streams are done. In the example graph, we want to stop the adder node as soon as the integer source is done. This is accomplished by configuring the adder node with an alternative input stream handler, EarlyCloseInputStreamHandler.

Relevant Source Code

Delay Calculator

Note the code in Open() that outputs the initial packet and the code in Process() that adds a (unit) delay to input packets. As noted above, this delay node assumes that its output stream is used alongside an input stream with packet timestamps 0, 1, 2, 3, …

class UnitDelayCalculator : public Calculator {
 public:
  static absl::Status FillExpectations(
      const CalculatorOptions& extendable_options, PacketTypeSet* inputs,
      PacketTypeSet* outputs, PacketTypeSet* input_side_packets) {
    inputs->Index(0)->Set<int>("An integer.");
    outputs->Index(0)->Set<int>("The input delayed by one time unit.");
    return absl::OkStatus();
  }

  absl::Status Open() final {
    Output()->Add(new int(0), Timestamp(0));
    return absl::OkStatus();
  }

  absl::Status Process() final {
    const Packet& packet = Input()->Value();
    Output()->AddPacket(packet.At(packet.Timestamp().NextAllowedInStream()));
    return absl::OkStatus();
  }
};

Graph Config

Note the back_edge annotation and the alternative input_stream_handler.

node {
  calculator: 'GlobalCountSourceCalculator'
  input_side_packet: 'global_counter'
  output_stream: 'integers'
}
node {
  calculator: 'IntAdderCalculator'
  input_stream: 'integers'
  input_stream: 'old_sum'
  input_stream_info: {
    tag_index: ':1'  # 'old_sum'
    back_edge: true
  }
  output_stream: 'sum'
  input_stream_handler {
    input_stream_handler: 'EarlyCloseInputStreamHandler'
  }
}
node {
  calculator: 'UnitDelayCalculator'
  input_stream: 'sum'
  output_stream: 'old_sum'
}