您现在的位置是:首页 >技术杂谈 >【路径规划】基于RRT算法和改进人工势场法的无人机任务规划方法研究(Python代码实现)先放源码网站首页技术杂谈

【路径规划】基于RRT算法和改进人工势场法的无人机任务规划方法研究(Python代码实现)先放源码

简介【路径规划】基于RRT算法和改进人工势场法的无人机任务规划方法研究(Python代码实现)先放源码
import random
import math
import copy
import time
import heapq

from scipy import stats
from matplotlib.patches import Ellipse

import matplotlib

matplotlib.use('TkAgg')
import matplotlib.pyplot as plt
import numpy as np


class GMM(object):
    def __init__(self, k: int, d: int):
        self.K = k
        # 初始化参数
        self.p = np.random.rand(k)
        self.p = self.p / self.p.sum()  # 保证所有p_k的和为1
        self.means = np.random.rand(k, d)  # 初始化均值
        self.covs = np.empty((k, d, d))  # 初始化方差
        for i in range(k):  # 随机生成协方差矩阵,必须是半正定矩阵
            self.covs[i] = np.eye(d) * np.random.rand(1) * k

    # 画出聚类图像
    def plot_clusters(self, data, Mu=None, Var=None):
        colors = ['b', 'g', 'r']
        n_clusters = len(self.means)
        plt.figure(figsize=(10, 8))
        plt.axis([-10, 15, -5, 15])
        plt.scatter(data[:, 0], data[:, 1], s=5)
        ax = plt.gca()
        for i in range(n_clusters):
            # 迭代模型区域
            # Ellipse,参数:坐标,椭圆胖瘦
            ellipse = Ellipse(list(self.means[i]), 3 * self.covs[i][0][0], 3 * self.covs[i][1][1], alpha=0.5, lw=2,
                              edgecolor=colors[i], ls='--')

            ax.add_patch(ellipse)
        if (Mu is not None) & (Var is not None):
            # 原始数据
            for i in range(n_clusters):
                # plot_args = {'fc': 'None', 'lw': 2, 'edgecolor': colors[i], 'alpha': 0.5}
                ellipse = Ellipse(Mu[i], 3 * Var[i][0], 3 * Var[i][1], fc='None', lw=2, edgecolor=colors[i])
                ax.add_patch(ellipse)
        plt.show()

    def fit(self, data: np.ndarray):
        for _ in range(100):
            density = np.empty((len(data), self.K))
            for i in range(self.K):
                # 生成K个概率密度函数并计算对于所有样本的概率密度
                norm = stats.multivariate_normal(self.means[i], self.covs[i], allow_singular=True)
                density[:, i] = norm.pdf(data)
            # 计算所有样本属于每一类别的后验
            posterior = density * self.p  # 概率*权重
            posterior = posterior / posterior.sum(axis=1, keepdims=True)  # 归一化:N*3
            # print(posterior)
            # 计算下一时刻的参数值
            p_hat = posterior.sum(axis=0)
            # print(p_hat)
            mean_hat = np.tensordot(posterior, data, axes=[0, 0])  # 3*N*N*2=3*2
            # print(mean_hat)
            # 计算协方差
            cov_hat = np.empty(self.covs.shape)
            for i in range(self.K):
                tmp = data - self.means[i]
                cov_hat[i] = np.dot(tmp.T * posterior[:, i], tmp) / p_hat[i]
            # 更新参数
            self.covs = cov_hat
            self.means = mean_hat / p_hat.reshape(-1, 1)
            self.p = p_hat / len(data)
            # if _ % 10 == 0:
            #     GMM.plot_clusters(self, data)
        print(self.p)
        print(self.means)
        print(self.covs)


