Skip to content

0 总体概览

总体目标:使用310P算力,实现机械臂抓取。本文档提供了 ACT,DP 两种较典型分层模型的离线部署参考。



1 环境搭建

支持操作系统:Ubuntu22.04;OpenEuler

NPU:Ascend 310P

  表1 组件安装顺序表

组件安装顺序

版本

安装指导

1. Ubuntu

22.04

OS安装

2. Firmware和Driver

24.RC1

驱动安装

3. CANN

8.2.RC1

CANN安装

4. Python

3.10.xx

Ubuntu22.04 自带

5. Pytorch & torch_npu

2.5.1

Pytorch&torch_npu安装指导

6. aie_bench

0.0.2

ais_bench安装指导

7. ACL Lite

/

ACLLite安装指导

  表1 版本配套表

  

2 ACT: Action Chunking with Transformers

2.0 实验目的


本章描述了如何在 310P 开发板上对训练好的 ACT 模型进行离线模型转换及推理。


其中:

2.1 节简短的介绍了ACT模型及架构细节;

2.2 节定义了模型详细的参数化输入及输出格式;

2.3 节详细描述了模型转换及推理部署的流程。


2.1 模型概述

ACT(Action Chunking with Transformers)是一种“视觉→动作”的端到端模仿学习模型:用多视角RGB图像及机器人本体状态作为输入,经过卷积特征提取+Transformer 编码,再用带条件VAE的解码头一次性输出接下来 k 步的动作序列(如关节/末端位姿轨迹),并在执行时做时间集成来平滑控制。


训练时:ACT 整体被建模为一个 Conditional VAE。训练时有个CVAE 编码器把“真实动作序列 + 关节状态”等压到潜变量 Z;策略部分则由 CVAE 解码器负责,其具体实现是 ResNet + Transformer Encoder + Decoder,以多视角图像的Transformer Encoder特征 + 潜变量 z 为条件,一次解出接下来 k 步动作。

推理时:丢弃 CVAE 编码器,只保留 CVAE 解码器的部分(见上图),Z 来自先验(常用均值/0)。随后再做时间集成平滑控制。


两个关键创新是:

① 动作分块(Action Chunking),把长时序决策改为预测长度为k的短块,减少误差累积;

② 时间集成(Temporal Ensembling),多次预测同一时刻动作并加权平均,提升稳健性。


实证表明在少量示范下就能在(双臂)操作任务中实现稳定、高成功率的精细操作,如插拔、装配、开合与贴合等。


2.2 模型输入输出

输入数据:

Nb = Batch 大小,默认为1

Ni = 图片数量,训练时决定,可根据具体任务及机器人构型决策

Ns = 状态维度,机器人选型决定

输入数据名(name)

数据类型(dtype)

数据大小(shape)

rgb

float32

[Nb , Ni , 3 , 480 , 640]

state

float32

[Nb , Ns]

输出数据:

Nk = 一个chunk包含的步数k

输入数据名(name)

数据类型(dtype)

数据大小(shape)

joint_states

float32

[Nb , Nk , Ns]


注:输入及输出的机械臂状态,state,均为机械臂关节的绝对位置。


2.3 推理部署

本节提供模型离线推理模式(通过OM文件, Offline Model)部署参考。

2.3.1 模型文件

原ACT代码仓:github.com

LeRobot代码仓:github.com

两个仓均支持ACT模型训练,训练细节本文不做阐述,详见代码仓。


2.3.2 Onnx 模型转换

利用 torch.onnx.export (API 参考:docs.pytorch.ac.cn),将训练/加载完成的 torch model 转化为 onnx 格式并保存。

此步操作在用于训练的电脑上即可完成,不强制 310P。

详见代码:已适配ACT模型,image_shape 及 state_dim 根据训练时 config 传入。

python
import torch
import torch.onnx
import os

