|
|
import numpy as np
|
|
|
import pandas as pd
|
|
|
import calc_way
|
|
|
from scipy import stats
|
|
|
import calc_slope_line
|
|
|
import matplotlib.pyplot as plt
|
|
|
import model
|
|
|
import os
|
|
|
import numpy as np
|
|
|
import matplotlib.pyplot as plt
|
|
|
from sklearn.linear_model import RANSACRegressor, LinearRegression
|
|
|
from sklearn.cluster import DBSCAN
|
|
|
# 数据截断线
|
|
|
model = model.Model()
|
|
|
# limit_slope = model.limit_slope
|
|
|
# limit_intercept = model.limit_intercept
|
|
|
def grid_downsample(points, cell_size=15):
|
|
|
"""网格化降采样,保持空间结构"""
|
|
|
df = pd.DataFrame(points, columns=['x', 'y'])
|
|
|
df['x_grid'] = (df['x'] // cell_size) * cell_size
|
|
|
df['y_grid'] = (df['y'] // cell_size) * cell_size
|
|
|
sampled = df.groupby(['x_grid', 'y_grid']).first().reset_index()
|
|
|
return sampled[['x', 'y']].values
|
|
|
|
|
|
"""
|
|
|
读取yolo网络识别路沿的坐标数据,筛选出目标区域的数据点,并将路沿上下侧数据分离
|
|
|
参数:保存数据的txt文件路径
|
|
|
返回值:在目标区域内的下侧数据点坐标x_bot、y_bot,上侧数据点坐标x_top,y_top
|
|
|
"""
|
|
|
def get_data(txt_name,jingdu):
|
|
|
# 加载数据
|
|
|
data = np.loadtxt(txt_name)
|
|
|
int_data = data.astype(int)
|
|
|
|
|
|
# 网格化降采样
|
|
|
grid_sampled = grid_downsample(int_data, cell_size=20)
|
|
|
|
|
|
# 数据截断
|
|
|
x = []
|
|
|
y = []
|
|
|
for i in range(grid_sampled.shape[0]):
|
|
|
grid_sampled[i][1] = 960 - int(grid_sampled[i][1])
|
|
|
if limit_slope * int(grid_sampled[i][0]) + limit_intercept - int(grid_sampled[i][1]) < 0:
|
|
|
continue
|
|
|
x.append(int(grid_sampled[i][0]))
|
|
|
y.append(int(grid_sampled[i][1]))
|
|
|
x = np.array(x)
|
|
|
y = np.array(y)
|
|
|
|
|
|
# 原始数据粗分类
|
|
|
slope, intercept, r_2 = calc_slope_line.linear_regression(x, y)
|
|
|
y_pred = slope * x + intercept
|
|
|
x_bot = []
|
|
|
y_bot = []
|
|
|
x_top = []
|
|
|
y_top = []
|
|
|
for i in range(len(x)):
|
|
|
if x[i] * slope + intercept - y[i] > 0:
|
|
|
x_bot.append(x[i])
|
|
|
y_bot.append(y[i])
|
|
|
else:
|
|
|
x_top.append(x[i])
|
|
|
y_top.append(y[i])
|
|
|
x_bot = np.array(x_bot)
|
|
|
y_bot = np.array(y_bot)
|
|
|
x_top = np.array(x_top)
|
|
|
y_top = np.array(y_top)
|
|
|
slope_bot, intercept_bot, r2_bot = calc_slope_line.linear_regression(x_bot, y_bot)
|
|
|
slope_top, intercept_top, r2_top = calc_slope_line.linear_regression(x_top, y_top)
|
|
|
print(f"未清洗数据拟合上下沿:r2_bot = {r2_bot},r2_top = {r2_top}")
|
|
|
|
|
|
# 第一次数据清洗,消除误识别点
|
|
|
# 计算残差
|
|
|
residuals = y - y_pred
|
|
|
# 计算残差的标准差 (MSE的平方根)
|
|
|
residual_std = np.sqrt(np.sum(residuals ** 2) / (len(x) - 2))
|
|
|
standardized_residuals = residuals / residual_std
|
|
|
# 设置阈值 (常用 2.5-3.0 个标准差)
|
|
|
threshold = 2.0
|
|
|
# 标记异常点
|
|
|
outlier_mask = np.abs(standardized_residuals) > threshold
|
|
|
outliers_x = x[outlier_mask]
|
|
|
outliers_y = y[outlier_mask]
|
|
|
print(f"第一次数据清洗发现 {np.sum(outlier_mask)} 个异常点:")
|
|
|
for i, (x_val, y_val) in enumerate(zip(outliers_x, outliers_y)):
|
|
|
print(f"点 {i + 1}: x={x_val}, y={y_val}, 残差={residuals[outlier_mask][i]:.2f}")
|
|
|
# 剔除异常点
|
|
|
clean_x = x[~outlier_mask]
|
|
|
clean_y = y[~outlier_mask]
|
|
|
clean_slope, clean_intercept, clean_r_2 = calc_slope_line.linear_regression(clean_x, clean_y)
|
|
|
print(f"清洗数据后整体拟合参数r_2 = {r_2}")
|
|
|
|
|
|
# 第一次数据清洗后的数据再分类
|
|
|
x_bot_clean = []
|
|
|
y_bot_clean = []
|
|
|
x_top_clean = []
|
|
|
y_top_clean = []
|
|
|
for i in range(len(clean_x)):
|
|
|
if clean_x[i] * clean_slope + clean_intercept - clean_y[i] > 0:
|
|
|
x_bot_clean.append(clean_x[i])
|
|
|
y_bot_clean.append(clean_y[i])
|
|
|
else:
|
|
|
x_top_clean.append(clean_x[i])
|
|
|
y_top_clean.append(clean_y[i])
|
|
|
x_bot_clean = np.array(x_bot_clean)
|
|
|
y_bot_clean = np.array(y_bot_clean)
|
|
|
x_top_clean = np.array(x_top_clean)
|
|
|
y_top_clean = np.array(y_top_clean)
|
|
|
|
|
|
# 第二次数据清洗,消除误分类点
|
|
|
clean_slope_bot, clean_intercept_bot, clean_r2_bot = calc_slope_line.linear_regression(x_bot_clean, y_bot_clean)
|
|
|
clean_slope_top, clean_intercept_top, clean_r2_top = calc_slope_line.linear_regression(x_top_clean, y_top_clean)
|
|
|
print(f"清洗数据后上下沿拟合参数clean_r2_bot = {clean_r2_bot},clean_r2_top = {clean_r2_top}")
|
|
|
# 绘制拟合线
|
|
|
y_bot_pred = clean_slope_bot * x_bot_clean + clean_intercept_bot
|
|
|
y_top_pred = clean_slope_top * x_top_clean + clean_intercept_top
|
|
|
# 计算残差
|
|
|
residuals_bot = y_bot_clean - y_bot_pred
|
|
|
residuals_top = y_top_clean - y_top_pred
|
|
|
# 计算残差的标准差 (MSE的平方根)
|
|
|
residual_std_bot = np.sqrt(np.sum(residuals_bot ** 2) / (len(x_bot_clean) - 2))
|
|
|
residual_std_top = np.sqrt(np.sum(residuals_top ** 2) / (len(x_top_clean) - 2))
|
|
|
# 计算标准化残差 (Z-score)
|
|
|
standardized_residuals_bot = residuals_bot / residual_std_bot
|
|
|
standardized_residuals_top = residuals_top / residual_std_top
|
|
|
# 设置阈值 (常用 2.5-3.0 个标准差)
|
|
|
threshold = 1.5
|
|
|
# 标记异常点
|
|
|
outlier_mask_bot = np.abs(standardized_residuals_bot) > threshold
|
|
|
outlier_mask_top = np.abs(standardized_residuals_top) > threshold
|
|
|
outliers_x_bot = x_bot_clean[outlier_mask_bot]
|
|
|
outliers_y_bot = y_bot_clean[outlier_mask_bot]
|
|
|
outliers_x_top = x_top_clean[outlier_mask_top]
|
|
|
outliers_y_top = y_top_clean[outlier_mask_top]
|
|
|
print(f"第二次数据清洗下沿发现 {np.sum(outlier_mask_bot)} 个异常点:")
|
|
|
# for i, (x_val, y_val) in enumerate(zip(outliers_x_bot, outliers_y_bot)):
|
|
|
# print(f"点 {i + 1}: x={x_val}, y={y_val}, 残差={residuals_bot[outlier_mask_bot][i]:.2f}")
|
|
|
print(f"第二次数据清洗上沿发现 {np.sum(outlier_mask_top)} 个异常点:")
|
|
|
# for i, (x_val, y_val) in enumerate(zip(outliers_x_top, outliers_y_top)):
|
|
|
# print(f"点 {i + 1}: x={x_val}, y={y_val}, 残差={residuals_top[outlier_mask_top][i]:.2f}")
|
|
|
# 剔除异常点
|
|
|
x_bot_clean = x_bot_clean[~outlier_mask_bot]
|
|
|
y_bot_clean = y_bot_clean[~outlier_mask_bot]
|
|
|
x_top_clean = x_top_clean[~outlier_mask_top]
|
|
|
y_top_clean = y_top_clean[~outlier_mask_top]
|
|
|
|
|
|
# 判断数据的有效性
|
|
|
clean_slope_bot, clean_intercept_bot, clean_r2_bot = calc_slope_line.linear_regression(x_bot_clean, y_bot_clean)
|
|
|
clean_slope_top, clean_intercept_top, clean_r2_top = calc_slope_line.linear_regression(x_top_clean, y_top_clean)
|
|
|
print(f"清洗数据后上下沿拟合参数clean_r2_bot = {clean_r2_bot},clean_r2_top = {clean_r2_top}")
|
|
|
if ((1-clean_r2_bot) > (1-jingdu)) or ((1-clean_r2_top) > (1-jingdu)):
|
|
|
print("无效数据")
|
|
|
return 0, None, None, None, None
|
|
|
return 1, x_bot_clean, y_bot_clean, x_top_clean, y_top_clean
|
|
|
|
|
|
|
|
|
def filter_middle_80_percent(data):
|
|
|
"""
|
|
|
保留数组中间80%的数据(删除首尾各10%)。
|
|
|
|
|
|
参数:
|
|
|
data (np.ndarray): 输入数组(可以是一维或多维,但会先展平)。
|
|
|
|
|
|
返回:
|
|
|
np.ndarray: 中间80%的数据。
|
|
|
"""
|
|
|
# 展平数组(确保处理的是所有数据点)
|
|
|
flattened_data = data.flatten()
|
|
|
|
|
|
# # 计算10%和90%分位数
|
|
|
# lower_bound = np.percentile(flattened_data, 15)
|
|
|
# upper_bound = np.percentile(flattened_data, 75)
|
|
|
#
|
|
|
# # 筛选中间80%的数据
|
|
|
# mask = (data >= lower_bound) & (data <= upper_bound)
|
|
|
# 计算最大值、最小值和总范围
|
|
|
data_min = np.min(flattened_data)
|
|
|
data_max = np.max(flattened_data)
|
|
|
data_range = data_max - data_min
|
|
|
|
|
|
# 计算中间80%的上下界
|
|
|
lower_bound = data_min + 0.2 * data_range
|
|
|
upper_bound = data_max - 0.1 * data_range
|
|
|
|
|
|
# 筛选数据
|
|
|
mask = (flattened_data >= lower_bound) & (flattened_data <= upper_bound)
|
|
|
|
|
|
return mask
|
|
|
|
|
|
def load_data(txt_name):
|
|
|
"""从用户输入的文件路径加载二维XY数据"""
|
|
|
while True:
|
|
|
filepath = txt_name.strip()
|
|
|
if filepath.lower() == 'q':
|
|
|
return None, None
|
|
|
|
|
|
try:
|
|
|
data = np.loadtxt(filepath)
|
|
|
if data.shape[1] != 2:
|
|
|
print("错误:文件必须包含两列数据(X和Y)")
|
|
|
continue
|
|
|
x = data[:, 0].reshape(-1, 1)
|
|
|
y = data[:, 1].reshape(-1, 1)
|
|
|
y = 960 - y
|
|
|
mask = filter_middle_80_percent(x)
|
|
|
x_clean = x[mask]
|
|
|
y_clean = y[mask]
|
|
|
return x_clean.reshape(-1,1), y_clean.reshape(-1,1)
|
|
|
except Exception as e:
|
|
|
print(f"加载文件出错: {e}")
|
|
|
|
|
|
def ransac_fit(x, y, residual_threshold=2.0):
|
|
|
"""执行RANSAC拟合并返回模型和内点/外点"""
|
|
|
ransac = RANSACRegressor(
|
|
|
LinearRegression(),
|
|
|
residual_threshold=residual_threshold,
|
|
|
random_state=42
|
|
|
)
|
|
|
ransac.fit(x, y)
|
|
|
|
|
|
inlier_mask = ransac.inlier_mask_
|
|
|
outlier_mask = ~inlier_mask
|
|
|
|
|
|
return ransac.estimator_, inlier_mask, outlier_mask
|
|
|
|
|
|
def get_data(txt_name):
|
|
|
# print("=== 双重RANSAC拟合与异常值聚类分析 ===")
|
|
|
|
|
|
# 加载数据
|
|
|
x, y = load_data(txt_name)
|
|
|
if x is None:
|
|
|
return 0, None, None, None, None, None, None, None, None
|
|
|
|
|
|
# 第一次RANSAC拟合
|
|
|
# print("\n正在进行第一次RANSAC拟合...")
|
|
|
model1, inlier_mask1, outlier_mask1 = ransac_fit(x, y, residual_threshold=3.0)
|
|
|
|
|
|
x_inliers1 = x[inlier_mask1]
|
|
|
y_inliers1 = y[inlier_mask1]
|
|
|
|
|
|
# 获取第一次拟合的外点
|
|
|
x_outliers1 = x[outlier_mask1]
|
|
|
y_outliers1 = y[outlier_mask1]
|
|
|
|
|
|
# 第二次RANSAC拟合(在外点上)
|
|
|
model2, inlier_mask2, outlier_mask2 = None, None, None
|
|
|
if len(x_outliers1) > 10: # 确保有足够的外点进行第二次拟合
|
|
|
# print("\n正在进行第二次RANSAC拟合...")
|
|
|
model2, inlier_mask2, outlier_mask2 = ransac_fit(x_outliers1, y_outliers1, residual_threshold=3.0)
|
|
|
|
|
|
x_inliers2 = x_outliers1[inlier_mask2]
|
|
|
y_inliers2 = y_outliers1[inlier_mask2]
|
|
|
# 获取第二次拟合的外点
|
|
|
x_outliers2 = x_outliers1[outlier_mask2]
|
|
|
y_outliers2 = y_outliers1[outlier_mask2]
|
|
|
|
|
|
mean_outliers1 = np.mean(y_inliers1)
|
|
|
mean_outliers2 = np.mean(y_inliers2)
|
|
|
|
|
|
m1 = model1.predict(np.array([600]).reshape(-1, 1))
|
|
|
m2 = model2.predict(np.array([600]).reshape(-1, 1))
|
|
|
# 判断上下沿
|
|
|
if m1 > m2:
|
|
|
model_top, model_bot = model1, model2
|
|
|
x_top, x_bot = x_inliers1, x_inliers2
|
|
|
y_top, y_bot = y_inliers1, y_inliers2
|
|
|
else:
|
|
|
model_top, model_bot = model2, model1
|
|
|
x_top, x_bot = x_inliers2, x_inliers1
|
|
|
y_top, y_bot = y_inliers2, y_inliers1
|
|
|
|
|
|
# 统一提取斜率和截距
|
|
|
slope_top = model_top.coef_[0][0]
|
|
|
intercept_top = model_top.intercept_[0]
|
|
|
slope_bot = model_bot.coef_[0][0]
|
|
|
intercept_bot = model_bot.intercept_[0]
|
|
|
|
|
|
plt.figure(figsize=(14, 7))
|
|
|
|
|
|
# 绘制原始内点
|
|
|
plt.scatter(x_bot, y_bot,
|
|
|
color='limegreen', marker='o', s=30, alpha=0.7,
|
|
|
label='bot')
|
|
|
plt.scatter(x_top, y_top,
|
|
|
color='red', marker='*', s=100, edgecolor='black',
|
|
|
label='top')
|
|
|
plt.xlabel('X', fontsize=12)
|
|
|
plt.ylabel('Y', fontsize=12)
|
|
|
plt.title(f'{txt_name}', fontsize=14)
|
|
|
plt.legend(fontsize=10, loc='best')
|
|
|
plt.grid(True, alpha=0.3)
|
|
|
# plt.tight_layout()
|
|
|
plt.show()
|
|
|
# print(f"model_top = {model_top.coef_, model_top.intercept_}")
|
|
|
# print("\n分析完成!")
|
|
|
return 1, x_bot, y_bot, slope_bot, intercept_bot, x_top, y_top, slope_top, intercept_top
|