You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

297 lines
11 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import numpy as np
import pandas as pd
import calc_way
from scipy import stats
import calc_slope_line
import matplotlib.pyplot as plt
import model
import os
import numpy as np
import matplotlib.pyplot as plt
from sklearn.linear_model import RANSACRegressor, LinearRegression
from sklearn.cluster import DBSCAN
# 数据截断线
model = model.Model()
# limit_slope = model.limit_slope
# limit_intercept = model.limit_intercept
def grid_downsample(points, cell_size=15):
"""网格化降采样,保持空间结构"""
df = pd.DataFrame(points, columns=['x', 'y'])
df['x_grid'] = (df['x'] // cell_size) * cell_size
df['y_grid'] = (df['y'] // cell_size) * cell_size
sampled = df.groupby(['x_grid', 'y_grid']).first().reset_index()
return sampled[['x', 'y']].values
"""
读取yolo网络识别路沿的坐标数据,筛选出目标区域的数据点,并将路沿上下侧数据分离
参数保存数据的txt文件路径
返回值在目标区域内的下侧数据点坐标x_bot、y_bot上侧数据点坐标x_top,y_top
"""
def get_data(txt_name,jingdu):
# 加载数据
data = np.loadtxt(txt_name)
int_data = data.astype(int)
# 网格化降采样
grid_sampled = grid_downsample(int_data, cell_size=20)
# 数据截断
x = []
y = []
for i in range(grid_sampled.shape[0]):
grid_sampled[i][1] = 960 - int(grid_sampled[i][1])
if limit_slope * int(grid_sampled[i][0]) + limit_intercept - int(grid_sampled[i][1]) < 0:
continue
x.append(int(grid_sampled[i][0]))
y.append(int(grid_sampled[i][1]))
x = np.array(x)
y = np.array(y)
# 原始数据粗分类
slope, intercept, r_2 = calc_slope_line.linear_regression(x, y)
y_pred = slope * x + intercept
x_bot = []
y_bot = []
x_top = []
y_top = []
for i in range(len(x)):
if x[i] * slope + intercept - y[i] > 0:
x_bot.append(x[i])
y_bot.append(y[i])
else:
x_top.append(x[i])
y_top.append(y[i])
x_bot = np.array(x_bot)
y_bot = np.array(y_bot)
x_top = np.array(x_top)
y_top = np.array(y_top)
slope_bot, intercept_bot, r2_bot = calc_slope_line.linear_regression(x_bot, y_bot)
slope_top, intercept_top, r2_top = calc_slope_line.linear_regression(x_top, y_top)
print(f"未清洗数据拟合上下沿r2_bot = {r2_bot},r2_top = {r2_top}")
# 第一次数据清洗,消除误识别点
# 计算残差
residuals = y - y_pred
# 计算残差的标准差 (MSE的平方根)
residual_std = np.sqrt(np.sum(residuals ** 2) / (len(x) - 2))
standardized_residuals = residuals / residual_std
# 设置阈值 (常用 2.5-3.0 个标准差)
threshold = 2.0
# 标记异常点
outlier_mask = np.abs(standardized_residuals) > threshold
outliers_x = x[outlier_mask]
outliers_y = y[outlier_mask]
print(f"第一次数据清洗发现 {np.sum(outlier_mask)} 个异常点:")
for i, (x_val, y_val) in enumerate(zip(outliers_x, outliers_y)):
print(f"{i + 1}: x={x_val}, y={y_val}, 残差={residuals[outlier_mask][i]:.2f}")
# 剔除异常点
clean_x = x[~outlier_mask]
clean_y = y[~outlier_mask]
clean_slope, clean_intercept, clean_r_2 = calc_slope_line.linear_regression(clean_x, clean_y)
print(f"清洗数据后整体拟合参数r_2 = {r_2}")
# 第一次数据清洗后的数据再分类
x_bot_clean = []
y_bot_clean = []
x_top_clean = []
y_top_clean = []
for i in range(len(clean_x)):
if clean_x[i] * clean_slope + clean_intercept - clean_y[i] > 0:
x_bot_clean.append(clean_x[i])
y_bot_clean.append(clean_y[i])
else:
x_top_clean.append(clean_x[i])
y_top_clean.append(clean_y[i])
x_bot_clean = np.array(x_bot_clean)
y_bot_clean = np.array(y_bot_clean)
x_top_clean = np.array(x_top_clean)
y_top_clean = np.array(y_top_clean)
# 第二次数据清洗,消除误分类点
clean_slope_bot, clean_intercept_bot, clean_r2_bot = calc_slope_line.linear_regression(x_bot_clean, y_bot_clean)
clean_slope_top, clean_intercept_top, clean_r2_top = calc_slope_line.linear_regression(x_top_clean, y_top_clean)
print(f"清洗数据后上下沿拟合参数clean_r2_bot = {clean_r2_bot},clean_r2_top = {clean_r2_top}")
# 绘制拟合线
y_bot_pred = clean_slope_bot * x_bot_clean + clean_intercept_bot
y_top_pred = clean_slope_top * x_top_clean + clean_intercept_top
# 计算残差
residuals_bot = y_bot_clean - y_bot_pred
residuals_top = y_top_clean - y_top_pred
# 计算残差的标准差 (MSE的平方根)
residual_std_bot = np.sqrt(np.sum(residuals_bot ** 2) / (len(x_bot_clean) - 2))
residual_std_top = np.sqrt(np.sum(residuals_top ** 2) / (len(x_top_clean) - 2))
# 计算标准化残差 (Z-score)
standardized_residuals_bot = residuals_bot / residual_std_bot
standardized_residuals_top = residuals_top / residual_std_top
# 设置阈值 (常用 2.5-3.0 个标准差)
threshold = 1.5
# 标记异常点
outlier_mask_bot = np.abs(standardized_residuals_bot) > threshold
outlier_mask_top = np.abs(standardized_residuals_top) > threshold
outliers_x_bot = x_bot_clean[outlier_mask_bot]
outliers_y_bot = y_bot_clean[outlier_mask_bot]
outliers_x_top = x_top_clean[outlier_mask_top]
outliers_y_top = y_top_clean[outlier_mask_top]
print(f"第二次数据清洗下沿发现 {np.sum(outlier_mask_bot)} 个异常点:")
# for i, (x_val, y_val) in enumerate(zip(outliers_x_bot, outliers_y_bot)):
# print(f"点 {i + 1}: x={x_val}, y={y_val}, 残差={residuals_bot[outlier_mask_bot][i]:.2f}")
print(f"第二次数据清洗上沿发现 {np.sum(outlier_mask_top)} 个异常点:")
# for i, (x_val, y_val) in enumerate(zip(outliers_x_top, outliers_y_top)):
# print(f"点 {i + 1}: x={x_val}, y={y_val}, 残差={residuals_top[outlier_mask_top][i]:.2f}")
# 剔除异常点
x_bot_clean = x_bot_clean[~outlier_mask_bot]
y_bot_clean = y_bot_clean[~outlier_mask_bot]
x_top_clean = x_top_clean[~outlier_mask_top]
y_top_clean = y_top_clean[~outlier_mask_top]
# 判断数据的有效性
clean_slope_bot, clean_intercept_bot, clean_r2_bot = calc_slope_line.linear_regression(x_bot_clean, y_bot_clean)
clean_slope_top, clean_intercept_top, clean_r2_top = calc_slope_line.linear_regression(x_top_clean, y_top_clean)
print(f"清洗数据后上下沿拟合参数clean_r2_bot = {clean_r2_bot},clean_r2_top = {clean_r2_top}")
if ((1-clean_r2_bot) > (1-jingdu)) or ((1-clean_r2_top) > (1-jingdu)):
print("无效数据")
return 0, None, None, None, None
return 1, x_bot_clean, y_bot_clean, x_top_clean, y_top_clean
def filter_middle_80_percent(data):
"""
保留数组中间80%的数据删除首尾各10%)。
参数:
data (np.ndarray): 输入数组(可以是一维或多维,但会先展平)。
返回:
np.ndarray: 中间80%的数据。
"""
# 展平数组(确保处理的是所有数据点)
flattened_data = data.flatten()
# # 计算10%和90%分位数
# lower_bound = np.percentile(flattened_data, 15)
# upper_bound = np.percentile(flattened_data, 75)
#
# # 筛选中间80%的数据
# mask = (data >= lower_bound) & (data <= upper_bound)
# 计算最大值、最小值和总范围
data_min = np.min(flattened_data)
data_max = np.max(flattened_data)
data_range = data_max - data_min
# 计算中间80%的上下界
lower_bound = data_min + 0.2 * data_range
upper_bound = data_max - 0.1 * data_range
# 筛选数据
mask = (flattened_data >= lower_bound) & (flattened_data <= upper_bound)
return mask
def load_data(txt_name):
"""从用户输入的文件路径加载二维XY数据"""
while True:
filepath = txt_name.strip()
if filepath.lower() == 'q':
return None, None
try:
data = np.loadtxt(filepath)
if data.shape[1] != 2:
print("错误文件必须包含两列数据X和Y")
continue
x = data[:, 0].reshape(-1, 1)
y = data[:, 1].reshape(-1, 1)
y = 960 - y
mask = filter_middle_80_percent(x)
x_clean = x[mask]
y_clean = y[mask]
return x_clean.reshape(-1,1), y_clean.reshape(-1,1)
except Exception as e:
print(f"加载文件出错: {e}")
def ransac_fit(x, y, residual_threshold=2.0):
"""执行RANSAC拟合并返回模型和内点/外点"""
ransac = RANSACRegressor(
LinearRegression(),
residual_threshold=residual_threshold,
random_state=42
)
ransac.fit(x, y)
inlier_mask = ransac.inlier_mask_
outlier_mask = ~inlier_mask
return ransac.estimator_, inlier_mask, outlier_mask
def get_data(txt_name):
# print("=== 双重RANSAC拟合与异常值聚类分析 ===")
# 加载数据
x, y = load_data(txt_name)
if x is None:
return 0, None, None, None, None, None, None, None, None
# 第一次RANSAC拟合
# print("\n正在进行第一次RANSAC拟合...")
model1, inlier_mask1, outlier_mask1 = ransac_fit(x, y, residual_threshold=3.0)
x_inliers1 = x[inlier_mask1]
y_inliers1 = y[inlier_mask1]
# 获取第一次拟合的外点
x_outliers1 = x[outlier_mask1]
y_outliers1 = y[outlier_mask1]
# 第二次RANSAC拟合在外点上
model2, inlier_mask2, outlier_mask2 = None, None, None
if len(x_outliers1) > 10: # 确保有足够的外点进行第二次拟合
# print("\n正在进行第二次RANSAC拟合...")
model2, inlier_mask2, outlier_mask2 = ransac_fit(x_outliers1, y_outliers1, residual_threshold=3.0)
x_inliers2 = x_outliers1[inlier_mask2]
y_inliers2 = y_outliers1[inlier_mask2]
# 获取第二次拟合的外点
x_outliers2 = x_outliers1[outlier_mask2]
y_outliers2 = y_outliers1[outlier_mask2]
mean_outliers1 = np.mean(y_inliers1)
mean_outliers2 = np.mean(y_inliers2)
m1 = model1.predict(np.array([600]).reshape(-1, 1))
m2 = model2.predict(np.array([600]).reshape(-1, 1))
# 判断上下沿
if m1 > m2:
model_top, model_bot = model1, model2
x_top, x_bot = x_inliers1, x_inliers2
y_top, y_bot = y_inliers1, y_inliers2
else:
model_top, model_bot = model2, model1
x_top, x_bot = x_inliers2, x_inliers1
y_top, y_bot = y_inliers2, y_inliers1
# 统一提取斜率和截距
slope_top = model_top.coef_[0][0]
intercept_top = model_top.intercept_[0]
slope_bot = model_bot.coef_[0][0]
intercept_bot = model_bot.intercept_[0]
plt.figure(figsize=(14, 7))
# 绘制原始内点
plt.scatter(x_bot, y_bot,
color='limegreen', marker='o', s=30, alpha=0.7,
label='bot')
plt.scatter(x_top, y_top,
color='red', marker='*', s=100, edgecolor='black',
label='top')
plt.xlabel('X', fontsize=12)
plt.ylabel('Y', fontsize=12)
plt.title(f'{txt_name}', fontsize=14)
plt.legend(fontsize=10, loc='best')
plt.grid(True, alpha=0.3)
# plt.tight_layout()
plt.show()
# print(f"model_top = {model_top.coef_, model_top.intercept_}")
# print("\n分析完成")
return 1, x_bot, y_bot, slope_bot, intercept_bot, x_top, y_top, slope_top, intercept_top