You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

182 lines
6.4 KiB

3 months ago
import time
import numpy as np
import matplotlib.pyplot as plt
from sklearn.linear_model import RANSACRegressor, LinearRegression
from sklearn.cluster import DBSCAN
from matplotlib.colors import ListedColormap
txt_name = r"C:\Users\Administrator\Desktop\BYD\0718\new233.1718914966782.txt"
def load_data(txt_name):
"""从用户输入的文件路径加载二维XY数据"""
while True:
filepath = txt_name.strip()
if filepath.lower() == 'q':
return None, None
try:
data = np.loadtxt(filepath)
if data.shape[1] != 2:
print("错误文件必须包含两列数据X和Y")
continue
x = data[:, 0].reshape(-1, 1)
y = data[:, 1].reshape(-1, 1)
y = 960 - y
return x, y
except Exception as e:
print(f"加载文件出错: {e}")
def ransac_fit(x, y, residual_threshold=1.0):
"""执行RANSAC拟合并返回模型和内点/外点"""
ransac = RANSACRegressor(
LinearRegression(),
residual_threshold=residual_threshold,
random_state=42
)
ransac.fit(x, y)
inlier_mask = ransac.inlier_mask_
outlier_mask = ~inlier_mask
return ransac.estimator_, inlier_mask, outlier_mask
def cluster_outliers(x_outliers, y_outliers, eps=0.5, min_samples=5):
"""对异常值进行聚类"""
points = np.column_stack((x_outliers, y_outliers))
clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(points)
return clustering.labels_
def plot_results(x, y, model1, inlier_mask1, model2=None, inlier_mask2=None,outlier_mask2=None, cluster_labels=None):
"""可视化拟合结果和聚类"""
plt.figure(figsize=(14, 7))
# 绘制原始内点
plt.scatter(x[inlier_mask1], y[inlier_mask1],
color='limegreen', marker='o', s=30, alpha=0.7,
label='first inlier')
# 绘制第一次拟合的直线
line_x = np.array([x.min(), x.max()]).reshape(-1, 1)
line_y_1 = model1.predict(line_x)
plt.plot(line_x, line_y_1, color='darkgreen', linewidth=3,
label='first RANSAC')
# 处理外点
outlier_mask1 = ~inlier_mask1
x_outliers = x[outlier_mask1]
y_outliers = y[outlier_mask1]
# if cluster_labels is not None and len(cluster_labels) > 0:
# # 如果有聚类结果,使用不同颜色显示不同簇
# unique_labels = np.unique(cluster_labels)
# colors = plt.cm.viridis(np.linspace(0, 1, len(unique_labels)))
#
# for label, color in zip(unique_labels, colors):
# if label == -1: # 噪声点
# mask = cluster_labels == label
# plt.scatter(x_outliers[mask], y_outliers[mask],
# color='gray', marker='x', s=20,
# label='zaoshengdian' if label == -1 else None)
# else:
# mask = cluster_labels == label
# plt.scatter(x_outliers[mask], y_outliers[mask],
# color=color, marker='^', s=40,
# label=f'cu {label + 1}')
# else:
# # 如果没有聚类结果,统一显示为金色
# plt.scatter(x_outliers, y_outliers,
# color='gold', marker='^', s=40,
# label='first outliner')
# 绘制第二次拟合的直线(如果有)
if model2 is not None and inlier_mask2 is not None:
line_y_2 = model2.predict(line_x)
plt.plot(line_x, line_y_2, color='red', linestyle='--', linewidth=3,
label='second RANSAC')
# 高亮显示第二次拟合的内点
plt.scatter(x_outliers[inlier_mask2], y_outliers[inlier_mask2],
color='red', marker='*', s=100, edgecolor='black',
label='second inliner')
plt.scatter(x_outliers[outlier_mask2], y_outliers[outlier_mask2],
color='gray', marker='x', s=100, edgecolor='black',
label='second outliner')
plt.xlabel('X', fontsize=12)
plt.ylabel('Y', fontsize=12)
plt.title('RANSAC拟合与异常值聚类结果', fontsize=14)
plt.legend(fontsize=10, loc='best')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def main():
print("=== 双重RANSAC拟合与异常值聚类分析 ===")
# 加载数据
x, y = load_data(txt_name)
if x is None:
return
# 第一次RANSAC拟合
print("\n正在进行第一次RANSAC拟合...")
model1, inlier_mask1, outlier_mask1 = ransac_fit(x, y, residual_threshold=3.0)
x_inliers1 = x[inlier_mask1]
y_inliers1 = y[inlier_mask1]
# 获取第一次拟合的外点
x_outliers1 = x[outlier_mask1]
y_outliers1 = y[outlier_mask1]
# 对外点进行聚类
print("正在对异常值进行聚类分析...")
cluster_labels = None
if len(x_outliers1) > 5: # 至少有5个外点才进行聚类
cluster_labels = cluster_outliers(x_outliers1, y_outliers1)
# 第二次RANSAC拟合在外点上
model2, inlier_mask2, outlier_mask2 = None, None, None
if len(x_outliers1) > 10: # 确保有足够的外点进行第二次拟合
print("\n正在进行第二次RANSAC拟合...")
model2, inlier_mask2, outlier_mask2 = ransac_fit(x_outliers1, y_outliers1, residual_threshold=3.0)
# 可视化结果
print("\n生成可视化结果...")
plot_results(x, y, model1, inlier_mask1, model2, inlier_mask2, outlier_mask2, cluster_labels)
x_inliers2 = x_outliers1[inlier_mask2]
y_inliers2 = y_outliers1[inlier_mask2]
# 获取第二次拟合的外点
x_outliers2 = x_outliers1[outlier_mask2]
y_outliers2 = y_outliers1[outlier_mask2]
m1 = model1.predict(np.array([600]).reshape(-1, 1))
m2 = model2.predict(np.array([600]).reshape(-1, 1))
# 判断上下沿
if m1 > m2:
model_top, model_bot = model1, model2
x_top, x_bot = x_inliers1, x_inliers2
y_top, y_bot = y_inliers1, y_inliers2
else:
model_top, model_bot = model2, model1
x_top, x_bot = x_inliers2, x_inliers1
y_top, y_bot = y_inliers2, y_inliers1
# 统一提取斜率和截距
slope_top = model_top.coef_[0][0]
intercept_top = model_top.intercept_[0]
slope_bot = model_bot.coef_[0][0]
intercept_bot = model_bot.intercept_[0]
print()
print("\n分析完成!")
if __name__ == "__main__":
t = time.time()
main()
print(f"time: {time.time() - t}")