0

对于 Context,我有一个装满图像的验证文件夹和一个 txt 文件来区分图像对是正面的1还是负面的0

Example of txt:
verification_data/00041961.jpg verification_data/00044353.jpg 0
verification_data/00007133.jpg verification_data/00060449.jpg 1
verification_data/00041961.jpg verification_data/00020166.jpg 0
verification_data/00013102.jpg verification_data/00055525.jpg 1
verification_data/00002921.jpg verification_data/00041331.jpg 0

在我见过的大多数教程中,它们都有一个锚、正面和负面文件夹。并使用此代码加载图像数据

    #Make Folder Structure
    #Path directories
    POS_PATH = os.path.join('data', 'positive')
    NEG_PATH = os.path.join('data', 'negative')
    ANC_PATH = os.path.join('data', 'anchor')
    anchor = tf.data.Dataset.list_files(ANC_PATH+'\*.jpg').take(300)
    positive = tf.data.Dataset.list_files(POS_PATH+'\*.jpg').take(300)
    negative = tf.data.Dataset.list_files(NEG_PATH+'\*.jpg').take(300)       

    #Make Labelled Dataset
    positives= tf.data.Dataset.zip((anchor,positive,tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
    negatives= tf.data.Dataset.zip((anchor,negative,tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
    data= positives.concatenate(negatives)

但问题就在这里,教程(youtube.com/watch?v=UMjW4Db4E_g&list=PLgNJO2hghbmhHuhURAGbe6KWpiYZt0AMH&index=3)从一个文件夹中加载了他们的图像,只有同一张脸,因此很容易提取,因为文件夹充当类,但我的 txt 文件区分图像对只有一个文件夹中存在所有图像我尝试使用数组来分隔图像对。

    lines = []
    with open('verification_pairs_val.txt') as file:
        lines = file.readlines()
    verifylist = []
    for line in lines:
        array = line.split(' ')
        #print(array)
        verifylist.append(array)
    for content in verifylist:
        content[2]="{}".format(content[2]).replace("\n","")
        content[2]=int(content[2])
    print(verifylist)

但我不知道如何将图像对单独加载到连体网络中。

Entire Code:


#Import Libraries
import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf

import uuid
from scipy.spatial import distance as dist
import argparse
import imutils




use_dataset=True
img_size=64
EPOCHS = 1000

#Image preprocess function
def preprocess(file_path):
    byte_img = tf.io.read_file(file_path)
    img = tf.io.decode_jpeg(byte_img)
    img = tf.image.resize(img,(img_size,img_size))
    img = img/255.0
    return img
    
    #TwinImage preprocess function
def preprocess_twin(input_img,validation_img,label):
    return (preprocess(input_img),preprocess(validation_img),label)

def detect_face(frame, net, ln, objIdx=0):
        (H, W) = frame.shape[:2]
        results = []
        blob = cv2.dnn.blobFromImage(frame, 1 / 255.0, (416, 416),swapRB=True, crop=False)
        net.setInput(blob)
        layerOutputs = net.forward(ln)
        boxes = []
        centroids = []
        confidences = []
        for output in layerOutputs:
            for detection in output:
                scores = detection[5:]
                classID = np.argmax(scores)
                confidence = scores[classID]
                if classID == objIdx and confidence > MIN_CONF:
                    box = detection[0:4] * np.array([W, H, W, H])
                    (centerX, centerY, width, height) = box.astype("int")
                    x = int(centerX - (width / 2))
                    y = int(centerY - (height / 2))
                    boxes.append([x, y, int(width), int(height)])
                    centroids.append((centerX, centerY))
                    confidences.append(float(confidence))
        idxs = cv2.dnn.NMSBoxes(boxes, confidences, MIN_CONF, NMS_THRESH)
        if len(idxs) > 0:
            for i in idxs.flatten():
                (x, y) = (boxes[i][0], boxes[i][1])
                (w, h) = (boxes[i][2], boxes[i][3])
                r = (confidences[i], (x, y, x + w, y + h), centroids[i])
                results.append(r)
        return results

#Set GPU Memory Limit
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)



if use_dataset==False:
    
    #Set parameters for yoloV4
    MODEL_PATH = "yolov4-trained"
    MIN_CONF = 0.3
    NMS_THRESH = 0.3
    USE_GPU = True
     
    

        
    #Make Folder Structure
    #Path directories
    POS_PATH = os.path.join('data', 'positive')
    NEG_PATH = os.path.join('data', 'negative')
    ANC_PATH = os.path.join('data', 'anchor')
    

    
    #Load files frome folder "yolov4-trained"
    ap = argparse.ArgumentParser()
    ap.add_argument("-i", "--input", type=str, default="",
        help="path to (optional) input video file")
    ap.add_argument("-o", "--output", type=str, default="",
        help="path to (optional) output video file")
    ap.add_argument("-d", "--display", type=int, default=1,
        help="whether or not output frame should be displayed")
    args = vars(ap.parse_args())
    labelsPath = os.path.sep.join([MODEL_PATH, "obj.names"])
    LABELS = open(labelsPath).read().strip().split("\n")
    weightsPath = os.path.sep.join([MODEL_PATH, "yolov4-obj_last.weights"])
    configPath = os.path.sep.join([MODEL_PATH, "yolov4-obj.cfg"])
    
    #Load YoloV4
    print("[INFO] loading YOLO from disk...")
    net = cv2.dnn.readNetFromDarknet(configPath, weightsPath)
    
    #Enable GPU
    if USE_GPU:
        print("[INFO] setting preferable backend and target to CUDA...")
        net.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)
        net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA)
    
    ln = net.getLayerNames()
    ln = [ln[i - 1] for i in net.getUnconnectedOutLayers()]
    
    
    #Collect Anchor and Positive Classes with WebCam
    print("[INFO] accessing video stream...")
    capture = cv2.VideoCapture(0)
    writer = None
    
    while capture.isOpened():
        (grabbed, frame) = capture.read()
        if not grabbed:
            break
        frame = imutils.resize(frame, width=700)
        results = detect_face(frame, net, ln,objIdx=LABELS.index("face"))
    
        for (i, (prob, bbox, centroid)) in enumerate(results):
            (startX, startY, endX, endY) = bbox
            (cX, cY) = centroid
            color = (255, 255, 0)
            
            if cv2.waitKey(1) & 0XFF == ord('a'):
                img = frame
                crop_img = img[cY-125:cY-125+250, cX-125:cX-125+250,:]
                imgname=os.path.join(ANC_PATH,'{}.jpg'.format(uuid.uuid1()))
                try:
                    cv2.imwrite(imgname,crop_img)
                except:
                    pass
            
            if cv2.waitKey(1) & 0XFF == ord('p'):
                img = frame
                crop_img = img[cY-125:cY-125+250, cX-125:cX-125+250,:]
                imgname=os.path.join(POS_PATH,'{}.jpg'.format(uuid.uuid1()))
                try:
                    cv2.imwrite(imgname,crop_img)
                except:
                    pass
                
            cv2.putText(frame, "Face_Detect %s" % ("{:.3f}".format(prob)), (startX, startY-5),
                cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
            
            cv2.rectangle(frame, (startX, startY), (endX, endY), color, 2)
            cv2.circle(frame, (cX, cY), 5, color, 1)
    
        text = "Press \"Q\" to stop"
        cv2.putText(frame, text, (10, frame.shape[0] - 25),
            cv2.FONT_HERSHEY_SIMPLEX, 0.85, (0, 0, 255), 2)
    
        if args["display"] > 0:
            cv2.imshow("Image Collection", frame)
            key = cv2.waitKey(1) & 0xFF
            
            if key == ord("q"):
                break
    capture.release()
    cv2.destroyAllWindows()
    
    #Load and preprocess Images
    #get file paths
    anchor = tf.data.Dataset.list_files(ANC_PATH+'\*.jpg').take(300)
    positive = tf.data.Dataset.list_files(POS_PATH+'\*.jpg').take(300)
    negative = tf.data.Dataset.list_files(NEG_PATH+'\*.jpg').take(300)       

    #Make Labelled Dataset
    positives= tf.data.Dataset.zip((anchor,positive,tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
    negatives= tf.data.Dataset.zip((anchor,negative,tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
    data= positives.concatenate(negatives)
else:

    #My own dataset
    lines = []
    with open('verification_pairs_val.txt') as file:
        lines = file.readlines()
    verifylist = []
    for line in lines:
        array = line.split(' ')
        #print(array)
        verifylist.append(array)
    for content in verifylist:
        content[2]="{}".format(content[2]).replace("\n","")
        content[2]=int(content[2])
    print(verifylist)
    
    
"""
THIS  SECTION OF THE CODE IS WHERE MY PROBLEM LIES
I need to figure out a way to get the data variable to match up with the tutorial video.
"""
    
    if verifylist[0][2]==1:
        pos_anchor = tf.data.Dataset.list_files(verifylist[0][0]).take(1)
        pos = tf.data.Dataset.list_files(verifylist[0][1]).take(1)
        data= tf.data.Dataset.zip((pos_anchor,pos,tf.data.Dataset.from_tensor_slices(tf.ones(1))))
    elif verifylist[0][2]==0:
        neg_anchor = tf.data.Dataset.list_files(verifylist[0][0]).take(1)
        neg = tf.data.Dataset.list_files(verifylist[0][1]).take(1)
        data= tf.data.Dataset.zip((neg_anchor,neg,tf.data.Dataset.from_tensor_slices(tf.zeros(1))))
    
    #for i in range(1,2000):
    for i in range(1,len(verifylist)):
        if verifylist[i][2]==1:
            pos_anchor = tf.data.Dataset.list_files(verifylist[i][0]).take(1)
            pos = tf.data.Dataset.list_files(verifylist[i][1]).take(1)
            positives= tf.data.Dataset.zip((pos_anchor,pos,tf.data.Dataset.from_tensor_slices(tf.ones(1))))
            data= data.concatenate(positives)
            
        elif verifylist[i][2]==0:
            neg_anchor = tf.data.Dataset.list_files(verifylist[i][0]).take(1)
            neg = tf.data.Dataset.list_files(verifylist[i][1]).take(1)
            negatives= tf.data.Dataset.zip((neg_anchor,neg,tf.data.Dataset.from_tensor_slices(tf.zeros(1))))
            data= data.concatenate(negatives)


#Build dataloader pipeline
data= data.map(preprocess_twin)
data= data.cache()
data= data.shuffle(buffer_size=1024)

#Training Partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)
    
#Test Partition
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

#Model Setup
#Embedding layer function
def make_embedding():
    inp = Input(shape=(img_size,img_size,3),name="input_image")
    
    c1 = Conv2D(64,(10,10),activation='relu')(inp)
    m1 = MaxPooling2D(64,(2,2),padding='same')(c1)
    
    c2 = Conv2D(128,(7,7),activation='relu')(m1)
    m2 = MaxPooling2D(64,(2,2),padding='same')(c2)
    
    c3 = Conv2D(128,(4,4),activation='relu')(m2)
    m3 = MaxPooling2D(64,(2,2),padding='same')(c3)
    
    c4 = Conv2D(256,(4,4),activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096,activation='sigmoid')(f1)
    
    return Model(inputs=[inp],outputs=[d1],name='embedding')

embedding=make_embedding()
#embedding.summary()

#L1 Siamese Distance layer function
class L1Dist(Layer):
    # Initial method - inheritance
    def __init__(self, **kwargs):
        super().__init__()  
    #Similarity calculation
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)
    
#l1 = L1Dist()
#l1(anchor_embedding, validation_embedding)

#Siamese Model

def make_siamese_model():
    input_image = Input(name='input_img', shape=(img_size,img_size,3))
    validation_image = Input(name='validation_img', shape=(img_size,img_size,3))
    
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))
    
    classifier = Dense(1, activation='sigmoid')(distances)
    
    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

siamese_model = make_siamese_model()
siamese_model.summary()

#Training Model
#Loss
binary_cross_loss = tf.losses.BinaryCrossentropy()
#Optimizer
opt = tf.keras.optimizers.Adam(0.0001) 
#Checkpoint
checkpoint_dir = 'training_checkpoints'
if os.path.isdir(checkpoint_dir)==False:
    os.mkdir(checkpoint_dir)
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

#Training Step function
@tf.function
def train_step(batch):
    
    with tf.GradientTape() as tape:     
        # Get anchor and positive/negative image
        X = batch[:2]
        # Get label
        y = batch[2]
        
        # Forward pass
        yhat = siamese_model(X, training=True)
        # Calculate loss
        loss = binary_cross_loss(y, yhat)
    print(loss)
        
    # Calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    
    # Calculate updated weights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
    
    # Return loss
    return loss

#Training Loop function
def train(data, EPOCHS):
    # Loop through epochs
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        
        # Loop through each batch
        for idx, batch in enumerate(data):
            # Run train step here
            train_step(batch)
            progbar.update(idx+1)
        
        # Save checkpoints
        if epoch % 100 == 0: 
            checkpoint.save(file_prefix=checkpoint_prefix)

train(train_data, EPOCHS)



4

0 回答 0