스크린캐스트 2025년 11월 29일 11시 48분 53초.webm
#!/usr/bin/env python
import os
import rclpy
import csv
from rclpy.node import Node
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy, QoSDurabilityPolicy
from mavros_msgs.srv import CommandBool
from geometry_msgs.msg import PoseStamped, Vector3, Quaternion
from std_msgs.msg import Bool,Float32
from tf_transformations import quaternion_from_euler
import time
import math
import numpy as np
NAV_PERIOD_SEC = 0.1
class WaypointNode(Node):
def __init__(self):
super().__init__('WaypointNode')
qos_profile = QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT,
durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
history=QoSHistoryPolicy.KEEP_LAST,
depth=10
)
qos_mavros = QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT,
durability=QoSDurabilityPolicy.VOLATILE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=10
)
# Subscriptions
self.position_subscriber = self.create_subscription(
PoseStamped,
'/mavros/local_position/pose',
self.position_callback,
qos_mavros
)
# Publishers
self.arm_publisher = self.create_publisher(
Bool,
'/arm_message',
qos_profile
)
self.disarm_publisher = self.create_publisher(
Bool,
'/disarm_message',
qos_profile
)
self.vtol_publisher_fw = self.create_publisher(
Bool,
'/vtol_message_fw',
qos_profile
) ## vtol transition(fw)
self.vtol_publisher_mc = self.create_publisher(
Bool,
'/vtol_message_mc',
qos_profile
) ## vtol transition(mc)
self.offboard_position_publisher = self.create_publisher(
PoseStamped,
'/mavros/setpoint_position/local',
qos_profile
)
self.offboard_position_publisher = self.create_publisher(
PoseStamped,
'/mavros/setpoint_position/local',
qos_profile
)
self.current_position = {'x': 0.0, 'y': 0.0, 'z': 0.0} # Current position of the drone
self.target_waypoints = [
{'x': 0.0, 'y': 0.0, 'z': 10.0}, #wp0
{'x': 30.0, 'y': 0.0, 'z': 10.0}, #wp1
{'x': 30.0, 'y': 30.0, 'z': 10.0}, #wp2
{'x': 0.0, 'y': 30.0, 'z': 10.0}, #wp3
{'x': 0.0, 'y': 0.0, 'z': 10.0}, #wp4
{'x': 0.0, 'y': 0.0, 'z': 0.0}, #wp5
]
self.curr_way_index = 0 # Current index in waypoint list
self.max_waypoint = len(self.target_waypoints) # Total number of waypoints
self.target_waypoint = self.target_waypoints[0] # Current target waypoint
time.sleep(5)
self.get_logger().info("Main launched")
self.arm_drone(True)
self.navigate_waypoints()
def navigate_waypoints(self):
self.wp_timer = self.create_timer(NAV_PERIOD_SEC, self.navigate_waypoint_callback)
def position_callback(self, msg):
self.current_position['x'] = msg.pose.position.x
self.current_position['y'] = msg.pose.position.y
self.current_position['z'] = msg.pose.position.z
self.current_orientation = (
msg.pose.orientation.x,
msg.pose.orientation.y,
msg.pose.orientation.z,
msg.pose.orientation.w
)
def navigate_waypoint_callback(self):
self.offboard_position_publisher.publish(self.calculate_position_command(self.target_waypoint))
if self.waypoint_reached(self.target_waypoint):
self.get_logger().info(f"Waypoint {self.curr_way_index + 1}/{self.max_waypoint} reached")
self.curr_way_index += 1
if self.curr_way_index >= self.max_waypoint:
self.get_logger().info("✅ All waypoints reached. Landing and disarming drone.")
self.disarm_drone()
self.destroy_timer(self.wp_timer)
else:
self.target_waypoint = self.target_waypoints[self.curr_way_index]
self.get_logger().info(f"Moving to next waypoint: "
f"({int(self.target_waypoint['x'])}, {int(self.target_waypoint['y'])}, {int(self.target_waypoint['z'])})")
def calculate_position_command(self, target):
pose = PoseStamped()
pose.header.stamp = self.get_clock().now().to_msg()
pose.pose.position.x = target['x']
pose.pose.position.y = target['y']
pose.pose.position.z = target['z']
dx = target['x'] - self.current_position['x']
dy = target['y'] - self.current_position['y']
dz = target['z'] - self.current_position['z']
d = math.sqrt(dx*dx + dy*dy)
yaw = math.atan2(dy,dx)
q = self.euler_to_quaternion(0.0,0.0,yaw)
pose.pose.orientation = q
return pose
def euler_to_quaternion(self, roll, pitch, yaw):
"""
Convert an Euler angle to a quaternion.
roll, pitch, yaw: in radians
"""
qx, qy, qz, qw = quaternion_from_euler(roll, pitch, yaw)
quaternion = Quaternion()
quaternion.x = qx
quaternion.y = qy
quaternion.z = qz
quaternion.w = qw
return quaternion
def waypoint_reached(self, target, tolerance = 1.0):
x_dist = abs(target['x'] - self.current_position['x'])
y_dist = abs(target['y'] - self.current_position['y'])
z_dist = abs(target['z'] - self.current_position['z'])
return (x_dist < tolerance and
y_dist < tolerance and
z_dist < tolerance)
def arm_drone(self, arm):
arm_msg = Bool()
arm_msg.data = arm
self.arm_publisher.publish(arm_msg)
self.get_logger().info("Drone armed")
def disarm_drone(self):
msg = Bool()
msg.data = True
self.disarm_publisher.publish(msg)
self.get_logger().info("Drone disarmed")
def fw_transition(self):
msg= Bool()
msg.data
self.fw_publisher.publish(msg)
def mc_transition(self):
msg= Bool()
self.mc_publisher.publish(msg)
def main(args=None):
rclpy.init(args=args)
waypoint_node = WaypointNode()
rclpy.spin(waypoint_node)
print("node destroyed")
waypoint_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy, QoSDurabilityPolicy
from rclpy.clock import Clock
import math
import numpy as np
from std_msgs.msg import Bool
from mavros_msgs.msg import State, ExtendedState
from mavros_msgs.srv import CommandBool, SetMode, CommandVtolTransition
class FlightstateNode(Node):
def __init__(self):
super().__init__('flight_state_controller')
qos_profile = QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT,
durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
history=QoSHistoryPolicy.KEEP_LAST,
depth=10
)
cmd_vel_qos_profile = QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT,
durability=QoSDurabilityPolicy.VOLATILE, # Match publisher's settings
history=QoSHistoryPolicy.KEEP_LAST,
depth=10
)
# Subscriptions
self.status_sub = self.create_subscription(
State,
'/mavros/state',
self.vehicle_status_callback,
cmd_vel_qos_profile
) ## current state
self.arm_sub = self.create_subscription(
Bool,
'/arm_message',
self.arm_message_callback,
qos_profile
) ## arming message from waypoint_controller.py
self.disarm_sub = self.create_subscription(
Bool,
'/disarm_message',
self.disarm_message_callback,
qos_profile
) # disarm message from waypoint_controller.py
self.vtol_state = self.create_subscription(
ExtendedState,
'/mavros/extended_state',
self.vtol_msg_callback,
qos_profile
) ## vtol state
self.vtol_publisher_fw = self.create_subscription(
Bool,
'/vtol_message_fw',
self.vtol_msg_callback_fw,
qos_profile
) ## vtol transition(fw)
self.vtol_publisher_mc = self.create_subscription(
Bool,
'/vtol_message_mc',
self.vtol_msg_callback_mc,
qos_profile
) ## vtol transition(mc)
self.arming_client = self.create_client(CommandBool, "/mavros/cmd/arming") ## arming command
self.set_mode_client = self.create_client(SetMode, "/mavros/set_mode") ## offboard command
self.vtol_transition_client = self.create_client(CommandVtolTransition,"/mavros/cmd/vtol_transition") ## transition command
# Timers
state_timer_period = 0.1
self.state_timer = self.create_timer(state_timer_period, self.flight_state_callback)
# Variables
self.arm_state = False
self.arm_message = False
self.disarm_message = False
self.current_state = None
self.offboard_reached = False
self.vtol_state = 0
self.vtol_msg_fw = False
self.vtol_msg_mc = False
self.vtol_count = 0
#subscription callback
def vehicle_status_callback(self, msg):
if (msg.armed != self.arm_state):
self.get_logger().info(f"ARM STATUS: {msg.armed}")
self.current_state = msg.mode
self.arm_state = msg.armed
def arm_message_callback(self, msg):
self.arm_message = msg.data
self.get_logger().info(f"Arm message recieved as {self.arm_message}")
def disarm_message_callback(self, msg):
self.disarm_message = msg.data
self.get_logger().info(f"Disarm message recieved as {self.arm_message}")
def vtol_msg_callback(self, msg):
if (self.vtol_state != msg.vtol_state):
if msg.vtol_state == 3:
self.get_logger().info("current VTOL state : MC")
if msg.vtol_state == 4:
self.get_logger().info("current VTOL state : FW")
self.vtol_state = msg.vtol_state
def vtol_msg_callback_fw(self, msg):
if msg.data and self.vtol_count == 0:
self.vtol_transition(transition_state = 4)
self.vtol_count = 1
self.get_logger().info("Transition message recieved (fw)")
def vtol_msg_callback_mc(self, msg):
if msg.data and self.vtol_count == 1:
self.vtol_transition(transition_state = 3)
self.vtol_count = 0
self.get_logger().info("Transition message recieved (mc)")
def flight_state_callback(self):
if (self.arm_message and not self.arm_state):
self.get_logger().info("ARMING START")
self.arm_drone()
if (self.arm_state and not self.offboard_reached):
self.set_mode("OFFBOARD")
if (self.current_state == 'OFFBOARD' and not self.offboard_reached):
self.offboard_reached = True
self.get_logger().info("OFFBOARD CONVERTED")
if self.vtol_msg_fw:
self.get_logger().info("MODE fw")
self.vtol_transition(4)
if self.vtol_msg_mc:
self.get_logger().info("MODE mc")
self.vtol_transition(3)
if self.disarm_message:
self.arm_drone(False)
self.destroy_timer(self.state_timer)
# Commands
def set_mode(self, mode_name):
req = SetMode.Request()
req.custom_mode = mode_name
self.set_mode_client.call_async(req)
def arm_drone(self, msg = True):
req = CommandBool.Request()
req.value = msg
self.arming_client.call_async(req)
def vtol_transition(self, transition_state):
req = CommandVtolTransition.Request()
req.state = transition_state
self.vtol_transition_client.call_async(req)
def main(args=None):
rclpy.init(args=args)
flight_state_node = FlightstateNode()
rclpy.spin(flight_state_node)
flight_state_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()