ROS2 Master Guide
// NODE · TOPIC · SERVICE · ACTION · TF
DDS LINK STABLE
Robotics Engineer's Field Manual · v2026

ROS2, 밑바닥부터
딥러닝 로봇팔까지.

C++·Python·Linux·네트워크를 이미 다루는 개발자라면, ROS2의 진짜 진입장벽은 "언어"가 아니라 분산 통신 모델좌표계의 감각입니다. 이 문서는 그 두 축을 중심으로, 설치에서 시작해 MoveIt2·PointCloud2· 온디바이스 딥러닝 추론까지 실제로 굴려 볼 수 있는 실습 경로를 한 번에 관통합니다.

PART 1
기반
DDS 통신 모델 · colcon 빌드 시스템 · 워크스페이스 감각 잡기
PART 2
핵심
Node/Topic/Service/Action/Parameter — 5대 메커니즘
PART 3
중급
TF2 · URDF · 시뮬레이션 · QoS — "움직이는 로봇"의 언어
PART 4
고급
Lifecycle · Component · 실시간 실행 모델
PART 5
타겟
MoveIt2 · PointCloud · PyTorch 추론 · 파지 파이프라인
┅ ╍ ┅ ╍ ┅ ╍ ┅ ╍ ┅ ╍ ┅ ╍ ┅ ╍ ┅ ╍ ┅ ╍ ┅ ╍ ┅ ╍ ┅ ╍ ┅ ╍ ┅ ╍ ┅ ╍ ┅ ╍ ┅ ╍ ┅ ╍ ┅ ╍ ┅ ╍ ┅ ╍ ┅

이 문서의 기준 배포판은 ROS 2 Jazzy Jalisco(Ubuntu 24.04 LTS 기준)이며, 일부 장기지원이 필요한 경우 Humble Hawksbill(22.04)도 병기합니다.

PART 01 · FOUNDATIONS

기반 — 세 가지 감각을 먼저 잡는다

"ROS2는 라이브러리가 아니다. 프로세스 간 통신
프로토콜과 빌드 시스템을 포함한 런타임이다."
CH.01

ROS2 란 무엇인가— ROS1을 알 필요는 없다. 단, "왜 바꿨는지"는 알아야 한다.

ROS2의 본질은 분산 컴퓨팅 미들웨어다. 여러 프로세스(노드)가 이름으로 서로를 찾아 메시지를 주고받는 PubSub 시스템이고, 그 아래에 DDS(Data Distribution Service)라는 산업 표준 통신 계층이 있다.

ROS1 vs ROS2 — 결정적인 차이

관점ROS1ROS2
마스터 노드roscore 필수 (단일장애점)없음 — DDS가 peer-to-peer 발견
통신 계층TCPROS 자체 구현DDS (OMG 표준) — RTPS over UDP
실시간성사실상 불가가능 (executor / DDS QoS)
OS 지원Linux 중심Linux · Windows · macOS · RTOS
QoS빈약Reliability · Durability · History 등 세분화
보안거의 없음SROS2 (DDS-Security)
멀티로봇별도 구성 필요도메인 ID로 기본 지원
CONCEPT · DDS를 한 문장으로

DDS는 주소가 아니라 "토픽 이름"으로 구독하는 UDP 멀티캐스트 기반 실시간 데이터버스다. 같은 네트워크의 모든 DDS 참여자가 SPDP(Simple Participant Discovery Protocol)로 서로를 발견하고, 서로가 발행하는 토픽 목록구독하는 토픽 목록을 교환한 뒤, 일치하는 쌍끼리만 데이터 전송을 시작한다. 그래서 "서버"가 필요 없다.

ROS2 스택의 전체 그림

▸ ROS2 LAYER STACK
Application Layer Nav2 · MoveIt2 · perception_pcl · rclpy/rclcpp 기반 사용자 노드 Client Libraries rclpy (Python) · rclcpp (C++) · rclrs (Rust) — 모두 rcl(C)을 감싼다 rcl — ROS Client Library (C) 언어 중립적 ROS 로직 · QoS · 토픽 이름 규칙 · 수명주기 rmw — ROS Middleware Interface DDS 구현체로 가는 얇은 어댑터. 구현체 교체 시 런타임 환경변수만 바꾸면 됨 DDS Vendor Fast DDS (기본) · Cyclone DDS · Connext — 실제 UDP/TCP/Shared Memory 전송

기억할 점은 단 하나 — 상단의 내 코드가 어떤 식이든 결국 rmw → DDS로 내려간다는 것. 성능·네트워크 문제의 90%는 이 아래 두 계층에서 발생한다.

TIP · 배포판 선택 기준

장기 프로젝트라면 Humble(2022년 5월 릴리스, 5년 LTS — 2027년 5월까지) 또는 Jazzy(2024년 5월 릴리스, LTS). 실험·최신 기능은 Rolling을 쓰되, 현업 장비에 Rolling을 절대 올리지 말 것. 본 가이드는 Jazzy 기준으로 작성하되 문법이 달라지는 부분만 Humble을 병기한다.

CH.02

환경 구축 & 워크스페이스— colcon 감각이 생기면 반은 끝났다.

1) 설치 — Ubuntu 24.04 기준

우분투 24.04 LTS 위에 Jazzy를 올린다. 핵심 단계만 추려서 보여주면 다음과 같다. (자세한 키 등록은 공식 문서 참고 — 주기적으로 서명 키가 갱신된다.)

bashinstall-ros2-jazzy.sh
# Locale 설정 (UTF-8 필수) / Locale setup
sudo apt update && sudo apt install -y locales
sudo locale-gen en_US en_US.UTF-8
sudo update-locale LC_ALL=en_US.UTF-8 LANG=en_US.UTF-8
export LANG=en_US.UTF-8

# ROS2 apt 저장소 등록 / Add ROS2 apt repo
sudo apt install -y software-properties-common curl
sudo add-apt-repository universe
sudo curl -sSL https://raw.githubusercontent.com/ros/rosdistro/master/ros.key \
    -o /usr/share/keyrings/ros-archive-keyring.gpg
echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/ros-archive-keyring.gpg] \
http://packages.ros.org/ros2/ubuntu $(. /etc/os-release && echo $UBUNTU_CODENAME) main" \
    | sudo tee /etc/apt/sources.list.d/ros2.list > /dev/null

# 데스크톱 풀 설치 (rviz2, demo 포함) / Desktop full install
sudo apt update
sudo apt install -y ros-jazzy-desktop python3-colcon-common-extensions \
    python3-rosdep python3-argcomplete

# rosdep 초기화 / Initialize rosdep
sudo rosdep init
rosdep update

# 셸 자동 로드 / Auto-source in shell
echo "source /opt/ros/jazzy/setup.bash" >> ~/.bashrc
source ~/.bashrc
PITFALL · 흔한 설치 실패
  • 서명 키 오류: Ubuntu 24.04부터 apt 키가 .gpg 바이너리여야 한다. 구글에서 나오는 구 버전 커맨드는 apt-key를 쓰는데 deprecated다.
  • locale이 C나 POSIX이면 TF2 메시지에서 이상한 floating point 파싱 오류가 난다.
  • 여러 배포판을 설치해 두면 /opt/ros/humble/opt/ros/jazzy동시에 source하지 말 것. 둘을 한 셸에 올리면 CMake find_package가 충돌한다.

2) 워크스페이스 구조

ROS2 개발의 기본 단위는 워크스페이스다. src/에 소스를, 빌드 결과는 build/ install/ log/에 격리된다.

shelltree ~/ros2_ws
~/ros2_ws
├── src/                  # 소스 패키지가 들어가는 곳 / user packages
│   └── my_first_pkg/
│       ├── package.xml   # 메타데이터 & 의존성 / metadata
│       ├── setup.py      # Python 빌드 (ament_python)
│       ├── resource/
│       └── my_first_pkg/
│           └── node.py
├── build/                # colcon 빌드 아티팩트 (gitignore)
├── install/              # 설치된 결과물 (gitignore) — source 해야 하는 곳
└── log/                  # 빌드 로그 (gitignore)

3) colcon — 빌드의 중심

ROS1의 catkin_make가 사라졌고, colcon이 유일한 빌드 도구다. 메타빌드 시스템이라, 내부적으로 각 패키지의 setup.py(Python)나 CMake(C++)를 호출한다.

bashcolcon 주요 명령
# 워크스페이스 전체 빌드 / build all packages
cd ~/ros2_ws
colcon build --symlink-install

# 특정 패키지만 / single package
colcon build --packages-select my_first_pkg

# 변경된 것만 / incremental
colcon build --packages-up-to my_first_pkg

# 의존성은 빌드하되 해당 패키지만 재빌드
colcon build --packages-select my_first_pkg --cmake-args -DCMAKE_BUILD_TYPE=Release

# 빌드 후에는 반드시 install/ 를 source
source install/setup.bash

# 테스트 / run tests
colcon test --packages-select my_first_pkg
colcon test-result --all --verbose
TIP · --symlink-install

이 옵션을 붙이면 Python 스크립트가 install/에 심볼릭 링크로 깔려서, .py를 수정해도 재빌드 없이 즉시 반영된다. 개발 중엔 거의 항상 켜둘 것. (단 package.xml, setup.py, launch/를 고치면 재빌드 필요.)

4) 패키지 생성

bashros2 pkg create
cd ~/ros2_ws/src

# Python 패키지 / Python package
ros2 pkg create --build-type ament_python \
    --dependencies rclpy std_msgs \
    --license MIT \
    my_py_pkg

# C++ 패키지 / C++ package
ros2 pkg create --build-type ament_cmake \
    --dependencies rclcpp std_msgs \
    --license MIT \
    my_cpp_pkg
LAB 02 · 실습

최소 워크스페이스 구성

  1. ~/ros2_ws를 만들고, src/hello_ros2 Python 패키지를 ros2 pkg create로 생성하라.
  2. colcon build --symlink-install이 성공하는지 확인.
  3. source install/setup.bashros2 pkg list | grep hello가 보이는지.
  4. ros2 pkg prefix hello_ros2로 설치 경로를 확인하라.

