pandas 的基本使用

Series

基础语法

Series是一种一维数组,与Numpy中的一维array类似。PandasSeries 能存放各种不同类型的对象,Series 能为数据自定义标签,也就是索引(index),然后通过索引来访问数组中的数据。
模块导入:

1
2
import numpy as np
import pandas as pd

基础语法:

1
2
s = pd.Series(data,index)   
# index不是必须的

举一个例子:

1
2
3
index = ['中国','美国','日本']
data = [100,200,300]
pd.Series(data,index)

它所展现的形式如下:

data index
100 中国
200 美国
300 日本

对 Series 进行算术运算操作

基于 index 进行的。我们可以用加减乘除$(+ - * /)$这样的运算符对两个 Series 进行运算,Pandas 将会根据索引 index,对响应的数据进行计算,结果将会以浮点数的形式存储,以避免丢失精度。如果 Pandas 在两个Series 里找不到相同的 index,对应的位置就返回一个空值 NaN

DataFrame

DataFrame:二维的表格型数据结构。很多功能与R中的data.frame类似。可以将DataFrame理解为Series的容器。
DataFrame是一个二维标记的数据结构,可以具有不同类型的列。您可以将其视为电子表格或SQL表,或序列对象的dict。它通常是最常用的pandas对象。与系列一样,DataFrame接受多种不同类型的输入:

  1. 一维ndarraydictlistsdicts, or Series
  2. 二维numpy.ndarray
  3. 结构化或记录 ndarray
  4. A Series
  5. Another DataFrame

创建

  1. 用多个Series创建
1
2
3
4
df = {'Name':pd.Series(data,index),
'Age':pd.Series(data,index)
}
pd.DataFrame(df)
  1. 用一个字典来创建 DataFrame
    其实这个和上面哪个差不多

基本操作

下面演示全部用这个数据来表示

1
2
3
4
5
6
data = {
'name': ['小米','华为', '苹果'],
'price': [1000, 2000, 3000]
}

my_df = pd.DataFrame(data, index=['性价比', '爱国', '奢侈品'])

形状如下:

name price
性价比 小米
爱国 华为
奢侈品 苹果

获取列

my_df['name']

会输出

1
2
3
性价比    小米
爱国 华为
奢侈品 苹果

如果想要获取多个列

my_df['name','price']

添加新列

和dict的添加方法基本类似

my_df['like'] = [1,2,3]

形状如下:

name price like
性价比 小米 1000
爱国 华为 2000
奢侈品 苹果 3000
  • 当然也可以用已经存在的列添加

my_df['txt'] = my_df['price'] + my_df['like']

删除列或者行

想要删除某一行或一列,可以用 drop() 函数。在使用这个函数的时候,你需要先指定具体的删除方向,axis=0 对应的是行 row,而 axis=1 对应的是列 column
举一个例子:

my_df.drop('like', axis=1,inplace=True)

这里就把like这一列给删掉了

为了防止用户误操作丢失数据,在调用 drop() 的时候,Pandas 并不会真的永久性地删除这行/列。可以通过调用 df 来确认数据的完整性(df就可以显示未永久删除的)。如果你确定要永久性删除某一行/列,你需要加上 inplace=True 参数

获取 DataFrame 中的一行或多行数据

其实就是使用lociloc
loc通过按索引(或标签名)引用
iloc按这行在表中的位置(行数(以0开始))来引用
举一个例子:

my_df.loc['性价比']

结果如下:

1
2
3
4
name       小米
price 1000
like 1
Name: 性价比, dtype: object

my_df.iloc[0]
结果和上面一样

同时你可以用 loc[] 来指定具体的行列范围,并生成一个子数据表。比如,提取 ‘性价比’ 行中 name 列的内容 ----> 小米

my_df.loc['性价比','name']

提取多个

my_df.loc[['性价比','爱国'],['name','price']]

结果如下:

1
2
3
    name  price
性价比 小米 1000
爱国 华为 2000

条件筛选

用中括号 [] 的方式,除了直接指定选中某些列外,还能接收一个条件语句,然后筛选出符合条件的行/列。

my_df[my_df['like'] == 1]

结果如下

1
2
    name  price  like
性价比 小米 1000 1

重置并设置DataFrame 的索引

1
df.reset_index()

