You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

368 lines
16 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import numpy as np
import pandas as pd
import calc_way
from scipy import stats
import calc_slope_line
import matplotlib.pyplot as plt
import model
import os
# 数据截断线
model = model.Model()
limit_slope = model.limit_slope
limit_intercept = model.limit_intercept
def grid_downsample(points, cell_size=15):
"""网格化降采样,保持空间结构"""
df = pd.DataFrame(points, columns=['x', 'y'])
df['x_grid'] = (df['x'] // cell_size) * cell_size
df['y_grid'] = (df['y'] // cell_size) * cell_size
sampled = df.groupby(['x_grid', 'y_grid']).first().reset_index()
return sampled[['x', 'y']].values
"""
读取yolo网络识别路沿的坐标数据,筛选出目标区域的数据点,并将路沿上下侧数据分离
参数保存数据的txt文件路径
返回值在目标区域内的下侧数据点坐标x_bot、y_bot上侧数据点坐标x_top,y_top
"""
def get_data(txt_name):
# 加载数据
data = np.loadtxt(txt_name)
int_data = data.astype(int)
# 网格化降采样
grid_sampled = grid_downsample(int_data, cell_size=20)
# 数据截断
x = []
y = []
for i in range(grid_sampled.shape[0]):
grid_sampled[i][1] = 960 - int(grid_sampled[i][1])
if limit_slope * int(grid_sampled[i][0]) + limit_intercept - int(grid_sampled[i][1]) < 0:
continue
x.append(int(grid_sampled[i][0]))
y.append(int(grid_sampled[i][1]))
x = np.array(x)
y = np.array(y)
# 原始数据粗分类
slope, intercept, r_2 = calc_slope_line.linear_regression(x, y)
y_pred = slope * x + intercept
x_bot = []
y_bot = []
x_top = []
y_top = []
for i in range(len(x)):
if x[i] * slope + intercept - y[i] > 0:
x_bot.append(x[i])
y_bot.append(y[i])
else:
x_top.append(x[i])
y_top.append(y[i])
x_bot = np.array(x_bot)
y_bot = np.array(y_bot)
x_top = np.array(x_top)
y_top = np.array(y_top)
slope_bot, intercept_bot, r2_bot = calc_slope_line.linear_regression(x_bot, y_bot)
slope_top, intercept_top, r2_top = calc_slope_line.linear_regression(x_top, y_top)
print(f"未清洗数据拟合上下沿r2_bot = {r2_bot},r2_top = {r2_top}")
# 第一次数据清洗,消除误识别点
# 计算残差
residuals = y - y_pred
# 计算残差的标准差 (MSE的平方根)
residual_std = np.sqrt(np.sum(residuals ** 2) / (len(x) - 2))
standardized_residuals = residuals / residual_std
# 设置阈值 (常用 2.5-3.0 个标准差)
threshold = 2.0
# 标记异常点
outlier_mask = np.abs(standardized_residuals) > threshold
outliers_x = x[outlier_mask]
outliers_y = y[outlier_mask]
print(f"第一次数据清洗发现 {np.sum(outlier_mask)} 个异常点:")
for i, (x_val, y_val) in enumerate(zip(outliers_x, outliers_y)):
print(f"{i + 1}: x={x_val}, y={y_val}, 残差={residuals[outlier_mask][i]:.2f}")
# 剔除异常点
clean_x = x[~outlier_mask]
clean_y = y[~outlier_mask]
clean_slope, clean_intercept, clean_r_2 = calc_slope_line.linear_regression(clean_x, clean_y)
print(f"清洗数据后整体拟合参数r_2 = {r_2}")
# 第一次数据清洗后的数据再分类
x_bot_clean = []
y_bot_clean = []
x_top_clean = []
y_top_clean = []
for i in range(len(clean_x)):
if clean_x[i] * clean_slope + clean_intercept - clean_y[i] > 0:
x_bot_clean.append(clean_x[i])
y_bot_clean.append(clean_y[i])
else:
x_top_clean.append(clean_x[i])
y_top_clean.append(clean_y[i])
x_bot_clean = np.array(x_bot_clean)
y_bot_clean = np.array(y_bot_clean)
x_top_clean = np.array(x_top_clean)
y_top_clean = np.array(y_top_clean)
# 第二次数据清洗,消除误分类点
clean_slope_bot, clean_intercept_bot, clean_r2_bot = calc_slope_line.linear_regression(x_bot_clean, y_bot_clean)
clean_slope_top, clean_intercept_top, clean_r2_top = calc_slope_line.linear_regression(x_top_clean, y_top_clean)
print(f"清洗数据后上下沿拟合参数clean_r2_bot = {clean_r2_bot},clean_r2_top = {clean_r2_top}")
# 绘制拟合线
y_bot_pred = clean_slope_bot * x_bot_clean + clean_intercept_bot
y_top_pred = clean_slope_top * x_top_clean + clean_intercept_top
# 计算残差
residuals_bot = y_bot_clean - y_bot_pred
residuals_top = y_top_clean - y_top_pred
# 计算残差的标准差 (MSE的平方根)
residual_std_bot = np.sqrt(np.sum(residuals_bot ** 2) / (len(x_bot_clean) - 2))
residual_std_top = np.sqrt(np.sum(residuals_top ** 2) / (len(x_top_clean) - 2))
# 计算标准化残差 (Z-score)
standardized_residuals_bot = residuals_bot / residual_std_bot
standardized_residuals_top = residuals_top / residual_std_top
# 设置阈值 (常用 2.5-3.0 个标准差)
threshold = 1.5
# 标记异常点
outlier_mask_bot = np.abs(standardized_residuals_bot) > threshold
outlier_mask_top = np.abs(standardized_residuals_top) > threshold
outliers_x_bot = x_bot_clean[outlier_mask_bot]
outliers_y_bot = y_bot_clean[outlier_mask_bot]
outliers_x_top = x_top_clean[outlier_mask_top]
outliers_y_top = y_top_clean[outlier_mask_top]
print(f"第二次数据清洗下沿发现 {np.sum(outlier_mask_bot)} 个异常点:")
# for i, (x_val, y_val) in enumerate(zip(outliers_x_bot, outliers_y_bot)):
# print(f"点 {i + 1}: x={x_val}, y={y_val}, 残差={residuals_bot[outlier_mask_bot][i]:.2f}")
print(f"第二次数据清洗上沿发现 {np.sum(outlier_mask_top)} 个异常点:")
# for i, (x_val, y_val) in enumerate(zip(outliers_x_top, outliers_y_top)):
# print(f"点 {i + 1}: x={x_val}, y={y_val}, 残差={residuals_top[outlier_mask_top][i]:.2f}")
# 剔除异常点
x_bot_clean = x_bot_clean[~outlier_mask_bot]
y_bot_clean = y_bot_clean[~outlier_mask_bot]
x_top_clean = x_top_clean[~outlier_mask_top]
y_top_clean = y_top_clean[~outlier_mask_top]
# 判断数据的有效性
clean_slope_bot, clean_intercept_bot, clean_r2_bot = calc_slope_line.linear_regression(x_bot_clean, y_bot_clean)
clean_slope_top, clean_intercept_top, clean_r2_top = calc_slope_line.linear_regression(x_top_clean, y_top_clean)
print(f"清洗数据后上下沿拟合参数clean_r2_bot = {clean_r2_bot},clean_r2_top = {clean_r2_top}")
if ((1-clean_r2_bot) > (1-0.98)) or ((1-clean_r2_top) > (1-0.98)):
print("无效数据")
return 0, None, None, None, None
return 1, x_bot_clean, y_bot_clean, x_top_clean, y_top_clean
def test3_get_data(txt_name):
# 加载数据
data = np.loadtxt(txt_name)
int_data = data.astype(int)
grid_sampled = grid_downsample(int_data, cell_size=20)
x = []
y = []
for i in range(grid_sampled.shape[0]):
grid_sampled[i][1] = 960 - int(grid_sampled[i][1])
if limit_slope * int(grid_sampled[i][0]) + limit_intercept - int(grid_sampled[i][1]) < 0:
continue
x.append(int(grid_sampled[i][0]))
y.append(int(grid_sampled[i][1]))
x = np.array(x)
y = np.array(y)
# with open(txt_name, 'r', encoding='utf-8') as f:
# lines = f.readlines()
# data = []
# for i, line in enumerate(lines, 1):
# data.append(line.split())
# print(data)
# if not data:
# return 0, None, None, None, None
# x = []
# y = []
#
# for i in range(len(data)):
# data[i][1] = 960 - int(data[i][1])
# if limit_slope * int(data[i][0]) + limit_intercept - int(data[i][1]) < 0:
# continue
# x.append(int(data[i][0]))
# y.append(int(data[i][1]))
# x = np.array(x)
# y = np.array(y)
slope, intercept, r_2 = calc_slope_line.linear_regression(x, y)
print(f"原始数据拟合参数r_2 = {r_2}" )
fig1, axes1 = plt.subplots(nrows=4, ncols=3, figsize=(10, 8))
fig1.tight_layout()
fig1.suptitle(f"{txt_name}")
axes1[0, 0].set_title("original data")
axes1[0, 0].scatter(x,y, color='blue', label='orgin')
# 绘制拟合线
y_pred = slope * x + intercept
axes1[0, 0].plot(x, y_pred, color='red', label='fix')
# for i in range(len(x)):
x_bot = []
y_bot = []
x_top = []
y_top = []
for i in range(len(x)):
if x[i] * slope + intercept - y[i] > 0:
x_bot.append(x[i])
y_bot.append(y[i])
else:
x_top.append(x[i])
y_top.append(y[i])
x_bot = np.array(x_bot)
y_bot = np.array(y_bot)
x_top = np.array(x_top)
y_top = np.array(y_top)
slope_bot, intercept_bot, r2_bot = calc_slope_line.linear_regression(x_bot, y_bot)
slope_top, intercept_top, r2_top = calc_slope_line.linear_regression(x_top, y_top)
print(f"未清洗数据拟合上下沿r2_bot = {r2_bot},r2_top = {r2_top}")
axes1[0, 1].set_title("original bot data")
axes1[0, 1].scatter(x_bot, y_bot, color='blue', label='orgin')
# 绘制拟合线
y_bot_pred = slope_bot * x_bot + intercept_bot
axes1[0, 1].plot(x_bot, y_bot_pred, color='red', label='fix')
axes1[0, 2].set_title("original top data")
axes1[0, 2].scatter(x_top, y_top, color='blue', label='orgin')
# 绘制拟合线
y_top_pred = slope_top * x_top + intercept_top
axes1[0, 2].plot(x_top, y_top_pred, color='red', label='fix')
# 计算残差
residuals = y - y_pred
# print(f"residuals = {residuals}")
# 计算残差的标准差 (MSE的平方根)
residual_std = np.sqrt(np.sum(residuals ** 2) / (len(x) - 2))
print(f"residual_std = {residual_std}")
# 计算标准化残差 (Z-score)
standardized_residuals = residuals / residual_std
for i in range(len(standardized_residuals)):
print(f"{i+1}个点的坐标为:{x[i],y[i]},标准化残差为{standardized_residuals[i]}\n")
# print(f"standardized_residuals = {standardized_residuals}")
# 设置阈值 (常用 2.5-3.0 个标准差)
threshold = 2.0
# 标记异常点
outlier_mask = np.abs(standardized_residuals) > threshold
# print(f"outlier_mask = {outlier_mask}")
outliers_x = x[outlier_mask]
outliers_y = y[outlier_mask]
axes1[2, 0].set_title("abnormal data")
axes1[2, 0].scatter(outliers_x, outliers_y, color='blue', label='orgin')
print(f"发现 {np.sum(outlier_mask)} 个异常点:")
# for i, (x_val, y_val) in enumerate(zip(outliers_x, outliers_y)):
# print(f"点 {i + 1}: x={x_val}, y={y_val}, 残差={residuals[outlier_mask][i]:.2f}")
# 剔除异常点
clean_x = x[~outlier_mask]
clean_y = y[~outlier_mask]
clean_slope, clean_intercept, clean_r_2 = calc_slope_line.linear_regression(clean_x, clean_y)
print(f"清洗数据后整体拟合参数r_2 = {r_2}")
axes1[1, 0].set_title("clean data")
axes1[1, 0].scatter(clean_x,clean_y , color='blue', label='orgin')
# 绘制拟合线
y_pred = clean_slope * clean_x + clean_intercept
axes1[1, 0].plot(clean_x, y_pred, color='red', label='fix')
x_bot_clean = []
y_bot_clean = []
x_top_clean = []
y_top_clean = []
for i in range(len(clean_x)):
if clean_x[i] * clean_slope + clean_intercept - clean_y[i] > 0:
x_bot_clean.append(clean_x[i])
y_bot_clean.append(clean_y[i])
else:
x_top_clean.append(clean_x[i])
y_top_clean.append(clean_y[i])
x_bot_clean = np.array(x_bot_clean)
y_bot_clean = np.array(y_bot_clean)
x_top_clean = np.array(x_top_clean)
y_top_clean = np.array(y_top_clean)
clean_slope_bot, clean_intercept_bot, clean_r2_bot = calc_slope_line.linear_regression(x_bot_clean, y_bot_clean)
clean_slope_top, clean_intercept_top, clean_r2_top = calc_slope_line.linear_regression(x_top_clean, y_top_clean)
print(f"清洗数据后上下沿拟合参数clean_r2_bot = {clean_r2_bot},clean_r2_top = {clean_r2_top}")
axes1[1, 1].set_title("clean bot data")
axes1[1, 1].scatter(x_bot_clean, y_bot_clean, color='blue', label='orgin')
# 绘制拟合线
y_bot_pred = clean_slope_bot * x_bot_clean + clean_intercept_bot
axes1[1, 1].plot(x_bot_clean, y_bot_pred, color='red', label='fix')
axes1[1, 2].set_title("clean top data")
axes1[1, 2].scatter(x_top_clean, y_top_clean, color='blue', label='orgin')
# 绘制拟合线
y_top_pred = clean_slope_top * x_top_clean + clean_intercept_top
axes1[1, 2].plot(x_top_clean, y_top_pred, color='red', label='fix')
residuals_bot = y_bot_clean - y_bot_pred
residuals_top = y_top_clean - y_top_pred
residual_std_bot = np.sqrt(np.sum(residuals_bot ** 2) / (len(x_bot_clean) - 2))
residual_std_top = np.sqrt(np.sum(residuals_top ** 2) / (len(x_top_clean) - 2))
print(f"residual_std_bot = {residual_std_bot}")
print(f"residual_std_top = {residual_std_top}")
# 计算标准化残差 (Z-score)
standardized_residuals_bot = residuals_bot / residual_std_bot
standardized_residuals_top = residuals_top / residual_std_top
# print(f"standardized_residuals_bot = {standardized_residuals_bot}")
# print(f"standardized_residuals_top = {standardized_residuals_top}")
# 设置阈值 (常用 2.5-3.0 个标准差)
threshold = 2.0
# 标记异常点
outlier_mask_bot = np.abs(standardized_residuals_bot) > threshold
outlier_mask_top = np.abs(standardized_residuals_top) > threshold
# print(f"outlier_mask = {outlier_mask}")
outliers_x_bot = x_bot_clean[outlier_mask_bot]
outliers_y_bot = y_bot_clean[outlier_mask_bot]
outliers_x_top = x_top_clean[outlier_mask_top]
outliers_y_top = y_top_clean[outlier_mask_top]
axes1[2, 1].set_title("re clean abnormal bot data")
axes1[2, 1].scatter(outliers_x_bot, outliers_y_bot, color='blue', label='delet_bot')
axes1[2, 2].set_title("re clean abnormal top data")
axes1[2, 2].scatter(outliers_x_top, outliers_y_top, color='blue', label='delet_top')
print(f"发现 {np.sum(outlier_mask_bot)} 个异常点:")
# for i, (x_val, y_val) in enumerate(zip(outliers_x_bot, outliers_y_bot)):
# print(f"点 {i + 1}: x={x_val}, y={y_val}, 残差={residuals_bot[outlier_mask_bot][i]:.2f}")
print(f"发现 {np.sum(outlier_mask_top)} 个异常点:")
# for i, (x_val, y_val) in enumerate(zip(outliers_x_top, outliers_y_top)):
# print(f"点 {i + 1}: x={x_val}, y={y_val}, 残差={residuals_top[outlier_mask_top][i]:.2f}")
# 剔除异常点
x_bot_clean = x_bot_clean[~outlier_mask_bot]
y_bot_clean = y_bot_clean[~outlier_mask_bot]
x_top_clean = x_top_clean[~outlier_mask_top]
y_top_clean = y_top_clean[~outlier_mask_top]
clean_slope_bot, clean_intercept_bot, clean_r2_bot = calc_slope_line.linear_regression(x_bot_clean, y_bot_clean)
clean_slope_top, clean_intercept_top, clean_r2_top = calc_slope_line.linear_regression(x_top_clean, y_top_clean)
print(f"清洗数据后上下沿拟合参数clean_r2_bot = {clean_r2_bot},clean_r2_top = {clean_r2_top}")
axes1[3, 1].set_title("re clean bot data")
axes1[3, 1].scatter(x_bot_clean, y_bot_clean, color='blue', label='orgin')
# 绘制拟合线
y_bot_pred = clean_slope_bot * x_bot_clean + clean_intercept_bot
axes1[3, 1].plot(x_bot_clean, y_bot_pred, color='red', label='fix')
axes1[3, 2].set_title("re clean top data")
axes1[3, 2].scatter(x_top_clean, y_top_clean, color='blue', label='orgin')
# 绘制拟合线
y_top_pred = clean_slope_top * x_top_clean + clean_intercept_top
axes1[3, 2].plot(x_top_clean, y_top_pred, color='red', label='fix')
plt.show()
# plt.savefig("my_plot.png")
# file_base = os.path.splitext(txt_name)[0] # 去掉扩展名
# output_file = f"{file_base}_plot.png"
# plt.savefig(output_file)
if ((1-clean_r2_bot) > 1e-3) or ((1-clean_r2_top) > 1e-3):
print("无效数据")
return 0, None, None, None, None
return 1, x_bot_clean, y_bot_clean, x_top_clean, y_top_clean