这是使用Gluon API实现的对比损失:
class ContrastiveLoss(Loss):
def __init__(self, margin=2.0, weight=None, batch_axis=0, **kwargs):
super(ContrastiveLoss, self).__init__(weight, batch_axis, **kwargs)
self.margin = margin
def hybrid_forward(self, F, output1, output2, label):
euclidean_distance = F.sqrt(F.square(output1 - output2))
loss_contrastive = F.mean(((1-label) * F.square(euclidean_distance) +
label * F.square(F.clip(self.margin - euclidean_distance, 0.0, 10))))
return loss_contrastive
我已经基于 PyTorch 示例实现了它,如何使用取自这里的连体网。
PyTorch 和 MxNet 有很多不同,所以如果你想试试这个,这里是完整的可运行示例。您需要下载 AT&T 人脸数据并将图像转换为 jpeg,因为 mxnet 不支持开箱即用加载 .pgm 图像。
import matplotlib.pyplot as plt
import numpy as np
import random
from PIL import Image
import PIL.ImageOps
import mxnet as mx
from mxnet import autograd
from mxnet.base import numeric_types
from mxnet.gluon import nn, HybridBlock, Trainer
from mxnet.gluon.data import DataLoader
from mxnet.gluon.data.vision.datasets import ImageFolderDataset
from mxnet.gluon.loss import Loss
def imshow(img,text=None, should_save=False):
npimg = img.numpy()
plt.axis("off")
if text:
plt.text(75, 8, text, style='italic',fontweight='bold',
bbox={'facecolor':'white', 'alpha':0.8, 'pad':10})
plt.imshow(np.transpose(npimg, (1, 2, 0)))
plt.show()
def show_plot(iteration, loss):
plt.plot(iteration, loss)
plt.show()
class Config:
training_dir = "./faces/training/"
testing_dir = "./faces/testing/"
train_batch_size = 5
train_number_epochs = 100
class SiameseNetworkDataset(ImageFolderDataset):
def __init__(self, root, transform=None):
super().__init__(root, flag=0, transform=transform)
self.root = root
self.transform = transform
def __getitem__(self, index):
items_with_index = list(enumerate(self.items))
img0_index, img0_tuple = random.choice(items_with_index)
# we need to make sure approx 50% of images are in the same class
should_get_same_class = random.randint(0, 1)
if should_get_same_class:
while True:
# keep looping till the same class image is found
img1_index, img1_tuple = random.choice(items_with_index)
if img0_tuple[1] == img1_tuple[1]:
break
else:
img1_index, img1_tuple = random.choice(items_with_index)
img0 = super().__getitem__(img0_index)
img1 = super().__getitem__(img1_index)
return img0[0].transpose(), img1[0].transpose(), mx.nd.array(mx.nd.array([int(img1_tuple[1] != img0_tuple[1])]))
def __len__(self):
return super().__len__()
class ReflectionPad2D(HybridBlock):
"""Pads the input tensor using the reflection of the input boundary.
Parameters
----------
padding: int
An integer padding size
Shape:
- Input: :math:`(N, C, H_{in}, W_{in})`
- Output: :math:`(N, C, H_{out}, W_{out})` where
:math:`H_{out} = H_{in} + 2 * padding
:math:`W_{out} = W_{in} + 2 * padding
"""
def __init__(self, padding=0, **kwargs):
super(ReflectionPad2D, self).__init__(**kwargs)
if isinstance(padding, numeric_types):
padding = (0, 0, 0, 0, padding, padding, padding, padding)
assert(len(padding) == 8)
self._padding = padding
def hybrid_forward(self, F, x, *args, **kwargs):
return F.pad(x, mode='reflect', pad_width=self._padding)
class SiameseNetwork(HybridBlock):
def __init__(self):
super(SiameseNetwork, self).__init__()
self.cnn1 = nn.HybridSequential()
with self.cnn1.name_scope():
self.cnn1.add(ReflectionPad2D(padding=1))
self.cnn1.add(nn.Conv2D(in_channels=1, channels=4, kernel_size=3))
self.cnn1.add(nn.Activation('relu'))
self.cnn1.add(nn.BatchNorm())
self.cnn1.add(ReflectionPad2D(padding=1))
self.cnn1.add(nn.Conv2D(in_channels=4, channels=8, kernel_size=3))
self.cnn1.add(nn.Activation('relu'))
self.cnn1.add(nn.BatchNorm())
self.cnn1.add(ReflectionPad2D(padding=1))
self.cnn1.add(nn.Conv2D(in_channels=8, channels=8, kernel_size=3))
self.cnn1.add(nn.Activation('relu'))
self.cnn1.add(nn.BatchNorm())
self.fc1 = nn.HybridSequential()
with self.fc1.name_scope():
self.cnn1.add(nn.Dense(500)),
self.cnn1.add(nn.Activation('relu')),
self.cnn1.add(nn.Dense(500)),
self.cnn1.add(nn.Activation('relu')),
self.cnn1.add(nn.Dense(5))
def hybrid_forward(self, F, input1, input2):
output1 = self._forward_once(input1)
output2 = self._forward_once(input2)
return output1, output2
def _forward_once(self, x):
output = self.cnn1(x)
#output = output.reshape((output.shape[0],))
output = self.fc1(output)
return output
class ContrastiveLoss(Loss):
def __init__(self, margin=2.0, weight=None, batch_axis=0, **kwargs):
super(ContrastiveLoss, self).__init__(weight, batch_axis, **kwargs)
self.margin = margin
def hybrid_forward(self, F, output1, output2, label):
euclidean_distance = F.sqrt(F.square(output1 - output2))
loss_contrastive = F.mean(((1-label) * F.square(euclidean_distance) +
label * F.square(F.clip(self.margin - euclidean_distance, 0.0, 10))))
return loss_contrastive
def aug_transform(data, label):
augs = mx.image.CreateAugmenter(data_shape=(1, 100, 100))
for aug in augs:
data = aug(data)
return data, label
def run_training():
siamese_dataset = SiameseNetworkDataset(root=Config.training_dir,transform=aug_transform)
train_dataloader = DataLoader(siamese_dataset, shuffle=True, num_workers=1, batch_size=Config.train_batch_size)
counter = []
loss_history = []
iteration_number = 0
net = SiameseNetwork()
net.initialize(init=mx.init.Xavier())
trainer = Trainer(net.collect_params(), 'adam', {'learning_rate': 0.0005})
loss = ContrastiveLoss(margin=2.0)
for epoch in range(0, Config.train_number_epochs):
for i, data in enumerate(train_dataloader, 0):
img0, img1, label = data
with autograd.record():
output1, output2 = net(img0, img1)
loss_contrastive = loss(output1, output2, label)
loss_contrastive.backward()
trainer.step(Config.train_batch_size)
if i % 10 == 0:
print("Epoch number {}\n Current loss {}\n".format(epoch, loss_contrastive))
iteration_number += 10
counter.append(iteration_number)
loss_history.append(loss_contrastive)
#show_plot(counter, loss_history)
return net
def run_predict(net):
folder_dataset_test = SiameseNetworkDataset(root=Config.testing_dir,transform=aug_transform)
test_dataloader = DataLoader(folder_dataset_test, shuffle=True, num_workers=1, batch_size=Config.train_batch_size)
dataiter = iter(test_dataloader)
x0, _, _ = next(dataiter)
_, x1, label2 = next(dataiter)
output1, output2 = net(x0, x1)
euclidean_distance = mx.ndarray.sqrt(mx.ndarray.square(output1 - output2))
print('x0 vs x1 dissimilarity is {}'.format(euclidean_distance[0][0]))
if __name__ == '__main__':
net = run_training()
run_predict(net)