Shortcuts

Source code for exir.backend.backend_api

# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

import copy
import logging
from contextlib import contextmanager
from functools import singledispatch
from typing import Generator, List, Type

import torch

from executorch.exir.backend.backend_details import BackendDetails, PreprocessResult
from executorch.exir.backend.compile_spec_schema import CompileSpec

from executorch.exir.backend.partitioner import (
    Partitioner,
    PartitionResult,
    TPartitioner,
)
from executorch.exir.backend.utils import is_identical_graph

from executorch.exir.delegate import executorch_call_delegate, get_lowered_module_name

from executorch.exir.graph_module import get_control_flow_submodules
from executorch.exir.lowered_backend_module import (
    _get_new_signature,
    create_exported_program_from_submodule,
    create_submodule_from_nodes,
    LoweredBackendModule,
)
from executorch.exir.pass_base import ExportPass
from torch.export import ExportedProgram


[docs]@singledispatch def to_backend(args): """ A generic function the dispatch happens on the type of the first argument. There are currently to overloaded to_backend function: Note: Python is dynamically-typed language and therefore cannot have proper method overloading as that requires the language to be able to discriminate between types at compile-time. @to_backend.register will attach the function to to_backend() base on the type of the first argument (type annotation is required). However, it can't take multiple types as arguments. :: def to_backend( backend_id: str, edge_graph_module: ExportedProgram, compile_specs: List[CompileSpec], ) -> LoweredBackendModule: def to_backend( graph_module: torch.fx.GraphModule, partitioner: Type[TPartitioner], ) -> torch.fx.GraphModule """ pass
@to_backend.register def _( backend_id: str, edge_program: ExportedProgram, compile_specs: List[CompileSpec], ) -> LoweredBackendModule: """ Add overloaded implementations for to_backend: :: def to_backend( backend_id: str, edge_program: ExportedProgram, compile_specs: List[CompileSpec], ) -> LoweredBackendModule: Requires the passed in exported program in Edge dialect to be executed in the backend identified by backend_id. The forward method of the given edge_graph_module will be targeted for execution. Args: backend_id: The backend identifier. exported_program: An exported program in Edge dialect to target for lowering to the backend. compile_specs: A list of backend-specific objects with static metadata to configure the "compilation" process (e.g. it could be another dictionary itself). Returns: LoweredBackendModule: A Module that has been lowered to the target backend. Internally, the lowered Module contains these special attributes: backend_id (str: backend id), __processed_module__ (str: a compiled module) compile_spec, original_module (original exported program) Raises: NotImplementedError: The backend is not implemented (e.g. it was not found). This exception is derived from RuntimeError and should be caught accordingly. RuntimeError: The module cannot be processed by the backend. """ assert isinstance(edge_program, ExportedProgram) # All backend implementation are final, so we don't need to consider nested subclasses. for cls in BackendDetails.__subclasses__(): if backend_id == cls.__name__: copied_edge_program = copy.deepcopy(edge_program) preprocess_result: PreprocessResult = cls.preprocess( copied_edge_program, compile_specs, ) lowered_module = LoweredBackendModule( edge_program=edge_program, backend_id=backend_id, processed_bytes=preprocess_result.processed_bytes, compile_specs=compile_specs, ) lowered_module.meta = { "debug_handle_map": preprocess_result.debug_handle_map } return lowered_module raise NotImplementedError(f"Backend {backend_id} was not found.") _ENABLE_VALIDATION: bool = True def disable_validation() -> None: """Disables validation""" global _ENABLE_VALIDATION _ENABLE_VALIDATION = False @contextmanager def validation_disabled() -> Generator[None, None, None]: """ Disables checking functions (ex. if the partitioned graph is identical to the original graph). This context manager should only be used in certain scenarios (such as when it has been profiled that checks are taking too long, and are not necessarily needed) """ global _ENABLE_VALIDATION existing_setting = _ENABLE_VALIDATION disable_validation() try: yield finally: _ENABLE_VALIDATION = existing_setting def _partition_and_lower( tagged_graph_module: torch.fx.GraphModule, partition_result: PartitionResult, owning_program: ExportedProgram, ) -> torch.fx.GraphModule: for tag, delegation_spec in partition_result.partition_tags.items(): # Create partition with nodes containing this tag. There should only be # one contained submodule per tag node_list = [] for node in tagged_graph_module.graph.nodes: if node.meta.get("delegation_tag", "") == tag: node_list.append(node) if len(node_list) == 0: logging.debug(f"Did not find any nodes for tag {tag}") continue logging.debug(f"For tag {tag}, found nodes {node_list}") # Tag the nodes that are params as buffers, so we can order the submodule as (Parms + Buffers) (User Inputs) submodule, call_module_node = create_submodule_from_nodes( tagged_graph_module, node_list, tag ) tagged_graph_module_output_node = [ node for node in tagged_graph_module.graph.nodes if node.op == "output" ] submodule_output_node = [ node for node in submodule.graph.nodes if node.op == "output" ] # Copy the output node meta from the original output node, because create_submodule_from_nodes doesn't cover the meta field submodule_output_node[0].meta = tagged_graph_module_output_node[0].meta logging.debug(f"Partitioned graph module: {tagged_graph_module}") submodule_program = create_exported_program_from_submodule( submodule, owning_program ) lowered_submodule = to_backend( delegation_spec.backend_id, submodule_program, delegation_spec.compile_specs, ) # call delegate args should only use user_inputs call_delegate_args = [] for inp in call_module_node.all_input_nodes: if inp.name in submodule_program.graph_signature.user_inputs: call_delegate_args.append(inp) # Replace the partitioned submodule with a lowered submodule # Add call_method node with function "forward" with tagged_graph_module.graph.inserting_before(call_module_node): lowered_name = get_lowered_module_name( tagged_graph_module, lowered_submodule ) lowered_node = tagged_graph_module.graph.get_attr(lowered_name) call_delegate_node = tagged_graph_module.graph.call_function( executorch_call_delegate, (lowered_node,) + tuple(call_delegate_args), call_module_node.kwargs, ) call_delegate_node.meta["debug_handle"] = len( tagged_graph_module.graph.nodes ) call_module_node.replace_all_uses_with(call_delegate_node) tagged_graph_module.graph.erase_node(call_module_node) # Delete all parameters/buffers consumed by the created exported program toplevel_signature = owning_program.graph_signature for node in tagged_graph_module.graph.nodes: # Find placeholders consumed by the delegate if node.op != "placeholder" or len(node.users) != 0: continue if node.name in toplevel_signature.inputs_to_buffers: # Delete the consumed buffers buffer_name = toplevel_signature.inputs_to_buffers.pop(node.name) toplevel_signature.buffers.remove(buffer_name) owning_program.state_dict.pop(buffer_name) tagged_graph_module.graph.erase_node(node) elif node.name in toplevel_signature.inputs_to_parameters: # Delete the consumed parameters param_name = toplevel_signature.inputs_to_parameters.pop(node.name) toplevel_signature.parameters.remove(param_name) owning_program.state_dict.pop(param_name) tagged_graph_module.graph.erase_node(node) tagged_graph_module.recompile() # Recursively partition and lower for submodules for name, submod, _node in get_control_flow_submodules(tagged_graph_module): partitioned_submodule = _partition_and_lower( submod, partition_result, owning_program ) tagged_graph_module.add_module(name, partitioned_submodule) # Run the export pass over the graph module so that the call delegate # nodes will match Edge dialect # TODO(angelayi): ExportPass will rerun the graph, however all we need # here is to add metadata to the call delegate nodes to preserve Edge # dialect. There's work going on to generate a random tensor from a # fake tensor and possibly it can help to address the issue. res = ExportPass()(tagged_graph_module) assert res is not None tagged_graph_module = res.graph_module return tagged_graph_module @to_backend.register def _( edge_program: ExportedProgram, partitioner: Type[TPartitioner], ) -> ExportedProgram: """ Add overloaded implementations for to_backend: :: def to_backend( edge_program: ExportedProgram, partitioner: Type[TPartitioner], ) -> ExportedProgram: Returns a semantically-equivalent program to the one given as input (represented as a graph module in Edge dialect), but with portions of the program targeted for delegation as determined by the partitioner. Args: ExportedProgram: Program in Edge dialect. partitioner: An instance of the Partitioner class type, in charge with tagging portions of the input program for delegation. A valid partitioner must have partition_tags: Dict[str, DelegationSpec], where each key is a tag name and the nodes with same tag will be fused a one subgraph and delegated to backend specififed in delegation spec. Returns: ExportedProgram: The input program, with some portions targeted for delegation. """ copied_edge_program = copy.deepcopy(edge_program) # Call the partitioner on the given graph module partitioner_instance: Partitioner = partitioner() partitioner_result = partitioner_instance(copied_edge_program) tagged_exported_program = partitioner_result.tagged_exported_program # Check that the partitioner did not modify the original graph if _ENABLE_VALIDATION: assert is_identical_graph( tagged_exported_program.graph_module, edge_program.graph_module, ), f"The partitioner {partitioner} should not modify the graph module" else: logging.warning("Disabled validating the partitioner.") assert ( partitioner_result.partition_tags is not None ), f"Partitioner {partitioner} needs a `partition_tags` field containing a mapping of tags to delegate spec" tagged_graph_module = _partition_and_lower( tagged_exported_program.graph_module, partitioner_result, edge_program ) # TODO(angelayi): Update this signature in a less manual way (maybe through # retracing) new_signature, new_state_dict = _get_new_signature( edge_program, tagged_graph_module ) return ExportedProgram( tagged_graph_module, tagged_graph_module.graph, new_signature, new_state_dict, copy.deepcopy(edge_program.range_constraints), copy.deepcopy(edge_program.equality_constraints), copy.deepcopy(edge_program.module_call_graph), )

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources