C++·Python·Linux·네트워크를 이미 다루는 개발자라면, ROS2의 진짜 진입장벽은
"언어"가 아니라 분산 통신 모델과 좌표계의 감각입니다.
이 문서는 그 두 축을 중심으로, 설치에서 시작해 MoveIt2·PointCloud2·
온디바이스 딥러닝 추론까지 실제로 굴려 볼 수 있는 실습 경로를 한 번에 관통합니다.
이 문서의 기준 배포판은 ROS 2 Jazzy Jalisco(Ubuntu 24.04 LTS 기준)이며,
일부 장기지원이 필요한 경우 Humble Hawksbill(22.04)도 병기합니다.
PART 01 · FOUNDATIONS
기반 — 세 가지 감각을 먼저 잡는다
"ROS2는 라이브러리가 아니다. 프로세스 간 통신 프로토콜과 빌드 시스템을 포함한 런타임이다."
CH.01
ROS2 란 무엇인가— ROS1을 알 필요는 없다. 단, "왜 바꿨는지"는 알아야 한다.
ROS2의 본질은 분산 컴퓨팅 미들웨어다.
여러 프로세스(노드)가 이름으로 서로를 찾아 메시지를 주고받는 PubSub 시스템이고,
그 아래에 DDS(Data Distribution Service)라는 산업 표준 통신 계층이 있다.
ROS1 vs ROS2 — 결정적인 차이
관점
ROS1
ROS2
마스터 노드
roscore 필수 (단일장애점)
없음 — DDS가 peer-to-peer 발견
통신 계층
TCPROS 자체 구현
DDS (OMG 표준) — RTPS over UDP
실시간성
사실상 불가
가능 (executor / DDS QoS)
OS 지원
Linux 중심
Linux · Windows · macOS · RTOS
QoS
빈약
Reliability · Durability · History 등 세분화
보안
거의 없음
SROS2 (DDS-Security)
멀티로봇
별도 구성 필요
도메인 ID로 기본 지원
CONCEPT · DDS를 한 문장으로
DDS는 주소가 아니라 "토픽 이름"으로 구독하는 UDP 멀티캐스트 기반 실시간 데이터버스다.
같은 네트워크의 모든 DDS 참여자가 SPDP(Simple Participant Discovery Protocol)로 서로를 발견하고,
서로가 발행하는 토픽 목록과 구독하는 토픽 목록을 교환한 뒤, 일치하는 쌍끼리만 데이터 전송을 시작한다.
그래서 "서버"가 필요 없다.
ROS2 스택의 전체 그림
▸ ROS2 LAYER STACK
기억할 점은 단 하나 — 상단의 내 코드가 어떤 식이든 결국 rmw → DDS로 내려간다는 것.
성능·네트워크 문제의 90%는 이 아래 두 계층에서 발생한다.
TIP · 배포판 선택 기준
장기 프로젝트라면 Humble(2022년 5월 릴리스, 5년 LTS — 2027년 5월까지) 또는
Jazzy(2024년 5월 릴리스, LTS). 실험·최신 기능은 Rolling을 쓰되,
현업 장비에 Rolling을 절대 올리지 말 것. 본 가이드는 Jazzy 기준으로 작성하되
문법이 달라지는 부분만 Humble을 병기한다.
CH.02
환경 구축 & 워크스페이스— colcon 감각이 생기면 반은 끝났다.
1) 설치 — Ubuntu 24.04 기준
우분투 24.04 LTS 위에 Jazzy를 올린다. 핵심 단계만 추려서 보여주면 다음과 같다.
(자세한 키 등록은 공식 문서 참고 — 주기적으로 서명 키가 갱신된다.)
서명 키 오류: Ubuntu 24.04부터 apt 키가 .gpg 바이너리여야 한다. 구글에서 나오는 구 버전 커맨드는 apt-key를 쓰는데 deprecated다.
locale이 C나 POSIX이면 TF2 메시지에서 이상한 floating point 파싱 오류가 난다.
여러 배포판을 설치해 두면 /opt/ros/humble과 /opt/ros/jazzy를 동시에 source하지 말 것. 둘을 한 셸에 올리면 CMake find_package가 충돌한다.
2) 워크스페이스 구조
ROS2 개발의 기본 단위는 워크스페이스다.
src/에 소스를, 빌드 결과는 build/ install/ log/에 격리된다.
shelltree ~/ros2_ws
~/ros2_ws
├── src/ # 소스 패키지가 들어가는 곳 / user packages
│ └── my_first_pkg/
│ ├── package.xml # 메타데이터 & 의존성 / metadata
│ ├── setup.py # Python 빌드 (ament_python)
│ ├── resource/
│ └── my_first_pkg/
│ └── node.py
├── build/ # colcon 빌드 아티팩트 (gitignore)
├── install/ # 설치된 결과물 (gitignore) — source 해야 하는 곳
└── log/ # 빌드 로그 (gitignore)
3) colcon — 빌드의 중심
ROS1의 catkin_make가 사라졌고, colcon이 유일한 빌드 도구다.
메타빌드 시스템이라, 내부적으로 각 패키지의 setup.py(Python)나 CMake(C++)를 호출한다.
bashcolcon 주요 명령
# 워크스페이스 전체 빌드 / build all packages
cd ~/ros2_ws
colcon build --symlink-install
# 특정 패키지만 / single package
colcon build --packages-select my_first_pkg
# 변경된 것만 / incremental
colcon build --packages-up-to my_first_pkg
# 의존성은 빌드하되 해당 패키지만 재빌드
colcon build --packages-select my_first_pkg --cmake-args -DCMAKE_BUILD_TYPE=Release
# 빌드 후에는 반드시 install/ 를 source
source install/setup.bash
# 테스트 / run tests
colcon test --packages-select my_first_pkg
colcon test-result --all --verbose
TIP · --symlink-install
이 옵션을 붙이면 Python 스크립트가 install/에 심볼릭 링크로 깔려서,
.py를 수정해도 재빌드 없이 즉시 반영된다. 개발 중엔 거의 항상 켜둘 것.
(단 package.xml, setup.py, launch/를 고치면 재빌드 필요.)
4) 패키지 생성
bashros2 pkg create
cd ~/ros2_ws/src
# Python 패키지 / Python package
ros2 pkg create --build-type ament_python \
--dependencies rclpy std_msgs \
--license MIT \
my_py_pkg
# C++ 패키지 / C++ package
ros2 pkg create --build-type ament_cmake \
--dependencies rclcpp std_msgs \
--license MIT \
my_cpp_pkg
LAB 02 · 실습
최소 워크스페이스 구성
~/ros2_ws를 만들고, src/hello_ros2 Python 패키지를 ros2 pkg create로 생성하라.
colcon build --symlink-install이 성공하는지 확인.
source install/setup.bash 후 ros2 pkg list | grep hello가 보이는지.
ros2 pkg prefix hello_ros2로 설치 경로를 확인하라.
검증 포인트: 새 터미널을 열어도 ros2 pkg list에 나오려면 .bashrc에 source ~/ros2_ws/install/setup.bash를 추가했거나 매번 source 해야 한다.
CH.03
통신 모델 아키텍처— 5가지 메커니즘의 "언제 무엇을 쓸지" 감각.
ROS2의 모든 상호작용은 다음 다섯 가지 원시 메커니즘 위에서 일어난다.
각각이 정확히 어떤 상황을 위한 것인지 구분할 수 있어야 한다.
▸ FIVE PRIMITIVES — MATRIX
언제 무엇을 쓸까 — 결정 가이드
시나리오
선택
이유
카메라 프레임 / IMU / LiDAR 스트림
Topic
지속 스트림, 다구독자, 비동기
"현재 상태를 주세요" 단발성 질의
Service
즉답 필요, 1-to-1
"좌표 (x,y)로 이동하라"
Action
수초~수분 걸림, 진행률 피드백, 취소 가능
PID 게인, 최대 속도, 모델 경로
Parameter
설정값, 동적 재구성
센서 데이터 덤프 · 재현
rosbag2
ML 학습 데이터셋 원천
CONCEPT · ROS_DOMAIN_ID
같은 네트워크에 여러 ROS2 시스템을 띄워야 할 때 — 예컨대 한 팀에서 여러 명이 개발할 때 —
export ROS_DOMAIN_ID=42처럼 0~101 범위의 정수로 격리할 수 있다.
DDS 레벨에서 UDP 포트가 계산식으로 결정되므로, 같은 ID끼리만 서로를 발견한다.
로봇 현장 배포 시 기본값 0을 그대로 쓰지 말 것 — 옆 팀과 섞인다.
LAB 03 · 실습
talker/listener로 감각 잡기
새 터미널 두 개를 열고 각각 실행:
terminal 1
ros2 run demo_nodes_cpp talker
terminal 2
ros2 run demo_nodes_py listener
또 다른 터미널에서 내부 구조를 들여다보기:
inspection
ros2 node list # 노드 리스트
ros2 topic list # 토픽 리스트
ros2 topic info /chatter # 토픽의 타입과 QoS 확인
ros2 topic echo /chatter # 실시간 수신
ros2 topic hz /chatter # 발행 주기 측정
ros2 topic bw /chatter # 대역폭 측정
# 그래프를 시각적으로 (별도 설치 필요: ros-jazzy-rqt-graph)
rqt_graph
확인: C++과 Python 노드가 아무 설정 없이 서로 통신하고 있다. 이게 DDS 발견의 힘이다.
PART 02 · CORE PRIMITIVES
핵심 — 다섯 메커니즘을 손끝에
"처음엔 Publisher·Subscriber만 써라. 나머지는 정말 필요할 때 꺼내 써도 늦지 않다."
cd ~/ros2_ws
colcon build --packages-select my_py_pkg --symlink-install
source install/setup.bash
ros2 run my_py_pkg hello
PITFALL · 로그가 안 보인다
print()는 안 쓰는 게 좋다. 반드시 self.get_logger().info()를 쓰자.
ROS2 로거는 severity(debug/info/warn/error/fatal)와 노드명을 자동으로 태깅해 /rosout 토픽으로도 뿌린다.
디버그 로그는 기본적으로 가려져 있으니 --ros-args --log-level my_node:=debug로 열어서 봐야 한다.
import rclpy
from rclpy.node import Node
from my_interfaces.srv import SetMode
class ModeServer(Node):
ALLOWED = {"idle", "inspect", "weld"}
def __init__(self):
super().__init__("mode_server")
self.mode = "idle"
self.srv = self.create_service(
SetMode, "set_mode", self.on_request)
def on_request(self, req: SetMode.Request,
res: SetMode.Response):
if req.mode not in self.ALLOWED:
res.success = False
res.message = f"unknown mode: {req.mode}"
return res
self.mode = req.mode
res.success = True
res.message = f"switched to {self.mode}"
self.get_logger().info(res.message)
return res
bashCLI로 서비스 호출
ros2 service list
ros2 service type /set_mode
ros2 service call /set_mode my_interfaces/srv/SetMode "{mode: 'weld'}"
PITFALL · 서비스 콜백에서 오래 블로킹하지 마라
기본 SingleThreadedExecutor에서는 서비스 콜백 안에서 다른 토픽·서비스·타이머가 멈춘다.
5초 넘는 작업이면 Service가 아니라 Action을 써야 하고,
어쩔 수 없다면 MultiThreadedExecutor로 콜백 그룹을 분리해야 한다 (CH.12).
Action — 장기 실행 · 피드백 · 취소
Action은 Goal → Feedback → Result 3단 구조다.
"경로를 따라가", "물체를 집어 올려", "스캔을 실행해"처럼 수초~수분 걸리는 작업에 쓴다.
내부적으로는 4개의 Service + 2개의 Topic으로 구성된 복합체다.
action 정의my_interfaces/action/Scan.action
# Goal — 클라이언트가 보내는 요청
uint32 num_samples # 스캔할 샘플 수
---
# Result — 완료 시 반환
uint32 total_captured
string bag_path
---
# Feedback — 진행 중 주기적으로 발행
uint32 current
float32 percentage
pythonscan_action_server.py
import time
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer, CancelResponse, GoalResponse
from my_interfaces.action import Scan
class ScanServer(Node):
def __init__(self):
super().__init__("scan_server")
self._server = ActionServer(
self, Scan, "scan",
execute_callback=self.execute,
goal_callback=self.accept_goal,
cancel_callback=self.accept_cancel,
)
def accept_goal(self, goal_request):
self.get_logger().info(
f"goal received: N={goal_request.num_samples}")
return GoalResponse.ACCEPT
def accept_cancel(self, goal_handle):
return CancelResponse.ACCEPT
def execute(self, goal_handle):
req: Scan.Goal = goal_handle.request
fb = Scan.Feedback()
for i in range(req.num_samples):
# 취소 체크 / cooperative cancellation
if goal_handle.is_cancel_requested:
goal_handle.canceled()
return Scan.Result()
# 실제 센서 샘플링 자리 / real acquisition here
time.sleep(0.05)
fb.current = i + 1
fb.percentage = (i + 1) / req.num_samples * 100.0
goal_handle.publish_feedback(fb)
goal_handle.succeed()
result = Scan.Result()
result.total_captured = req.num_samples
result.bag_path = "/tmp/scan_001.bag"
return result
CONCEPT · 언제 Service, 언제 Action?
1초 룰을 쓰자. 응답이 1초 안에 돌아올 거면 Service, 그 이상이면 Action.
추가로 중간에 취소하고 싶거나 진행률이 필요하면 무조건 Action이다.
로봇팔의 "이 좌표로 가라"는 Nav2/MoveIt2 모두 Action이다 — 실행 중 장애물 감지로 멈출 수 있어야 하니까.
CH.07
파라미터 · 런치— 설정과 오케스트레이션.
Parameter — 노드별 설정
Parameter는 노드에 속한 Key-Value 설정 저장소다.
런타임에 ros2 param set으로 바꿀 수 있으며,
콜백을 등록해 "변경되면 자동으로 반영"도 가능하다.
pythoncontroller_node.py
from rcl_interfaces.msg import ParameterDescriptor, FloatingPointRange
from rcl_interfaces.msg import SetParametersResult
import rclpy
from rclpy.node import Node
class Controller(Node):
def __init__(self):
super().__init__("controller")
# 파라미터 선언 (타입 · 기본값 · 범위 · 설명)
# Declare parameter with type, default, range, description
self.declare_parameter(
"max_speed", 1.5,
ParameterDescriptor(
description="로봇팔 엔드이펙터 최대 속도 m/s",
floating_point_range=[FloatingPointRange(
from_value=0.0, to_value=3.0, step=0.1)],
),
)
self.declare_parameter("gains.kp", 2.0)
self.declare_parameter("gains.ki", 0.1)
self.declare_parameter("gains.kd", 0.05)
# 런타임 변경 콜백 / runtime change callback
self.add_on_set_parameters_callback(self.on_param_change)
def on_param_change(self, params):
for p in params:
self.get_logger().info(f"{p.name} = {p.value}")
return SetParametersResult(successful=True)
bash파라미터 다루기
ros2 param list /controller
ros2 param get /controller max_speed
ros2 param set /controller max_speed 1.2
ros2 param dump /controller > controller_params.yaml
# YAML로 한꺼번에 로드 / load from yaml
ros2 run my_py_pkg controller \
--ros-args --params-file ./controller_params.yaml
Launch — 여러 노드 오케스트레이션
로봇 시스템은 단일 노드로 끝나는 경우가 거의 없다.
Launch 시스템이 그 "전체 시스템을 한 번에 띄우는" 역할을 한다.
ROS2에서는 XML, YAML, Python 세 가지 형식을 지원하지만,
분기·반복·조건부가 들어가면 Python이 가장 낫다.
pythonlaunch/bringup.launch.py
"""
로봇 시스템 통합 런치 / System bring-up
"""
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, GroupAction, LogInfo
from launch.conditions import IfCondition
from launch.substitutions import LaunchConfiguration, PathJoinSubstitution
from launch_ros.actions import Node, PushRosNamespace
from launch_ros.substitutions import FindPackageShare
def generate_launch_description():
# 인자 선언 / declare args
use_sim = LaunchConfiguration("use_sim_time")
robot_ns = LaunchConfiguration("namespace")
params_file = PathJoinSubstitution([
FindPackageShare("my_py_pkg"),
"config", "controller.yaml"
])
return LaunchDescription([
DeclareLaunchArgument(
"use_sim_time", default_value="false"),
DeclareLaunchArgument(
"namespace", default_value="arm0"),
LogInfo(msg=["Launching system for ns=", robot_ns]),
GroupAction([
PushRosNamespace(robot_ns),
Node(
package="my_py_pkg",
executable="controller",
name="controller",
parameters=[params_file,
{"use_sim_time": use_sim}],
output="screen",
emulate_tty=True,
),
Node(
package="my_py_pkg",
executable="hello",
name="hello",
condition=IfCondition(LaunchConfiguration("debug",
default="false")),
),
]),
])
"좌표계가 헷갈리기 시작하면 TF2 시간이다. URDF는 기술서지, 렌더링 모델이 아니다."
CH.08
TF2 좌표 변환— 로보틱스에서 가장 많이 쓰는 단일 도구.
로봇에는 수십 개의 좌표계(프레임)가 있다. 베이스·관절1..n·엔드이펙터·카메라·LiDAR·월드.
TF2는 이 프레임들 사이의 상대 변환을 트리 구조로 관리하고, 시간에 따라 보간해 주는 시스템이다.
▸ TYPICAL TF TREE — manipulator + camera
핵심 개념 — Static · Dynamic 두 가지 뿐
Static Transform: "변하지 않는" 관계. 센서와 마운트 사이, base_link와 world 원점 오프셋 등.
한 번 발행하고 latch하는 형태라 비용이 거의 없다. /tf_static 토픽으로 발행.
Dynamic Transform: 시간에 따라 변한다. 로봇 관절이 움직이면 robot_state_publisher가 /tf로 지속 발행.
변환 조회 — 리스너 코드
pythontf_listener_node.py
import rclpy
from rclpy.node import Node
from rclpy.duration import Duration
from tf2_ros import Buffer, TransformListener
from tf2_ros import LookupException, ConnectivityException, ExtrapolationException
class TFUser(Node):
def __init__(self):
super().__init__("tf_user")
self.buffer = Buffer()
self.listener = TransformListener(self.buffer, self)
self.create_timer(0.1, self.tick) # 10 Hz
def tick(self):
try:
# target_frame ← source_frame at time
# "source 기준 target의 위치/자세를 알려줘"
t = self.buffer.lookup_transform(
target_frame="base_link",
source_frame="camera_optical",
time=rclpy.time.Time(), # 최신
timeout=Duration(seconds=0.05),
)
x = t.transform.translation.x
y = t.transform.translation.y
z = t.transform.translation.z
self.get_logger().info(f"cam in base: ({x:+.3f}, {y:+.3f}, {z:+.3f})")
except (LookupException, ConnectivityException,
ExtrapolationException) as e:
self.get_logger().warn(f"tf not ready: {e}")
PITFALL · "lookup_transform 방향 헷갈림"
lookup_transform(target, source, ...)의 반환 값은
"source 프레임 원점이 target 프레임에서 어디에 있는가"를 나타낸다.
즉, source 좌표의 점을 target 좌표로 변환할 때 그대로 쓸 수 있는 변환이다.
순서가 바뀌면 부호가 전부 뒤집히므로, 매번 "target이 먼저, 그 다음 source"라고 속으로 외우자.
Point를 변환하기 — tf2_geometry_msgs
python
from geometry_msgs.msg import PointStamped
from tf2_geometry_msgs import do_transform_point
# 카메라 프레임의 3D 점을 base_link로
pt_in_cam = PointStamped()
pt_in_cam.header.frame_id = "camera_optical"
pt_in_cam.header.stamp = self.get_clock().now().to_msg()
pt_in_cam.point.x, pt_in_cam.point.y, pt_in_cam.point.z = 0.1, 0.05, 0.8
tf = self.buffer.lookup_transform(
"base_link", "camera_optical", pt_in_cam.header.stamp,
timeout=Duration(seconds=0.1))
pt_in_base = do_transform_point(pt_in_cam, tf)
Static Broadcaster — 캘리브레이션 발행
pythonstatic_cam_tf.py
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import TransformStamped
from tf2_ros.static_transform_broadcaster import StaticTransformBroadcaster
from tf_transformations import quaternion_from_euler
import math
class StaticCamTF(Node):
def __init__(self):
super().__init__("static_cam_tf")
self.br = StaticTransformBroadcaster(self)
t = TransformStamped()
t.header.stamp = self.get_clock().now().to_msg()
t.header.frame_id = "tool0" # parent
t.child_frame_id = "camera_optical" # child
t.transform.translation.x = 0.05
t.transform.translation.y = 0.00
t.transform.translation.z = 0.12
q = quaternion_from_euler(0.0, math.radians(15.0), 0.0)
t.transform.rotation.x, t.transform.rotation.y, \
t.transform.rotation.z, t.transform.rotation.w = q
self.br.sendTransform(t) # latched on /tf_static
TIP · rviz2로 TF 트리 확인
ros2 run tf2_tools view_frames로 PDF 형태의 TF 트리를 떨구거나,
rviz2의 TF 플러그인으로 실시간 확인. 트리가 끊어졌으면 반드시 수정 후 진행할 것.
끊어진 채로 SLAM·MoveIt·PointCloud 처리에 들어가면 원인을 잡기가 매우 어렵다.
LAB 08 · 실습
간이 TF 파이프라인
world → base_link를 SLAM인 척 동적 변환으로 발행 (0.5Hz, 임의의 원운동).
base_link → tool0을 정적 변환(0.3, 0, 0.4)으로 발행.
tool0 → camera_optical을 위 static 예제로 발행.
리스너 노드에서 world → camera_optical을 주기적으로 조회해 로그에 찍는다.
rviz2에서 TF 시각화 & tf2_tools view_frames로 트리 덤프.
CH.09
URDF · 로봇 기술— 기하학 + 관성 + 관절 + 충돌.
URDF(Unified Robot Description Format)는 로봇을 링크(link) + 관절(joint) 그래프로 표현하는 XML이다.
같은 기술을 기반으로 rviz2·Gazebo·MoveIt2·ros2_control이 모두 돌아간다.
Xacro는 URDF에 매크로·변수·include를 추가한 템플릿 언어로, 실전에서는 거의 항상 Xacro를 쓴다.
실행 — robot_state_publisher + joint_state_publisher_gui
pythonlaunch/view_urdf.launch.py
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import ExecuteProcess
from launch.substitutions import Command
from launch_ros.substitutions import FindPackageShare
from launch.substitutions import PathJoinSubstitution
def generate_launch_description():
urdf_path = PathJoinSubstitution([
FindPackageShare("my_robot_description"),
"urdf", "simple_arm.urdf.xacro"
])
robot_description = {"robot_description": Command(["xacro ", urdf_path])}
return LaunchDescription([
Node(package="robot_state_publisher",
executable="robot_state_publisher",
parameters=[robot_description],
output="screen"),
Node(package="joint_state_publisher_gui",
executable="joint_state_publisher_gui"),
ExecuteProcess(cmd=["rviz2"], output="screen"),
])
CONCEPT · robot_state_publisher의 역할
이 노드는 /joint_states를 구독해서, URDF의 관절 위상과 결합해
모든 링크의 TF를 /tf로 발행해 준다.
즉 당신은 관절값만 보내고, TF 계산은 로봇 기술서와 이 노드가 알아서 한다.
ROS2의 아름다운 분리 중 하나다.
TIP · 실제 로봇 제조사 기술서
UR, Franka, Kinova, Doosan, Rainbow 등 주요 제조사들은 공식 URDF/Xacro를 깃허브로 배포한다.
밑바닥부터 모델링할 필요가 없고, 없는 경우도 OnShape · SolidWorks ROS Exporter로 뽑을 수 있다.
본인의 실전에서는 "제조사 기술서를 검증하고 캘리브레이션 보정"이 업무의 95%다.
ROS 2 Jazzy 기본 짝은 Gazebo Harmonic이다.
ros_gz_sim, ros_gz_bridge로 토픽을 Gazebo ↔ ROS2 간 브리지한다.
bash
sudo apt install -y ros-jazzy-ros-gz
# 예: 카메라 토픽 브리지
ros2 run ros_gz_bridge parameter_bridge \
/camera@sensor_msgs/msg/[email protected] \
/camera_info@sensor_msgs/msg/[email protected]
# world 실행 + 브리지
ros2 launch ros_gz_sim gz_sim.launch.py gz_args:="empty.sdf"
PITFALL · sim_time
시뮬레이터를 쓸 때는 모든 노드에 use_sim_time:=true를 설정해야 한다.
이걸 빼면 TF 보간이 실시간 시계로 돌아가 버리고, Gazebo 시계와 어긋나서
"TF extrapolation into the future/past" 에러 폭탄을 맞는다.
CH.11
rosbag2 · QoS— 재현 가능한 데이터와 믿을 수 있는 통신.
rosbag2 — 토픽을 디스크에
rosbag2는 ROS2 토픽 스트림을 SQLite/MCAP에 기록하고 재생한다.
ML 학습 데이터 파이프라인의 시작점이기도 하다 —
현장에서 센서 데이터를 녹화 → 오프라인에서 라벨링 → 학습.
bashrosbag2 basics
# 녹화 (MCAP 포맷이 권장 — 압축·랜덤액세스 유리)
ros2 bag record -s mcap -o mission_01 \
/scan /camera/image_raw /tf /tf_static /joint_states
# 정보 조회
ros2 bag info mission_01
# 재생 (루프 / 속도 조절 가능)
ros2 bag play mission_01 --loop --rate 2.0 \
--clock # /clock 토픽 발행 (sim_time용)
# 특정 토픽 제외하고 재생
ros2 bag play mission_01 --exclude "/joint_states"
TIP · MCAP 선택 이유
기본 SQLite3 스토리지는 파일 락 때문에 동시 녹화·읽기에 불리하고, 압축 옵션도 제한적이다.
MCAP은 Foxglove에서 만든 포맷으로 시간 인덱스 기반 랜덤 액세스, zstd 압축, 다양한 언어의 독립 리더가 강점.
ML 데이터셋으로 쓸 거라면 처음부터 MCAP로.
QoS — Quality of Service
DDS는 각 Pub/Sub 쌍마다 QoS 프로파일을 협상한다.
매칭이 어긋나면 통신이 아예 안 된다 —
"왜 rqt에선 보이는데 내 노드는 못 받지?"의 90%는 QoS 문제다.
rclpy.qos_event와 rclpy.qos에는 자주 쓰이는 조합이 상수로 들어있다.
qos_profile_sensor_data, qos_profile_services_default,
qos_profile_parameters 등. 시작은 이 프리셋을 쓰고, 필요할 때만 커스텀.
"여기부터는 '왜 이게 느리지'를 해결하는 영역. 초보에게는 숨겨져 있던 내부 구조가 드러난다."
CH.12
Lifecycle · Executors— 노드의 상태 기계와 콜백 스케줄링.
Managed (Lifecycle) Node — 상태 기계로 관리
그냥 Node는 시작하자마자 돌고, 죽으면 그냥 죽는다.
반면 LifecycleNode는 unconfigured → inactive → active → finalized 상태를 갖는다.
이걸로 "로봇 전원 인가는 했지만 아직 제어 루프는 안 돌리고 싶다"는 시나리오를 깔끔히 다룰 수 있다.
▸ LIFECYCLE STATE MACHINE
sensor_driver, controller 같이 의존 순서가 있는 노드들을 Lifecycle로 묶으면,
"모두 configure 끝난 후 activate"와 같은 시스템 수준 시퀀싱을 Nav2·MoveIt2·ros2_control이
실제로 하듯 할 수 있다.
Executor — 콜백은 누가 실행하는가
rclpy.spin(node)을 호출하면 내부에서 SingleThreadedExecutor가 돈다.
모든 콜백이 한 스레드에서 순차 실행된다. 그래서:
긴 서비스 콜백이 들어있으면 그 동안 토픽 수신이 막힌다.
타이머 주기가 다른 콜백의 실행 시간 때문에 흔들린다.
해결책은 MultiThreadedExecutor + Reentrant/MutuallyExclusive 콜백 그룹이다.
Python에서 MultiThreadedExecutor는 I/O 바운드에는 의미가 있지만 CPU 바운드에는 GIL 때문에 제한적이다.
무거운 연산(추론·PCL 처리)은 C++ 노드로 옮기거나, 별도 프로세스(composable node)로 분리,
또는 NumPy/PyTorch 벡터 연산(C 레벨에서 GIL 해제됨)으로 처리해야 한다.
CH.13
Component · Composition— 한 프로세스에 여러 노드, 제로카피 통신.
프로세스 간 통신은 DDS가 아무리 빨라도 직렬화 + 네트워크 스택 오버헤드가 있다.
Composition은 여러 노드를 하나의 프로세스 안에 담아서,
토픽 통신을 메모리 포인터 전달로 치환한다. 대역폭이 큰 이미지·포인트클라우드 파이프라인에 필수.
ros2 trace + ros2_tracing — LTTng 기반 커널/유저 공간 트레이싱, 콜백 시간 측정.
top, htop, perf — CPU 스파이크 위치 찾기.
Foxglove Studio — 시각화 & 성능 디버깅.
TIP · 제로카피를 끝까지 얻는 법
Loaned messages(C++): 같은 노드가 pub/sub이면 publisher->borrow_loaned_message()로 중간 버퍼를 건너뛴다.
Composition + Intra-process + Cyclone + SHM까지 조합하면 4K 이미지 60Hz도 여유 있게 흐른다.
최적화 전후 ros2 topic delay로 수치를 반드시 남길 것.
CONCEPT · 실시간(RT) 노드
산업용 안전 루프(1kHz 피드백 제어)는 일반 Linux로 안 된다.
PREEMPT_RT 커널로 바꾸고, 노드에 sched_setscheduler(SCHED_FIFO)로 우선순위 부여.
C++ 전용 영역이며, Python은 이 영역에 들어가지 않는다.
일반 애플리케이션에선 10~100Hz 루프면 충분하니 RT 커널은 "정말 필요할 때만".
PART 05 · TARGET DOMAIN
타겟 — 포인트클라우드 · 딥러닝 · 로봇팔
"여기부터는 진짜 현장이다. 센서에서 추론까지 한 줄로 꿰는 법."
CH.15
MoveIt2 — 로봇팔 모션 계획— FK/IK · 경로 계획 · 충돌 회피.
MoveIt2는 로봇팔의 "어떻게 움직일까"를 담당하는 프레임워크다.
목표 자세(pose) 또는 조인트 목표를 주면, IK를 풀어 조인트 경로를 만들고,
OMPL 플래너로 충돌 없는 궤적을 생성해, 컨트롤러(ros2_control)로 내려보낸다.
▸ MOVEIT2 PIPELINE
설치 & MoveIt Setup Assistant
bash
sudo apt install -y ros-jazzy-moveit ros-jazzy-moveit-py
# Setup Assistant — URDF를 먹여 move_group 설정 패키지 생성
ros2 launch moveit_setup_assistant setup_assistant.launch.py
Setup Assistant는 URDF를 읽어 SRDF(Semantic Robot Description)를 만든다.
관절 그룹, 자기 충돌 매트릭스, 기본 자세(home/ready), 엔드이펙터 링크 등을 선언하는 파일이다.
출력물은 하나의 "moveit_config" 패키지 — 이걸 본인 프로젝트에서 include해 쓴다.
Python으로 모션 계획 — moveit_py
pythonmove_arm.py
"""
MoveIt2 Python API로 팔 움직이기 / Move arm with moveit_py
"""
import rclpy
from moveit.planning import MoveItPy
from geometry_msgs.msg import PoseStamped
def main():
rclpy.init()
moveit = MoveItPy(node_name="moveit_py_demo")
arm = moveit.get_planning_component("manipulator")
# 목표 pose / target pose
target = PoseStamped()
target.header.frame_id = "base_link"
target.pose.position.x = 0.45
target.pose.position.y = 0.0
target.pose.position.z = 0.35
target.pose.orientation.w = 1.0
arm.set_start_state_to_current_state()
arm.set_goal_state(pose_stamped_msg=target,
pose_link="tool0")
# 계획 / plan
plan_result = arm.plan()
if plan_result:
moveit.execute(plan_result.trajectory, controllers=[])
else:
print("planning failed")
rclpy.shutdown()
PITFALL · IK가 왜 실패하나
도달 불가능: workspace 밖이거나 관절 한계 넘음. tf2_echo로 현재 tool0 위치 확인.
기본 KDL은 특이점 근처에서 약함 — TRAC-IK로 바꾸면 훨씬 안정적.
end-effector 프레임 잘못 지정: SRDF의 end_effector_link가 실제 잡고 싶은 링크인지 확인.
MoveIt이 궤적을 만들어도 이를 실제 모터로 내려보낼 장치가 필요하다.
그것이 ros2_control이다. 추상 계층이 잘 되어 있어,
제조사 드라이버(UR의 ur_robot_driver, Franka의 franka_ros2)를 쓰거나
직접 hardware_interface::SystemInterface를 구현할 수 있다.
CONCEPT · 컨트롤러 종류
joint_trajectory_controller: MoveIt이 보내는 궤적을 따라 주는 기본 컨트롤러.
joint_state_broadcaster: 현재 관절 상태를 /joint_states로 내보냄.
Micro-Epsilon scanCONTROL 같은 Laser Line 센서는 한 프레임당 일렬의 프로파일(1D 곡선)을 뱉는다.
축 방향으로 이송시키며 시간축으로 누적하면 3D가 된다.
드라이버가 PointCloud2로 전해 준다면 위 파이프라인에 그대로 얹힌다 —
frame_id를 sensor optical frame으로 정확히 표기하는 것만 잊지 말 것.
그래야 다음 단계의 TF 변환이 정확해진다.
pc_processor 노드로 voxel 0.01m 다운샘플 후 /points_filtered 발행.
RViz2에 두 토픽을 동시에 띄워 색을 다르게 — 점 수 차이를 눈으로 확인.
ros2 topic bw로 대역폭이 얼마나 줄었는지 비교.
CH.17
딥러닝 모델 통합— PyTorch / ONNX를 ROS2 노드에 얹는 법.
ROS2 노드는 결국 Python 프로세스다. PyTorch/ONNXRuntime을 그대로 __init__에서 로드해서
콜백에서 추론하면 끝. 단, 그 단순함 뒤에
배치·비동기·GPU 메모리 관리라는 실전 함정이 있다.
PTv3 · Pointcept 계열 추론 인터페이스
3D 포인트 기반 시맨틱 세그멘테이션 모델(예: Point Transformer v3)은
대체로 (N, C) 형태 텐서를 입력으로 받는다 — C는 최소 3(XYZ), 옵션으로 intensity/normal.
ROS2에서 필요한 건: PointCloud2 → 텐서 → 추론 → 점별 라벨 → PointCloud2(라벨 필드 추가).
pythonptv3_infer_node.py
"""
PTv3 추론 노드 스케치 / PTv3 inference node sketch
실제 Pointcept API에 맞게 import / input dict 구성을 조정해야 한다.
"""
import numpy as np
import torch
import rclpy
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data
from rclpy.callback_groups import ReentrantCallbackGroup
from sensor_msgs.msg import PointCloud2, PointField
from std_msgs.msg import Header
import threading
class SegInferenceNode(Node):
def __init__(self):
super().__init__("seg_infer")
self.declare_parameter("model_ckpt", "")
self.declare_parameter("device", "cuda:0")
self.device = torch.device(self.get_parameter("device").value)
self.model = self._load_model(self.get_parameter("model_ckpt").value)
self.model.eval()
# 백프레셔 해결: 최신 프레임만 보관
# backpressure: keep only latest frame
self._latest = None
self._lock = threading.Lock()
cb = ReentrantCallbackGroup()
self.sub = self.create_subscription(
PointCloud2, "/lvs/points_filtered",
self.on_cloud, qos_profile_sensor_data,
callback_group=cb)
self.pub = self.create_publisher(
PointCloud2, "/lvs/points_labeled", 5)
# 추론 루프 (sub와 별도 콜백) / inference loop
self.create_timer(0.0, self._infer_loop, callback_group=cb)
def _load_model(self, ckpt_path: str):
# Pointcept의 build_model(cfg) 패턴을 그대로 사용
# 여기서는 의사 코드 / pseudo
...
def on_cloud(self, msg: PointCloud2):
with self._lock:
self._latest = msg # 최신만 / overwrite
@torch.inference_mode()
def _infer_loop(self):
with self._lock:
msg = self._latest
self._latest = None
if msg is None:
return
xyz = pc2_to_xyz_fast(msg)
if xyz.shape[0] < 1024:
return
# PTv3 입력은 보통 dict: coord, feat, offset
coord = torch.from_numpy(xyz).to(self.device)
feat = coord.clone()
offset = torch.tensor([coord.shape[0]], device=self.device)
out = self.model(dict(coord=coord, feat=feat, offset=offset))
logits = out["seg_logits"] if isinstance(out, dict) else out
labels = logits.argmax(dim=-1).to(torch.uint8).cpu().numpy()
# 라벨을 추가 필드로 담은 PointCloud2 / labeled cloud
labeled = np.empty(xyz.shape[0], dtype=np.dtype([
('x', 'f4'), ('y', 'f4'), ('z', 'f4'), ('label', 'u1')
]))
labeled['x'] = xyz[:, 0]
labeled['y'] = xyz[:, 1]
labeled['z'] = xyz[:, 2]
labeled['label'] = labels
out_msg = PointCloud2()
out_msg.header = Header(stamp=msg.header.stamp,
frame_id=msg.header.frame_id)
out_msg.height, out_msg.width = 1, labeled.shape[0]
out_msg.fields = [
PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1),
PointField(name='y', offset=4, datatype=PointField.FLOAT32, count=1),
PointField(name='z', offset=8, datatype=PointField.FLOAT32, count=1),
PointField(name='label', offset=12, datatype=PointField.UINT8, count=1),
]
out_msg.is_bigendian = False
out_msg.point_step = 13
out_msg.row_step = 13 * labeled.shape[0]
out_msg.is_dense = True
out_msg.data = labeled.tobytes()
self.pub.publish(out_msg)
def main():
rclpy.init()
node = SegInferenceNode()
from rclpy.executors import MultiThreadedExecutor
ex = MultiThreadedExecutor(num_threads=2)
ex.add_node(node)
try:
ex.spin()
finally:
ex.shutdown()
PITFALL · 추론이 모든 것을 막는다
on_cloud 콜백에서 200ms짜리 추론을 돌리면 20Hz 센서 스트림은 즉시 무너진다.
해결책 세 가지:
MultiThreadedExecutor + Reentrant 그룹으로 수신과 추론을 분리.
수신 콜백은 최신 프레임만 보관하는 버퍼에 쓰고, 별도 타이머/스레드가 추론(백프레셔 해결) — 위 코드의 패턴.
본격적으로는 별도 프로세스 + shared memory로 격리 (Triton, TensorRT 서버).
TensorRT / ONNX 배포
엣지 PC에서 실시간을 확보하려면 PyTorch로 그대로 돌리지 않는다.
ONNX export → TensorRT 최적화 → ROS2 노드에선 trt 엔진만 로드.
PTv3는 sparse conv·kNN 같은 커스텀 CUDA 연산이 있어 변환이 까다로우니,
Pointcept 커뮤니티의 export 스크립트를 먼저 확인하자.
100MB 넘는 ckpt는 colcon 워크스페이스에 넣지 말 것 — 빌드 시간이 폭발한다.
/opt/models 같은 별도 경로에 두고, 노드는 파라미터로 절대 경로를 받도록 설계.
조직 내 공유는 S3·MinIO·MLflow Model Registry로 관리.
CONCEPT · 데이터 → 모델 → 배포 루프
rosbag2로 현장 데이터 수집 → 오프라인 라벨링 → 학습(MLflow 추적) → ONNX/TRT export →
모델 레지스트리 등록 → 엣지 배포의 폐루프를 갖추는 것이 ROS2 + 딥러닝의 진짜 그림이다.
Kubeflow가 학습 쪽을, ROS2 노드가 추론 쪽을 맡고, 가운데를 모델 레지스트리가 잇는다.
CH.18
End-to-End 파이프라인— 센서 → 세그멘테이션 → 파지 → 팔 실행.
이 장은 앞의 모든 내용을 하나의 흐름으로 꿰는 캡스톤이다.
최종 목표 — "포인트클라우드에서 딥러닝으로 대상을 식별 → 3D 좌표 산출 → MoveIt2로 팔을 보냄".
▸ FULL PIPELINE — sensor to motion
실행 순서
부팅: bringup.launch.py로 robot_state_publisher · ros2_control · 센서 드라이버 · 카메라 TF를 한 번에 띄움.
인지: LVS/RGB-D → pc_processor → seg_infer 노드 체인. 결과 /points_labeled.
파지 제안: 라벨된 점군에서 대상 클래스만 필터 → GPD/ContactGraspNet 또는 직접 구현한 PCA 기반 제안기로 grasp pose 산출.
계획 & 실행: MoveIt2에 pose 전달 → plan → execute. 접근(pre-grasp) → 그립 → 후퇴(post-grasp) 3단계.
관찰: rosbag2로 전 구간 녹화 (디버깅 + 학습데이터 재수집 동시 효과).
파지 제안자 — 미니멀 예시
GPD 같은 무거운 라이브러리를 붙이기 전에, 간단한 기하 기반 제안자로 파이프라인 전체를 굴려 보는 게 정답이다.
전체 파이프라인이 한번 돌아가야 어디가 약점인지 보인다.
pythonsimple_grasp_proposer.py
"""
간이 파지 제안 / Minimal grasp proposer
- 대상 클래스 포인트만 추출 → 주성분 분석으로 방향, 센트로이드로 위치 결정.
"""
import numpy as np
from typing import Optional
from geometry_msgs.msg import PoseStamped
from tf_transformations import quaternion_from_matrix
TARGET_CLASS = 1 # 예: 용접 비드 / weld bead
def propose_grasp(xyz: np.ndarray, labels: np.ndarray,
frame_id: str, stamp) -> Optional[PoseStamped]:
mask = labels == TARGET_CLASS
if mask.sum() < 50:
return None
pts = xyz[mask]
centroid = pts.mean(axis=0)
# PCA — 점군의 주축 / principal axes
centered = pts - centroid
cov = np.cov(centered.T)
eigvals, eigvecs = np.linalg.eigh(cov) # ascending order
# 가장 큰 축 = 대상의 길이 방향 / longest axis
x_axis = eigvecs[:, 2]
z_axis = eigvecs[:, 0] # 가장 작은 분산 = 표면 법선 추정 / surface normal proxy
# 직교화 / orthogonalize
y_axis = np.cross(z_axis, x_axis)
y_axis /= np.linalg.norm(y_axis) + 1e-9
x_axis = np.cross(y_axis, z_axis)
# 4x4 변환 / homogeneous transform
R = np.eye(4)
R[:3, 0] = x_axis
R[:3, 1] = y_axis
R[:3, 2] = z_axis
qx, qy, qz, qw = quaternion_from_matrix(R)
pose = PoseStamped()
pose.header.frame_id = frame_id
pose.header.stamp = stamp
pose.pose.position.x = float(centroid[0])
pose.pose.position.y = float(centroid[1])
pose.pose.position.z = float(centroid[2])
pose.pose.orientation.x = qx
pose.pose.orientation.y = qy
pose.pose.orientation.z = qz
pose.pose.orientation.w = qw
return pose
오케스트레이터 노드 — 클라이언트들의 지휘자
최상위에 작은 상태 기계 노드 하나를 두고, 라벨된 점군을 받으면
→ grasp proposer 호출 → TF 변환 → MoveIt2 Action 호출 순으로 진행한다.
이 부분은 본인 도메인(파지/용접/검사)에 따라 가장 다채로워지는 부분이다.
pythonorchestrator_node.py — 골격
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from rclpy.duration import Duration
from sensor_msgs.msg import PointCloud2
from geometry_msgs.msg import PoseStamped
from tf2_ros import Buffer, TransformListener
from tf2_geometry_msgs import do_transform_pose_stamped
from control_msgs.action import FollowJointTrajectory # 또는 MoveGroup
class Orchestrator(Node):
def __init__(self):
super().__init__("orchestrator")
self.buffer = Buffer()
self.tf_listener = TransformListener(self.buffer, self)
self.busy = False
self.sub = self.create_subscription(
PointCloud2, "/lvs/points_labeled",
self.on_labeled, 5)
# MoveIt 액션 (실제로는 move_action 또는 moveit_py를 권장)
self.move_client = ActionClient(self,
FollowJointTrajectory, "/joint_trajectory_controller/follow_joint_trajectory")
def on_labeled(self, msg: PointCloud2):
if self.busy:
return
xyz, labels = parse_labeled(msg) # CH.16/17 헬퍼
pose_cam = propose_grasp(xyz, labels,
msg.header.frame_id,
msg.header.stamp)
if pose_cam is None:
return
# camera_optical → base_link 변환 / TF transform
try:
tf = self.buffer.lookup_transform(
"base_link", pose_cam.header.frame_id,
pose_cam.header.stamp,
timeout=Duration(seconds=0.1))
except Exception as e:
self.get_logger().warn(f"tf failed: {e}")
return
pose_base = do_transform_pose_stamped(pose_cam, tf)
# 실제로는 moveit_py로 plan & execute 하는 것이 현실적
# In practice, use moveit_py to plan & execute
self.busy = True
self._send_to_moveit(pose_base)
def _send_to_moveit(self, pose: PoseStamped):
# 의사 코드 — moveit_py 인스턴스에 위임
# Pseudo: delegate to moveit_py instance from CH.15
...
self.busy = False
PITFALL · 시간 동기화
lookup_transform의 시간 인자에 msg.header.stamp를 그대로 넣었는데
"extrapolation into the past" 에러가 난다면, TF 버퍼의 보관 시간(기본 10초)을 넘어선 케이스다.
추론이 오래 걸려서 한 두 초 지난 stamp를 묻고 있을 수 있다 —
"가능한 최신"이 더 자연스러운 경우는 rclpy.time.Time() + 짧은 timeout으로.
반대로 정확한 시점 변환이 꼭 필요하면 TF 버퍼 길이를 늘리고
추론 결과에 입력 점군의 stamp를 그대로 전파하라.
LAB 18 · 캡스톤 실습
"보이는 것을 잡는다" 미니 프로젝트
Gazebo로 단순 박스 여러 개를 테이블에 올린 환경 구축. RGB-D + 6DOF 팔 모델 사용.
pc_processor + 라벨러(처음에는 색상 또는 평면 기반의 가짜 분류기) → /points_labeled 발행.
위 simple_grasp_proposer로 grasp pose 산출 → RViz Marker로 시각화.
orchestrator가 TF로 base_link 좌표 변환 후 MoveIt2 plan & execute. 접근→그립→후퇴 3단 동작 구현.
전 구간을 rosbag2 (mcap)로 녹화. 실패 케이스 5건을 재생하며 어디서 깨졌는지 분석.
가짜 분류기를 PTv3 추론 노드로 교체. 동일한 파이프라인이 그대로 동작하는지 확인 — "인터페이스 분리"의 가치를 체험.
완성 시 얻게 되는 것: 본업의 용접 검사 파이프라인을 ROS2 위에 얹는 데 필요한 거의 모든 패턴을 손끝에 갖게 된다.
남은 것은 도메인별 모델 학습과 안전 인터록뿐.
APPENDIX
치트시트 · 디버깅 · 다음 단계
"한 번 본 명령어는 결국 다시 찾는다. 여기 모아 둔다."
A.1
CLI 치트시트— 매일 손에 닿는 명령들.
목적
명령
노드 목록
ros2 node list
노드 상세
ros2 node info /your_node
토픽 목록 / 타입 / QoS
ros2 topic list -v · ros2 topic info -v /scan
토픽 내용
ros2 topic echo /scan
토픽 주기/대역폭/지연
ros2 topic hz /scan · bw · delay
서비스 호출
ros2 service call /set_mode my_pkg/srv/SetMode "{mode: 'idle'}"