You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

182 lines
6.4 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import time
import numpy as np
import matplotlib.pyplot as plt
from sklearn.linear_model import RANSACRegressor, LinearRegression
from sklearn.cluster import DBSCAN
from matplotlib.colors import ListedColormap
txt_name = r"C:\Users\Administrator\Desktop\BYD\0718\new233.1718914966782.txt"
def load_data(txt_name):
"""从用户输入的文件路径加载二维XY数据"""
while True:
filepath = txt_name.strip()
if filepath.lower() == 'q':
return None, None
try:
data = np.loadtxt(filepath)
if data.shape[1] != 2:
print("错误文件必须包含两列数据X和Y")
continue
x = data[:, 0].reshape(-1, 1)
y = data[:, 1].reshape(-1, 1)
y = 960 - y
return x, y
except Exception as e:
print(f"加载文件出错: {e}")
def ransac_fit(x, y, residual_threshold=1.0):
"""执行RANSAC拟合并返回模型和内点/外点"""
ransac = RANSACRegressor(
LinearRegression(),
residual_threshold=residual_threshold,
random_state=42
)
ransac.fit(x, y)
inlier_mask = ransac.inlier_mask_
outlier_mask = ~inlier_mask
return ransac.estimator_, inlier_mask, outlier_mask
def cluster_outliers(x_outliers, y_outliers, eps=0.5, min_samples=5):
"""对异常值进行聚类"""
points = np.column_stack((x_outliers, y_outliers))
clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(points)
return clustering.labels_
def plot_results(x, y, model1, inlier_mask1, model2=None, inlier_mask2=None,outlier_mask2=None, cluster_labels=None):
"""可视化拟合结果和聚类"""
plt.figure(figsize=(14, 7))
# 绘制原始内点
plt.scatter(x[inlier_mask1], y[inlier_mask1],
color='limegreen', marker='o', s=30, alpha=0.7,
label='first inlier')
# 绘制第一次拟合的直线
line_x = np.array([x.min(), x.max()]).reshape(-1, 1)
line_y_1 = model1.predict(line_x)
plt.plot(line_x, line_y_1, color='darkgreen', linewidth=3,
label='first RANSAC')
# 处理外点
outlier_mask1 = ~inlier_mask1
x_outliers = x[outlier_mask1]
y_outliers = y[outlier_mask1]
# if cluster_labels is not None and len(cluster_labels) > 0:
# # 如果有聚类结果,使用不同颜色显示不同簇
# unique_labels = np.unique(cluster_labels)
# colors = plt.cm.viridis(np.linspace(0, 1, len(unique_labels)))
#
# for label, color in zip(unique_labels, colors):
# if label == -1: # 噪声点
# mask = cluster_labels == label
# plt.scatter(x_outliers[mask], y_outliers[mask],
# color='gray', marker='x', s=20,
# label='zaoshengdian' if label == -1 else None)
# else:
# mask = cluster_labels == label
# plt.scatter(x_outliers[mask], y_outliers[mask],
# color=color, marker='^', s=40,
# label=f'cu {label + 1}')
# else:
# # 如果没有聚类结果,统一显示为金色
# plt.scatter(x_outliers, y_outliers,
# color='gold', marker='^', s=40,
# label='first outliner')
# 绘制第二次拟合的直线(如果有)
if model2 is not None and inlier_mask2 is not None:
line_y_2 = model2.predict(line_x)
plt.plot(line_x, line_y_2, color='red', linestyle='--', linewidth=3,
label='second RANSAC')
# 高亮显示第二次拟合的内点
plt.scatter(x_outliers[inlier_mask2], y_outliers[inlier_mask2],
color='red', marker='*', s=100, edgecolor='black',
label='second inliner')
plt.scatter(x_outliers[outlier_mask2], y_outliers[outlier_mask2],
color='gray', marker='x', s=100, edgecolor='black',
label='second outliner')
plt.xlabel('X', fontsize=12)
plt.ylabel('Y', fontsize=12)
plt.title('RANSAC拟合与异常值聚类结果', fontsize=14)
plt.legend(fontsize=10, loc='best')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def main():
print("=== 双重RANSAC拟合与异常值聚类分析 ===")
# 加载数据
x, y = load_data(txt_name)
if x is None:
return
# 第一次RANSAC拟合
print("\n正在进行第一次RANSAC拟合...")
model1, inlier_mask1, outlier_mask1 = ransac_fit(x, y, residual_threshold=3.0)
x_inliers1 = x[inlier_mask1]
y_inliers1 = y[inlier_mask1]
# 获取第一次拟合的外点
x_outliers1 = x[outlier_mask1]
y_outliers1 = y[outlier_mask1]
# 对外点进行聚类
print("正在对异常值进行聚类分析...")
cluster_labels = None
if len(x_outliers1) > 5: # 至少有5个外点才进行聚类
cluster_labels = cluster_outliers(x_outliers1, y_outliers1)
# 第二次RANSAC拟合在外点上
model2, inlier_mask2, outlier_mask2 = None, None, None
if len(x_outliers1) > 10: # 确保有足够的外点进行第二次拟合
print("\n正在进行第二次RANSAC拟合...")
model2, inlier_mask2, outlier_mask2 = ransac_fit(x_outliers1, y_outliers1, residual_threshold=3.0)
# 可视化结果
print("\n生成可视化结果...")
plot_results(x, y, model1, inlier_mask1, model2, inlier_mask2, outlier_mask2, cluster_labels)
x_inliers2 = x_outliers1[inlier_mask2]
y_inliers2 = y_outliers1[inlier_mask2]
# 获取第二次拟合的外点
x_outliers2 = x_outliers1[outlier_mask2]
y_outliers2 = y_outliers1[outlier_mask2]
m1 = model1.predict(np.array([600]).reshape(-1, 1))
m2 = model2.predict(np.array([600]).reshape(-1, 1))
# 判断上下沿
if m1 > m2:
model_top, model_bot = model1, model2
x_top, x_bot = x_inliers1, x_inliers2
y_top, y_bot = y_inliers1, y_inliers2
else:
model_top, model_bot = model2, model1
x_top, x_bot = x_inliers2, x_inliers1
y_top, y_bot = y_inliers2, y_inliers1
# 统一提取斜率和截距
slope_top = model_top.coef_[0][0]
intercept_top = model_top.intercept_[0]
slope_bot = model_bot.coef_[0][0]
intercept_bot = model_bot.intercept_[0]
print()
print("\n分析完成!")
if __name__ == "__main__":
t = time.time()
main()
print(f"time: {time.time() - t}")