class RRT:

    # 初始化
    def __init__(self,
                 obstacle_list,  # 障碍物
                 rand_area,  # 采样的区域
                 expand_dis=2.0,  # 步长
                 goal_sample_rate=10,  # 目标采样率
                 max_iter=200):  # 最大迭代次数

        self.start = None
        self.goal = None
        self.min_rand = rand_area[0]
        self.max_rand = rand_area[1]
        self.expand_dis = expand_dis
        self.goal_sample_rate = goal_sample_rate
        self.max_iter = max_iter
        self.obstacle_list = obstacle_list
        self.node_list = None

    def rrt_path_optimize(self, path):
        # random_list = [[self.goal.x, self.goal.y]]
        # for i in range(1,len(path)):
        #     rnd_node = self.random_point_circle(path[i][0],path[i][1])
        #     random_list.append(rnd_node)
        #
        # random_list.append([self.start.x, self.start.y])
        # #
        random_list = []
        cost = []
        for j in range(0, 21):
            random_list.append([[self.goal.x, self.goal.y]])
            flag = False
            for i in range(1, len(path) - 1):
                rnd_node = self.random_point_circle(path[i][0], path[i][1])
                # rnd_node = self.random_specific_point_Gaussin(path[i][0], path[i][1])
                random_list[j].append(rnd_node)
            random_list[j].append([self.start.x, self.start.y])
            for i in range(0, len(path) - 2):
                no_collision = self.check_segment_collision(random_list[j][i][0], random_list[j][i][1],
                                                            random_list[j][i + 1][0], random_list[j][i + 1][1])
                if not no_collision:
                    flag = True
                    break
            if flag:
                cost.append(100000)
            else:
                cost.append(self.get_path_len(random_list[j]))

        least_5 = heapq.nsmallest(5, cost)
        avg_x = [0] * (len(random_list[0]) - 2)
        avg_y = [0] * (len(random_list[0]) - 2)
        for i in range(0, 5):
            index = cost.index(least_5[i])
            for j in range(1, len(avg_x) + 1):
                avg_x[j - 1] += random_list[index][j][0]
                avg_y[j - 1] += random_list[index][j][1]

        for i in range(0, len(avg_x)):
            avg_x[i] = avg_x[i] / 5
            avg_y[i] = avg_y[i] / 5

        optmized_path = [[self.goal.x, self.goal.y]]
        for i in range(0, len(avg_x)):
            optmized_path.append([avg_x[i], avg_y[i]])
        optmized_path.append([self.start.x, self.start.y])

        path_length = self.get_path_len(optmized_path)
        print("优化后的路径长度为:{}".format(path_length))

        return optmized_path

        #
        # return random_list

    def rrt_optimize_one_point(self, path):
        for j in range(1, len(path) - 1):
            # rnd_node = self.random_point_circle(path[j][0], path[j][1])
            rnd_node = self.random_specific_point_Gaussin(path[j][0], path[j][1])
            origin_cost = math.sqrt(
                math.pow((path[j][0] - path[j - 1][0]), 2) + math.pow((path[j][1] - path[j - 1][1]), 2)) 
                          + math.sqrt(
                math.pow((path[j + 1][0] - path[j][0]), 2) + math.pow((path[j + 1][1] - path[j][1]), 2))

            new_cost = math.sqrt(
                math.pow((rnd_node[0] - path[j - 1][0]), 2) + math.pow((rnd_node[1] - path[j - 1][1]), 2)) 
                       + math.sqrt(
                math.pow((path[j + 1][0] - rnd_node[0]), 2) + math.pow((path[j + 1][1] - rnd_node[1]), 2))

            if new_cost < origin_cost:
                no_collision_1 = self.check_segment_collision(path[j - 1][0], path[j - 1][1], rnd_node[0], rnd_node[1])
                no_collision_2 = self.check_segment_collision(rnd_node[0], rnd_node[1], path[j + 1][0], path[j + 1][1])
                if no_collision_1 and no_collision_2:
                    path[j] = rnd_node
                    path_length = self.get_path_len(path)
                    print("优化后的路径长度为:{}".format(path_length))

    def rrt_planning(self, start, goal, animation=True):
        self.start = Node(start[0], start[1])
        self.goal = Node(goal[0], goal[1])
        self.node_list = [self.start]
        path = None

        for i in range(self.max_iter):
            # 1. 在环境中随机采样点
            rnd = self.sample()

            # 2. 找到结点树中距离采样点最近的结点
            n_ind = self.get_nearest_list_index(self.node_list, rnd)
            nearest_node = self.node_list[n_ind]

            # 3. 在采样点的方向生长一个步长,得到下一个树的结点。
            theta = math.atan2(rnd[1] - nearest_node.y, rnd[0] - nearest_node.x)
            new_node = self.get_new_node(theta, n_ind, nearest_node)

            # 4. 检测碰撞,检测到新生成的结点的路径是否会与障碍物碰撞
            no_collision = self.check_segment_collision(new_node.x, new_node.y, nearest_node.x, nearest_node.y)
            if no_collision:
                self.node_list.append(new_node)

                # 一步一绘制
                if animation:
                    time.sleep(0.3)
                    self.draw_graph(new_node, path)

                # 判断新结点是否临近目标点
                if self.is_near_goal(new_node):
                    if self.check_segment_collision(new_node.x, new_node.y,
                                                    self.goal.x, self.goal.y):
                        last_index = len(self.node_list) - 1
                        path = self.get_final_course(last_index)  # 回溯路径
                        path_length = self.get_path_len(path)  # 计算路径的长度
                        print("当前的路径长度为:{}".format(path_length))

                        if animation:
                            self.draw_graph(new_node, path)


                        # 原始版本:均匀采样/单点高斯采样优化
                        cost = []
                        opt_path = self.rrt_path_optimize(path)
                        cost.append(self.get_path_len(opt_path))
                        for j in range(0, 100):
                            # opt_path = self.rrt_path_optimize(opt_path)
                            self.rrt_optimize_one_point(opt_path)
                            cost.append(self.get_path_len(opt_path))
                            self.draw_graph(None, opt_path, True)
                            plt.pause(0.08)
                        self.draw_graph(None, opt_path, False)

                        plt.figure()
                        plt.ylim((15, 35))
                        plt.title("Global Path Cost")
                        plt.plot(range(len(cost)), cost)

                        # opt_path = self.rrt_optimize_GMM(path)
                        return opt_path

    def test(self):
        # random_point_Gaussin 测试无误
        # random = []
        # for i in range(500):
        #     random.append(self.random_point_Gaussin(2, 2, [[0.2, 0], [0, 1]]))
        #     plt.plot(random[i][0], random[i][1], "og")
        # plt.show()


        # sample_GMM 测试无误
        # self.goal = Node(12, 18)
        # gmm = GMM(3, 2)
        # gmm.means[0] = [0.5, 0.5]
        # gmm.means[1] = [2, 3]
        # gmm.means[2] = [5, 5]
        # gmm.covs[0] = [[0.1, 0], [0, 0.1]]
        # gmm.covs[1] = [[0.1, 0], [0, 0.3]]
        # gmm.covs[2] = [[0.3, 0], [0, 0.1]]
        # gmm.p = [0.333, 0.333, 0.333]
        # random = []
        # for i in range(500):
        #     random.append(self.sample_GMM(gmm))
        #     plt.plot(random[i][0], random[i][1], "og")
        # plt.show()

        a = 1

    def rrt_optimize_GMM(self, init_path):
        path_points_num = len(init_path) - 2

        # !!! 在这里要先算出足够数量的路径,有足够数量的样本之后才能进行参数估计
        init_random_path = []
        init_cost = []
        for j in range(50):
            for i in range(self.max_iter):
                # 1. 在环境中随机采样点
                rnd = self.sample()

                # 2. 找到结点树中距离采样点最近的结点
                n_ind = self.get_nearest_list_index(self.node_list, rnd)
                nearest_node = self.node_list[n_ind]

                # 3. 在采样点的方向生长一个步长,得到下一个树的结点。
                theta = math.atan2(rnd[1] - nearest_node.y, rnd[0] - nearest_node.x)
                new_node = self.get_new_node(theta, n_ind, nearest_node)

                # 4. 检测碰撞,检测到新生成的结点的路径是否会与障碍物碰撞
                no_collision = self.check_segment_collision(new_node.x, new_node.y, nearest_node.x, nearest_node.y)
                if no_collision:
                    self.node_list.append(new_node)

                    # 判断新结点是否临近目标点
                    if self.is_near_goal(new_node):
                        if self.check_segment_collision(new_node.x, new_node.y,
                                                        self.goal.x, self.goal.y):
                            last_index = len(self.node_list) - 1
                            path = self.get_final_course(last_index)  # 回溯路径
                            path_length = self.get_path_len(path)  # 计算路径的长度
                            print("当前的路径长度为:{}".format(path_length))
                            init_random_path.append(path)
                            init_cost.append(path_length)

        data = []
        least_5 = heapq.nsmallest(5, init_cost)
        for i in range(0, 5):
            index = init_cost.index(least_5[i])
            for j in range(1, len(init_random_path[index]) - 1):
                data.append(init_random_path[i][j])

        # 需要在正式迭代优化之前先进行预处理,按均匀采样得到一组样本以估计高斯混合模型的参数。   高斯混合模型的参数可以适当给一个大一点的定值(以平均路径点数为基础) !!

        gmm = GMM(path_points_num, 2)
        gmm.fit(init_path)
        animation = True

        random_path_list = []
        cost = []
        for k in range(50):  # 迭代优化50次
            for j in range(20):  # 每次采样20条路径再取最好的5条
                for i in range(self.max_iter):
                    # 1. 在环境中随机采样点
                    rnd = self.sample_GMM(gmm)

                    # 2. 找到结点树中距离采样点最近的结点
                    n_ind = self.get_nearest_list_index(self.node_list, rnd)
                    nearest_node = self.node_list[n_ind]

                    # 3. 在采样点的方向生长一个步长,得到下一个树的结点。
                    theta = math.atan2(rnd[1] - nearest_node.y, rnd[0] - nearest_node.x)
                    new_node = self.get_new_node(theta, n_ind, nearest_node)

                    # 4. 检测碰撞,检测到新生成的结点的路径是否会与障碍物碰撞
                    no_collision = self.check_segment_collision(new_node.x, new_node.y, nearest_node.x, nearest_node.y)
                    if no_collision:
                        self.node_list.append(new_node)

                        # 判断新结点是否临近目标点
                        if self.is_near_goal(new_node):
                            if self.check_segment_collision(new_node.x, new_node.y,
                                                            self.goal.x, self.goal.y):
                                last_index = len(self.node_list) - 1
                                path = self.get_final_course(last_index)  # 回溯路径
                                path_length = self.get_path_len(path)  # 计算路径的长度
                                print("当前的路径长度为:{}".format(path_length))
                                random_path_list.append(path)
                                cost.append(path_length)

                                if animation:
                                    self.draw_graph(new_node, path)

            data = []
            least_5 = heapq.nsmallest(5, cost)
            for i in range(0, 5):
                index = cost.index(least_5[i])
                for j in range(1, len(random_path_list[index]) - 1):
                    data.append(random_path_list[i][j])
            gmm.fit(data)

        optimized_path = []
        optimized_path.append(self.goal)
        for i in range(1, len(gmm.means) - 1):
            optimized_path.append(gmm.means[i])
        optimized_path.append(self.start)
        return optimized_path

    def sample_GMM(self, gmm):
        if random.randint(0, 100) > self.goal_sample_rate:
            points_num = 0
            index_sample = random.uniform(0, 1)
            while index_sample > 0:
                index_sample -= gmm.p[points_num]
                points_num += 1
            points_num -= 1
            rnd = self.random_point_Gaussin(gmm.means[points_num][0], gmm.means[points_num][1], gmm.covs[points_num])
        else:
            rnd = [self.goal.x, self.goal.y]
        return rnd

    def sample(self):
        if random.randint(0, 100) > self.goal_sample_rate:
            rnd = [random.uniform(self.min_rand, self.max_rand),
                   random.uniform(self.min_rand, self.max_rand)]
        else:
            rnd = [self.goal.x, self.goal.y]
        return rnd

    @staticmethod
    def get_nearest_list_index(nodes, rnd):
        d_list = [(node.x - rnd[0]) ** 2 + (node.y - rnd[1]) ** 2
                  for node in nodes]
        min_index = d_list.index(min(d_list))
        return min_index

    def get_new_node(self, theta, n_ind, nearest_node):
        new_node = copy.deepcopy(nearest_node)

        new_node.x += self.expand_dis * math.cos(theta)
        new_node.y += self.expand_dis * math.sin(theta)

        new_node.cost += self.expand_dis
        new_node.parent = n_ind

        return new_node

    def check_segment_collision(self, x1, y1, x2, y2):
        for (ox, oy, radius) in self.obstacle_list:
            dd = self.distance_squared_point_to_segment(
                np.array([x1, y1]),
                np.array([x2, y2]),
                np.array([ox, oy])
            )
            if dd <= radius ** 2:
                return False
        return True

    @staticmethod
    def distance_squared_point_to_segment(v, w, p):
        if np.array_equal(v, w):  # 点 v 和 点 w 重合的情况
            return (p - v).dot(p - v)

        l2 = (w - v).dot(w - v)  # 线段 vw 长度的平方
        t = max(0, min(1, (p - v).dot(w - v) / l2))
        projection = v + t * (w - v)
        return (p - projection).dot(p - projection)

    def draw_graph(self, rnd=None, path=None, optimize=False):

        plt.clf()

        # 绘制新的结点
        if rnd is not None:
            plt.plot(rnd.x, rnd.y, '^k')

        # 绘制路径
        for node in self.node_list:
            if node.parent is not None:
                if node.x or node.y is not None:
                    plt.plot([node.x, self.node_list[node.parent].x],
                             [node.y, self.node_list[node.parent].y],
                             '-g')

        # 绘制起点、终点
        plt.plot(self.start.x, self.start.y, "og")
        plt.plot(self.goal.x, self.goal.y, "or")

        # 绘制障碍物
        for (ox, oy, size) in self.obstacle_list:
            plt.plot(ox, oy, "ok", ms=30 * size)

        if not optimize:
            # 绘制路径
            if path is not None:
                plt.plot([x for (x, y) in path], [y for (x, y) in path], '-r')

        else:
            if path is not None:
                plt.plot([x for (x, y) in path], [y for (x, y) in path], '-b')

        # 绘制图的设置
        plt.axis([-2, 18, -2, 15])
        plt.grid(True)
        plt.pause(0.01)

    def is_near_goal(self, node):
        d = self.line_cost(node, self.goal)
        if d < self.expand_dis:
            return True
        return False

    @staticmethod
    def line_cost(node1, node2):
        return math.sqrt((node1.x - node2.x) ** 2 + (node1.y - node2.y) ** 2)

    @staticmethod
    def random_point_circle(x, y):
        radius = random.uniform(0, 0.5)
        theta = random.uniform(0, 2 * math.pi)
        rnd = [x + radius * math.cos(theta), y + radius * math.sin(theta)]
        return rnd

    def random_specific_point_Gaussin(self, x, y):
        cov = np.array([[0.2, 0], [0, 0.2
                                   ]])
        Gaussin = np.random.multivariate_normal([x, y], cov, 1)
        rx, ry = Gaussin.T
        return [rx[0], ry[0]]

    def random_point_Gaussin(self, x, y, cov):
        Gaussin = np.random.multivariate_normal([x, y], cov, 1)
        rx, ry = Gaussin.T
        return [rx[0], ry[0]]

    def get_final_course(self, last_index):
        path = [[self.goal.x, self.goal.y]]
        while self.node_list[last_index].parent is not None:
            node = self.node_list[last_index]
            path.append([node.x, node.y])
            last_index = node.parent
        path.append([self.start.x, self.start.y])
        return path

    @staticmethod
    def get_path_len(path):
        path_length = 0
        for i in range(1, len(path)):
            node1_x = path[i][0]
            node1_y = path[i][1]
            node2_x = path[i - 1][0]
            node2_y = path[i - 1][1]
            path_length += math.sqrt((node1_x - node2_x) ** 2 + (node1_y - node2_y) ** 2)
        return path_length


class Node:
    def __init__(self, x, y):
        self.x = x
        self.y = y
        self.cost = 0.0
        self.parent = None


def main():
    print('Start RRT planning!')
    show_animation = True
    start = [0, 0]
    goal = [15, 12]
    # 障碍物 (x, y, radius)
    obstacle_list = [
        (3, 3, 1.5),
        (12, 2, 3),
        (3, 9, 2),
        (9, 11, 2),
        (5, 5, 3),
        (9, 11, 2)
    ]

    rrt = RRT(rand_area=[-2, 18], obstacle_list=obstacle_list, max_iter=200)
    # rrt.test()
    path = rrt.rrt_planning(start=[0, 0], goal=[15, 12], animation=show_animation)
    print('Done!')
    if show_animation and path:
        plt.show()


if __name__ == '__main__':
    main()

这个程序实现了 RRT(Rapidly-exploring Random Tree,快速探索随机树)算法。RRT 是一种用于路径规划的随机化算法,适用于解决高维空间中的路径规划问题。它主要用于机器人运动规划、自动驾驶汽车、虚拟现实等领域。

这个程序中定义了 RRT 类,Node 类,以及一个 main 函数。RRT 类包含了 RRT 算法的主要逻辑。以下是每个类和函数的简要概述:

  1. class RRT: 实现了 RRT 算法的类。它包含了初始化函数、rrt_planning 函数、rrt_optimize_GMM 函数以及其他辅助函数。

  2. __init__(self, start, goal, obstacle_list, expand_dis=1.0, path_resolution=0.5, goal_sample_rate=5, max_iter=300): RRT 类的初始化函数。它接收起点、终点、障碍物列表、扩展距离、路径分辨率、目标采样率和最大迭代次数作为参数。

  3. rrt_planning(self, start, goal, animation=True): RRT 规划的主要函数。它实现了 RRT 算法,并在找到一条路径时返回该路径。

  4. rrt_optimize_GMM(self, init_path): 这个函数使用高斯混合模型(GMM)对初始路径进行优化。它首先生成一些随机路径,然后用 GMM 对路径进行拟合和优化。

  5. class Node: 一个简单的类,表示 RRT 中的节点。它包含 x 和 y 坐标、代价和父节点。

  6. main(): 主函数。它定义了起点、终点和障碍物列表,然后创建一个 RRT 实例并调用 rrt_planning 函数进行路径规划。如果找到路径,它还会显示路径。

 这个程序的主要目的是在有障碍物的二维环境中找到一条从起点到终点的可行路径。它首先初始化 RRT 类,然后在 main 函数中调用 rrt_planning 函数进行路径规划。在规划过程中,它会随机采样点,寻找最近的节点,并在该节点的方向上扩展一个步长。然后检查扩展后的路径是否与障碍物碰撞。如果没有碰撞,它会将新节点添加到节点列表中。当找到一条到达目标点的路径时,算法停止并返回路径。

rrt_optimize_GMM 函数中,程序使用高斯混合模型(GMM)对初始路径进行优化。它首先生成一些随机路径,然后用 GMM 对路径进行拟合和 

优化。

以下是这个程序中的一些辅助函数:

  1. distance(self, from_node, to_node): 计算两个节点之间的欧氏距离。

  2. get_random_node(self): 在搜索区域内生成一个随机节点。

  3. get_nearest_node_index(self, node_list, random_node): 计算节点列表中离给定随机节点最近的节点的索引。

  4. steer(self, from_node, to_node, extend_length=float("inf")): 控制从一个节点到另一个节点的移动。这个函数会返回一个新的节点,这个节点在 from_node 和 to_node 之间,与 from_node 的距离不超过 extend_length。

  5. collision_check(self, new_node, near_node): 检查从 near_node 到 new_node 的路径是否与障碍物发生碰撞。如果有碰撞,返回 True,否则返回 False。

  6. find_near_nodes(self, new_node): 寻找与新节点距离在一个预定范围内的节点。

  7. update_parent(self, n, near_inds, obstacle_list): 更新节点的父节点以减少总路径长度。

  8. get_best_last_index(self): 获取到达目标点的最佳路径的最后一个节点的索引。

  9. generate_final_course(self, goal_ind): 根据给定的目标节点索引,生成从起点到终点的路径。

  10. draw_graph(self, rnd=None): 绘制 RRT 图,包括节点、边和障碍物。如果提供了 rnd 参数,还会绘制随机生成的节点。

  11. propagate_cost_to_leaves(self, parent_node): 将从根节点到指定父节点的代价传播到所有叶子节点。

在主函数中,首先定义了起点、终点和障碍物列表。然后创建一个 RRT 实例并调用 rrt_planning 函数进行路径规划。如果找到了路径,会显示出来。如果需要对路径进行优化,可以调用 rrt_optimize_GMM 函数。

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。