和删除操作差不多,.reset_index() 并不会永久改变你表格的索引,除非你调用的时候明确传入了 inplace 参数,比如:.reset_index(inplace=True)然后用 .set_index() 方法,将 DataFrame 里的某一列作为索引来用。

注意,不像 .reset_index() 会保留一个备份,然后才用默认的索引值代替原索引,.set_index() 将会完全覆盖原来的索引值。

数据处理

缺失值的处理

在许多情况下,如果你用 Pandas 来读取大量数据,往往会发现原始数据中会存在不完整的地方。在 DataFrame 中缺少数据的位置, Pandas 会自动填入一个空值NAN

缺失值的删除:.dropna() 来丢弃这些自动填充的值,当你使用 .dropna() 方法时,就是告诉 Pandas 删除掉存在一个或多个空值的行(或者列)。删除行用的是 .dropna(axis=0) ,删除列用的是 .dropna(axis=1) ,默认axis = 0

缺失值的填充:.fillna() 来自动给这些空值填充数据,使用 .fillna() 方法,Pandas 将对这个 DataFrame 里所有的空值位置填上你指定的默认值。

同样地,.dropna()方法 和 .fillna() 方法并不会永久性改变你的数据,除非你传入了inplace=True 参数。

分组统计

Pandas 的分组统计功能可以按某一列的内容对数据行进行分组,并对其应用统计函数,比如求和,平均数,中位数,标准差等
.groupby() 方法,我们可以按 xxx 列进行分组,并用 .mean() 求每组的平均值,用 .count() 方法,能对 DataFrame 中的某个元素出现的次数进行计数

数据描述

.describe() 方法将对 DataFrame 里的数据进行分析,并一次性生成多个描述性的统计指标,方便用户对数据有一个直观上的认识。生成的指标,从左到右分别是:计数、平均数、标准差、最小值、25% 50% 75% 位置的值、最大值。用 .transpose() 方法获得一个竖排的格式

其他操作

  1. 堆叠(Concat)
    堆叠基本上就是简单地把多个 DataFrame 堆在一起,拼成一个更大的 DataFrame。当你进行堆叠的时候,堆叠时要注意数据表的索引和列的延伸方向,堆叠的方向要和它一致。
    默认按行的方向堆叠,把每个表的索引按顺序叠加。
    按列的方向堆叠,那你需要传入 axis=1 参数

  2. 归并(Merge)
    使用 pd.merge() 函数,能将多个 DataFrame 归并在一起,它的合并方式类似合并 SQL 数据表的方式

1
2
3
4
5
6
7
pd.merge(left, right, how='inner', on='Key') 

其中 left 参数代表放在左侧的 DataFrame,而 right 参数代表放在右边的 DataFrame;

how='inner' 指的是当左右两个 DataFrame 中存在不重合的 Key 时,取结果的方式:默认:inner 内连接,取交集”,outer 外连接,取并集,并用nan填充”,left 左连接, 左侧取全部,右侧取部分”,right 右连接,左侧取部分,右侧取全部”

最后,on='Key' 代表需要合并的键值所在的列,最后整个表格会以该列为准进行归并。
  1. 连接(Join)
    若要把两个表连在一起,然而它们之间没有太多共同的列,可以选择使用.join() 方法。和 .merge() 不同,连接采用索引作为公共的键,而不是某一列。
    同样inner 表示内连接,取交集”,outer 外连接,取并集,并用nan填充”,left 左连接, 左侧取全部,右侧取部分”,right 右连接,左侧取部分,右侧取全部”

  2. 查找不重复的值

  • .unique() 方法
  • .nunique() 方法,获取所有不重复值的个数
  • .value_counts() 同时获得所有值和对应值的计数
  1. apply() 方法
    .apply() 方法,可以对 DataFrame 中的数据应用自定义函数,进行数据处理。通常使用lambda

获取 DataFrame 的属性

DataFrame 的属性包括列和索引的名字。假如你不确定表中的某个列名是否含有空格之类的字符,你可以通过 .columns 来获取属性值,以查看具体的列名。

排序(sort)

.sort_values() 将整个表按某一列的值进行排序

查找空值

数据集很庞大时,使用用 Pandas.isnull() 方法,方便快捷地发现表中的空值。

