import numpy as np from sklearn.linear_model import RANSACRegressor, LinearRegression import matplotlib.pyplot as plt import pandas as pd import calc_way def grid_downsample(points, cell_size=15): """网格化降采样,保持空间结构""" df = pd.DataFrame(points, columns=['x', 'y']) df['x_grid'] = (df['x'] // cell_size) * cell_size df['y_grid'] = (df['y'] // cell_size) * cell_size sampled = df.groupby(['x_grid', 'y_grid']).first().reset_index() return sampled[['x', 'y']].values def filter_middle_percent(data_x, data_y, cameraModel): """ 保留数组中间80%的数据(删除首尾各10%)。 参数: data (np.ndarray): 输入数组(可以是一维或多维,但会先展平)。 返回: np.ndarray: 中间80%的数据。 """ # 展平数组(确保处理的是所有数据点) # Xw_bot = [] # for i in range(len(data_x)): # Xw, Yw = calc_way.calc_distance(cameraModel, data_x[i], data_y[i]) # Xw_bot.append(Xw) # Xw_bot = np.array(Xw_bot) # flattened_data = Xw_bot.flatten() flattened_data = data_x.flatten() # # 计算10%和90%分位数 # lower_bound = np.percentile(flattened_data, 15) # upper_bound = np.percentile(flattened_data, 75) # # # 筛选中间80%的数据 # mask = (data >= lower_bound) & (data <= upper_bound) # 计算最大值、最小值和总范围 data_min = np.min(flattened_data) data_max = np.max(flattened_data) data_range = data_max - data_min # print(f'data_range: {data_range}, data_min: {data_min}, data_max: {data_max}') # 计算中间80%的上下界 lower_bound = data_min + cameraModel.filt_percent * data_range upper_bound = data_max - cameraModel.filt_percent * data_range # 筛选数据 mask = (flattened_data >= lower_bound) & (flattened_data <= upper_bound) return mask def load_data(cameraModel, txt_name): """从用户输入的文件路径加载二维XY数据""" while True: filepath = txt_name.strip() if filepath.lower() == 'q': return None, None try: data = np.loadtxt(filepath) if cameraModel.grid_downsample: int_data = data.astype(int) grid_sampled = grid_downsample(int_data, cell_size=cameraModel.cell_size) data = grid_sampled if data.shape[1] != 2: print("错误:文件必须包含两列数据(X和Y)") continue x = data[:, 0].reshape(-1, 1) y = data[:, 1].reshape(-1, 1) y = cameraModel.camera_height - y if cameraModel.filter: mask = filter_middle_percent(x, y, cameraModel) x_clean = x[mask] y_clean = y[mask] else: x_clean = x y_clean = y return x_clean.reshape(-1, 1), y_clean.reshape(-1, 1) except Exception as e: print(f"加载文件出错: {e}") def ransac_fit(x, y, residual_threshold=2.5): """执行RANSAC拟合并返回模型和内点/外点""" ransac = RANSACRegressor( LinearRegression(), residual_threshold=residual_threshold, random_state=42 ) ransac.fit(x, y) inlier_mask = ransac.inlier_mask_ outlier_mask = ~inlier_mask return ransac.estimator_, inlier_mask, outlier_mask def get_data(cameraModel, txt_name): # 加载数据 x, y = load_data(cameraModel, txt_name) # print(f"文件数据点个数为:{len(x)}\n") if x is None: return 0, None, None, None, None, None, None, None, None # 第一次RANSAC拟合 model1, inlier_mask1, outlier_mask1 = ransac_fit(x, y, residual_threshold=cameraModel.ransac_residual_threshold) x_inliers1 = x[inlier_mask1] y_inliers1 = y[inlier_mask1] # 获取第一次拟合的外点 x_outliers1 = x[outlier_mask1] y_outliers1 = y[outlier_mask1] # 第二次RANSAC拟合(在外点上) model2, inlier_mask2, outlier_mask2 = None, None, None # print(f"第一次拟合内点数量:{len(x_inliers1)}\n外点数量:{len(x_outliers1)}\n") if len(x_outliers1) > cameraModel.outlier_num: # 确保有足够的外点进行第二次拟合 model2, inlier_mask2, outlier_mask2 = ransac_fit(x_outliers1, y_outliers1, residual_threshold=cameraModel.ransac_residual_threshold) else: print(f"外点数量不足\n第一次拟合内点数量:{len(x_inliers1)}\n外点数量:{len(x_outliers1)}\n") return 0, None, None, None, None, None, None, None, None x_inliers2 = x_outliers1[inlier_mask2] y_inliers2 = y_outliers1[inlier_mask2] # 获取第二次拟合的外点 x_outliers2 = x_outliers1[outlier_mask2] y_outliers2 = y_outliers1[outlier_mask2] print(f"第二次拟合内点数量:{len(x_inliers2)}\n外点数量:{len(x_outliers2)}\n") m1 = model1.predict(np.array([600]).reshape(-1, 1)) m2 = model2.predict(np.array([600]).reshape(-1, 1)) # 判断上下沿 if m1 > m2: model_top, model_bot = model1, model2 x_top, x_bot = x_inliers1, x_inliers2 y_top, y_bot = y_inliers1, y_inliers2 else: model_top, model_bot = model2, model1 x_top, x_bot = x_inliers2, x_inliers1 y_top, y_bot = y_inliers2, y_inliers1 # 统一提取斜率和截距 slope_top = model_top.coef_[0][0] intercept_top = model_top.intercept_[0] slope_bot = model_bot.coef_[0][0] intercept_bot = model_bot.intercept_[0] # plt.figure(figsize=(14, 7)) # # # 绘制原始内点 # plt.scatter(x_inliers1, y_inliers1, # color='limegreen', marker='o', s=30, alpha=0.7, # label='first_inliers') # # # 绘制第一次拟合的直线 # line_x = np.array([x.min(), x.max()]).reshape(-1, 1) # line_y_1 = model1.predict(line_x) # plt.plot(line_x, line_y_1, color='darkgreen', linewidth=3, # label='first RANSAC') # # plt.scatter(x_inliers2, y_inliers2, # color='yellow', marker='o', s=100, edgecolor='black', # label='second_inliers') # # # 绘制第二次拟合的直线 # line_x = np.array([x.min(), x.max()]).reshape(-1, 1) # line_y_1 = model2.predict(line_x) # plt.plot(line_x, line_y_1, color='darkgreen', linewidth=3, # label='second RANSAC') # # plt.scatter(x_outliers2, y_outliers2, # color='gray', marker='*', s=100, edgecolor='black', # label='second outliner') # # plt.xlabel('X', fontsize=12) # plt.ylabel('Y', fontsize=12) # plt.title(f'{txt_name}', fontsize=14) # plt.legend(fontsize=10, loc='best') # plt.grid(True, alpha=0.3) # plt.tight_layout() # plt.show() return 1, x_bot, y_bot, slope_bot, intercept_bot, x_top, y_top, slope_top, intercept_top