def convert_to_onnx_act(model, om_name="output", output_dir="./", image_shape=None, state_dim=None):
    output_path = os.path.join(output_dir, om_name)
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    print("Saving Onnx Model file to:", output_path)

    device = next(model.parameters()).device # 获取model所在设备,确保dummy输入与model在同一设备上

    # 图像输入
    image_input = torch.randn(1, image_shape, dtype=torch.float32).to(device)  # [batch_size==1, num_cam, channels, height, width]

    # 机器人状态输入
    robot_state_input = torch.randn(1, state_dim, dtype=torch.float32).to(device)  # [batch_size==1, state_dim]

    # 模型设置为推理模式
    model.eval()

    torch.onnx.export(model,
                      (robot_state_input,image_input),  #随机输入
                      os.path.join(output_path,om_name+".onnx"),
                      input_names=["state","rgb"],  # 构造输入名
                      output_names=["joint_state"],  # 构造输出名
                      opset_version=12,  # CANN 8.0.0 支持opset v11~v15版本的算子
                      dynamic_axes={"input": {0: "batch_size"}, "output": {0: "batch_size"}}, verbose=True)  # 支持输出动态轴

    inputs_info = f"state:{robot_state_input.shape[0]},{robot_state_input.shape[1]};rgb:{image_input.shape[0]},{image_input.shape[1]},{image_input.shape[2]},{image_input.shape[3]},{image_input.shape[4]}"
    print("inputs_info:", inputs_info)


2.3.3 OM 模型转换

使用ATC工具将 .onnx 模型文件转为 .om 模型文件(昇腾支持)。

注:2.3.3 - 2.3.5 节的操作需在310P上执行。

1. 执行命令查看芯片名称(${chip_name})
r
npu-smi info
#该设备芯片名为Ascend310P1 (自行替换)
回显如下:
+--------------------------------------------------------------------------------------------------------+
| npu-smi 23.0.1                                   Version: 24.1.t35                                     |
+-------------------------------+-----------------+------------------------------------------------------+
| NPU     Name                  | Health          | Power(W)     Temp(C)           Hugepages-Usage(page) |
| Chip    Device                | Bus-Id          | AICore(%)    Memory-Usage(MB)                        |
+===============================+=================+======================================================+
| 0       310P1                 | OK              | NA           49                0     / 0             |
| 0       0                     | NA              | 0            NA / 44252                              |
+===============================+=================+======================================================+
2. 执行ATC命令
shell
# ${output_dir} 及 ${om_name} 为上一步中设置的输出路径以及文件名
# ${inputs_info} 为上一步中打印的inputs_info, 例:"state:1,14;rbg:1,3,3,480,640", 加引号!
# ${chip_name}根据芯片名称替换,这里为 Ascend310P1

cd ${output_dir}/${om_name}
atc --model=${om_name}.onnx \
    --framework=5 \
    --output=${om_name} \
    --input_shape="${inputs_info}" \
    --soc_version=${chip_name}

  参数说明:

--model:为ONNX模型文件。
--framework:5代表ONNX模型。
--output:输出的OM模型名字。
--input_shape:输入数据的shape。
--soc_version:处理器型号。


3. 执行成功后获得<span style="font-weight: normal"> </span><span style="font-weight: bold"><b>.om 文件</b></span>

  预期输出:

2.3.4 功能/性能测试

作为om模型的快速验证(可选),此步骤需要用到 ais_bench 推理工具。安装见环境搭建中提供的链接。

ais_bench 会根据上一步指定的的输入形状,随机生成输入数据。

shell
cd ${output_dir}/${om_name}
python3 -m ais_bench --model ./${om_name}.om --loop 10 --debug 1

运行输出包含模型的输入输出参数信息及详细的推理耗时信息。

若无报错,证明推理功能上没问题。预期输出:

在310P1设备上测得 DP 模型单次推理 NPU 计算时间约 10 ms。


2.3.5 端到端推理/功能验证

训练脚本中会同时生成一个 dataset_stats.pkl 文件存放在 ckpt_dir 下,推理时需要将此文件复制至包含 .om 文件夹中。推理脚本会自动根据此文件对输入输出的机械臂状态进行预处理及后处理。


期待的文件夹格式为:

${output_dir}/${om_name}

  |-- ${om_name}.onnx (仅生成om文件时需要)

  |-- ${om_name}.om (离线模型)

  |-- dataset_stats.pkl (从ckpt目录复制过来,用于状态预/后处理)

  

