import time import numpy as np import matplotlib.pyplot as plt from sklearn.linear_model import RANSACRegressor, LinearRegression from sklearn.cluster import DBSCAN from matplotlib.colors import ListedColormap txt_name = r"C:\Users\Administrator\Desktop\BYD\0718\new233.1718914966782.txt" def load_data(txt_name): """从用户输入的文件路径加载二维XY数据""" while True: filepath = txt_name.strip() if filepath.lower() == 'q': return None, None try: data = np.loadtxt(filepath) if data.shape[1] != 2: print("错误:文件必须包含两列数据(X和Y)") continue x = data[:, 0].reshape(-1, 1) y = data[:, 1].reshape(-1, 1) y = 960 - y return x, y except Exception as e: print(f"加载文件出错: {e}") def ransac_fit(x, y, residual_threshold=1.0): """执行RANSAC拟合并返回模型和内点/外点""" ransac = RANSACRegressor( LinearRegression(), residual_threshold=residual_threshold, random_state=42 ) ransac.fit(x, y) inlier_mask = ransac.inlier_mask_ outlier_mask = ~inlier_mask return ransac.estimator_, inlier_mask, outlier_mask def cluster_outliers(x_outliers, y_outliers, eps=0.5, min_samples=5): """对异常值进行聚类""" points = np.column_stack((x_outliers, y_outliers)) clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(points) return clustering.labels_ def plot_results(x, y, model1, inlier_mask1, model2=None, inlier_mask2=None,outlier_mask2=None, cluster_labels=None): """可视化拟合结果和聚类""" plt.figure(figsize=(14, 7)) # 绘制原始内点 plt.scatter(x[inlier_mask1], y[inlier_mask1], color='limegreen', marker='o', s=30, alpha=0.7, label='first inlier') # 绘制第一次拟合的直线 line_x = np.array([x.min(), x.max()]).reshape(-1, 1) line_y_1 = model1.predict(line_x) plt.plot(line_x, line_y_1, color='darkgreen', linewidth=3, label='first RANSAC') # 处理外点 outlier_mask1 = ~inlier_mask1 x_outliers = x[outlier_mask1] y_outliers = y[outlier_mask1] # if cluster_labels is not None and len(cluster_labels) > 0: # # 如果有聚类结果,使用不同颜色显示不同簇 # unique_labels = np.unique(cluster_labels) # colors = plt.cm.viridis(np.linspace(0, 1, len(unique_labels))) # # for label, color in zip(unique_labels, colors): # if label == -1: # 噪声点 # mask = cluster_labels == label # plt.scatter(x_outliers[mask], y_outliers[mask], # color='gray', marker='x', s=20, # label='zaoshengdian' if label == -1 else None) # else: # mask = cluster_labels == label # plt.scatter(x_outliers[mask], y_outliers[mask], # color=color, marker='^', s=40, # label=f'cu {label + 1}') # else: # # 如果没有聚类结果,统一显示为金色 # plt.scatter(x_outliers, y_outliers, # color='gold', marker='^', s=40, # label='first outliner') # 绘制第二次拟合的直线(如果有) if model2 is not None and inlier_mask2 is not None: line_y_2 = model2.predict(line_x) plt.plot(line_x, line_y_2, color='red', linestyle='--', linewidth=3, label='second RANSAC') # 高亮显示第二次拟合的内点 plt.scatter(x_outliers[inlier_mask2], y_outliers[inlier_mask2], color='red', marker='*', s=100, edgecolor='black', label='second inliner') plt.scatter(x_outliers[outlier_mask2], y_outliers[outlier_mask2], color='gray', marker='x', s=100, edgecolor='black', label='second outliner') plt.xlabel('X', fontsize=12) plt.ylabel('Y', fontsize=12) plt.title('RANSAC拟合与异常值聚类结果', fontsize=14) plt.legend(fontsize=10, loc='best') plt.grid(True, alpha=0.3) plt.tight_layout() plt.show() def main(): print("=== 双重RANSAC拟合与异常值聚类分析 ===") # 加载数据 x, y = load_data(txt_name) if x is None: return # 第一次RANSAC拟合 print("\n正在进行第一次RANSAC拟合...") model1, inlier_mask1, outlier_mask1 = ransac_fit(x, y, residual_threshold=3.0) x_inliers1 = x[inlier_mask1] y_inliers1 = y[inlier_mask1] # 获取第一次拟合的外点 x_outliers1 = x[outlier_mask1] y_outliers1 = y[outlier_mask1] # 对外点进行聚类 print("正在对异常值进行聚类分析...") cluster_labels = None if len(x_outliers1) > 5: # 至少有5个外点才进行聚类 cluster_labels = cluster_outliers(x_outliers1, y_outliers1) # 第二次RANSAC拟合(在外点上) model2, inlier_mask2, outlier_mask2 = None, None, None if len(x_outliers1) > 10: # 确保有足够的外点进行第二次拟合 print("\n正在进行第二次RANSAC拟合...") model2, inlier_mask2, outlier_mask2 = ransac_fit(x_outliers1, y_outliers1, residual_threshold=3.0) # 可视化结果 print("\n生成可视化结果...") plot_results(x, y, model1, inlier_mask1, model2, inlier_mask2, outlier_mask2, cluster_labels) x_inliers2 = x_outliers1[inlier_mask2] y_inliers2 = y_outliers1[inlier_mask2] # 获取第二次拟合的外点 x_outliers2 = x_outliers1[outlier_mask2] y_outliers2 = y_outliers1[outlier_mask2] m1 = model1.predict(np.array([600]).reshape(-1, 1)) m2 = model2.predict(np.array([600]).reshape(-1, 1)) # 判断上下沿 if m1 > m2: model_top, model_bot = model1, model2 x_top, x_bot = x_inliers1, x_inliers2 y_top, y_bot = y_inliers1, y_inliers2 else: model_top, model_bot = model2, model1 x_top, x_bot = x_inliers2, x_inliers1 y_top, y_bot = y_inliers2, y_inliers1 # 统一提取斜率和截距 slope_top = model_top.coef_[0][0] intercept_top = model_top.intercept_[0] slope_bot = model_bot.coef_[0][0] intercept_bot = model_bot.intercept_[0] print() print("\n分析完成!") if __name__ == "__main__": t = time.time() main() print(f"time: {time.time() - t}")