스크린캐스트 2025년 11월 29일 11시 48분 53초.webm

#!/usr/bin/env python

import os
import rclpy
import csv
from rclpy.node import Node
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy, QoSDurabilityPolicy
from mavros_msgs.srv import CommandBool
from geometry_msgs.msg import PoseStamped, Vector3, Quaternion
from std_msgs.msg import Bool,Float32
from tf_transformations import quaternion_from_euler
import time
import math
import numpy as np

NAV_PERIOD_SEC = 0.1

class WaypointNode(Node):
    def __init__(self):
        super().__init__('WaypointNode')
        
        qos_profile = QoSProfile(
            reliability=QoSReliabilityPolicy.BEST_EFFORT,
            durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
            history=QoSHistoryPolicy.KEEP_LAST,
            depth=10
        )

        qos_mavros = QoSProfile(
            reliability=QoSReliabilityPolicy.BEST_EFFORT,
            durability=QoSDurabilityPolicy.VOLATILE,
            history=QoSHistoryPolicy.KEEP_LAST,
            depth=10
        )
        
        # Subscriptions
        self.position_subscriber = self.create_subscription(
            PoseStamped,
            '/mavros/local_position/pose',
            self.position_callback,
            qos_mavros
        ) 

        # Publishers
        self.arm_publisher = self.create_publisher(
            Bool,
            '/arm_message',
            qos_profile
        )

        self.disarm_publisher = self.create_publisher(
            Bool,
            '/disarm_message',
            qos_profile
        )
        
        
        self.vtol_publisher_fw = self.create_publisher(
            Bool,
            '/vtol_message_fw',
            qos_profile
        ) ## vtol transition(fw)
        
        
        
        self.vtol_publisher_mc = self.create_publisher(
            Bool,
            '/vtol_message_mc',
            qos_profile
        ) ## vtol transition(mc)

        self.offboard_position_publisher = self.create_publisher(
            PoseStamped,
            '/mavros/setpoint_position/local',
            qos_profile
        )

        self.offboard_position_publisher = self.create_publisher(
            PoseStamped,
            '/mavros/setpoint_position/local',
            qos_profile
        )

        self.current_position = {'x': 0.0, 'y': 0.0, 'z': 0.0}  # Current position of the drone

        self.target_waypoints = [                             
            {'x': 0.0, 'y': 0.0, 'z': 10.0},                     #wp0                                                           
            {'x': 30.0, 'y': 0.0, 'z': 10.0},                    #wp1  
            {'x': 30.0, 'y': 30.0, 'z': 10.0},                   #wp2
            {'x': 0.0, 'y': 30.0, 'z': 10.0},                    #wp3
            {'x': 0.0, 'y': 0.0, 'z': 10.0},                     #wp4
            {'x': 0.0, 'y': 0.0, 'z': 0.0},                      #wp5
        ] 
        self.curr_way_index = 0                             # Current index in waypoint list
        self.max_waypoint = len(self.target_waypoints)  # Total number of waypoints
        self.target_waypoint = self.target_waypoints[0]               # Current target waypoint                

        time.sleep(5)
        self.get_logger().info("Main launched")
        self.arm_drone(True)
        self.navigate_waypoints()

    def navigate_waypoints(self):
        self.wp_timer = self.create_timer(NAV_PERIOD_SEC, self.navigate_waypoint_callback)

    def position_callback(self, msg):
        self.current_position['x'] = msg.pose.position.x
        self.current_position['y'] = msg.pose.position.y
        self.current_position['z'] = msg.pose.position.z
        self.current_orientation = (
            msg.pose.orientation.x,
            msg.pose.orientation.y,
            msg.pose.orientation.z,
            msg.pose.orientation.w
        )

    def navigate_waypoint_callback(self):
        self.offboard_position_publisher.publish(self.calculate_position_command(self.target_waypoint))

        if self.waypoint_reached(self.target_waypoint):
            self.get_logger().info(f"Waypoint {self.curr_way_index + 1}/{self.max_waypoint} reached")
            self.curr_way_index += 1

            if self.curr_way_index >= self.max_waypoint:
                self.get_logger().info("✅ All waypoints reached. Landing and disarming drone.")
                self.disarm_drone()
                self.destroy_timer(self.wp_timer)
            else:
                self.target_waypoint = self.target_waypoints[self.curr_way_index]
                self.get_logger().info(f"Moving to next waypoint: "
                                       f"({int(self.target_waypoint['x'])}, {int(self.target_waypoint['y'])}, {int(self.target_waypoint['z'])})")

    def calculate_position_command(self, target):
        pose = PoseStamped()
        pose.header.stamp = self.get_clock().now().to_msg()
        pose.pose.position.x = target['x'] 
        pose.pose.position.y = target['y']
        pose.pose.position.z = target['z']
        
        dx = target['x'] - self.current_position['x']
        dy = target['y'] - self.current_position['y']
        dz = target['z'] - self.current_position['z']
        d = math.sqrt(dx*dx + dy*dy)
        
        yaw = math.atan2(dy,dx)
        
        q = self.euler_to_quaternion(0.0,0.0,yaw)

        pose.pose.orientation = q
        
        return pose

    def euler_to_quaternion(self, roll, pitch, yaw):
        """
        Convert an Euler angle to a quaternion.
        roll, pitch, yaw: in radians
        """
        qx, qy, qz, qw = quaternion_from_euler(roll, pitch, yaw)

        quaternion = Quaternion()
        quaternion.x = qx
        quaternion.y = qy
        quaternion.z = qz
        quaternion.w = qw

        return quaternion

    def waypoint_reached(self, target, tolerance = 1.0):

        x_dist = abs(target['x'] - self.current_position['x'])
        y_dist = abs(target['y'] - self.current_position['y'])
        z_dist = abs(target['z'] - self.current_position['z'])

        return (x_dist < tolerance and
                y_dist < tolerance and
                z_dist < tolerance)

    def arm_drone(self, arm):
        arm_msg = Bool()
        arm_msg.data = arm
        self.arm_publisher.publish(arm_msg)
        self.get_logger().info("Drone armed")

    def disarm_drone(self):
        msg = Bool()
        msg.data = True
        self.disarm_publisher.publish(msg)
        self.get_logger().info("Drone disarmed")
    
    def fw_transition(self):
        msg= Bool()
        msg.data
        self.fw_publisher.publish(msg)
        
        
    def mc_transition(self):
        msg= Bool()
        self.mc_publisher.publish(msg)
        

