|  |  |  |  | import time | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  | import numpy as np | 
					
						
							|  |  |  |  | import matplotlib.pyplot as plt | 
					
						
							|  |  |  |  | from sklearn.linear_model import RANSACRegressor, LinearRegression | 
					
						
							|  |  |  |  | from sklearn.cluster import DBSCAN | 
					
						
							|  |  |  |  | from matplotlib.colors import ListedColormap | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  | txt_name = r"C:\Users\Administrator\Desktop\BYD\0718\new233.1718914966782.txt" | 
					
						
							|  |  |  |  | def load_data(txt_name): | 
					
						
							|  |  |  |  |     """从用户输入的文件路径加载二维XY数据""" | 
					
						
							|  |  |  |  |     while True: | 
					
						
							|  |  |  |  |         filepath = txt_name.strip() | 
					
						
							|  |  |  |  |         if filepath.lower() == 'q': | 
					
						
							|  |  |  |  |             return None, None | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |         try: | 
					
						
							|  |  |  |  |             data = np.loadtxt(filepath) | 
					
						
							|  |  |  |  |             if data.shape[1] != 2: | 
					
						
							|  |  |  |  |                 print("错误:文件必须包含两列数据(X和Y)") | 
					
						
							|  |  |  |  |                 continue | 
					
						
							|  |  |  |  |             x = data[:, 0].reshape(-1, 1) | 
					
						
							|  |  |  |  |             y = data[:, 1].reshape(-1, 1) | 
					
						
							|  |  |  |  |             y = 960 - y | 
					
						
							|  |  |  |  |             return x, y | 
					
						
							|  |  |  |  |         except Exception as e: | 
					
						
							|  |  |  |  |             print(f"加载文件出错: {e}") | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  | def ransac_fit(x, y, residual_threshold=1.0): | 
					
						
							|  |  |  |  |     """执行RANSAC拟合并返回模型和内点/外点""" | 
					
						
							|  |  |  |  |     ransac = RANSACRegressor( | 
					
						
							|  |  |  |  |         LinearRegression(), | 
					
						
							|  |  |  |  |         residual_threshold=residual_threshold, | 
					
						
							|  |  |  |  |         random_state=42 | 
					
						
							|  |  |  |  |     ) | 
					
						
							|  |  |  |  |     ransac.fit(x, y) | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     inlier_mask = ransac.inlier_mask_ | 
					
						
							|  |  |  |  |     outlier_mask = ~inlier_mask | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     return ransac.estimator_, inlier_mask, outlier_mask | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  | def cluster_outliers(x_outliers, y_outliers, eps=0.5, min_samples=5): | 
					
						
							|  |  |  |  |     """对异常值进行聚类""" | 
					
						
							|  |  |  |  |     points = np.column_stack((x_outliers, y_outliers)) | 
					
						
							|  |  |  |  |     clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(points) | 
					
						
							|  |  |  |  |     return clustering.labels_ | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  | def plot_results(x, y, model1, inlier_mask1, model2=None, inlier_mask2=None,outlier_mask2=None, cluster_labels=None): | 
					
						
							|  |  |  |  |     """可视化拟合结果和聚类""" | 
					
						
							|  |  |  |  |     plt.figure(figsize=(14, 7)) | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     # 绘制原始内点 | 
					
						
							|  |  |  |  |     plt.scatter(x[inlier_mask1], y[inlier_mask1], | 
					
						
							|  |  |  |  |                 color='limegreen', marker='o', s=30, alpha=0.7, | 
					
						
							|  |  |  |  |                 label='first inlier') | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     # 绘制第一次拟合的直线 | 
					
						
							|  |  |  |  |     line_x = np.array([x.min(), x.max()]).reshape(-1, 1) | 
					
						
							|  |  |  |  |     line_y_1 = model1.predict(line_x) | 
					
						
							|  |  |  |  |     plt.plot(line_x, line_y_1, color='darkgreen', linewidth=3, | 
					
						
							|  |  |  |  |              label='first RANSAC') | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     # 处理外点 | 
					
						
							|  |  |  |  |     outlier_mask1 = ~inlier_mask1 | 
					
						
							|  |  |  |  |     x_outliers = x[outlier_mask1] | 
					
						
							|  |  |  |  |     y_outliers = y[outlier_mask1] | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     # if cluster_labels is not None and len(cluster_labels) > 0: | 
					
						
							|  |  |  |  |     #     # 如果有聚类结果,使用不同颜色显示不同簇 | 
					
						
							|  |  |  |  |     #     unique_labels = np.unique(cluster_labels) | 
					
						
							|  |  |  |  |     #     colors = plt.cm.viridis(np.linspace(0, 1, len(unique_labels))) | 
					
						
							|  |  |  |  |     # | 
					
						
							|  |  |  |  |     #     for label, color in zip(unique_labels, colors): | 
					
						
							|  |  |  |  |     #         if label == -1:  # 噪声点 | 
					
						
							|  |  |  |  |     #             mask = cluster_labels == label | 
					
						
							|  |  |  |  |     #             plt.scatter(x_outliers[mask], y_outliers[mask], | 
					
						
							|  |  |  |  |     #                         color='gray', marker='x', s=20, | 
					
						
							|  |  |  |  |     #                         label='zaoshengdian' if label == -1 else None) | 
					
						
							|  |  |  |  |     #         else: | 
					
						
							|  |  |  |  |     #             mask = cluster_labels == label | 
					
						
							|  |  |  |  |     #             plt.scatter(x_outliers[mask], y_outliers[mask], | 
					
						
							|  |  |  |  |     #                         color=color, marker='^', s=40, | 
					
						
							|  |  |  |  |     #                         label=f'cu {label + 1}') | 
					
						
							|  |  |  |  |     # else: | 
					
						
							|  |  |  |  |     #     # 如果没有聚类结果,统一显示为金色 | 
					
						
							|  |  |  |  |     #     plt.scatter(x_outliers, y_outliers, | 
					
						
							|  |  |  |  |     #                 color='gold', marker='^', s=40, | 
					
						
							|  |  |  |  |     #                 label='first outliner') | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     # 绘制第二次拟合的直线(如果有) | 
					
						
							|  |  |  |  |     if model2 is not None and inlier_mask2 is not None: | 
					
						
							|  |  |  |  |         line_y_2 = model2.predict(line_x) | 
					
						
							|  |  |  |  |         plt.plot(line_x, line_y_2, color='red', linestyle='--', linewidth=3, | 
					
						
							|  |  |  |  |                  label='second RANSAC') | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |         # 高亮显示第二次拟合的内点 | 
					
						
							|  |  |  |  |         plt.scatter(x_outliers[inlier_mask2], y_outliers[inlier_mask2], | 
					
						
							|  |  |  |  |                     color='red', marker='*', s=100, edgecolor='black', | 
					
						
							|  |  |  |  |                     label='second inliner') | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |         plt.scatter(x_outliers[outlier_mask2], y_outliers[outlier_mask2], | 
					
						
							|  |  |  |  |                     color='gray', marker='x', s=100, edgecolor='black', | 
					
						
							|  |  |  |  |                     label='second outliner') | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     plt.xlabel('X', fontsize=12) | 
					
						
							|  |  |  |  |     plt.ylabel('Y', fontsize=12) | 
					
						
							|  |  |  |  |     plt.title('RANSAC拟合与异常值聚类结果', fontsize=14) | 
					
						
							|  |  |  |  |     plt.legend(fontsize=10, loc='best') | 
					
						
							|  |  |  |  |     plt.grid(True, alpha=0.3) | 
					
						
							|  |  |  |  |     plt.tight_layout() | 
					
						
							|  |  |  |  |     plt.show() | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  | def main(): | 
					
						
							|  |  |  |  |     print("=== 双重RANSAC拟合与异常值聚类分析 ===") | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     # 加载数据 | 
					
						
							|  |  |  |  |     x, y = load_data(txt_name) | 
					
						
							|  |  |  |  |     if x is None: | 
					
						
							|  |  |  |  |         return | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     # 第一次RANSAC拟合 | 
					
						
							|  |  |  |  |     print("\n正在进行第一次RANSAC拟合...") | 
					
						
							|  |  |  |  |     model1, inlier_mask1, outlier_mask1 = ransac_fit(x, y, residual_threshold=3.0) | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     x_inliers1 = x[inlier_mask1] | 
					
						
							|  |  |  |  |     y_inliers1 = y[inlier_mask1] | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     # 获取第一次拟合的外点 | 
					
						
							|  |  |  |  |     x_outliers1 = x[outlier_mask1] | 
					
						
							|  |  |  |  |     y_outliers1 = y[outlier_mask1] | 
					
						
							|  |  |  |  |     # 对外点进行聚类 | 
					
						
							|  |  |  |  |     print("正在对异常值进行聚类分析...") | 
					
						
							|  |  |  |  |     cluster_labels = None | 
					
						
							|  |  |  |  |     if len(x_outliers1) > 5:  # 至少有5个外点才进行聚类 | 
					
						
							|  |  |  |  |         cluster_labels = cluster_outliers(x_outliers1, y_outliers1) | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     # 第二次RANSAC拟合(在外点上) | 
					
						
							|  |  |  |  |     model2, inlier_mask2, outlier_mask2 = None, None, None | 
					
						
							|  |  |  |  |     if len(x_outliers1) > 10:  # 确保有足够的外点进行第二次拟合 | 
					
						
							|  |  |  |  |         print("\n正在进行第二次RANSAC拟合...") | 
					
						
							|  |  |  |  |         model2, inlier_mask2, outlier_mask2 = ransac_fit(x_outliers1, y_outliers1, residual_threshold=3.0) | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     # 可视化结果 | 
					
						
							|  |  |  |  |     print("\n生成可视化结果...") | 
					
						
							|  |  |  |  |     plot_results(x, y, model1, inlier_mask1, model2, inlier_mask2, outlier_mask2, cluster_labels) | 
					
						
							|  |  |  |  |     x_inliers2 = x_outliers1[inlier_mask2] | 
					
						
							|  |  |  |  |     y_inliers2 = y_outliers1[inlier_mask2] | 
					
						
							|  |  |  |  |     # 获取第二次拟合的外点 | 
					
						
							|  |  |  |  |     x_outliers2 = x_outliers1[outlier_mask2] | 
					
						
							|  |  |  |  |     y_outliers2 = y_outliers1[outlier_mask2] | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     m1 = model1.predict(np.array([600]).reshape(-1, 1)) | 
					
						
							|  |  |  |  |     m2 = model2.predict(np.array([600]).reshape(-1, 1)) | 
					
						
							|  |  |  |  |     # 判断上下沿 | 
					
						
							|  |  |  |  |     if m1 > m2: | 
					
						
							|  |  |  |  |         model_top, model_bot = model1, model2 | 
					
						
							|  |  |  |  |         x_top, x_bot = x_inliers1, x_inliers2 | 
					
						
							|  |  |  |  |         y_top, y_bot = y_inliers1, y_inliers2 | 
					
						
							|  |  |  |  |     else: | 
					
						
							|  |  |  |  |         model_top, model_bot = model2, model1 | 
					
						
							|  |  |  |  |         x_top, x_bot = x_inliers2, x_inliers1 | 
					
						
							|  |  |  |  |         y_top, y_bot = y_inliers2, y_inliers1 | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     # 统一提取斜率和截距 | 
					
						
							|  |  |  |  |     slope_top = model_top.coef_[0][0] | 
					
						
							|  |  |  |  |     intercept_top = model_top.intercept_[0] | 
					
						
							|  |  |  |  |     slope_bot = model_bot.coef_[0][0] | 
					
						
							|  |  |  |  |     intercept_bot = model_bot.intercept_[0] | 
					
						
							|  |  |  |  |     print() | 
					
						
							|  |  |  |  |     print("\n分析完成!") | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  | if __name__ == "__main__": | 
					
						
							|  |  |  |  |     t = time.time() | 
					
						
							|  |  |  |  |     main() | 
					
						
							|  |  |  |  |     print(f"time: {time.time() - t}") |