검증 포인트: 새 터미널을 열어도 ros2 pkg list에 나오려면 .bashrcsource ~/ros2_ws/install/setup.bash를 추가했거나 매번 source 해야 한다.

CH.03

통신 모델 아키텍처— 5가지 메커니즘의 "언제 무엇을 쓸지" 감각.

ROS2의 모든 상호작용은 다음 다섯 가지 원시 메커니즘 위에서 일어난다. 각각이 정확히 어떤 상황을 위한 것인지 구분할 수 있어야 한다.

▸ FIVE PRIMITIVES — MATRIX
① TOPIC many-to-many · async · no reply publisher subscriber /scan ② SERVICE 1-to-1 · sync · request/reply client server ③ ACTION 1-to-1 · long-running · goal + feedback + result action client action server goal → ← feedback ④ PARAMETER 노드당 KV 저장소 · 런타임 변경 가능 node.declare_parameter("max_speed", 1.5) ⑤ BAG (rosbag2) 토픽 스트림 녹화/재생 — 센서 데이터 ML 학습셋의 원천 ros2 bag record -o mission_01 /scan /tf /camera/image_raw

언제 무엇을 쓸까 — 결정 가이드

시나리오선택이유
카메라 프레임 / IMU / LiDAR 스트림Topic지속 스트림, 다구독자, 비동기
"현재 상태를 주세요" 단발성 질의Service즉답 필요, 1-to-1
"좌표 (x,y)로 이동하라"Action수초~수분 걸림, 진행률 피드백, 취소 가능
PID 게인, 최대 속도, 모델 경로Parameter설정값, 동적 재구성
센서 데이터 덤프 · 재현rosbag2ML 학습 데이터셋 원천
CONCEPT · ROS_DOMAIN_ID

같은 네트워크에 여러 ROS2 시스템을 띄워야 할 때 — 예컨대 한 팀에서 여러 명이 개발할 때 — export ROS_DOMAIN_ID=42처럼 0~101 범위의 정수로 격리할 수 있다. DDS 레벨에서 UDP 포트가 계산식으로 결정되므로, 같은 ID끼리만 서로를 발견한다. 로봇 현장 배포 시 기본값 0을 그대로 쓰지 말 것 — 옆 팀과 섞인다.

LAB 03 · 실습

talker/listener로 감각 잡기

새 터미널 두 개를 열고 각각 실행:

terminal 1
ros2 run demo_nodes_cpp talker
terminal 2
ros2 run demo_nodes_py listener

또 다른 터미널에서 내부 구조를 들여다보기:

inspection
ros2 node list              # 노드 리스트
ros2 topic list             # 토픽 리스트
ros2 topic info /chatter    # 토픽의 타입과 QoS 확인
ros2 topic echo /chatter    # 실시간 수신
ros2 topic hz /chatter      # 발행 주기 측정
ros2 topic bw /chatter      # 대역폭 측정

# 그래프를 시각적으로 (별도 설치 필요: ros-jazzy-rqt-graph)
rqt_graph

확인: C++과 Python 노드가 아무 설정 없이 서로 통신하고 있다. 이게 DDS 발견의 힘이다.

PART 02 · CORE PRIMITIVES

핵심 — 다섯 메커니즘을 손끝에

"처음엔 Publisher·Subscriber만 써라.
나머지는 정말 필요할 때 꺼내 써도 늦지 않다."
CH.04

첫 노드 · Python— rclpy의 핵심은 Node 서브클래싱과 spin().

ROS2의 Python 클라이언트 라이브러리는 rclpy다. 모든 기능(퍼블리시·구독·서비스·타이머·파라미터)은 rclpy.node.Node를 상속한 클래스의 메서드로 구현한다.

가장 작은 노드

