0
点赞
收藏
分享

微信扫一扫

爬虫图像验证码识别

杰森wang 2021-09-28 阅读 59

字母数字汉子均可cnn+lstm+ctc

config_demo.yaml

System:
  GpuMemoryFraction:0.7
  TrainSetPath:'train/'
  TestSetPath:'test/'
  ValSetPath:'dev/'
  LabelRegex:'([\u4E00-\u9FA5]{4,8}).jpg'
  MaxTextLenth:8
  IMG_W:200
  IMG_H:100
  ModelName:'captcha2.h5'
  Alphabet:'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
 
 
NeuralNet:
  RNNSize:256
  Dropout:0.25
 
 
TrainParam:
  EarlyStoping:
    monitor:'val_acc'
    patience:10
    mode:'auto'
    baseline:0.02
  Epochs:10
  BatchSize:100
  TestBatchSize:10

train.py

# coding=utf-8
"""
将三通道的图片转为灰度图进行训练
"""
import itertools
import os
import re
import random
import string
from collectionsimport Counter
from os.pathimport join
import yaml
import cv2
import numpy as np
import tensorflow as tf
from kerasimport backend as K
from keras.callbacksimport ModelCheckpoint, EarlyStopping, Callback
from keras.layersimport Input, Dense, Activation, Dropout, BatchNormalization, Reshape, Lambda
from keras.layers.convolutionalimport Conv2D, MaxPooling2D
from keras.layers.mergeimport add, concatenate
from keras.layers.recurrentimport GRU
from keras.modelsimport Model, load_model
 
f= open('./config/config_demo.yaml','r', encoding='utf-8')
cfg= f.read()
cfg_dict= yaml.load(cfg)
 
config= tf.ConfigProto()
config.gpu_options.allow_growth= True
session= tf.Session(config=config)
K.set_session(session)
 
TRAIN_SET_PTAH= cfg_dict['System']['TrainSetPath']
VALID_SET_PATH= cfg_dict['System']['TrainSetPath']
TEST_SET_PATH= cfg_dict['System']['TestSetPath']
IMG_W= cfg_dict['System']['IMG_W']
IMG_H= cfg_dict['System']['IMG_H']
MODEL_NAME= cfg_dict['System']['ModelName']
LABEL_REGEX= cfg_dict['System']['LabelRegex']
 
RNN_SIZE= cfg_dict['NeuralNet']['RNNSize']
DROPOUT= cfg_dict['NeuralNet']['Dropout']
 
MONITOR= cfg_dict['TrainParam']['EarlyStoping']['monitor']
PATIENCE= cfg_dict['TrainParam']['EarlyStoping']['patience']
MODE= cfg_dict['TrainParam']['EarlyStoping']['mode']
BASELINE= cfg_dict['TrainParam']['EarlyStoping']['baseline']
EPOCHS= cfg_dict['TrainParam']['Epochs']
BATCH_SIZE= cfg_dict['TrainParam']['BatchSize']
TEST_BATCH_SIZE= cfg_dict['TrainParam']['TestBatchSize']
 
letters_dict= {}
MAX_LEN= 0
 
def get_maxlen():
    global MAX_LEN
    maxlen= 0
    lines= open("train.csv","r", encoding="utf-8").readlines()
    for linein lines:
        name,label= line.strip().split(",")
        if len(label)>maxlen:
            maxlen= len(label)
    MAX_LEN= maxlen
    return maxlen
 
def get_letters():
    global letters_dict
    letters= ""
 
    lines= open("train.csv","r",encoding="utf-8").readlines()
    maxlen= get_maxlen()
    for linein lines:
        name,label= line.strip().split(",")
        letters= letters+label
        if len(label) < maxlen:
            label= label+ '_' * (maxlen- len(label))
        letters_dict[name]= label
 
    if os.path.exists("letters.txt"):
        letters= open("letters.txt","r",encoding="utf-8").read()
        return letters
    return "".join(set(letters))
 
letters= get_letters()
f_W= open("letters.txt","w",encoding="utf-8")
f_W.write("".join(letters))
class_num= len(letters)+ 1   # plus 1 for blank
print('Letters:', ''.join(letters))
print("letters_num:",class_num)
 
def labels_to_text(labels):
    return ''.join([letters[int(x)] if int(x) != len(letters) else ''for xin labels])
 
def text_to_labels(text):
    return [letters.find(x)if letters.find(x) >-1 else len(letters)for xin text]
 
 
def is_valid_str(s):
    for chin s:
        if not chin letters:
            return False
    return True
 
 