推理逻辑:acl_inference.py

调用AclInference类时, 设model_dir = ${output_dir}/${om_name},task_name = ${om_name}

python
import acl
import numpy as np
import pickle
import os
import time

class AclInference:
    def __init__(self,model_dir,task_name):
        ret = acl.init()
        self.check_ret(ret, "Failed to init")
        self.device_id = acl.rt.get_device_count()[1]
        ret = acl.rt.set_device(self.device_id)
        self.check_ret(ret, "Failed to set device")
        self.stream, ret = acl.rt.create_stream()
        self.check_ret(ret, "Failed to create stream")

        # 初始化变量
        model_path = os.path.join(model_dir,task_name+".om")

        # 加载离线模型文件
        self.model_id, ret = acl.mdl.load_from_file(model_path)
        self.check_ret(ret, "Failed to load model from file.")

        # 初始化变量
        self.ACL_MEM_MALLOC_HUGE_FIRST = 0
        self.ACL_MEMCPY_HOST_TO_DEVICE = 1
        self.ACL_MEMCPY_DEVICE_TO_HOST = 2

        # 获取模型描述
        self.model_desc = acl.mdl.create_desc()
        ret = acl.mdl.get_desc(self.model_desc, self.model_id)
        self.check_ret(ret, "Failed to get model description.")

        # 准备输入数据集
        self.input_dataset = acl.mdl.create_dataset()
        self.input_size = acl.mdl.get_num_inputs(self.model_desc)
        self.input_data = []
        for i in range(self.input_size):
            buffer_size = acl.mdl.get_input_size_by_index(self.model_desc, i)
            buffer, ret = acl.rt.malloc(buffer_size, self.ACL_MEM_MALLOC_HUGE_FIRST)
            self.check_ret(ret, f"Failed to allocate memory for input {i}.")
            data = acl.create_data_buffer(buffer, buffer_size)
            _, ret = acl.mdl.add_dataset_buffer(self.input_dataset, data)
            self.check_ret(ret, f"Failed to add buffer to input dataset for input {i}.")
            self.input_data.append({"buffer": buffer, "size": buffer_size})

        # 准备输出数据集
        self.output_dataset = acl.mdl.create_dataset()
        self.output_size = acl.mdl.get_num_outputs(self.model_desc)
        self.output_data = []
        for i in range(self.output_size):
            buffer_size = acl.mdl.get_output_size_by_index(self.model_desc, i)
            buffer, ret = acl.rt.malloc(buffer_size, self.ACL_MEM_MALLOC_HUGE_FIRST)
            self.check_ret(ret, f"Failed to allocate memory for output {i}.")
            data = acl.create_data_buffer(buffer, buffer_size)
            _, ret = acl.mdl.add_dataset_buffer(self.output_dataset, data)
            self.check_ret(ret, f"Failed to add buffer to output dataset for output {i}.")
            self.output_data.append({"buffer": buffer, "size": buffer_size})

    def check_ret(self, ret, message):
        if ret != 0:
            raise Exception(f"{message} Error code: {ret}")

    def inference_acl(self, image_input, robot_state_input):
        # 将输入数据从Host传输到Device
        ret = acl.rt.memcpy(self.input_data[0]["buffer"], self.input_data[0]["size"], 
                            robot_state_input.ctypes.data, robot_state_input.nbytes, 
                            self.ACL_MEMCPY_HOST_TO_DEVICE)
        self.check_ret(ret, "Failed to copy robot state input data from host to device.")
        ret = acl.rt.memcpy(self.input_data[1]["buffer"], self.input_data[1]["size"], 
                            image_input.ctypes.data, image_input.nbytes, 
                            self.ACL_MEMCPY_HOST_TO_DEVICE)
        self.check_ret(ret, "Failed to copy image input data from host to device.")

        # 执行模型推理
        t = time.time()
        ret = acl.mdl.execute(self.model_id, self.input_dataset, self.output_dataset)
        print("Inference time:", time.time() - t)
        self.check_ret(ret, "Model execution failed.")

        # 处理模型推理的输出数据
        inference_result = []
        for i, item in enumerate(self.output_data):
            buffer_host, ret = acl.rt.malloc_host(item["size"])
            self.check_ret(ret, f"Failed to allocate host memory for output {i}.")
            ret = acl.rt.memcpy(buffer_host, item["size"], item["buffer"], item["size"], 
                                self.ACL_MEMCPY_DEVICE_TO_HOST)
            self.check_ret(ret, f"Failed to copy output data from device to host for output {i}.")
            bytes_out = acl.util.ptr_to_bytes(buffer_host, item["size"])
            data = np.frombuffer(bytes_out, dtype=np.float32)
            inference_result.append(data)
            acl.rt.free_host(buffer_host)

        return inference_result

    def release_resources(self):
        # 释放资源
        for item in self.input_data:
            acl.rt.free(item["buffer"])
        for item in self.output_data:
            acl.rt.free(item["buffer"])
        acl.mdl.destroy_dataset(self.input_dataset)
        acl.mdl.destroy_dataset(self.output_dataset)
        acl.mdl.destroy_desc(self.model_desc)
        acl.mdl.unload(self.model_id)
        acl.rt.destroy_stream(self.stream)
        acl.finalize()


