import numpy as np import pandas as pd import calc_way from scipy import stats import calc_slope_line import matplotlib.pyplot as plt import model import os # 数据截断线 model = model.Model() limit_slope = model.limit_slope limit_intercept = model.limit_intercept def grid_downsample(points, cell_size=15): """网格化降采样,保持空间结构""" df = pd.DataFrame(points, columns=['x', 'y']) df['x_grid'] = (df['x'] // cell_size) * cell_size df['y_grid'] = (df['y'] // cell_size) * cell_size sampled = df.groupby(['x_grid', 'y_grid']).first().reset_index() return sampled[['x', 'y']].values """ 读取yolo网络识别路沿的坐标数据,筛选出目标区域的数据点,并将路沿上下侧数据分离 参数:保存数据的txt文件路径 返回值:在目标区域内的下侧数据点坐标x_bot、y_bot,上侧数据点坐标x_top,y_top """ def get_data(txt_name): # 加载数据 data = np.loadtxt(txt_name) int_data = data.astype(int) # 网格化降采样 grid_sampled = grid_downsample(int_data, cell_size=20) # 数据截断 x = [] y = [] for i in range(grid_sampled.shape[0]): grid_sampled[i][1] = 960 - int(grid_sampled[i][1]) if limit_slope * int(grid_sampled[i][0]) + limit_intercept - int(grid_sampled[i][1]) < 0: continue x.append(int(grid_sampled[i][0])) y.append(int(grid_sampled[i][1])) x = np.array(x) y = np.array(y) # 原始数据粗分类 slope, intercept, r_2 = calc_slope_line.linear_regression(x, y) y_pred = slope * x + intercept x_bot = [] y_bot = [] x_top = [] y_top = [] for i in range(len(x)): if x[i] * slope + intercept - y[i] > 0: x_bot.append(x[i]) y_bot.append(y[i]) else: x_top.append(x[i]) y_top.append(y[i]) x_bot = np.array(x_bot) y_bot = np.array(y_bot) x_top = np.array(x_top) y_top = np.array(y_top) slope_bot, intercept_bot, r2_bot = calc_slope_line.linear_regression(x_bot, y_bot) slope_top, intercept_top, r2_top = calc_slope_line.linear_regression(x_top, y_top) print(f"未清洗数据拟合上下沿:r2_bot = {r2_bot},r2_top = {r2_top}") # 第一次数据清洗,消除误识别点 # 计算残差 residuals = y - y_pred # 计算残差的标准差 (MSE的平方根) residual_std = np.sqrt(np.sum(residuals ** 2) / (len(x) - 2)) standardized_residuals = residuals / residual_std # 设置阈值 (常用 2.5-3.0 个标准差) threshold = 2.0 # 标记异常点 outlier_mask = np.abs(standardized_residuals) > threshold outliers_x = x[outlier_mask] outliers_y = y[outlier_mask] print(f"第一次数据清洗发现 {np.sum(outlier_mask)} 个异常点:") for i, (x_val, y_val) in enumerate(zip(outliers_x, outliers_y)): print(f"点 {i + 1}: x={x_val}, y={y_val}, 残差={residuals[outlier_mask][i]:.2f}") # 剔除异常点 clean_x = x[~outlier_mask] clean_y = y[~outlier_mask] clean_slope, clean_intercept, clean_r_2 = calc_slope_line.linear_regression(clean_x, clean_y) print(f"清洗数据后整体拟合参数r_2 = {r_2}") # 第一次数据清洗后的数据再分类 x_bot_clean = [] y_bot_clean = [] x_top_clean = [] y_top_clean = [] for i in range(len(clean_x)): if clean_x[i] * clean_slope + clean_intercept - clean_y[i] > 0: x_bot_clean.append(clean_x[i]) y_bot_clean.append(clean_y[i]) else: x_top_clean.append(clean_x[i]) y_top_clean.append(clean_y[i]) x_bot_clean = np.array(x_bot_clean) y_bot_clean = np.array(y_bot_clean) x_top_clean = np.array(x_top_clean) y_top_clean = np.array(y_top_clean) # 第二次数据清洗,消除误分类点 clean_slope_bot, clean_intercept_bot, clean_r2_bot = calc_slope_line.linear_regression(x_bot_clean, y_bot_clean) clean_slope_top, clean_intercept_top, clean_r2_top = calc_slope_line.linear_regression(x_top_clean, y_top_clean) print(f"清洗数据后上下沿拟合参数clean_r2_bot = {clean_r2_bot},clean_r2_top = {clean_r2_top}") # 绘制拟合线 y_bot_pred = clean_slope_bot * x_bot_clean + clean_intercept_bot y_top_pred = clean_slope_top * x_top_clean + clean_intercept_top # 计算残差 residuals_bot = y_bot_clean - y_bot_pred residuals_top = y_top_clean - y_top_pred # 计算残差的标准差 (MSE的平方根) residual_std_bot = np.sqrt(np.sum(residuals_bot ** 2) / (len(x_bot_clean) - 2)) residual_std_top = np.sqrt(np.sum(residuals_top ** 2) / (len(x_top_clean) - 2)) # 计算标准化残差 (Z-score) standardized_residuals_bot = residuals_bot / residual_std_bot standardized_residuals_top = residuals_top / residual_std_top # 设置阈值 (常用 2.5-3.0 个标准差) threshold = 1.0 # 标记异常点 outlier_mask_bot = np.abs(standardized_residuals_bot) > threshold outlier_mask_top = np.abs(standardized_residuals_top) > threshold outliers_x_bot = x_bot_clean[outlier_mask_bot] outliers_y_bot = y_bot_clean[outlier_mask_bot] outliers_x_top = x_top_clean[outlier_mask_top] outliers_y_top = y_top_clean[outlier_mask_top] print(f"第二次数据清洗下沿发现 {np.sum(outlier_mask_bot)} 个异常点:") # for i, (x_val, y_val) in enumerate(zip(outliers_x_bot, outliers_y_bot)): # print(f"点 {i + 1}: x={x_val}, y={y_val}, 残差={residuals_bot[outlier_mask_bot][i]:.2f}") print(f"第二次数据清洗上沿发现 {np.sum(outlier_mask_top)} 个异常点:") # for i, (x_val, y_val) in enumerate(zip(outliers_x_top, outliers_y_top)): # print(f"点 {i + 1}: x={x_val}, y={y_val}, 残差={residuals_top[outlier_mask_top][i]:.2f}") # 剔除异常点 x_bot_clean = x_bot_clean[~outlier_mask_bot] y_bot_clean = y_bot_clean[~outlier_mask_bot] x_top_clean = x_top_clean[~outlier_mask_top] y_top_clean = y_top_clean[~outlier_mask_top] # 判断数据的有效性 clean_slope_bot, clean_intercept_bot, clean_r2_bot = calc_slope_line.linear_regression(x_bot_clean, y_bot_clean) clean_slope_top, clean_intercept_top, clean_r2_top = calc_slope_line.linear_regression(x_top_clean, y_top_clean) print(f"清洗数据后上下沿拟合参数clean_r2_bot = {clean_r2_bot},clean_r2_top = {clean_r2_top}") if ((1-clean_r2_bot) > 1e-3) or ((1-clean_r2_top) > 1e-3): print("无效数据") return 0, None, None, None, None return 1, x_bot_clean, y_bot_clean, x_top_clean, y_top_clean