class TextImageGenerator:
 
    def __init__(self,
                 dirpath,
                 tag,
                 img_w, img_h,
                 batch_size,
                 downsample_factor,
                 ):
        global letters_dict
        self.img_h= img_h
        self.img_w= img_w
        self.batch_size= batch_size
        self.downsample_factor= downsample_factor
        self.letters_dict= letters_dict
        self.n= len(self.letters_dict)
        self.indexes= list(range(self.n))
        self.cur_index= 0
        self.imgs= np.zeros((self.n,self.img_h,self.img_w))
        self.texts= []
 
        for i, (img_filepath, text)in enumerate(self.letters_dict.items()):
 
 
            img_filepath= dirpath+img_filepath
            if i== 0:
                img_filepath= "train/0.jpg"
            img= cv2.imread(img_filepath)
            img= cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)    # cv2默认是BGR模式
            img= cv2.resize(img, (self.img_w,self.img_h))
            img= img.astype(np.float32)
            img/= 255
            self.imgs[i, :, :]= img
            self.texts.append(text)
        print(len(self.texts),len(self.imgs),self.n)
 
    @staticmethod
    def get_output_size():
        return len(letters)+ 1
 
    def next_sample(self):  #每次返回一个数据和对应标签
        self.cur_index+= 1
        if self.cur_index >= self.n:
            self.cur_index= 0
            random.shuffle(self.indexes)
        return self.imgs[self.indexes[self.cur_index]],self.texts[self.indexes[self.cur_index]]
 
    def next_batch(self):  #
        while True:
            # width and height are backwards from typical Keras convention
            # because width is the time dimension when it gets fed into the RNN
            if K.image_data_format()== 'channels_first':
                X_data= np.ones([self.batch_size,1,self.img_w,self.img_h])
            else:
                X_data= np.ones([self.batch_size,self.img_w,self.img_h,1])
            Y_data= np.ones([self.batch_size, MAX_LEN])
            input_length= np.ones((self.batch_size,1))* (self.img_w// self.downsample_factor- 2)
            label_length= np.zeros((self.batch_size,1))
            source_str= []
 
            for iin range(self.batch_size):
                img, text= self.next_sample()
                img= img.T
                if K.image_data_format()== 'channels_first':
                    img= np.expand_dims(img,0)    #增加一个维度
                else:
                    img= np.expand_dims(img,-1)
                X_data[i]= img
                Y_data[i]= text_to_labels(text)
                source_str.append(text)
                text= text.replace("_", "") # important step
                label_length[i]= len(text)
 
            inputs= {
                'the_input': X_data,
                'the_labels': Y_data,
                'input_length': input_length,
                'label_length': label_length,
                # 'source_str': source_str
            }
            outputs= {'ctc': np.zeros([self.batch_size])}
            yield (inputs, outputs)
 
# # Loss and train functions, network architecture
def ctc_lambda_func(args):   #ctc损失是时间序列损失函数
    y_pred, labels, input_length, label_length= args
    # the 2 is critical here since the first couple outputs of the RNN
    # tend to be garbage:
    y_pred= y_pred[:,2:, :]
    return K.ctc_batch_cost(labels, y_pred, input_length, label_length)
 
 
downsample_factor= 4
 
 
def train(img_w=IMG_W, img_h=IMG_H, dropout=DROPOUT, batch_size=BATCH_SIZE, rnn_size=RNN_SIZE):
    # Input Parameters
    # Network parameters
    conv_filters= 16
    kernel_size= (3,3)
    pool_size= 2
    time_dense_size= 32
 
    if K.image_data_format()== 'channels_first':
        input_shape= (1, img_w, img_h)
    else:
        input_shape= (img_w, img_h,1)
 
    global downsample_factor
    downsample_factor= pool_size** 2
    tiger_train= TextImageGenerator(TRAIN_SET_PTAH,'train', img_w, img_h, batch_size, downsample_factor)
    tiger_val= TextImageGenerator(VALID_SET_PATH,'val', img_w, img_h, batch_size, downsample_factor)
 
    act= 'relu'
    input_data= Input(name='the_input', shape=input_shape, dtype='float32')
    inner= Conv2D(conv_filters, kernel_size, padding='same',
                   activation=None, kernel_initializer='he_normal',
                   name='conv1')(input_data)
    inner= BatchNormalization()(inner) # add BN
    inner= Activation(act)(inner)
 
    inner= MaxPooling2D(pool_size=(pool_size, pool_size), name='max1')(inner)
    inner= Conv2D(conv_filters, kernel_size, padding='same',
                   activation=None, kernel_initializer='he_normal',
                   name='conv2')(inner)
    inner= BatchNormalization()(inner) # add BN
    inner= Activation(act)(inner)
 
    inner= MaxPooling2D(pool_size=(pool_size, pool_size), name='max2')(inner)
 
    conv_to_rnn_dims= (img_w// (pool_size** 2), (img_h// (pool_size** 2))* conv_filters)
    inner= Reshape(target_shape=conv_to_rnn_dims, name='reshape')(inner)
 
    # cuts down input size going into RNN:
    inner= Dense(time_dense_size, activation=None, name='dense1')(inner)
    inner= BatchNormalization()(inner) # add BN
    inner= Activation(act)(inner)
    if dropout:
        inner= Dropout(dropout)(inner) # 防止过拟合
 
    # Two layers of bidirecitonal GRUs
    # GRU seems to work as well, if not better than LSTM:
    gru_1= GRU(rnn_size, return_sequences=True, kernel_initializer='he_normal', name='gru1')(inner)
    gru_1b= GRU(rnn_size, return_sequences=True, go_backwards=True, kernel_initializer='he_normal', name='gru1_b')(
        inner)
    gru1_merged= add([gru_1, gru_1b])
    gru_2= GRU(rnn_size, return_sequences=True, kernel_initializer='he_normal', name='gru2')(gru1_merged)
    gru_2b= GRU(rnn_size, return_sequences=True, go_backwards=True, kernel_initializer='he_normal', name='gru2_b')(
        gru1_merged)
 
    inner= concatenate([gru_2, gru_2b])
 
    if dropout:
        inner= Dropout(dropout)(inner) # 防止过拟合
 
    # transforms RNN output to character activations:
    inner= Dense(tiger_train.get_output_size(), kernel_initializer='he_normal',
                  name='dense2')(inner)
    y_pred= Activation('softmax', name='softmax')(inner)
    base_model= Model(inputs=input_data, outputs=y_pred)
    base_model.summary()
 
    labels= Input(name='the_labels', shape=[MAX_LEN], dtype='float32')
    input_length= Input(name='input_length', shape=[1], dtype='int64')
    label_length= Input(name='label_length', shape=[1], dtype='int64')
    # Keras doesn't currently support loss funcs with extra parameters
    # so CTC loss is implemented in a lambda layer
    loss_out= Lambda(ctc_lambda_func, output_shape=(1,), name='ctc')([y_pred, labels, input_length, label_length])
 
    model= Model(inputs=[input_data, labels, input_length, label_length], outputs=loss_out)
    # the loss calc occurs elsewhere, so use a dummy lambda func for the loss
    model.compile(loss={'ctc':lambda y_true, y_pred: y_pred}, optimizer='adadelta')
 
    earlystoping= EarlyStopping(monitor=MONITOR, patience=PATIENCE, verbose=1, mode=MODE, baseline=BASELINE)
    train_model_path= './tmp/train_' + MODEL_NAME
    checkpointer= ModelCheckpoint(filepath=train_model_path,
                                   verbose=1,
                                   save_best_only=True)
 
    if os.path.exists(train_model_path):
        model.load_weights(train_model_path)
        print('load model weights:%s' % train_model_path)
 
    evaluator= Evaluate(model)
    model.fit_generator(generator=tiger_train.next_batch(),
                        steps_per_epoch=tiger_train.n,
                        epochs=EPOCHS,
                        initial_epoch=1,
                        validation_data=tiger_val.next_batch(),
                        validation_steps=tiger_val.n,
                        callbacks=[checkpointer, earlystoping, evaluator])
 
    print('----train end----')
 
 
# For a real OCR application, this should be beam search with a dictionary
# and language model.  For this example, best path is sufficient.
def decode_batch(out):
    ret= []
    for jin range(out.shape[0]):
        out_best= list(np.argmax(out[j,2:],1))
        out_best= [kfor k, gin itertools.groupby(out_best)]
        outstr= ''
        for cin out_best:
            if c <len(letters):
                outstr+= letters[c]
        ret.append(outstr)
    return ret
 
 
class Evaluate(Callback):
    def __init__(self, model):
        self.accs= []
        self.model= model
 
    def on_epoch_end(self, epoch, logs=None):
        acc= evaluate(self.model)
        self.accs.append(acc)
 
 
# Test on validation images
def evaluate(model):
    global downsample_factor
    tiger_test= TextImageGenerator(VALID_SET_PATH,'test', IMG_W, IMG_H, TEST_BATCH_SIZE, downsample_factor)
 
    net_inp= model.get_layer(name='the_input').input
    net_out= model.get_layer(name='softmax').output
    predict_model= Model(inputs=net_inp, outputs=net_out)
 
    equalsIgnoreCaseNum= 0.00
    equalsNum= 0.00
    totalNum= 0.00
    for inp_value, _in tiger_test.next_batch():
        batch_size= inp_value['the_input'].shape[0]
        X_data= inp_value['the_input']
 
        net_out_value= predict_model.predict(X_data)
        pred_texts= decode_batch(net_out_value)
        labels= inp_value['the_labels']
        texts= []
        for labelin labels:
            text= labels_to_text(label)
            texts.append(text)
 
        for iin range(batch_size):
 
            totalNum+= 1
            if pred_texts[i]== texts[i]:
                equalsNum+= 1
            if pred_texts[i].lower()== texts[i].lower():
                equalsIgnoreCaseNum+= 1
            else:
                print('Predict: %s ---> Label: %s' % (pred_texts[i], texts[i]))
        if totalNum >= 10000:
            break
    print('---Result---')
    print('Test num: %d, accuracy: %.5f, ignoreCase accuracy: %.5f' % (
    totalNum, equalsNum/ totalNum, equalsIgnoreCaseNum/ totalNum))
    return equalsIgnoreCaseNum/ totalNum
 
 
if __name__== '__main__':
    train()
    test= True
    if test:
        model_path= './tmp/train_' + MODEL_NAME
        model= load_model(model_path,compile=False)
        evaluate(model)
    print('----End----')

interface_testset.py

import itertools
import string
import yaml
from tqdmimport tqdm
import cv2
import numpy as np
import os
import tensorflow as tf
from kerasimport backend as K
from keras.modelsimport Model, load_model
 
f= open('./config/config_demo.yaml','r', encoding='utf-8')
cfg= f.read()
cfg_dict= yaml.load(cfg)
config= tf.ConfigProto()
config.gpu_options.allow_growth= True
session= tf.Session(config=config)
K.set_session(session)
 
MODEL_NAME= cfg_dict['System']['ModelName']
 
 
letters= string.ascii_uppercase+ string.ascii_lowercase+string.digits
 
def decode_batch(out):
    ret= []
    for jin range(out.shape[0]):
        out_best= list(np.argmax(out[j,2:],1))
        out_best= [kfor k, gin itertools.groupby(out_best)]
        outstr= ''
        for cin out_best:
            if c <len(letters):
                outstr+= letters[c]
        ret.append(outstr)
    return ret
 
def get_x_data(img_data, img_w, img_h):
    img= cv2.cvtColor(img_data, cv2.COLOR_RGB2GRAY)
    img= cv2.resize(img, (img_w, img_h))
    img= img.astype(np.float32)
    img/= 255
    batch_size= 1
    if K.image_data_format()== 'channels_first':
        X_data= np.ones([batch_size,1, img_w, img_h])
    else:
        X_data= np.ones([batch_size, img_w, img_h,1])
    img= img.T
    if K.image_data_format()== 'channels_first':
        img= np.expand_dims(img,0)
    else:
        img= np.expand_dims(img,-1)
    X_data[0]= img
    return X_data
 
# Test on validation images
def interface(datapath="./testset" ,img_w= 200,img_h= 100):
 
    save_file= open("answer.csv","a",encoding="utf-8")
    save_file.truncate()
    model_path= './tmp/train_' + MODEL_NAME
    model= load_model(model_path,compile=False)
 
    net_inp= model.get_layer(name='the_input').input
    net_out= model.get_layer(name='softmax').output
    predict_model= Model(inputs=net_inp, outputs=net_out)
 
    print("开始预测,预测结果:")
    listdir= os.listdir(datapath)
 
    bar= tqdm(range(len(listdir)),total=len(listdir))
    for idxin bar:
        img_data= cv2.imread(datapath+"/" + str(idx)+ ".jpg")
        X_data= get_x_data(img_data, img_w, img_h)
        net_out_value= predict_model.predict(X_data)
        pred_texts= decode_batch(net_out_value)
        #print(str(idx) + ".jpg" + "\t", pred_texts[0])
        save_file.write(str(idx)+","+pred_texts[0]+"\r\n")
 
if __name__== '__main__':
    interface(datapath="./testset")
举报

相关推荐

0 条评论