为方便对接ROS或其它系统,将ACT相关的数据处理以及推理调用封装在一个 wrapper 类中:act_model_wrapper.py

python
import time
import pickle
import os
from contextlib import contextmanager

import numpy as np
from acl_inference import AclInference

NUMBER_OF_IMAGES = 3 # Ni, 按需修改!
STATE_DIM = 14 # Ns, 按需修改!

@contextmanager
def timer(name="Block"):
    start = time.perf_counter()
    yield
    end = time.perf_counter()
    print(f"[ACT model] [TIMER] {name} took {(end-start)*1000:.3f} ms")


class ACTModelWrapper:
    """
        ACT MODEL WRAPPER
        
        predict:
        INPUT (single batch): 
            1. [Ni, 3, 480, 640] (rgb, np.float32)
            2. [Ns] (state, np.float32)
        OUTPUT:
            1. [Nk, Ns] (joint_states, np.float32)
    """

    def __init__(self):
        task_name = "sim_insertion_scripted_policy_best" # 按需修改!
        model_dir = "/opt/data/sim_insertion_scripted_policy_best" # 按需修改!
        stats_path = os.path.join(model_dir,"dataset_stats.pkl")
        with open(stats_path, 'rb') as f:
            self.stats = pickle.load(f)

        self.model = AclInference(model_dir, task_name)

        self.pre_pos_process = lambda s_qpos: (s_qpos - self.stats['qpos_mean']) / self.stats['qpos_std']
        self.post_process = lambda a: a * self.stats['qpos_std'] + self.stats['qpos_mean']

        self._dummy_run()

    
    def _dummy_run(self):
        rgb_data = np.random.uniform(0, 255, size=(NUMBER_OF_IMAGES, 3, 480, 640))
        rgb_data = rgb_data / 255.0
        rgb_data = rgb_data.astype(np.float32)
        curr_image = rgb_data[np.newaxis,:]

        in_pos_numpy = np.random.rand(STATE_DIM).astype(np.float32)
        # in_pos = self.pre_pos_process(in_pos_numpy).astype(np.float32)
        in_pos = in_pos_numpy[np.newaxis,:]

        with timer("Dummy Run"):
            for i in range(10):
                self.model.inference_acl(curr_image, in_pos)

    def predict(self, in_dict):
        if "rgb" not in in_dict or "state" not in in_dict:
            print("Required data is not included. Check JSON format")
            return
        
        rgb_data = in_dict["rgb"] # expected shape [Ni, 3, 480, 640]  np array float32
        curr_image = rgb_data[np.newaxis,:]

        in_pos = in_dict["state"] # expected shape [Ns], np array float32
        in_pos = in_pos[np.newaxis,:]
        in_pos = self.pre_pos_process(in_pos)
        #with timer("Inference"):
        output_data = self.model.inference_acl(curr_image, in_pos)
        out_pos = output_data[0].reshape(-1, STATE_DIM)
        joint_states = self.post_process(out_pos)

        response_dict = {
            "status" : "success",
            "joint_states" : joint_states
        }

        return response_dict