该方法返回的是一个新的 DataFrame,里面用布尔值(True/False)表示原 DataFrame 中对应位置的数据是否是空值。

数据透视表

使用 Excel 的时候,有数据透视表的功能,同样,pandas也有。数据透视表是一种汇总统计表,它展现了原表格中数据的汇总统计结果。Pandas 的数据透视表能自动帮你对数据进行分组、切片、筛选、排序、计数、求和或取平均值,并将结果直观地显示出来。

1
2
3
4
5
df .pivot_table(data, values='', index=[''], columns=['']) 

#values代表我们需要汇总统计的数据所在的列
#index 表示按该列进行分组索引
#columns 则表示最后结果将按该列的数据进行分列

导入导出数据(文件操作)

pd.read_ 这样的方法,你可以用 Pandas 读取各种不同格式的数据文件,包括 Excel 表格、CSV 文件、SQL 数据库,甚至 HTML 文件等。

  1. 读写 CSV 文件
    pd.read_csv() 就能将 CSV 文件里的数据转换成 DataFrame 对象
    .to_csv(),DataFrame 对象存入 csv 文件
    不希望 Pandas 把索引写入加上index=False即可
  2. 读写 Excel 表格文件
    pd.read_excel(),能将 Excel 表格中的数据导入 Pandas 中。但是请注意,Pandas 只能导入表格文件中的数据,其他对象,例如宏、图形和公式等都不会被导入。如果文件中存在有此类对象,可能会导致 pd.read_excel() 方法执行失败
    写入 CSV 文件类似,我们可以将一个 DataFrame 对象存成 .xlsx 文件,语法是 .to_excel()

数据结构以及操作——时间序列(Time- Series)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
pandas.date_range(start=None, end=None, periods=None, freq=‘D’, tz=None, normalize=False, name=None,closed=None, **kwargs)
功能:返回一个固定的频率DatetimeIndex,以日(日历D)作为默认频率

参数:start : string or datetime-like, default None 生成日期的左边界

end : string or datetime-like, default None 生成日期的右边界

periods : integer, default None 要生成的时间段数

freq : string or DateOffset, default ‘D’ (日历日期) 频率字符串可以有多个,例如“S”表示秒频率

返回:rng : DatetimeIndex

注意:在三个参数中:startand和periods,必须指定两个参数。

利用CNN模型实现MNIST图像分类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import numpy as np
import pandas as pd
import torch
import torchvision
import torchvision.transforms as transforms
from matplotlib import pyplot as plt
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader


# 定义网络结构
class CNN_NET(torch.nn.Module):
def __init__(self):
super(CNN_NET, self).__init__()
self.conv1 = torch.nn.Conv2d(in_channels=1, out_channels=10, kernel_size=5, stride=1)
self.conv1_bn = torch.nn.BatchNorm2d(10)
self.pool = torch.nn.MaxPool2d(2)
self.conv2 = torch.nn.Conv2d(in_channels=10, out_channels=20, kernel_size=5, stride=1)
self.conv2_bn = torch.nn.BatchNorm2d(20)
self.fc1 = torch.nn.Linear(320, 128)
self.fc2 = torch.nn.Linear(128, 64)
self.fc3 = torch.nn.Linear(64, 10)
self.dropout = torch.nn.Dropout(0.5)

def forward(self, x):
x = self.pool(F.relu(self.conv1_bn(self.conv1(x))))
x = self.pool(F.relu(self.conv2_bn(self.conv2(x))))
x = x.view(x.size(0), -1)

x = self.fc1(x)
x = self.dropout(x)
x = self.fc2(x)
x = self.dropout(x)
x = self.fc3(x)
return x


if __name__ == '__main__':

EPOCH = 8

# 加载数据集
train_dataset = datasets.MNIST('data/手写数字识别数据集+标签', train=True, download=True, transform=transforms.ToTensor())
train_loader = DataLoader(dataset=train_dataset, batch_size=64, shuffle=True,
num_workers=2)
test_dataset = datasets.MNIST('data/手写数字识别数据集+标签', train=False, download=True, transform=transforms.ToTensor())
test_loader = DataLoader(dataset=test_dataset, batch_size=64, shuffle=True,
num_workers=2)

# 创建数据加载器
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

net = CNN_NET() # 初始化网络