pythonmy_py_pkg/my_py_pkg/hello_node.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
최소 ROS2 노드 / Minimal ROS2 node
"""
import rclpy
from rclpy.node import Node


class HelloNode(Node):
    def __init__(self):
        # 노드명은 DDS 네임스페이스 내에서 유니크해야 한다
        # Node name must be unique within DDS namespace
        super().__init__("hello_node")
        self.get_logger().info("안녕, ROS2 — node up")
        # 1Hz 타이머 / 1Hz timer
        self.create_timer(1.0, self.tick)
        self.counter = 0

    def tick(self):
        self.counter += 1
        self.get_logger().info(f"tick #{self.counter}")


def main():
    rclpy.init()                  # DDS 컨텍스트 초기화
    node = HelloNode()
    try:
        rclpy.spin(node)          # 이벤트 루프 — blocking
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()


if __name__ == "__main__":
    main()

setup.py 엔트리포인트 등록

ros2 run <pkg> <exe>로 실행하려면 setup.py에 콘솔 스크립트로 등록해야 한다. 이 단계를 빼먹으면 "executable not found" 에러가 나는데, 가장 흔한 초보 함정이다.

pythonmy_py_pkg/setup.py
from setuptools import find_packages, setup

package_name = 'my_py_pkg'

setup(
    name=package_name,
    version='0.0.1',
    packages=find_packages(exclude=['test']),
    data_files=[
        ('share/ament_index/resource_index/packages',
         ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='zer0',
    maintainer_email='[email protected]',
    description='ROS2 learning playground',
    license='MIT',
    entry_points={
        'console_scripts': [
            # 'exe_name = pkg.module:main_func' 형태
            'hello = my_py_pkg.hello_node:main',
        ],
    },
)
bashbuild & run
cd ~/ros2_ws
colcon build --packages-select my_py_pkg --symlink-install
source install/setup.bash
ros2 run my_py_pkg hello
PITFALL · 로그가 안 보인다

print()는 안 쓰는 게 좋다. 반드시 self.get_logger().info()를 쓰자. ROS2 로거는 severity(debug/info/warn/error/fatal)와 노드명을 자동으로 태깅해 /rosout 토픽으로도 뿌린다. 디버그 로그는 기본적으로 가려져 있으니 --ros-args --log-level my_node:=debug로 열어서 봐야 한다.

CH.05

토픽 & 메시지 설계— 커스텀 메시지는 별도 패키지에 둔다.

Publisher — Subscriber 기본 쌍

publisherpublisher_node.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32


class TempPublisher(Node):
    def __init__(self):
        super().__init__("temp_publisher")
        # (타입, 토픽명, QoS depth=10)
        self.pub = self.create_publisher(
            Float32, "sensor/temperature", 10)
        self.create_timer(0.5, self.publish)
        self.t = 20.0

    def publish(self):
        self.t += 0.1
        msg = Float32()
        msg.data = self.t
        self.pub.publish(msg)


def main():
    rclpy.init()
    rclpy.spin(TempPublisher())
subscribersubscriber_node.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32


class TempSubscriber(Node):
    def __init__(self):
        super().__init__("temp_subscriber")
        # (타입, 토픽명, 콜백, QoS depth=10)
        self.sub = self.create_subscription(
            Float32, "sensor/temperature",
            self.on_msg, 10)

    def on_msg(self, msg: Float32):
        self.get_logger().info(
            f"T = {msg.data:.2f}°C")


def main():
    rclpy.init()
    rclpy.spin(TempSubscriber())

커스텀 메시지 — .msg 파일

커스텀 메시지는 반드시 별도의 ament_cmake 인터페이스 패키지에 둔다. Python 패키지 안에는 rosidl_default_generators를 올릴 수 없기 때문이다. 이걸 지키지 않아 시간을 버리는 사람이 정말 많다.

bashinterface 패키지 만들기
cd ~/ros2_ws/src
ros2 pkg create --build-type ament_cmake my_interfaces \
    --dependencies rosidl_default_generators std_msgs

mkdir -p my_interfaces/msg
cat > my_interfaces/msg/WeldScanPoint.msg << 'EOF'
# 용접 스캔 샘플 / Welding scan sample
std_msgs/Header header
float64 x
float64 y
float64 z
float32 intensity
uint8 class_id       # 0=background, 1..N=defect class
EOF
cmakemy_interfaces/CMakeLists.txt 요지
cmake_minimum_required(VERSION 3.8)
project(my_interfaces)

find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
find_package(std_msgs REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/WeldScanPoint.msg"
  DEPENDENCIES std_msgs
)

ament_export_dependencies(rosidl_default_runtime)
ament_package()
xmlmy_interfaces/package.xml 요지
<buildtool_depend>ament_cmake</buildtool_depend>
<buildtool_depend>rosidl_default_generators</buildtool_depend>
<build_depend>std_msgs</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>

빌드 후 Python에서 다음과 같이 사용한다:

python
from my_interfaces.msg import WeldScanPoint

msg = WeldScanPoint()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = "sensor_link"
msg.x, msg.y, msg.z = 0.12, -0.03, 0.45
msg.intensity = 0.87
msg.class_id = 2
TIP · header를 기본 첨가

주기적으로 샘플링되는 센서성 메시지라면 거의 항상 std_msgs/Header를 첫 필드로 둔다. Header는 타임스탬프와 프레임 ID를 담고 있어 TF2 변환·시간 동기화·rosbag 재생 시 필수가 된다. (TF2는 다음 장에서 자세히 다룬다.)

CH.06

서비스 · 액션— "즉답형"과 "오래 걸리는 형"

Service — 동기 요청/응답

Service는 "지금 상태를 찍어줘", "파일 저장해줘", "모드 전환해줘" 같이 짧고 명확한 왕복이 필요할 때 쓴다. 내부적으로는 DDS의 RPC 패턴이다.

srv 정의my_interfaces/srv/SetMode.srv
# 요청 / request
string mode            # "idle" | "inspect" | "weld"
---
# 응답 / response
bool success
string message
pythonmode_service_server.py
import rclpy
from rclpy.node import Node
from my_interfaces.srv import SetMode


class ModeServer(Node):
    ALLOWED = {"idle", "inspect", "weld"}

    def __init__(self):
        super().__init__("mode_server")
        self.mode = "idle"
        self.srv = self.create_service(
            SetMode, "set_mode", self.on_request)

    def on_request(self, req: SetMode.Request,
                   res: SetMode.Response):
        if req.mode not in self.ALLOWED:
            res.success = False
            res.message = f"unknown mode: {req.mode}"
            return res
        self.mode = req.mode
        res.success = True
        res.message = f"switched to {self.mode}"
        self.get_logger().info(res.message)
        return res
bashCLI로 서비스 호출
ros2 service list
ros2 service type /set_mode
ros2 service call /set_mode my_interfaces/srv/SetMode "{mode: 'weld'}"
PITFALL · 서비스 콜백에서 오래 블로킹하지 마라

기본 SingleThreadedExecutor에서는 서비스 콜백 안에서 다른 토픽·서비스·타이머가 멈춘다. 5초 넘는 작업이면 Service가 아니라 Action을 써야 하고, 어쩔 수 없다면 MultiThreadedExecutor로 콜백 그룹을 분리해야 한다 (CH.12).

Action — 장기 실행 · 피드백 · 취소

Action은 Goal → Feedback → Result 3단 구조다. "경로를 따라가", "물체를 집어 올려", "스캔을 실행해"처럼 수초~수분 걸리는 작업에 쓴다. 내부적으로는 4개의 Service + 2개의 Topic으로 구성된 복합체다.

action 정의my_interfaces/action/Scan.action
# Goal — 클라이언트가 보내는 요청
uint32 num_samples      # 스캔할 샘플 수
---
# Result — 완료 시 반환
uint32 total_captured
string bag_path
---
# Feedback — 진행 중 주기적으로 발행
uint32 current
float32 percentage
pythonscan_action_server.py
import time
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer, CancelResponse, GoalResponse
from my_interfaces.action import Scan


class ScanServer(Node):
    def __init__(self):
        super().__init__("scan_server")
        self._server = ActionServer(
            self, Scan, "scan",
            execute_callback=self.execute,
            goal_callback=self.accept_goal,
            cancel_callback=self.accept_cancel,
        )

    def accept_goal(self, goal_request):
        self.get_logger().info(
            f"goal received: N={goal_request.num_samples}")
        return GoalResponse.ACCEPT

    def accept_cancel(self, goal_handle):
        return CancelResponse.ACCEPT

    def execute(self, goal_handle):
        req: Scan.Goal = goal_handle.request
        fb = Scan.Feedback()
        for i in range(req.num_samples):
            # 취소 체크 / cooperative cancellation
            if goal_handle.is_cancel_requested:
                goal_handle.canceled()
                return Scan.Result()
            # 실제 센서 샘플링 자리 / real acquisition here
            time.sleep(0.05)
            fb.current = i + 1
            fb.percentage = (i + 1) / req.num_samples * 100.0
            goal_handle.publish_feedback(fb)

        goal_handle.succeed()
        result = Scan.Result()
        result.total_captured = req.num_samples
        result.bag_path = "/tmp/scan_001.bag"
        return result
CONCEPT · 언제 Service, 언제 Action?

1초 룰을 쓰자. 응답이 1초 안에 돌아올 거면 Service, 그 이상이면 Action. 추가로 중간에 취소하고 싶거나 진행률이 필요하면 무조건 Action이다. 로봇팔의 "이 좌표로 가라"는 Nav2/MoveIt2 모두 Action이다 — 실행 중 장애물 감지로 멈출 수 있어야 하니까.

CH.07

파라미터 · 런치— 설정과 오케스트레이션.

Parameter — 노드별 설정

Parameter는 노드에 속한 Key-Value 설정 저장소다. 런타임에 ros2 param set으로 바꿀 수 있으며, 콜백을 등록해 "변경되면 자동으로 반영"도 가능하다.

pythoncontroller_node.py
from rcl_interfaces.msg import ParameterDescriptor, FloatingPointRange
from rcl_interfaces.msg import SetParametersResult
import rclpy
from rclpy.node import Node


class Controller(Node):
    def __init__(self):
        super().__init__("controller")

        # 파라미터 선언 (타입 · 기본값 · 범위 · 설명)
        # Declare parameter with type, default, range, description
        self.declare_parameter(
            "max_speed", 1.5,
            ParameterDescriptor(
                description="로봇팔 엔드이펙터 최대 속도 m/s",
                floating_point_range=[FloatingPointRange(
                    from_value=0.0, to_value=3.0, step=0.1)],
            ),
        )
        self.declare_parameter("gains.kp", 2.0)
        self.declare_parameter("gains.ki", 0.1)
        self.declare_parameter("gains.kd", 0.05)

        # 런타임 변경 콜백 / runtime change callback
        self.add_on_set_parameters_callback(self.on_param_change)

    def on_param_change(self, params):
        for p in params:
            self.get_logger().info(f"{p.name} = {p.value}")
        return SetParametersResult(successful=True)
bash파라미터 다루기
ros2 param list /controller
ros2 param get /controller max_speed
ros2 param set /controller max_speed 1.2
ros2 param dump /controller > controller_params.yaml

# YAML로 한꺼번에 로드 / load from yaml
ros2 run my_py_pkg controller \
    --ros-args --params-file ./controller_params.yaml

Launch — 여러 노드 오케스트레이션

로봇 시스템은 단일 노드로 끝나는 경우가 거의 없다. Launch 시스템이 그 "전체 시스템을 한 번에 띄우는" 역할을 한다. ROS2에서는 XML, YAML, Python 세 가지 형식을 지원하지만, 분기·반복·조건부가 들어가면 Python이 가장 낫다.

pythonlaunch/bringup.launch.py
"""
로봇 시스템 통합 런치 / System bring-up
"""
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, GroupAction, LogInfo
from launch.conditions import IfCondition
from launch.substitutions import LaunchConfiguration, PathJoinSubstitution
from launch_ros.actions import Node, PushRosNamespace
from launch_ros.substitutions import FindPackageShare


def generate_launch_description():
    # 인자 선언 / declare args
    use_sim = LaunchConfiguration("use_sim_time")
    robot_ns = LaunchConfiguration("namespace")

    params_file = PathJoinSubstitution([
        FindPackageShare("my_py_pkg"),
        "config", "controller.yaml"
    ])

    return LaunchDescription([
        DeclareLaunchArgument(
            "use_sim_time", default_value="false"),
        DeclareLaunchArgument(
            "namespace", default_value="arm0"),

        LogInfo(msg=["Launching system for ns=", robot_ns]),

        GroupAction([
            PushRosNamespace(robot_ns),

            Node(
                package="my_py_pkg",
                executable="controller",
                name="controller",
                parameters=[params_file,
                            {"use_sim_time": use_sim}],
                output="screen",
                emulate_tty=True,
            ),

            Node(
                package="my_py_pkg",
                executable="hello",
                name="hello",
                condition=IfCondition(LaunchConfiguration("debug",
                    default="false")),
            ),
        ]),
    ])
bash
ros2 launch my_py_pkg bringup.launch.py namespace:=arm1 debug:=true
LAB 07 · 실습

센서 ↔ 컨트롤러 2노드 시스템

  1. temp_publisher(0.5초마다 Float32 발행)와 temp_subscriber(수신 로그) 두 노드를 작성.
  2. 파라미터 publish_rate(Hz)와 baseline_temp를 publisher에 추가하고, YAML로 주입.
  3. 두 노드를 하나의 Launch 파일로 띄우고 namespace sensor1로 그룹화.
  4. ros2 topic list/sensor1/sensor/temperature로 보이는지 확인.
  5. ros2 param set으로 런타임에 publish_rate를 바꿔 주기가 달라지는지 검증 (파라미터 콜백 필요).
PART 03 · INTERMEDIATE

중급 — 움직이는 로봇의 언어

"좌표계가 헷갈리기 시작하면 TF2 시간이다.
URDF는 기술서지, 렌더링 모델이 아니다."
CH.08

TF2 좌표 변환— 로보틱스에서 가장 많이 쓰는 단일 도구.

로봇에는 수십 개의 좌표계(프레임)가 있다. 베이스·관절1..n·엔드이펙터·카메라·LiDAR·월드. TF2는 이 프레임들 사이의 상대 변환을 트리 구조로 관리하고, 시간에 따라 보간해 주는 시스템이다.

▸ TYPICAL TF TREE — manipulator + camera
world base_link shoulder_link elbow_link … → tool0 camera_base camera_optical 정적(static) ───── 동적(dynamic) ─ ─ → world → base_link SLAM 결과 → 관절 회전으로 갱신 → 캘리브레이션 상수

핵심 개념 — Static · Dynamic 두 가지 뿐

  • Static Transform: "변하지 않는" 관계. 센서와 마운트 사이, base_link와 world 원점 오프셋 등. 한 번 발행하고 latch하는 형태라 비용이 거의 없다. /tf_static 토픽으로 발행.
  • Dynamic Transform: 시간에 따라 변한다. 로봇 관절이 움직이면 robot_state_publisher/tf로 지속 발행.

변환 조회 — 리스너 코드

pythontf_listener_node.py
import rclpy
from rclpy.node import Node
from rclpy.duration import Duration
from tf2_ros import Buffer, TransformListener
from tf2_ros import LookupException, ConnectivityException, ExtrapolationException


class TFUser(Node):
    def __init__(self):
        super().__init__("tf_user")
        self.buffer = Buffer()
        self.listener = TransformListener(self.buffer, self)
        self.create_timer(0.1, self.tick)   # 10 Hz

    def tick(self):
        try:
            # target_frame ← source_frame at time
            # "source 기준 target의 위치/자세를 알려줘"
            t = self.buffer.lookup_transform(
                target_frame="base_link",
                source_frame="camera_optical",
                time=rclpy.time.Time(),        # 최신
                timeout=Duration(seconds=0.05),
            )
            x = t.transform.translation.x
            y = t.transform.translation.y
            z = t.transform.translation.z
            self.get_logger().info(f"cam in base: ({x:+.3f}, {y:+.3f}, {z:+.3f})")
        except (LookupException, ConnectivityException,
                ExtrapolationException) as e:
            self.get_logger().warn(f"tf not ready: {e}")
PITFALL · "lookup_transform 방향 헷갈림"

lookup_transform(target, source, ...)의 반환 값은 "source 프레임 원점이 target 프레임에서 어디에 있는가"를 나타낸다. 즉, source 좌표의 점을 target 좌표로 변환할 때 그대로 쓸 수 있는 변환이다. 순서가 바뀌면 부호가 전부 뒤집히므로, 매번 "target이 먼저, 그 다음 source"라고 속으로 외우자.

Point를 변환하기 — tf2_geometry_msgs

python
from geometry_msgs.msg import PointStamped
from tf2_geometry_msgs import do_transform_point

# 카메라 프레임의 3D 점을 base_link로
pt_in_cam = PointStamped()
pt_in_cam.header.frame_id = "camera_optical"
pt_in_cam.header.stamp = self.get_clock().now().to_msg()
pt_in_cam.point.x, pt_in_cam.point.y, pt_in_cam.point.z = 0.1, 0.05, 0.8

tf = self.buffer.lookup_transform(
    "base_link", "camera_optical", pt_in_cam.header.stamp,
    timeout=Duration(seconds=0.1))
pt_in_base = do_transform_point(pt_in_cam, tf)

Static Broadcaster — 캘리브레이션 발행

pythonstatic_cam_tf.py
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import TransformStamped
from tf2_ros.static_transform_broadcaster import StaticTransformBroadcaster
from tf_transformations import quaternion_from_euler
import math


class StaticCamTF(Node):
    def __init__(self):
        super().__init__("static_cam_tf")
        self.br = StaticTransformBroadcaster(self)

        t = TransformStamped()
        t.header.stamp = self.get_clock().now().to_msg()
        t.header.frame_id = "tool0"           # parent
        t.child_frame_id = "camera_optical"   # child
        t.transform.translation.x = 0.05
        t.transform.translation.y = 0.00
        t.transform.translation.z = 0.12
        q = quaternion_from_euler(0.0, math.radians(15.0), 0.0)
        t.transform.rotation.x, t.transform.rotation.y, \
            t.transform.rotation.z, t.transform.rotation.w = q

        self.br.sendTransform(t)              # latched on /tf_static
TIP · rviz2로 TF 트리 확인

ros2 run tf2_tools view_frames로 PDF 형태의 TF 트리를 떨구거나, rviz2의 TF 플러그인으로 실시간 확인. 트리가 끊어졌으면 반드시 수정 후 진행할 것. 끊어진 채로 SLAM·MoveIt·PointCloud 처리에 들어가면 원인을 잡기가 매우 어렵다.

LAB 08 · 실습

간이 TF 파이프라인

  1. world → base_link를 SLAM인 척 동적 변환으로 발행 (0.5Hz, 임의의 원운동).
  2. base_link → tool0을 정적 변환(0.3, 0, 0.4)으로 발행.
  3. tool0 → camera_optical을 위 static 예제로 발행.
  4. 리스너 노드에서 worldcamera_optical을 주기적으로 조회해 로그에 찍는다.
  5. rviz2에서 TF 시각화 & tf2_tools view_frames로 트리 덤프.
CH.09

URDF · 로봇 기술— 기하학 + 관성 + 관절 + 충돌.

URDF(Unified Robot Description Format)는 로봇을 링크(link) + 관절(joint) 그래프로 표현하는 XML이다. 같은 기술을 기반으로 rviz2·Gazebo·MoveIt2·ros2_control이 모두 돌아간다. Xacro는 URDF에 매크로·변수·include를 추가한 템플릿 언어로, 실전에서는 거의 항상 Xacro를 쓴다.

최소 URDF

xmlsimple_arm.urdf.xacro
<?xml version="1.0"?>
<robot name="simple_arm" xmlns:xacro="http://www.ros.org/wiki/xacro">

  <xacro:property name="pi" value="3.14159"/>

  <link name="base_link">
    <visual>
      <geometry><cylinder radius="0.08" length="0.1"/></geometry>
      <material name="steel"><color rgba="0.5 0.5 0.6 1"/></material>
    </visual>
    <collision>
      <geometry><cylinder radius="0.08" length="0.1"/></geometry>
    </collision>
    <inertial>
      <mass value="1.0"/>
      <inertia ixx="0.01" iyy="0.01" izz="0.01"
               ixy="0" ixz="0" iyz="0"/>
    </inertial>
  </link>

  <link name="link1">
    <visual>
      <origin xyz="0 0 0.15"/>
      <geometry><box size="0.04 0.04 0.3"/></geometry>
    </visual>
  </link>

  <joint name="joint1" type="revolute">
    <parent link="base_link"/>
    <child link="link1"/>
    <origin xyz="0 0 0.05"/>
    <axis xyz="0 0 1"/>
    <limit lower="${-pi}" upper="${pi}"
           effort="10" velocity="2.0"/>
  </joint>

</robot>

실행 — robot_state_publisher + joint_state_publisher_gui

pythonlaunch/view_urdf.launch.py
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import ExecuteProcess
from launch.substitutions import Command
from launch_ros.substitutions import FindPackageShare
from launch.substitutions import PathJoinSubstitution

def generate_launch_description():
    urdf_path = PathJoinSubstitution([
        FindPackageShare("my_robot_description"),
        "urdf", "simple_arm.urdf.xacro"
    ])
    robot_description = {"robot_description": Command(["xacro ", urdf_path])}

    return LaunchDescription([
        Node(package="robot_state_publisher",
             executable="robot_state_publisher",
             parameters=[robot_description],
             output="screen"),
        Node(package="joint_state_publisher_gui",
             executable="joint_state_publisher_gui"),
        ExecuteProcess(cmd=["rviz2"], output="screen"),
    ])
CONCEPT · robot_state_publisher의 역할

이 노드는 /joint_states를 구독해서, URDF의 관절 위상과 결합해 모든 링크의 TF를 /tf로 발행해 준다. 즉 당신은 관절값만 보내고, TF 계산은 로봇 기술서와 이 노드가 알아서 한다. ROS2의 아름다운 분리 중 하나다.

TIP · 실제 로봇 제조사 기술서

UR, Franka, Kinova, Doosan, Rainbow 등 주요 제조사들은 공식 URDF/Xacro를 깃허브로 배포한다. 밑바닥부터 모델링할 필요가 없고, 없는 경우도 OnShape · SolidWorks ROS Exporter로 뽑을 수 있다. 본인의 실전에서는 "제조사 기술서를 검증하고 캘리브레이션 보정"이 업무의 95%다.

CH.10

Gazebo · RViz2— 시뮬레이터 vs 비주얼라이저.

둘을 혼동하는 경우가 많은데 정확히 다른 도구다.

도구역할요구
RViz2 데이터 시각화. 토픽의 내용을 3D로 렌더링. 계산 안 함. 경량 · GPU 최소 요구
Gazebo (Harmonic / Classic) 물리 시뮬레이션. 센서 모델·물리 충돌·모터 구동을 진짜로 계산. GPU · 물리 설정 파일(.sdf / .world)

RViz2 — 지정 필수 플러그인

  • TF: 프레임 트리와 각 프레임의 축.
  • RobotModel: /robot_description을 읽어 URDF 렌더링.
  • PointCloud2: LiDAR·RGB-D·LVS 센서 결과.
  • Image, Camera, LaserScan, Marker/MarkerArray, Path.

Gazebo (Harmonic) 연동

ROS 2 Jazzy 기본 짝은 Gazebo Harmonic이다. ros_gz_sim, ros_gz_bridge로 토픽을 Gazebo ↔ ROS2 간 브리지한다.

bash
sudo apt install -y ros-jazzy-ros-gz

# 예: 카메라 토픽 브리지
ros2 run ros_gz_bridge parameter_bridge \
    /camera@sensor_msgs/msg/[email protected] \
    /camera_info@sensor_msgs/msg/[email protected]

# world 실행 + 브리지
ros2 launch ros_gz_sim gz_sim.launch.py gz_args:="empty.sdf"
PITFALL · sim_time

시뮬레이터를 쓸 때는 모든 노드에 use_sim_time:=true를 설정해야 한다. 이걸 빼면 TF 보간이 실시간 시계로 돌아가 버리고, Gazebo 시계와 어긋나서 "TF extrapolation into the future/past" 에러 폭탄을 맞는다.

CH.11

rosbag2 · QoS— 재현 가능한 데이터와 믿을 수 있는 통신.

rosbag2 — 토픽을 디스크에

rosbag2는 ROS2 토픽 스트림을 SQLite/MCAP에 기록하고 재생한다. ML 학습 데이터 파이프라인의 시작점이기도 하다 — 현장에서 센서 데이터를 녹화 → 오프라인에서 라벨링 → 학습.

bashrosbag2 basics
# 녹화 (MCAP 포맷이 권장 — 압축·랜덤액세스 유리)
ros2 bag record -s mcap -o mission_01 \
    /scan /camera/image_raw /tf /tf_static /joint_states

# 정보 조회
ros2 bag info mission_01

# 재생 (루프 / 속도 조절 가능)
ros2 bag play mission_01 --loop --rate 2.0 \
    --clock              # /clock 토픽 발행 (sim_time용)

# 특정 토픽 제외하고 재생
ros2 bag play mission_01 --exclude "/joint_states"
TIP · MCAP 선택 이유

기본 SQLite3 스토리지는 파일 락 때문에 동시 녹화·읽기에 불리하고, 압축 옵션도 제한적이다. MCAP은 Foxglove에서 만든 포맷으로 시간 인덱스 기반 랜덤 액세스, zstd 압축, 다양한 언어의 독립 리더가 강점. ML 데이터셋으로 쓸 거라면 처음부터 MCAP로.

QoS — Quality of Service

DDS는 각 Pub/Sub 쌍마다 QoS 프로파일을 협상한다. 매칭이 어긋나면 통신이 아예 안 된다 — "왜 rqt에선 보이는데 내 노드는 못 받지?"의 90%는 QoS 문제다.

정책의미
ReliabilityRELIABLE · BEST_EFFORT재전송 vs 드롭 허용. 센서 스트림은 후자가 흔함
DurabilityVOLATILE · TRANSIENT_LOCAL늦게 들어온 구독자에게 과거 메시지를 주는지. latched 역할
HistoryKEEP_LAST(n) · KEEP_ALL버퍼 정책
Deadlinems이 주기보다 느리면 경보
LivelinessAUTOMATIC · MANUAL_BY_TOPIC피어가 살아있는지 판정 방식
pythonQoS 커스텀 예
from rclpy.qos import QoSProfile, ReliabilityPolicy, DurabilityPolicy, HistoryPolicy

sensor_qos = QoSProfile(
    reliability=ReliabilityPolicy.BEST_EFFORT,  # 센서 데이터
    durability=DurabilityPolicy.VOLATILE,
    history=HistoryPolicy.KEEP_LAST,
    depth=5,
)
self.sub = self.create_subscription(
    LaserScan, "/scan", self.on_scan, sensor_qos)

# latched 스타일 — 나중에 구독해도 마지막 맵을 받음
map_qos = QoSProfile(
    reliability=ReliabilityPolicy.RELIABLE,
    durability=DurabilityPolicy.TRANSIENT_LOCAL,
    history=HistoryPolicy.KEEP_LAST,
    depth=1,
)
CONCEPT · 사전 정의된 프로파일

rclpy.qos_eventrclpy.qos에는 자주 쓰이는 조합이 상수로 들어있다. qos_profile_sensor_data, qos_profile_services_default, qos_profile_parameters 등. 시작은 이 프리셋을 쓰고, 필요할 때만 커스텀.

LAB 11 · 실습

QoS 불일치 재현 & 진단

  1. Publisher는 RELIABLE + TRANSIENT_LOCAL, Subscriber는 BEST_EFFORT + VOLATILE로 설정.
  2. 통신이 안 되는 현상을 확인 (Subscriber가 퍼블리셔보다 관대한 요구여야 매칭됨).
  3. ros2 topic info -v /your_topic로 QoS 프로파일과 매칭 여부를 진단.
  4. Subscriber 쪽만 RELIABLE + TRANSIENT_LOCAL로 올려서 매칭되는지 확인.
PART 04 · ADVANCED

고급 — 실행 모델을 조각한다

"여기부터는 '왜 이게 느리지'를 해결하는 영역.
초보에게는 숨겨져 있던 내부 구조가 드러난다."
CH.12

Lifecycle · Executors— 노드의 상태 기계와 콜백 스케줄링.

Managed (Lifecycle) Node — 상태 기계로 관리

그냥 Node는 시작하자마자 돌고, 죽으면 그냥 죽는다. 반면 LifecycleNodeunconfigured → inactive → active → finalized 상태를 갖는다. 이걸로 "로봇 전원 인가는 했지만 아직 제어 루프는 안 돌리고 싶다"는 시나리오를 깔끔히 다룰 수 있다.

▸ LIFECYCLE STATE MACHINE
unconfigured inactive active finalized configure activate shutdown deactivate cleanup · configure: 자원 할당 · 파라미터 로드 · 연결 준비 · activate: 구독/발행 실제로 열림 · 타이머 시작

sensor_driver, controller 같이 의존 순서가 있는 노드들을 Lifecycle로 묶으면, "모두 configure 끝난 후 activate"와 같은 시스템 수준 시퀀싱을 Nav2·MoveIt2·ros2_control이 실제로 하듯 할 수 있다.

Executor — 콜백은 누가 실행하는가

rclpy.spin(node)을 호출하면 내부에서 SingleThreadedExecutor가 돈다. 모든 콜백이 한 스레드에서 순차 실행된다. 그래서:

  • 긴 서비스 콜백이 들어있으면 그 동안 토픽 수신이 막힌다.
  • 타이머 주기가 다른 콜백의 실행 시간 때문에 흔들린다.

해결책은 MultiThreadedExecutor + Reentrant/MutuallyExclusive 콜백 그룹이다.

pythonmulti_threaded.py
import rclpy
from rclpy.node import Node
from rclpy.executors import MultiThreadedExecutor
from rclpy.callback_groups import ReentrantCallbackGroup, MutuallyExclusiveCallbackGroup
from std_srvs.srv import Trigger
import time


class HeavyNode(Node):
    def __init__(self):
        super().__init__("heavy_node")

        # 병렬 허용 / parallel-safe
        self.cb_fast = ReentrantCallbackGroup()
        # 상호배제 / critical section
        self.cb_critical = MutuallyExclusiveCallbackGroup()

        self.create_timer(0.05, self.fast_tick,
                          callback_group=self.cb_fast)  # 20Hz
        self.create_service(Trigger, "slow_work",
                            self.slow_work,
                            callback_group=self.cb_critical)

    def fast_tick(self):
        self.get_logger().debug("tick")

    def slow_work(self, req, res):
        time.sleep(2.0)   # fast_tick가 블록되지 않음
        res.success = True
        res.message = "done"
        return res


def main():
    rclpy.init()
    node = HeavyNode()
    executor = MultiThreadedExecutor(num_threads=4)
    executor.add_node(node)
    try:
        executor.spin()
    finally:
        executor.shutdown()
PITFALL · GIL의 현실

Python에서 MultiThreadedExecutor는 I/O 바운드에는 의미가 있지만 CPU 바운드에는 GIL 때문에 제한적이다. 무거운 연산(추론·PCL 처리)은 C++ 노드로 옮기거나, 별도 프로세스(composable node)로 분리, 또는 NumPy/PyTorch 벡터 연산(C 레벨에서 GIL 해제됨)으로 처리해야 한다.

CH.13

Component · Composition— 한 프로세스에 여러 노드, 제로카피 통신.

프로세스 간 통신은 DDS가 아무리 빨라도 직렬화 + 네트워크 스택 오버헤드가 있다. Composition은 여러 노드를 하나의 프로세스 안에 담아서, 토픽 통신을 메모리 포인터 전달로 치환한다. 대역폭이 큰 이미지·포인트클라우드 파이프라인에 필수.

▸ INTER-PROCESS vs INTRA-PROCESS
Inter-process (default) camera_node filter_node ↯ serialize ↯ UDP stack 4MB 이미지 60Hz → 240 MB/s 직렬화 · 카피 Intra-process (composable) single process · shared memory camera_node filter_node ptr 같은 주소공간 → 포인터 이동만, 복사 없음
pythoncomposable_launch.py
from launch import LaunchDescription
from launch_ros.actions import ComposableNodeContainer
from launch_ros.descriptions import ComposableNode


def generate_launch_description():
    return LaunchDescription([
        ComposableNodeContainer(
            name="perception_container",
            namespace="",
            package="rclcpp_components",
            executable="component_container_mt",  # multi-threaded
            composable_node_descriptions=[
                ComposableNode(
                    package="image_proc",
                    plugin="image_proc::RectifyNode",
                    name="rectify"),
                ComposableNode(
                    package="depth_image_proc",
                    plugin="depth_image_proc::PointCloudXyzNode",
                    name="xyz_cloud",
                    remappings=[
                        ("image_rect", "/camera/depth/image_rect"),
                        ("camera_info", "/camera/depth/camera_info"),
                    ]),
            ],
            output="screen",
        ),
    ])
TIP · Python에도 Composable Node가 있다

Jazzy부터 Python rclpy에도 동적 composition이 들어왔다. 단 제로카피의 진짜 이점은 C++에서 나온다 — 대용량 토픽 파이프라인의 중심 노드는 C++로 짜는 게 여전히 정답이다.

CH.14

실시간 · 성능 최적화— DDS 바닥까지 내려가는 순간.

DDS 구현체 교체

기본 구현체는 Fast DDS다. 안정적이지만 대용량 메시지에서 드롭이 관찰되면 Cyclone DDS로 바꾸는 것이 업계 관행이다. Nav2·MoveIt2 커뮤니티는 Cyclone을 선호한다.

bash
sudo apt install -y ros-jazzy-rmw-cyclonedds-cpp
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp

# 검증 / verify
ros2 doctor --report | grep -A2 "RMW MIDDLEWARE"

Cyclone DDS 튜닝 — 이미지/포인트클라우드용

UDP 버퍼를 키우고 fragmentation 임계를 조정하지 않으면 4K 이미지가 막 도착한다.

xmlcyclonedds.xml
<CycloneDDS xmlns="https://cdds.io/config">
  <Domain Id="any">
    <General>
      <!-- 공유 메모리 전송 활성 (같은 호스트 intra-process용) -->
      <Interfaces><NetworkInterface autodetermine="true"/></Interfaces>
      <AllowMulticast>true</AllowMulticast>
      <MaxMessageSize>65500B</MaxMessageSize>
    </General>
    <Internal>
      <SocketReceiveBufferSize min="10MB"/>
      <SocketSendBufferSize min="10MB"/>
    </Internal>
  </Domain>
</CycloneDDS>
bash
export CYCLONEDDS_URI=file:///$HOME/cyclonedds.xml

커널 UDP 버퍼

bash/etc/sysctl.d/60-ros2.conf
net.core.rmem_max=134217728       # 128MB
net.core.wmem_max=134217728
net.ipv4.udp_mem="102400 873800 16777216"
# 적용 / apply
sudo sysctl --system

프로파일링 도구

  • ros2 topic hz / bw / delay — 주기, 대역폭, 지연 측정.
  • ros2 trace + ros2_tracing — LTTng 기반 커널/유저 공간 트레이싱, 콜백 시간 측정.
  • top, htop, perf — CPU 스파이크 위치 찾기.
  • Foxglove Studio — 시각화 & 성능 디버깅.
TIP · 제로카피를 끝까지 얻는 법

Loaned messages(C++): 같은 노드가 pub/sub이면 publisher->borrow_loaned_message()로 중간 버퍼를 건너뛴다. Composition + Intra-process + Cyclone + SHM까지 조합하면 4K 이미지 60Hz도 여유 있게 흐른다. 최적화 전후 ros2 topic delay로 수치를 반드시 남길 것.

CONCEPT · 실시간(RT) 노드

산업용 안전 루프(1kHz 피드백 제어)는 일반 Linux로 안 된다. PREEMPT_RT 커널로 바꾸고, 노드에 sched_setscheduler(SCHED_FIFO)로 우선순위 부여. C++ 전용 영역이며, Python은 이 영역에 들어가지 않는다. 일반 애플리케이션에선 10~100Hz 루프면 충분하니 RT 커널은 "정말 필요할 때만".

PART 05 · TARGET DOMAIN

타겟 — 포인트클라우드 · 딥러닝 · 로봇팔

"여기부터는 진짜 현장이다.
센서에서 추론까지 한 줄로 꿰는 법."
CH.15

MoveIt2 — 로봇팔 모션 계획— FK/IK · 경로 계획 · 충돌 회피.

MoveIt2는 로봇팔의 "어떻게 움직일까"를 담당하는 프레임워크다. 목표 자세(pose) 또는 조인트 목표를 주면, IK를 풀어 조인트 경로를 만들고, OMPL 플래너로 충돌 없는 궤적을 생성해, 컨트롤러(ros2_control)로 내려보낸다.

▸ MOVEIT2 PIPELINE
Goal pose or joint IK Solver KDL · TRAC-IK Planner OMPL · Pilz · CHOMP Time Param. TOTG FollowJointTrajectory Action → ros2_control → hardware / sim

설치 & MoveIt Setup Assistant

bash
sudo apt install -y ros-jazzy-moveit ros-jazzy-moveit-py

# Setup Assistant — URDF를 먹여 move_group 설정 패키지 생성
ros2 launch moveit_setup_assistant setup_assistant.launch.py

Setup Assistant는 URDF를 읽어 SRDF(Semantic Robot Description)를 만든다. 관절 그룹, 자기 충돌 매트릭스, 기본 자세(home/ready), 엔드이펙터 링크 등을 선언하는 파일이다. 출력물은 하나의 "moveit_config" 패키지 — 이걸 본인 프로젝트에서 include해 쓴다.

Python으로 모션 계획 — moveit_py

pythonmove_arm.py
"""
MoveIt2 Python API로 팔 움직이기 / Move arm with moveit_py
"""
import rclpy
from moveit.planning import MoveItPy
from geometry_msgs.msg import PoseStamped


def main():
    rclpy.init()
    moveit = MoveItPy(node_name="moveit_py_demo")
    arm = moveit.get_planning_component("manipulator")

    # 목표 pose / target pose
    target = PoseStamped()
    target.header.frame_id = "base_link"
    target.pose.position.x = 0.45
    target.pose.position.y = 0.0
    target.pose.position.z = 0.35
    target.pose.orientation.w = 1.0

    arm.set_start_state_to_current_state()
    arm.set_goal_state(pose_stamped_msg=target,
                       pose_link="tool0")

    # 계획 / plan
    plan_result = arm.plan()
    if plan_result:
        moveit.execute(plan_result.trajectory, controllers=[])
    else:
        print("planning failed")

    rclpy.shutdown()
PITFALL · IK가 왜 실패하나
  • 도달 불가능: workspace 밖이거나 관절 한계 넘음. tf2_echo로 현재 tool0 위치 확인.
  • 기본 KDL은 특이점 근처에서 약함 — TRAC-IK로 바꾸면 훨씬 안정적.
  • end-effector 프레임 잘못 지정: SRDF의 end_effector_link가 실제 잡고 싶은 링크인지 확인.
  • Planning frame 불일치: goal의 frame_id와 MoveIt의 planning_frame이 다르면 TF 변환에 실패해 silent하게 깨진다.

ros2_control — 하드웨어 인터페이스

MoveIt이 궤적을 만들어도 이를 실제 모터로 내려보낼 장치가 필요하다. 그것이 ros2_control이다. 추상 계층이 잘 되어 있어, 제조사 드라이버(UR의 ur_robot_driver, Franka의 franka_ros2)를 쓰거나 직접 hardware_interface::SystemInterface를 구현할 수 있다.

CONCEPT · 컨트롤러 종류
  • joint_trajectory_controller: MoveIt이 보내는 궤적을 따라 주는 기본 컨트롤러.
  • joint_state_broadcaster: 현재 관절 상태를 /joint_states로 내보냄.
  • forward_position_controller: "이 위치로 가라" 저수준 명령용.
  • velocity_controllers, effort_controllers: 속도·토크 제어.

산업 로봇은 보통 joint_trajectory_controller + joint_state_broadcaster 조합.

LAB 15 · 실습

가상 6DOF 팔로 첫 모션

  1. UR5e 또는 Panda 데모 패키지 설치 (sudo apt install ros-jazzy-ur ros-jazzy-franka-ros2 등 — 배포 패키지 가용성에 따라 선택).
  2. 해당 로봇의 moveit_config 데모 런치를 fake hardware로 띄운다.
  3. RViz2의 MotionPlanning 패널에서 마우스 드래그로 목표를 잡고 "Plan & Execute" 동작 확인.
  4. move_arm.py를 본인의 그룹명/링크명으로 수정해 같은 동작을 코드로 재현.
  5. 일부러 도달 불가 좌표를 줘서 실패 메시지도 직접 본다 — 실패 케이스를 만나본 사람이 잘 짠다.
CH.16

PointCloud2 처리— 센서 메시지에서 Open3D · PyTorch 텐서까지.

sensor_msgs/PointCloud2 의 구조

ROS2에서 3D 점군은 sensor_msgs/PointCloud2로 전송된다. 임의 필드를 선언할 수 있는 유연한 바이너리 포맷이고, NumPy view로 바로 얹어 제로카피처럼 읽는 것이 핵심이다.

pythonPointCloud2 필드
# PointCloud2 주요 필드 / Key fields
# header           : std_msgs/Header (frame_id, stamp)
# height, width    : 구조화 여부 (organized면 h>1)
# fields[]         : name, offset, datatype, count
# is_bigendian     : 바이트 순서
# point_step       : 한 점의 바이트 크기
# row_step         : 한 행의 바이트 크기
# data[]           : 실제 바이너리 (bytes)
# is_dense         : NaN 제거 여부

PointCloud2 ↔ NumPy 변환

pythonpc2_io.py
import numpy as np
import sensor_msgs_py.point_cloud2 as pc2
from sensor_msgs.msg import PointCloud2, PointField
from std_msgs.msg import Header


def pc2_to_xyz(msg: PointCloud2) -> np.ndarray:
    """PointCloud2 → (N, 3) float32 numpy array."""
    pts = np.array(list(pc2.read_points(
        msg,
        field_names=("x", "y", "z"),
        skip_nans=True,
    )), dtype=np.float32)
    return pts.reshape(-1, 3) if pts.size else np.empty((0, 3), np.float32)


def xyz_to_pc2(points: np.ndarray, frame_id: str, stamp) -> PointCloud2:
    """(N, 3) → PointCloud2."""
    header = Header()
    header.stamp = stamp
    header.frame_id = frame_id
    fields = [
        PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1),
        PointField(name='y', offset=4, datatype=PointField.FLOAT32, count=1),
        PointField(name='z', offset=8, datatype=PointField.FLOAT32, count=1),
    ]
    return pc2.create_cloud(header, fields, points)
TIP · 대용량 점군의 진짜 빠른 변환

read_points는 Python 이터레이터라서 수십만 점에선 느리다. 실전에서는 msg.datanp.frombuffer로 직접 읽고 dtype을 필드 레이아웃에 맞춰 캐스팅하는 편이 10배 이상 빠르다.

pythonfast_pc2_parse.py
import numpy as np


def pc2_to_xyz_fast(msg) -> np.ndarray:
    """
    빠른 경로 / fast path — point_step이 정확히 12B(XYZ float32)인 일반 케이스.
    intensity 등이 섞이면 structured dtype 경로를 쓴다.
    """
    if msg.point_step == 12 and not msg.is_bigendian:
        return np.frombuffer(msg.data, dtype=np.float32).reshape(-1, 3).copy()

    field_to_dtype = {
        1: np.int8, 2: np.uint8, 3: np.int16, 4: np.uint16,
        5: np.int32, 6: np.uint32, 7: np.float32, 8: np.float64,
    }
    dtype_list, cursor = [], 0
    for f in msg.fields:
        if f.offset > cursor:
            dtype_list.append((f"_pad{cursor}", f"V{f.offset - cursor}"))
            cursor = f.offset
        np_dt = field_to_dtype[f.datatype]
        dtype_list.append((f.name, np_dt))
        cursor += np.dtype(np_dt).itemsize
    if msg.point_step > cursor:
        dtype_list.append(("_tail", f"V{msg.point_step - cursor}"))

    s = np.frombuffer(msg.data, dtype=np.dtype(dtype_list))
    xyz = np.stack([s['x'], s['y'], s['z']], axis=-1)
    return xyz.astype(np.float32)

Open3D로 전처리 — 다운샘플 · 노이즈 · 노멀

pythonpc_preprocess.py
import numpy as np
import open3d as o3d


def preprocess(xyz: np.ndarray, voxel_size: float = 0.005):
    """다운샘플 · outlier 제거 · 노멀 추정 / downsample, denoise, normals."""
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(xyz.astype(np.float64))

    pcd = pcd.voxel_down_sample(voxel_size=voxel_size)
    pcd, _ = pcd.remove_statistical_outlier(nb_neighbors=20, std_ratio=2.0)
    pcd.estimate_normals(
        search_param=o3d.geometry.KDTreeSearchParamHybrid(
            radius=voxel_size * 3, max_nn=30))
    pcd.orient_normals_towards_camera_location(np.array([0, 0, 0]))
    return pcd

ROS2 노드로 감싸기

pythonpc_processor_node.py
import numpy as np
import rclpy
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data
from sensor_msgs.msg import PointCloud2


class PCProcessor(Node):
    def __init__(self):
        super().__init__("pc_processor")
        self.declare_parameter("voxel_size", 0.005)

        self.sub = self.create_subscription(
            PointCloud2, "/lvs/points",
            self.on_cloud, qos_profile_sensor_data)
        self.pub = self.create_publisher(
            PointCloud2, "/lvs/points_filtered", 5)

    def on_cloud(self, msg: PointCloud2):
        xyz = pc2_to_xyz_fast(msg)
        if xyz.shape[0] < 100:
            return

        vs = self.get_parameter("voxel_size").value
        pcd = preprocess(xyz, voxel_size=vs)
        out_xyz = np.asarray(pcd.points, dtype=np.float32)

        out_msg = xyz_to_pc2(out_xyz,
                             frame_id=msg.header.frame_id,
                             stamp=msg.header.stamp)
        self.pub.publish(out_msg)
        self.get_logger().debug(f"{xyz.shape[0]} → {out_xyz.shape[0]} pts")
CONCEPT · scanCONTROL · LVS 계열

Micro-Epsilon scanCONTROL 같은 Laser Line 센서는 한 프레임당 일렬의 프로파일(1D 곡선)을 뱉는다. 축 방향으로 이송시키며 시간축으로 누적하면 3D가 된다. 드라이버가 PointCloud2로 전해 준다면 위 파이프라인에 그대로 얹힌다 — frame_id를 sensor optical frame으로 정확히 표기하는 것만 잊지 말 것. 그래야 다음 단계의 TF 변환이 정확해진다.

LAB 16 · 실습

점군 라이브 시각화 파이프라인

  1. RGB-D 카메라(또는 Gazebo의 RGBD sensor) → /camera/depth/points 토픽 확인.
  2. pc_processor 노드로 voxel 0.01m 다운샘플 후 /points_filtered 발행.
  3. RViz2에 두 토픽을 동시에 띄워 색을 다르게 — 점 수 차이를 눈으로 확인.
  4. ros2 topic bw로 대역폭이 얼마나 줄었는지 비교.
CH.17

딥러닝 모델 통합— PyTorch / ONNX를 ROS2 노드에 얹는 법.

ROS2 노드는 결국 Python 프로세스다. PyTorch/ONNXRuntime을 그대로 __init__에서 로드해서 콜백에서 추론하면 끝. 단, 그 단순함 뒤에 배치·비동기·GPU 메모리 관리라는 실전 함정이 있다.

PTv3 · Pointcept 계열 추론 인터페이스

3D 포인트 기반 시맨틱 세그멘테이션 모델(예: Point Transformer v3)은 대체로 (N, C) 형태 텐서를 입력으로 받는다 — C는 최소 3(XYZ), 옵션으로 intensity/normal. ROS2에서 필요한 건: PointCloud2 → 텐서 → 추론 → 점별 라벨 → PointCloud2(라벨 필드 추가).

pythonptv3_infer_node.py
"""
PTv3 추론 노드 스케치 / PTv3 inference node sketch
실제 Pointcept API에 맞게 import / input dict 구성을 조정해야 한다.
"""
import numpy as np
import torch
import rclpy
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data
from rclpy.callback_groups import ReentrantCallbackGroup
from sensor_msgs.msg import PointCloud2, PointField
from std_msgs.msg import Header
import threading


class SegInferenceNode(Node):
    def __init__(self):
        super().__init__("seg_infer")
        self.declare_parameter("model_ckpt", "")
        self.declare_parameter("device", "cuda:0")

        self.device = torch.device(self.get_parameter("device").value)
        self.model = self._load_model(self.get_parameter("model_ckpt").value)
        self.model.eval()

        # 백프레셔 해결: 최신 프레임만 보관
        # backpressure: keep only latest frame
        self._latest = None
        self._lock = threading.Lock()
        cb = ReentrantCallbackGroup()

        self.sub = self.create_subscription(
            PointCloud2, "/lvs/points_filtered",
            self.on_cloud, qos_profile_sensor_data,
            callback_group=cb)
        self.pub = self.create_publisher(
            PointCloud2, "/lvs/points_labeled", 5)

        # 추론 루프 (sub와 별도 콜백) / inference loop
        self.create_timer(0.0, self._infer_loop, callback_group=cb)

    def _load_model(self, ckpt_path: str):
        # Pointcept의 build_model(cfg) 패턴을 그대로 사용
        # 여기서는 의사 코드 / pseudo
        ...

    def on_cloud(self, msg: PointCloud2):
        with self._lock:
            self._latest = msg     # 최신만 / overwrite

    @torch.inference_mode()
    def _infer_loop(self):
        with self._lock:
            msg = self._latest
            self._latest = None
        if msg is None:
            return

        xyz = pc2_to_xyz_fast(msg)
        if xyz.shape[0] < 1024:
            return

        # PTv3 입력은 보통 dict: coord, feat, offset
        coord = torch.from_numpy(xyz).to(self.device)
        feat = coord.clone()
        offset = torch.tensor([coord.shape[0]], device=self.device)

        out = self.model(dict(coord=coord, feat=feat, offset=offset))
        logits = out["seg_logits"] if isinstance(out, dict) else out
        labels = logits.argmax(dim=-1).to(torch.uint8).cpu().numpy()

        # 라벨을 추가 필드로 담은 PointCloud2 / labeled cloud
        labeled = np.empty(xyz.shape[0], dtype=np.dtype([
            ('x', 'f4'), ('y', 'f4'), ('z', 'f4'), ('label', 'u1')
        ]))
        labeled['x'] = xyz[:, 0]
        labeled['y'] = xyz[:, 1]
        labeled['z'] = xyz[:, 2]
        labeled['label'] = labels

        out_msg = PointCloud2()
        out_msg.header = Header(stamp=msg.header.stamp,
                                frame_id=msg.header.frame_id)
        out_msg.height, out_msg.width = 1, labeled.shape[0]
        out_msg.fields = [
            PointField(name='x',     offset=0,  datatype=PointField.FLOAT32, count=1),
            PointField(name='y',     offset=4,  datatype=PointField.FLOAT32, count=1),
            PointField(name='z',     offset=8,  datatype=PointField.FLOAT32, count=1),
            PointField(name='label', offset=12, datatype=PointField.UINT8,   count=1),
        ]
        out_msg.is_bigendian = False
        out_msg.point_step = 13
        out_msg.row_step = 13 * labeled.shape[0]
        out_msg.is_dense = True
        out_msg.data = labeled.tobytes()
        self.pub.publish(out_msg)


def main():
    rclpy.init()
    node = SegInferenceNode()
    from rclpy.executors import MultiThreadedExecutor
    ex = MultiThreadedExecutor(num_threads=2)
    ex.add_node(node)
    try:
        ex.spin()
    finally:
        ex.shutdown()
PITFALL · 추론이 모든 것을 막는다

on_cloud 콜백에서 200ms짜리 추론을 돌리면 20Hz 센서 스트림은 즉시 무너진다. 해결책 세 가지:

  • MultiThreadedExecutor + Reentrant 그룹으로 수신과 추론을 분리.
  • 수신 콜백은 최신 프레임만 보관하는 버퍼에 쓰고, 별도 타이머/스레드가 추론(백프레셔 해결) — 위 코드의 패턴.
  • 본격적으로는 별도 프로세스 + shared memory로 격리 (Triton, TensorRT 서버).

TensorRT / ONNX 배포

엣지 PC에서 실시간을 확보하려면 PyTorch로 그대로 돌리지 않는다. ONNX export → TensorRT 최적화 → ROS2 노드에선 trt 엔진만 로드. PTv3는 sparse conv·kNN 같은 커스텀 CUDA 연산이 있어 변환이 까다로우니, Pointcept 커뮤니티의 export 스크립트를 먼저 확인하자.

bash
# ONNX export (예시 / example)
python export_ptv3.py --ckpt best.pth --out ptv3.onnx --opset 17

# TensorRT 빌드 — dynamic shape 프로파일
trtexec --onnx=ptv3.onnx --fp16 --saveEngine=ptv3.engine \
        --minShapes=coord:1024x3 \
        --optShapes=coord:65536x3 \
        --maxShapes=coord:262144x3
TIP · 모델 자산은 ROS 패키지 외부로

100MB 넘는 ckpt는 colcon 워크스페이스에 넣지 말 것 — 빌드 시간이 폭발한다. /opt/models 같은 별도 경로에 두고, 노드는 파라미터로 절대 경로를 받도록 설계. 조직 내 공유는 S3·MinIO·MLflow Model Registry로 관리.

CONCEPT · 데이터 → 모델 → 배포 루프

rosbag2로 현장 데이터 수집 → 오프라인 라벨링 → 학습(MLflow 추적) → ONNX/TRT export → 모델 레지스트리 등록 → 엣지 배포의 폐루프를 갖추는 것이 ROS2 + 딥러닝의 진짜 그림이다. Kubeflow가 학습 쪽을, ROS2 노드가 추론 쪽을 맡고, 가운데를 모델 레지스트리가 잇는다.

CH.18

End-to-End 파이프라인— 센서 → 세그멘테이션 → 파지 → 팔 실행.

이 장은 앞의 모든 내용을 하나의 흐름으로 꿰는 캡스톤이다. 최종 목표 — "포인트클라우드에서 딥러닝으로 대상을 식별 → 3D 좌표 산출 → MoveIt2로 팔을 보냄".

▸ FULL PIPELINE — sensor to motion
LVS / RGB-D /lvs/points Preprocess Open3D · voxel · TF PTv3 Segm. /points_labeled Grasp Proposer GPD · 기하 기반 MoveIt2 Plan pose → trajectory ros2_control — FollowJointTrajectory Action robot driver (UR · Franka · Doosan · custom) TF2 트리 — world → base_link → … → tool0 → camera_optical rosbag2 로 전 구간 녹화 → 재현 · 학습데이터화

실행 순서

  1. 부팅: bringup.launch.py로 robot_state_publisher · ros2_control · 센서 드라이버 · 카메라 TF를 한 번에 띄움.
  2. 인지: LVS/RGB-D → pc_processorseg_infer 노드 체인. 결과 /points_labeled.
  3. 파지 제안: 라벨된 점군에서 대상 클래스만 필터 → GPD/ContactGraspNet 또는 직접 구현한 PCA 기반 제안기로 grasp pose 산출.
  4. 좌표 변환: camera_optical 프레임의 pose를 base_link로 TF 변환.
  5. 계획 & 실행: MoveIt2에 pose 전달 → plan → execute. 접근(pre-grasp) → 그립 → 후퇴(post-grasp) 3단계.
  6. 관찰: rosbag2로 전 구간 녹화 (디버깅 + 학습데이터 재수집 동시 효과).

파지 제안자 — 미니멀 예시

GPD 같은 무거운 라이브러리를 붙이기 전에, 간단한 기하 기반 제안자로 파이프라인 전체를 굴려 보는 게 정답이다. 전체 파이프라인이 한번 돌아가야 어디가 약점인지 보인다.

pythonsimple_grasp_proposer.py
"""
간이 파지 제안 / Minimal grasp proposer
- 대상 클래스 포인트만 추출 → 주성분 분석으로 방향, 센트로이드로 위치 결정.
"""
import numpy as np
from typing import Optional
from geometry_msgs.msg import PoseStamped
from tf_transformations import quaternion_from_matrix


TARGET_CLASS = 1   # 예: 용접 비드 / weld bead


def propose_grasp(xyz: np.ndarray, labels: np.ndarray,
                  frame_id: str, stamp) -> Optional[PoseStamped]:
    mask = labels == TARGET_CLASS
    if mask.sum() < 50:
        return None

    pts = xyz[mask]
    centroid = pts.mean(axis=0)

    # PCA — 점군의 주축 / principal axes
    centered = pts - centroid
    cov = np.cov(centered.T)
    eigvals, eigvecs = np.linalg.eigh(cov)   # ascending order
    # 가장 큰 축 = 대상의 길이 방향 / longest axis
    x_axis = eigvecs[:, 2]
    z_axis = eigvecs[:, 0]   # 가장 작은 분산 = 표면 법선 추정 / surface normal proxy
    # 직교화 / orthogonalize
    y_axis = np.cross(z_axis, x_axis)
    y_axis /= np.linalg.norm(y_axis) + 1e-9
    x_axis = np.cross(y_axis, z_axis)

    # 4x4 변환 / homogeneous transform
    R = np.eye(4)
    R[:3, 0] = x_axis
    R[:3, 1] = y_axis
    R[:3, 2] = z_axis
    qx, qy, qz, qw = quaternion_from_matrix(R)

    pose = PoseStamped()
    pose.header.frame_id = frame_id
    pose.header.stamp = stamp
    pose.pose.position.x = float(centroid[0])
    pose.pose.position.y = float(centroid[1])
    pose.pose.position.z = float(centroid[2])
    pose.pose.orientation.x = qx
    pose.pose.orientation.y = qy
    pose.pose.orientation.z = qz
    pose.pose.orientation.w = qw
    return pose

오케스트레이터 노드 — 클라이언트들의 지휘자

최상위에 작은 상태 기계 노드 하나를 두고, 라벨된 점군을 받으면 → grasp proposer 호출 → TF 변환 → MoveIt2 Action 호출 순으로 진행한다. 이 부분은 본인 도메인(파지/용접/검사)에 따라 가장 다채로워지는 부분이다.

pythonorchestrator_node.py — 골격
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from rclpy.duration import Duration
from sensor_msgs.msg import PointCloud2
from geometry_msgs.msg import PoseStamped
from tf2_ros import Buffer, TransformListener
from tf2_geometry_msgs import do_transform_pose_stamped
from control_msgs.action import FollowJointTrajectory   # 또는 MoveGroup


class Orchestrator(Node):
    def __init__(self):
        super().__init__("orchestrator")
        self.buffer = Buffer()
        self.tf_listener = TransformListener(self.buffer, self)
        self.busy = False

        self.sub = self.create_subscription(
            PointCloud2, "/lvs/points_labeled",
            self.on_labeled, 5)

        # MoveIt 액션 (실제로는 move_action 또는 moveit_py를 권장)
        self.move_client = ActionClient(self,
            FollowJointTrajectory, "/joint_trajectory_controller/follow_joint_trajectory")

    def on_labeled(self, msg: PointCloud2):
        if self.busy:
            return
        xyz, labels = parse_labeled(msg)             # CH.16/17 헬퍼
        pose_cam = propose_grasp(xyz, labels,
                                 msg.header.frame_id,
                                 msg.header.stamp)
        if pose_cam is None:
            return

        # camera_optical → base_link 변환 / TF transform
        try:
            tf = self.buffer.lookup_transform(
                "base_link", pose_cam.header.frame_id,
                pose_cam.header.stamp,
                timeout=Duration(seconds=0.1))
        except Exception as e:
            self.get_logger().warn(f"tf failed: {e}")
            return
        pose_base = do_transform_pose_stamped(pose_cam, tf)

        # 실제로는 moveit_py로 plan & execute 하는 것이 현실적
        # In practice, use moveit_py to plan & execute
        self.busy = True
        self._send_to_moveit(pose_base)

    def _send_to_moveit(self, pose: PoseStamped):
        # 의사 코드 — moveit_py 인스턴스에 위임
        # Pseudo: delegate to moveit_py instance from CH.15
        ...
        self.busy = False
PITFALL · 시간 동기화

lookup_transform의 시간 인자에 msg.header.stamp를 그대로 넣었는데 "extrapolation into the past" 에러가 난다면, TF 버퍼의 보관 시간(기본 10초)을 넘어선 케이스다. 추론이 오래 걸려서 한 두 초 지난 stamp를 묻고 있을 수 있다 — "가능한 최신"이 더 자연스러운 경우는 rclpy.time.Time() + 짧은 timeout으로. 반대로 정확한 시점 변환이 꼭 필요하면 TF 버퍼 길이를 늘리고 추론 결과에 입력 점군의 stamp를 그대로 전파하라.

LAB 18 · 캡스톤 실습

"보이는 것을 잡는다" 미니 프로젝트

  1. Gazebo로 단순 박스 여러 개를 테이블에 올린 환경 구축. RGB-D + 6DOF 팔 모델 사용.
  2. pc_processor + 라벨러(처음에는 색상 또는 평면 기반의 가짜 분류기) → /points_labeled 발행.
  3. simple_grasp_proposer로 grasp pose 산출 → RViz Marker로 시각화.
  4. orchestrator가 TF로 base_link 좌표 변환 후 MoveIt2 plan & execute. 접근→그립→후퇴 3단 동작 구현.
  5. 전 구간을 rosbag2 (mcap)로 녹화. 실패 케이스 5건을 재생하며 어디서 깨졌는지 분석.
  6. 가짜 분류기를 PTv3 추론 노드로 교체. 동일한 파이프라인이 그대로 동작하는지 확인 — "인터페이스 분리"의 가치를 체험.

완성 시 얻게 되는 것: 본업의 용접 검사 파이프라인을 ROS2 위에 얹는 데 필요한 거의 모든 패턴을 손끝에 갖게 된다. 남은 것은 도메인별 모델 학습과 안전 인터록뿐.

APPENDIX

치트시트 · 디버깅 · 다음 단계

"한 번 본 명령어는 결국 다시 찾는다.
여기 모아 둔다."
A.1

CLI 치트시트— 매일 손에 닿는 명령들.

목적명령
노드 목록ros2 node list
노드 상세ros2 node info /your_node
토픽 목록 / 타입 / QoSros2 topic list -v · ros2 topic info -v /scan
토픽 내용ros2 topic echo /scan
토픽 주기/대역폭/지연ros2 topic hz /scan · bw · delay
서비스 호출ros2 service call /set_mode my_pkg/srv/SetMode "{mode: 'idle'}"
액션 호출ros2 action send_goal /scan my_pkg/action/Scan "{num_samples: 100}"
파라미터ros2 param list · get · set · dump · load
인터페이스 정의 보기ros2 interface show sensor_msgs/msg/PointCloud2
TF 트리 PDF 덤프ros2 run tf2_tools view_frames
특정 변환 단발 확인ros2 run tf2_ros tf2_echo base_link tool0
환경 진단ros2 doctor --report
패키지 경로ros2 pkg prefix my_pkg
로그 레벨--ros-args --log-level my_node:=debug
A.2

디버깅 플레이북— 증상별 1차 점검 리스트.

SYMPTOM · "토픽이 안 들어온다"
  1. ros2 topic list에 보이나? 안 보이면 노드가 죽은 것이거나 namespace 문제.
  2. ros2 topic info -vQoS 매칭 확인 (Reliability/Durability).
  3. ROS_DOMAIN_ID가 동일한가? 다른 셸에서 환경변수가 다를 수 있음.
  4. 같은 RMW 구현체를 쓰는가? (Fast DDS ↔ Cyclone은 보통 잘 되지만 일부 시나리오 문제).
  5. 방화벽/네트워크 인터페이스 (특히 도커·Nebula·k8s pod 네트워크) — 멀티캐스트가 막혔을 수 있음.
SYMPTOM · "TF extrapolation into the future / past"
  1. 시뮬레이터를 쓴다면 모든 노드use_sim_time:=true인지 확인.
  2. TF 발행 주기가 너무 낮지 않은지 (관절 1Hz는 부족, 최소 30Hz 권장).
  3. lookup_transform에 정확한 stamp 대신 Time()(=최신)을 쓸 수 있는 케이스인지 재검토.
  4. TF buffer 길이(cache_time) 늘리기 — 기본 10s.
SYMPTOM · "ros2 run에서 executable not found"
  1. setup.pyconsole_scripts에 등록했나?
  2. 빌드 후 install/setup.bash를 source 했나? 새 터미널이라면 다시.
  3. 패키지명과 entry name이 정확한가? ros2 pkg executables <pkg>로 확인.
  4. --symlink-install 없이 빌드한 후 .py를 수정했다면 재빌드 필요.
A.3

다음 단계— 이 가이드를 마친 후의 깊이.

  • Nav2 — 모바일 로봇이 필요해지면. SLAM · 비용맵 · BehaviorTree 기반 행동 제어.
  • BehaviorTree.CPP / py_trees_ros — 복잡한 시퀀싱·재시도·페일오버 로직.
  • SROS2 + DDS-Security — 인증·암호화. 현장 배포 시 필요.
  • Foxglove Studio + WebSocket Bridge — 원격 모니터링·디버깅.
  • Isaac Sim · Isaac Lab — 강화학습 sim-to-real, 합성 데이터 생성.
  • ros2_control 커스텀 하드웨어 인터페이스 — 제조사 드라이버가 없는 장비를 직접 붙일 때.
  • micro-ROS — 마이크로컨트롤러(STM32, ESP32, Teensy) 위에 ROS2 노드.
  • 실시간 배포 — PREEMPT_RT 커널 + 메모리 락 + RT 스케줄링.

"ROS2를 쓴다는 것은 결국 — 좌표계와 시간을 정확히 다루고, 메시지 인터페이스를 깨끗이 설계하고, 시스템을 노드라는 작은 책임으로 분해하는 일이다. 라이브러리는 도구일 뿐이다."