if __name__=="__main__":
    m = ACTModelWrapper() # dummy run


wrapper 类仅对外开放一个predict方法,用于触发模型新一轮的推理并返回结果值。输入输出具体格式见 act_model_wrapper.py:22-29。


当用 python 直接执行 act_model_wrapper.py 时,__main__ 函数会实例化 wrapper 类并执行十次随机输入的 dummy run,预期输出:


predict 接口调用示例(随机输入):


3 DP: Diffusion Policy

3.0 实验目的

本章描述了如何在 310P 开发板上对训练好的 DP 模型进行离线模型转换及推理。


其中:

3.1 节简短的介绍了DP模型及架构细节;

3.2 节定义了模型详细的参数化输入及输出格式;

3.3 节详细描述了模型转换及推理部署的流程。


3.1 模型概述

Diffusion Policy(DP)是基于条件扩散模型训练的策略模型:先用视觉编码器把过去 To 帧观测嵌入成上下文,再用时间维度的 1D U-Net 扩散去噪网络在动作序列空间上迭代去噪,一次生成未来 TP 步动作;执行时采用滚动时域控制(每次只执行前 TA 步再重规划)。

典型值:TO=2,TP=16,TA=8

解决的核心问题

① Action Multimodality:对于同一输入,机械臂可能会有多个最优解。扩散模型能够很好的学习这类操作,相比于比单步/自回归模型更灵活,更抗噪。

② Action Space Scalability: 线性的长序列动作预测。长序列会使模型输出/机械臂操控更统一更稳定。

③ Training Stability:扩散模型的训练更稳定。


3.2 模型输入输出

输入数据:

Nb = Batch 大小,默认为1

TO = 观测窗口,典型值取2

Ni = 图片数量,训练时决定,可根据具体任务及机器人构型决策

Ns = 状态维度,机器人选型决定

输入数据名(name)

数据类型(dtype)

数据大小(shape)

rgb

float32

[Nb , TO , Ni , 3 , 480 , 640]

state

float32

[Nb , TO, Ns]

输出数据:

TP = 预测窗口,典型值取16

输入数据名(name)

数据类型(dtype)

数据大小(shape)

joint_states

float32

[Nb , TP , Ns]


3.3 推理部署

3.3.1 模型文件

同ACT模型,LeRobot同样支持DP模型的训练。训练细节本文不做阐述,详见代码仓。

原代码仓:github.com

LeRobot:github.com


3.3.2 Onnx 模型转换

利用 torch.onnx.export (API 参考:docs.pytorch.ac.cn),将训练/加载完成的 torch model 转化为 onnx 格式并保存。

此步操作在用于训练的电脑上即可完成,不强制 310P。

详见代码:已适配 DP 模型,image_shape 及 state_dim 根据训练时 config 传入。

python
import torch
import torch.onnx
import os

def convert_to_onnx_dp(model, om_name="output", output_dir="./", image_shape=None, state_dim=None):
    output_path = os.path.join(output_dir, om_name)
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    print("Saving Onnx Model file to:", output_path)

    device = next(model.parameters()).device # 获取model所在设备,确保dummy输入与model在同一设备上

    # 图像输入
    image_input = torch.randn(1, image_shape, dtype=torch.float32).to(device)  # [batch_size==1, observation_window, num_cam, channels, height, width]

    # 机器人状态输入
    robot_state_input = torch.randn(1, state_dim, dtype=torch.float32).to(device)  # [batch_size==1, observation_window, state_dim]

    # 模型设置为推理模式
    model.eval()

    torch.onnx.export(model,
                      (robot_state_input,image_input),  #随机输入
                      os.path.join(output_path,om_name+".onnx"),
                      input_names=["state","rgb"],  # 构造输入名
                      output_names=["joint_state"],  # 构造输出名
                      opset_version=12,  # CANN 8.0.0 支持opset v11~v15版本的算子
                      dynamic_axes={"input": {0: "batch_size"}, "output": {0: "batch_size"}}, verbose=True)  # 支持输出动态轴

    
    inputs_info = f"state:{robot_state_input.shape[0]},{robot_state_input.shape[1]},{robot_state_input.shape[2]};rgb:{image_input.shape[0]},{image_input.shape[1]},{image_input.shape[2]},{image_input.shape[3]},{image_input.shape[4]},{image_input.shape[5]}"
    print("inputs_info:", inputs_info)