criterion = nn.CrossEntropyLoss() # 损失函数
optimizer = optim.Adam(net.parameters(), lr=0.0001) # 优化器

# 训练网络
losses = []
accuracies = []
for epoch in range(EPOCH):
running_loss = 0.0
correct = 0
total = 0
for i, data in enumerate(train_loader, 0):
inputs, labels = data
optimizer.zero_grad() # 梯度清零
outputs = net(inputs) # 前向传播
loss = criterion(outputs, labels) # 计算损失
loss.backward() # 反向传播
optimizer.step() # 更新参数
running_loss += loss.item()
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
if i % 10 == 9: # 每10个批次打印一次信息
print(f'[Epoch {epoch + 1}, Batch {i + 1}] loss: {running_loss / 10:.3f}')
losses.append(running_loss / 10)
accuracies.append(100 * correct / total)
running_loss = 0.0

print('Finished Training')

# 可视化训练损失和准确率
fig, ax1 = plt.subplots()

color = 'tab:red'
ax1.set_xlabel('Iterations')
ax1.set_ylabel('Loss', color=color)
ax1.plot(losses, color=color)
ax1.tick_params(axis='y', labelcolor=color)

ax2 = ax1.twinx()
color = 'tab:blue'
ax2.set_ylabel('Accuracy (%)', color=color)
ax2.plot(accuracies, color=color)
ax2.tick_params(axis='y', labelcolor=color)

fig.tight_layout()
plt.title('Training Loss and Accuracy')
plt.show()

# 在测试集上测试模型
correct = 0
total = 0
with torch.no_grad():
for data in test_loader:
images, labels = data
outputs = net(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()

print('Accuracy of the network on the 10000 test images: %d %%' % (100 * correct / total))

利用CNN模型实现人马分类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import numpy as np
import pandas as pd
import torch
import torchvision.transforms as transforms
from matplotlib import pyplot as plt
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader



# 定义网络结构
class CNN_NET(torch.nn.Module):
"""人马是二分类问题
"""

def __init__(self):
super(CNN_NET, self).__init__()
self.conv1 = torch.nn.Conv2d(in_channels=3, out_channels=128, kernel_size=4, stride=2, padding=1)
self.conv1_bn = torch.nn.BatchNorm2d(128)

self.conv2 = torch.nn.Conv2d(in_channels=128, out_channels=64, kernel_size=3, stride=1, padding=1)
self.conv2_bn = torch.nn.BatchNorm2d(64)

self.pool = torch.nn.MaxPool2d(kernel_size=2, stride=2)

self.conv3 = torch.nn.Conv2d(in_channels=64, out_channels=32, kernel_size=3, stride=1, padding=1)
self.conv3_bn = torch.nn.BatchNorm2d(32)

self.conv4 = torch.nn.Conv2d(in_channels=32, out_channels=16, kernel_size=2, stride=1, padding=1)
self.conv4_bn = torch.nn.BatchNorm2d(16)

self.fc1 = torch.nn.Linear(9 * 9 * 16, 1024)

self.fc2 = torch.nn.Linear(1024, 512)

self.fc3 = torch.nn.Linear(512, 256)

self.fc4 = torch.nn.Linear(256, 128)

self.fc5 = torch.nn.Linear(128, 64)

self.fc6 = torch.nn.Linear(64, 2)

self.dropout = torch.nn.Dropout(0.5)

def forward(self, x):
x = self.pool(F.relu(self.conv1_bn(self.conv1(x))))
x = self.pool(F.relu(self.conv2_bn(self.conv2(x))))
x = self.pool(F.relu(self.conv3_bn(self.conv3(x))))
x = self.pool(F.relu(self.conv4_bn(self.conv4(x))))

x = x.view(-1, 9 * 9 * 16)
x = F.relu(self.fc1(x))
x = self.dropout(x)
x = F.relu(self.fc2(x))
x = self.dropout(x)
x = F.relu(self.fc3(x))
x = self.dropout(x)
x = F.relu(self.fc4(x))
x = self.dropout(x)
x = F.relu(self.fc5(x))
x = self.dropout(x)
x = self.fc6(x)
return x

if __name__ == '__main__':
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
EPOCH = 12
# 定义数据预处理
transform = transforms.Compose([
transforms.Resize((300, 300)),
transforms.ToTensor(), # 将图像转换为张量
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) # 标准化图像数据
])

# 加载数据集
train_dataset = ImageFolder(root='data/horse-or-human/horse-or-human', transform=transform)
test_dataset = ImageFolder(root='data/horse-or-human/validation-horse-or-human', transform=transform)

# 创建数据加载器
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

net = CNN_NET().to(device) # 初始化网络并移到GPU

criterion = nn.CrossEntropyLoss() # 损失函数
optimizer = optim.Adam(net.parameters(), lr=0.001) # 优化器

# 训练网络
losses = []
accuracies = []
for epoch in range(EPOCH):
running_loss = 0.0
correct = 0

total = 0
for i, data in enumerate(train_loader, 0):
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device) # 将数据移到GPU
optimizer.zero_grad() # 梯度清零
outputs = net(inputs) # 前向传播
loss = criterion(outputs, labels) # 计算损失
loss.backward() # 反向传播
optimizer.step() # 更新参数
running_loss += loss.item()
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
if i % 10 == 9: # 每10个批次打印一次信息
print(f'[Epoch {epoch + 1}, Batch {i + 1}] loss: {running_loss / 10:.3f}')
losses.append(running_loss / 10)
accuracies.append(100 * correct / total)
running_loss = 0.0

print('Finished Training')

# 可视化训练损失和准确率
fig, ax1 = plt.subplots()

color = 'tab:red'
ax1.set_xlabel('Iterations')
ax1.set_ylabel('Loss', color=color)
ax1.plot(losses, color=color)
ax1.tick_params(axis='y', labelcolor=color)

ax2 = ax1.twinx()
color = 'tab:blue'
ax2.set_ylabel('Accuracy (%)', color=color)
ax2.plot(accuracies, color=color)
ax2.tick_params(axis='y', labelcolor=color)

fig.tight_layout()
plt.title('Training Loss and Accuracy')
plt.show()

# 在测试集上测试模型
correct = 0
total = 0
with torch.no_grad():
for data in test_loader:
images, labels = data
images, labels = images.to(device), labels.to(device) # 将数据移到GPU
outputs = net(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()

print('Accuracy of the network on the 10000 test images: %d %%' % (100 * correct / total))

利用PyTorch框架,使用R-CNN模型,在生活垃圾数据集上完成目标检测任务

这些是导入必要的库和模块。warnings.filterwarnings(‘ignore’)用于忽略警告信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import os
import torch
import torchvision.transforms as transforms
from PIL import Image
import xml.etree.ElementTree as ET
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
import numpy as np
from sklearn.model_selection import train_test_split
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
import warnings

# 忽略警告
warnings.filterwarnings('ignore')

自定义的数据集类TrashDataset,用于加载和预处理图像及其对应的标注数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 数据预处理类
class TrashDataset(Dataset):
def __init__(self, img_dir, annotations_dir, transform=None, imgs=None):
self.img_dir = img_dir # 图片目录
self.annotations_dir = annotations_dir # 标注文件目录
self.transform = transform # 数据变换
if imgs is None:
self.imgs = list(sorted(os.listdir(img_dir))) # 获取所有图片文件名并排序
self.annotations = list(sorted(os.listdir(annotations_dir))) # 获取所有标注文件名并排序
self.imgs = [img for img in self.imgs if img.replace('.jpg', '.xml') in self.annotations] # 确保图片和标注匹配
else:
self.imgs = imgs # 使用传入的图片列表

def __len__(self):
return len(self.imgs) # 返回数据集大小

def __getitem__(self, idx):
img_path = os.path.join(self.img_dir, self.imgs[idx]) # 获取图片路径
annotation_path = os.path.join(self.annotations_dir, self.imgs[idx].replace('.jpg', '.xml')) # 获取对应的标注路径
img = Image.open(img_path).convert("RGB") # 打开图片并转换为RGB格式

tree = ET.parse(annotation_path) # 解析标注文件
root = tree.getroot() # 获取根节点
boxes = [] # 存储边界框
labels = [] # 存储标签
for obj in root.findall('object'):
labels.append(1) # 假设所有对象标签都为1(垃圾)
bndbox = obj.find('bndbox')
xmin = int(bndbox.find('xmin').text) # 获取边界框的坐标
ymin = int(bndbox.find('ymin').text)
xmax = int(bndbox.find('xmax').text)
ymax = int(bndbox.find('ymax').text)
if xmax > xmin and ymax > ymin:
boxes.append([xmin, ymin, xmax, ymax]) # 添加有效的边界框

boxes = torch.as_tensor(boxes, dtype=torch.float32) # 转换为tensor
labels = torch.as_tensor(labels, dtype=torch.int64) # 转换为tensor
target = {'boxes': boxes, 'labels': labels} # 创建目标字典

if self.transform:
img = self.transform(img) # 应用变换

return img, target # 返回图片和目标字典

定义了一个数据转换管道,将图片转换为Tensor格式

1
2
3
4
# 数据转换
transform = transforms.Compose([
transforms.ToTensor(), # 将图片转换为Tensor
])

定义了一个collate_fn函数,用于DataLoader在批处理数据时的处理方式

1
2
3
# 定义collate_fn函数
def collate_fn(batch):
return tuple(zip(*batch))

加载数据集

1
2
# 加载数据集
dataset = TrashDataset('data\\trash', 'data\\annotated', transform=transform)

将数据集划分为训练集和验证集,80%用于训练,20%用于验证

1
2
3
4
# 划分数据集
train_imgs, val_imgs = train_test_split(dataset.imgs, test_size=0.2, random_state=42)
train_dataset = TrashDataset('data\\trash', 'data\\annotated', transform=transform, imgs=train_imgs)
val_dataset = TrashDataset('data\\trash', 'data\\annotated', transform=transform, imgs=val_imgs)

创建DataLoader用于加载训练和验证数据,batch_size设置为2

1
2
train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=2, shuffle=False, collate_fn=collate_fn)