def main(args=None):
    rclpy.init(args=args)
    waypoint_node = WaypointNode()
    rclpy.spin(waypoint_node)
    print("node destroyed")
    waypoint_node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy, QoSDurabilityPolicy
from rclpy.clock import Clock

import math
import numpy as np

from std_msgs.msg import Bool
from mavros_msgs.msg import State, ExtendedState
from mavros_msgs.srv import CommandBool, SetMode, CommandVtolTransition

class FlightstateNode(Node):

    def __init__(self):
        super().__init__('flight_state_controller')

        qos_profile = QoSProfile(
            reliability=QoSReliabilityPolicy.BEST_EFFORT,
            durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
            history=QoSHistoryPolicy.KEEP_LAST,
            depth=10
        )
        cmd_vel_qos_profile = QoSProfile(
            reliability=QoSReliabilityPolicy.BEST_EFFORT,
            durability=QoSDurabilityPolicy.VOLATILE,  # Match publisher's settings
            history=QoSHistoryPolicy.KEEP_LAST,
            depth=10
        )

        # Subscriptions
        self.status_sub = self.create_subscription(
            State,
            '/mavros/state',
            self.vehicle_status_callback,
            cmd_vel_qos_profile
        )  ## current state

        self.arm_sub = self.create_subscription(
            Bool,
            '/arm_message',
            self.arm_message_callback,
            qos_profile
        ) ## arming message from waypoint_controller.py

        self.disarm_sub = self.create_subscription(
            Bool,
            '/disarm_message',
            self.disarm_message_callback,
            qos_profile
        ) # disarm message from waypoint_controller.py
        

        self.vtol_state = self.create_subscription(
            ExtendedState,
            '/mavros/extended_state',
            self.vtol_msg_callback,
            qos_profile
        ) ## vtol state
        
        
        self.vtol_publisher_fw = self.create_subscription(
            Bool,
            '/vtol_message_fw',
            self.vtol_msg_callback_fw,
            qos_profile
        ) ## vtol transition(fw)

        self.vtol_publisher_mc = self.create_subscription(
            Bool,
            '/vtol_message_mc',
            self.vtol_msg_callback_mc,
            qos_profile
        ) ## vtol transition(mc)

        
        self.arming_client = self.create_client(CommandBool, "/mavros/cmd/arming")  ## arming command
        self.set_mode_client = self.create_client(SetMode, "/mavros/set_mode") ## offboard command
        self.vtol_transition_client = self.create_client(CommandVtolTransition,"/mavros/cmd/vtol_transition") ## transition command
        
        
      # Timers
        state_timer_period = 0.1
        self.state_timer = self.create_timer(state_timer_period, self.flight_state_callback)

        # Variables
        self.arm_state = False

        self.arm_message = False
        self.disarm_message = False

        self.current_state = None
        self.offboard_reached = False
 
        self.vtol_state = 0
        self.vtol_msg_fw = False
        self.vtol_msg_mc = False
        self.vtol_count = 0
    
        
        
    #subscription callback

    def vehicle_status_callback(self, msg):      
        if (msg.armed != self.arm_state):
            self.get_logger().info(f"ARM STATUS: {msg.armed}")
        self.current_state = msg.mode
        self.arm_state = msg.armed

    def arm_message_callback(self, msg):
        self.arm_message = msg.data
        self.get_logger().info(f"Arm message recieved as {self.arm_message}")

    def disarm_message_callback(self, msg):
        self.disarm_message = msg.data
        self.get_logger().info(f"Disarm message recieved as {self.arm_message}")

    def vtol_msg_callback(self, msg):
        if (self.vtol_state != msg.vtol_state):
            if msg.vtol_state == 3:
                self.get_logger().info("current VTOL state : MC")
            if msg.vtol_state == 4:
                self.get_logger().info("current VTOL state : FW")
        self.vtol_state = msg.vtol_state

    def vtol_msg_callback_fw(self, msg):
        if msg.data and self.vtol_count == 0:
            self.vtol_transition(transition_state = 4)
            self.vtol_count = 1
        self.get_logger().info("Transition message recieved (fw)")

    def vtol_msg_callback_mc(self, msg):
        if msg.data and self.vtol_count == 1:
            self.vtol_transition(transition_state = 3)
            self.vtol_count = 0
        self.get_logger().info("Transition message recieved (mc)")
    

    def flight_state_callback(self):
            
        if (self.arm_message and not self.arm_state):
            self.get_logger().info("ARMING START")
            self.arm_drone()

        if (self.arm_state and not self.offboard_reached):
            self.set_mode("OFFBOARD")
            
        if (self.current_state == 'OFFBOARD' and not self.offboard_reached):
            self.offboard_reached = True
            self.get_logger().info("OFFBOARD CONVERTED")
            
        if self.vtol_msg_fw:
            self.get_logger().info("MODE fw")
            self.vtol_transition(4)

        if self.vtol_msg_mc:
            self.get_logger().info("MODE mc")
            self.vtol_transition(3)

        if self.disarm_message:
            self.arm_drone(False)
            self.destroy_timer(self.state_timer)
    

    
    # Commands
    def set_mode(self, mode_name):
        req = SetMode.Request()
        req.custom_mode = mode_name
        self.set_mode_client.call_async(req)

    def arm_drone(self, msg = True):
        req = CommandBool.Request()
        req.value = msg
        self.arming_client.call_async(req)
        
    def vtol_transition(self, transition_state):
        req = CommandVtolTransition.Request()
        req.state = transition_state
        self.vtol_transition_client.call_async(req)
        

def main(args=None):
    rclpy.init(args=args)
    flight_state_node = FlightstateNode()
    rclpy.spin(flight_state_node)
    flight_state_node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()