3.3.3 OM 模型转换

步骤同 2.3.3 小节


3.3.4 功能/性能测试

步骤同 2.3.4 小节

在310P1设备上测得 DP 模型单次推理 NPU 计算时间约 70 ms。


3.3.5 端到端推理/功能验证

DP模型不依赖 dataset_stats.pkl 文件,故期待的文件夹格式(DP)为:

${output_dir}/${om_name}

  |-- ${om_name}.onnx (仅生成om文件时需要)

  |-- ${om_name}.om (离线模型)

  

acl_inference.py 文件可复用。见 2.3.5 小节


DP 模型 wrapper 类,dp_model_wrapper.py:

python
import time
import pickle
import os
from contextlib import contextmanager

import numpy as np
from acl_inference import AclInference

OBSERVATION_WINDOW = 2 # To, 按需修改!
PREDICTION_WINDOW = 16 # Tp, 按需修改!
NUMBER_OF_IMAGES = 2 # Ni, 按需修改!
STATE_DIM = 14 # Ns, 按需修改!

@contextmanager
def timer(name="Block"):
    start = time.perf_counter()
    yield
    end = time.perf_counter()
    print(f"[DP model] [TIMER] {name} took {(end-start)*1000:.3f} ms")


class DPModelWrapper:
    """
        DP MODEL WRAPPER

        predict:
        INPUT (single batch): 
            1. [To, Ni, 3, 480, 640] (rgb, np.float32)
            2. [To, Ns] (state, np.float32)
        OUTPUT:
            1. [Tp, Ns] (joint_states, np.float32)
    """

    def __init__(self):
        task_name = "sim_insertion_scripted_policy_best" # 按需修改!
        model_dir = "/opt/data/sim_insertion_scripted_policy_best" # 按需修改!

        self.model = AclInference(model_dir, task_name)

        self.dummy_run()

    
    def dummy_run(self):
        rgb_data = np.random.uniform(0, 255, size=(OBSERVATION_WINDOW, NUMBER_OF_IMAGES, 3, 480, 640))
        rgb_data = rgb_data / 255.0
        rgb_data = rgb_data.astype(np.float32)
        curr_image = rgb_data[np.newaxis,:]

        in_pos_numpy = np.random.rand(OBSERVATION_WINDOW, STATE_DIM).astype(np.float32)
        in_pos = in_pos_numpy[np.newaxis,:]

        with timer("Dummy Run"):
            for i in range(10):
                self.model.inference_acl(curr_image, in_pos)

    def predict(self, in_dict):
        if "rgb" not in in_dict or "state" not in in_dict:
            print("Required data is not included. Check JSON format")
            return
        
        rgb_data = in_dict["rgb"] # expected shape [To, Ni, 3, 480, 640]  np array float32
        curr_image = rgb_data[np.newaxis,:]

        in_pos = in_dict["state"] # expected shape [To, Ns], np array float32

        #with timer("Inference"):
        output_data = self.model.inference_acl(curr_image, in_pos)
        joint_states = output_data[0].reshape(-1, STATE_DIM)

        response_dict = {
            "status" : "success",
            "joint_states" : joint_states
        }

        return response_dict

if __name__=="__main__":
    m = DPModelWrapper() # dummy run

wrapper 类仅对外开放一个 predict 方法,用于触发模型新一轮的推理并返回结果值。输入输出具体格式见 dp_model_wrapper.py:24-31。


predict 接口调用示例(随机输入):

4 ROS2 对接

详细的对接ROS2的步骤本文不做阐述。ROS2 教程(中文,供参考):fishros.com

在 310P 侧的 ROS2 程序通过 import 模型对应的 wrapper 类并实例化以实现对模型的调用。

ROS2 程序主要负责与机器人的通讯及动作下发逻辑:当获取到机器人当前的位姿状态及传感器数据后,调用 predict 接口并获取到机器人的下 Nk 步动作,并通过最优策略(自定义)逐步下发至机器人。