加载预训练的Faster R-CNN模型,并将其分类器调整为输出2个类别(垃圾和背景)

1
2
3
4
5
# 加载预训练的Faster R-CNN模型并微调
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
num_classes = 2 # 1 class (trash) + background
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

将模型移动到GPU(如果可用)或CPU上

1
2
3
# 训练模型
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
model.to(device)

定义优化器,这里使用Adam优化器,也可以选择使用SGD优化器

1
2
3
4
5
# 优化器
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.Adam(params, lr=0.0001, weight_decay=0.0005)
# optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
num_epochs = 10

训练循环,遍历每个epoch和batch,计算损失并进行反向传播和优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 训练循环
for epoch in range(num_epochs):
model.train()
i = 0
for images, targets in train_loader:
images = list(image.to(device) for image in images)
targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
loss_dict = model(images, targets)
losses = sum(loss for loss in loss_dict.values())
optimizer.zero_grad()
losses.backward()
optimizer.step()
i += 1
if i % 10 == 0:
print(f'Epoch: {epoch}, Step: {i}, Loss: {losses.item()}')
print("Training complete")

定义一个可视化函数,显示模型在验证集上的预测结果,绘制边界框和置信度分数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 定义可视化函数
def visualize_predictions(dataset, model, device, threshold=0.65, num_images=5):
model.eval()
fig, axs = plt.subplots(1, num_images, figsize=(20, 5))
with torch.no_grad():
for i in range(num_images):
img, _ = dataset[i]
prediction = model([img.to(device)])
img = img.permute(1, 2, 0).numpy()
ax = axs[i] if num_images > 1 else axs
ax.imshow(img)
boxes = prediction[0]['boxes'].cpu().numpy().astype(np.int32)
scores = prediction[0]['scores'].cpu().numpy()
filtered_boxes = boxes[scores > threshold]
filtered_scores = scores[scores > threshold]
for box, score in zip(filtered_boxes, filtered_scores):
xmin, ymin, xmax, ymax = box
ax.add_patch(
plt.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin, fill=False, color='red', linewidth=2))
ax.text(xmin, ymin, f'{score:.2f}', bbox=dict(facecolor='yellow', alpha=0.5))
ax.axis('off')
plt.show()

# 测试并可视化预测结果
visualize_predictions(val_dataset, model, device, threshold=0.55, num_images=5)

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import os
import torch
import torchvision.transforms as transforms
from PIL import Image
import xml.etree.ElementTree as ET
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
import numpy as np
from sklearn.model_selection import train_test_split
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
import warnings

