How ROS2 efficiently deals with callbacks
이 글에서는 ROS2의 메인 루프가 어떻게 돌아가는 것인지, 콜백이 어떻게 효율적으로 처리되는지, executor/callback group/wait set/guard condition 등의 개념이 무엇인지를 다룹니다. 이 글의 많은 부분은 ROS2 Jazzy의 공식 문서와 rclpy를 참고했습니다.
이 글을 읽으면 다음 의문을 풀 수 있게 됩니다.
- ROS에서는 어떻게 여러 노드에서 사용하는 여러 콜백을 동시에 효율적으로 처리할 수 있는가?
- ROS에서는 어떻게 cpu-intensive한 polling 없이 여러 콜백을 처리할 수 있는가?
- ROS에서, 노드를 구체적으로 어떻게 "실행"하는 것인가?
- ROS에서, 노드의 추가/제거는 어떻게 수행하는 것인가? 나아가, Publisher/Timer/Subscription 등은 어떻게 추가/제거하는 것인가?
Prerequisites
독자가 다음 내용을 알고 있는 것을 전제로 합니다.
- ROS2의 미들웨어 디자인과 DDS
- Node, Publisher, Subscriber, Timer 등에 대한 개념
- ROS2 Application Code에 대한 이해
이는 ROS2의 기초 책자에서 다루는 내용인 만큼 알고 있는 사람들이 많으리라 추정했습니다. (역으로, 이 글에서 다루는 내용은 제가 구글링해서 공식 자료와 소스코드 외에서 구체적인 설명을 확인할 수 없는 내용입니다.)
오로카의 로봇 운영체제 ROS 강좌 초반부를 읽으면 해당하는 개념을 습득할 수 있습니다.
Main Loop of ROS2 Application
먼저, 이 글의 내용은 rclpy rolling branch의 34f9e13e72ea4528bd96f1d739bc7a8711cc923b에 해당하는 내용을 바탕으로 작성되었음을 밝힙니다.
ROS2 어플리케이션을 파이썬으로 작성했을 때, 메인 루프는 다음과 같이 구성됩니다. 실제로 이는 어떤 ROS2 예제 파일을 봐도 확인할 수 있습니다. executor에 대해서는 추후 설명합니다.
- rclpy.init
- 이는 InitContextManager을 인스턴스화하는데, 이 과정에서 domain id를 고려한 rclpy의 default context가 init됩니다. context.init은 ROS communication을 init합니다.
- executor을 선언한다.
- executor에 노드를 추가한다.
executor.spin()
으로 루프를 돌린다. 이 루프가 메인 루프가 된다.- executor.spin은 종료 조건이 만족되지 않으면 executor.spin_once를 무한히 반복해서 실행합니다.
- spin_once의 구성
handler, entity, node = self.wait_for_ready_callbacks()
: ready된 콜백을 가져온다.handler(), handler.result()
: 가져온 콜백을 처리한다.
위의 4단계가 끝입니다. 매우 간단하며, 동시에 어느 정도로 철저하게 ROS2의 개발자들이 좋은 디자인을 숙고하고 골랐는지 알 수 있습니다. rclpy.spin
는 봤지만 executor을 명시적으로 선언하고 사용하는 것을 보지 못했을 수도 있는데, rclpy.spin
은 내부적으로 default executor에 대한 .spin을 사용하는 함수입니다. rclpy의 __init__.py를 보면 확인할 수 있습니다.
What's Callback in ROS2?
ROS2에서 무엇을 콜백이라고 하는지는 아래 문서에서 확인 가능합니다. 문서에 잘 나와있기에 설명은 덧붙이지 않겠습니다.
주요하게 눈여겨볼만한 점은,
Examples of callbacks in this context are
- subscription callbacks (receiving and handling data from a topic),
- timer callbacks,
- service callbacks (for executing service requests in a server),
- different callbacks in action servers and clients,
- done-callbacks of Futures.
• Almost everything in ROS 2 is a callback! Every function that is run by an executor is, by definition, a callback. The non-callback functions in a ROS 2 system are found mainly at the edge of the system (user and sensor inputs etc).
About Executor
executor에 대해서는 아래 문서에서 확인 가능합니다.
주요하게 눈여겨볼만한 점은,
In order not to counteract the QoS settings of the middleware, an incoming message is not stored in a queue on the Client Library layer but kept in the middleware until it is taken for processing by a callback function. (This is a crucial difference to ROS 1.)
The Multi-Threaded Executor creates a configurable number of threads to allow for processing multiple messages or events in parallel. The Static Single-Threaded Executor optimizes the runtime costs for scanning the structure of a node in terms of subscriptions, timers, service servers, action servers, etc. It performs this scan only once when the node is added, while the other two executors regularly scan for such changes. Therefore, the Static Single-Threaded Executor should be used only with nodes that create all subscriptions, timers, etc. during initialization.
Implementation of executor
executor에서 주요하게 눈여겨볼 지점은 wait_for_ready_callbacks
의 구현입니다. 이 함수가 executor.spin_once를 하면 실행되는 함수이기 때문입니다. wait_for_ready_callbacks는 내부 함수인 _wait_for_ready_callbacks를 부르는데요, 이것의 구현은 다음에서 확인할 수 있습니다.
순서대로 설명해보겠습니다. 참고로 ROS2 코드에서는 모든 종류의 콜백을 모아 Entity라고 명명합니다.
- 실행 가능한 entity 취합. 이때 실행 가능 여부
can_execute
를 callback group가 판별 - context_stack 진입
- 모든 실행 가능 Entity에 대해 enter_context 및 핸들 취합
- wait_set에 모든 컬백 추가
wait_set.wait(timeout_sec)
wait_set.get_ready_entities
를 이용해 ready entity 취합- context_stack 탈출 전 wait_set이 파괴되기 전 waitables 반환
- 다양한 ready callback에 대한 handler 반환
- 이 handler들은 SingleThreadExecutor / MultiThreadedExecutor에서
_spin_once_impl
에 의해 처리된다. https://github.com/ros2/rclpy/blob/c009b0de286101da1caedef860b88f1520edb97b/rclpy/rclpy/executors.py#L838, https://github.com/ros2/rclpy/blob/c009b0de286101da1caedef860b88f1520edb97b/rclpy/rclpy/executors.py#L912
즉, 실제 구현을 확인해보니 executor 문서에 나와있는 그대로 구현이 되어 있다는 사실을 알 수 있습니다.
처음의 질문에 대한 답
- ROS에서는 어떻게 여러 노드에서 사용하는 여러 콜백을 동시에 효율적으로 처리할 수 있는가?
- MultiThreadedExecutor을 사용하며, 여러 콜백에 따로 콜백 그룹을 할당하거나 Reentrant Callback Group을 사용하면 여러 콜백을 병렬적으로 처리할 수 있습니다.
- 다만, concurrent 라이브러리의 ThreadPoolExecutor을 사용하므로(
self._executor = ThreadPoolExecutor(num_threads)
) Python GIL에 걸릴 것으로 예상됩니다.
- ROS에서는 어떻게 cpu-intensive한 polling 없이 여러 콜백을 처리할 수 있는가?
- waitset에 대한 .wait(timeout_sec)으로 polling 없이 구독을 구현합니다. DDS를 포함한 하위 레벨 구현에서 알아서 효율적으로 처리하겠죠.
- ROS에서, 노드를 구체적으로 어떻게 "실행"하는 것인가?
- 노드를 실행한다는 건 없습니다. ROS는 노드에 존재하는 콜백을 처리해줄 뿐입니다. 거의 모든 것은 콜백이며 ROS2는 콜백을 잘 다뤄주는 프레임워크이자 라이브러리라고도 할 수 있습니다.
- ROS에서, 노드의 추가/제거는 어떻게 수행하는 것인가? 나아가, Publisher/Timer/Subscription 등은 어떻게 추가/제거하는 것인가?
- 이에 대해서는 guard condition에 대해 알아야 합니다. executor.add_node 구현에 따르면, 노드를 추가하면 executor의
_guard
가 트리거됩니다. 이는Rebuild the wait set so it includes this new node
라고 주석이 달려있는데, 상세한 과정은 아래와 같습니다. - 노드가 추가되면, executor._guard가 트리거되고, 이러면 여러 timer, subscription 등과 함께 guard condition 역시도 기다리고 있던 executor._wait_for_ready_callbacks 중간의 wait_set.wait()가 끊어지고, 그럼 _wait_for_ready_callbacks 루프를 다시 돌면서 루프 초반부의 node refresh
nodes_to_use = self.get_nodes()
를 실행하게 된다. - 즉, 구독 대상이 되는 콜백/엔티티의 추가/제거 역시도 guard condition을 이용해 refresh하도록 제어하고 있습니다.
- 노드 내부에서 Pubilsher, Timer, Subscription 등을 인스턴스화할때도 똑같이 타고타고 내려가면
executor._guard.trigger()
을 하는 작업이 일어납니다.. 이에 대해서는 직접 Node 소스코드를 확인해보시면 알 수 있습니다.
- 이에 대해서는 guard condition에 대해 알아야 합니다. executor.add_node 구현에 따르면, 노드를 추가하면 executor의