# 忽略警告
warnings.filterwarnings('ignore')


# 数据预处理类
class TrashDataset(Dataset):
def __init__(self, img_dir, annotations_dir, transform=None, imgs=None):
self.img_dir = img_dir
self.annotations_dir = annotations_dir
self.transform = transform
if imgs is None:
self.imgs = list(sorted(os.listdir(img_dir)))
self.annotations = list(sorted(os.listdir(annotations_dir)))
self.imgs = [img for img in self.imgs if img.replace('.jpg', '.xml') in self.annotations]
else:
self.imgs = imgs

def __len__(self):
return len(self.imgs)

def __getitem__(self, idx):
img_path = os.path.join(self.img_dir, self.imgs[idx])
annotation_path = os.path.join(self.annotations_dir, self.imgs[idx].replace('.jpg', '.xml'))
img = Image.open(img_path).convert("RGB")

tree = ET.parse(annotation_path)
root = tree.getroot()
boxes = []
labels = []
for obj in root.findall('object'):
labels.append(1)
bndbox = obj.find('bndbox')
xmin = int(bndbox.find('xmin').text)
ymin = int(bndbox.find('ymin').text)
xmax = int(bndbox.find('xmax').text)
ymax = int(bndbox.find('ymax').text)
if xmax > xmin and ymax > ymin:
boxes.append([xmin, ymin, xmax, ymax])

boxes = torch.as_tensor(boxes, dtype=torch.float32)
labels = torch.as_tensor(labels, dtype=torch.int64)
target = {'boxes': boxes, 'labels': labels}

if self.transform:
img = self.transform(img)

return img, target


# 数据转换
transform = transforms.Compose([
transforms.ToTensor(),
])


# 定义collate_fn函数
def collate_fn(batch):
return tuple(zip(*batch))


# 加载数据集
dataset = TrashDataset('data\\trash', 'data\\annotated', transform=transform)

# 划分数据集
train_imgs, val_imgs = train_test_split(dataset.imgs, test_size=0.2, random_state=42)
train_dataset = TrashDataset('data\\trash', 'data\\annotated', transform=transform, imgs=train_imgs)
val_dataset = TrashDataset('data\\trash', 'data\\annotated', transform=transform, imgs=val_imgs)

train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=2, shuffle=False, collate_fn=collate_fn)

# 加载预训练的Faster R-CNN模型并微调
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
num_classes = 2 # 1 class (trash) + background
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

# 训练模型
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
model.to(device)

# 优化器
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.Adam(params, lr=0.0001, weight_decay=0.0005)
# optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
num_epochs = 10

# 训练循环
for epoch in range(num_epochs):
model.train()
i = 0
for images, targets in train_loader:
images = list(image.to(device) for image in images)
targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
loss_dict = model(images, targets)
losses = sum(loss for loss in loss_dict.values())
optimizer.zero_grad()
losses.backward()
optimizer.step()
i += 1
if i % 10 == 0:
print(f'Epoch: {epoch}, Step: {i}, Loss: {losses.item()}')

print("Training complete")


# 定义可视化函数
def visualize_predictions(dataset, model, device, threshold=0.65, num_images=5):
model.eval()
fig, axs = plt.subplots(1, num_images, figsize=(20, 5))
with torch.no_grad():
for i in range(num_images):
img, _ = dataset[i]
prediction = model([img.to(device)])
img = img.permute(1, 2, 0).numpy()
ax = axs[i] if num_images > 1 else axs
ax.imshow(img)
boxes = prediction[0]['boxes'].cpu().numpy().astype(np.int32)
scores = prediction[0]['scores'].cpu().numpy()
filtered_boxes = boxes[scores > threshold]
filtered_scores = scores[scores > threshold]
for box, score in zip(filtered_boxes, filtered_scores):
xmin, ymin, xmax, ymax = box
ax.add_patch(
plt.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin, fill=False, color='red', linewidth=2))
ax.text(xmin, ymin, f'{score:.2f}', bbox=dict(facecolor='yellow', alpha=0.5))
ax.axis('off')
plt.show()


# 测试并可视化预测结果
visualize_predictions(val_dataset, model, device, threshold=0.55, num_images=5)