0

我使用Keras和TensorFlow后端。我刚刚想出了如何在不屏蔽的情况下训练和分类不同长度的序列,因为我无法掩盖工作。在我正在使用的玩具示例中,我试图训练LSTM以检测任意长度的序列是否以1开头。不同长度的错误分批序列

from keras.models import Sequential 
from keras.layers import LSTM, Dense 
import numpy as np 


def gen_sig(num_samples, seq_len): 
    one_indices = np.random.choice(a=num_samples, size=num_samples // 2, replace=False) 

    x_val = np.zeros((num_samples, seq_len), dtype=np.bool) 
    x_val[one_indices, 0] = 1 

    y_val = np.zeros(num_samples, dtype=np.bool) 
    y_val[one_indices] = 1 

    return x_val, y_val 


N_train = 100 
N_test = 10 
recall_len = 20 

X_train, y_train = gen_sig(N_train, recall_len) 

X_test, y_test = gen_sig(N_train, recall_len) 

print('Build STATEFUL model...') 
model = Sequential() 
model.add(LSTM(10, batch_input_shape=(1, 1, 1), return_sequences=False, stateful=True)) 
model.add(Dense(1, activation='sigmoid')) 
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy']) 

print('Train...') 
for epoch in range(15): 
    mean_tr_acc = [] 
    mean_tr_loss = [] 

    for seq_idx in range(X_train.shape[0]): 
     start_val = X_train[seq_idx, 0] 
     assert y_train[seq_idx] == start_val 
     assert tuple(np.nonzero(X_train[seq_idx, :]))[0].shape[0] == start_val 

     y_in = np.array([y_train[seq_idx]], dtype=np.bool) 

     for j in range(np.random.choice(a=np.arange(5, recall_len+1))): 
      x_in = np.array([[[X_train[seq_idx][j]]]]) 
      tr_loss, tr_acc = model.train_on_batch(x_in, y_in) 

      mean_tr_acc.append(tr_acc) 
      mean_tr_loss.append(tr_loss) 

      model.reset_states() 

    print('accuracy training = {}'.format(np.mean(mean_tr_acc))) 
    print('loss training = {}'.format(np.mean(mean_tr_loss))) 
    print('___________________________________') 

    mean_te_acc = [] 
    mean_te_loss = [] 
    for seq_idx in range(X_test.shape[0]): 
     start_val = X_test[seq_idx, 0] 
     assert y_test[seq_idx] == start_val 
     assert tuple(np.nonzero(X_test[seq_idx, :]))[0].shape[0] == start_val 

     y_in = np.array([y_test[seq_idx]], dtype=np.bool) 

     for j in range(np.random.choice(a=np.arange(5, recall_len+1))): 
      te_loss, te_acc = model.test_on_batch(np.array([[[X_test[seq_idx][j]]]], dtype=np.bool), y_in) 
      mean_te_acc.append(te_acc) 
      mean_te_loss.append(te_loss) 
     model.reset_states() 

    print('accuracy testing = {}'.format(np.mean(mean_te_acc))) 
    print('loss testing = {}'.format(np.mean(mean_te_loss))) 
    print('___________________________________') 

从代码中可以看出,我的错误正在每个时间步骤上进行批处理。由于多种原因,这是不好的。我如何分两步进行网络训练?例如:

  1. 运行通过网络一堆值的累积误差
  2. 调整网络的权重给予该累积误差

回答

0

做的就是在原来的问题描述,最简单的方法是用掩模训练原始网络,然后用有状态网络进行测试,以便可以对任何长度输入进行分类:

import numpy as np 
np.random.seed(1) 

import tensorflow as tf 
tf.set_random_seed(1) 

from keras import models 
from keras.layers import Dense, Masking, LSTM 

import matplotlib.pyplot as plt 


def stateful_model(): 
    hidden_units = 256 

    model = models.Sequential() 
    model.add(LSTM(hidden_units, batch_input_shape=(1, 1, 1), return_sequences=False, stateful=True)) 
    model.add(Dense(1, activation='relu', name='output')) 

    model.compile(loss='binary_crossentropy', optimizer='rmsprop') 

    return model 


def train_rnn(x_train, y_train, max_len, mask): 
    epochs = 10 
    batch_size = 200 

    vec_dims = 1 
    hidden_units = 256 
    in_shape = (max_len, vec_dims) 

    model = models.Sequential() 

    model.add(Masking(mask, name="in_layer", input_shape=in_shape,)) 
    model.add(LSTM(hidden_units, return_sequences=False)) 
    model.add(Dense(1, activation='relu', name='output')) 

    model.compile(loss='binary_crossentropy', optimizer='rmsprop') 

    model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, 
       validation_split=0.05) 

    return model 


def gen_train_sig_cls_pair(t_stops, num_examples, mask): 
    x = [] 
    y = [] 
    max_t = int(np.max(t_stops)) 

    for t_stop in t_stops: 
     one_indices = np.random.choice(a=num_examples, size=num_examples // 2, replace=False) 

     sig = np.zeros((num_examples, max_t), dtype=np.int8) 
     sig[one_indices, 0] = 1 
     sig[:, t_stop:] = mask 
     x.append(sig) 

     cls = np.zeros(num_examples, dtype=np.bool) 
     cls[one_indices] = 1 
     y.append(cls) 

    return np.concatenate(x, axis=0), np.concatenate(y, axis=0) 


def gen_test_sig_cls_pair(t_stops, num_examples): 
    x = [] 
    y = [] 

    for t_stop in t_stops: 
     one_indices = np.random.choice(a=num_examples, size=num_examples // 2, replace=False) 

     sig = np.zeros((num_examples, t_stop), dtype=np.bool) 
     sig[one_indices, 0] = 1 
     x.extend(list(sig)) 

     cls = np.zeros((num_examples, t_stop), dtype=np.bool) 
     cls[one_indices] = 1 
     y.extend(list(cls)) 

    return x, y 


if __name__ == '__main__': 
    noise_mag = 0.01 
    mask_val = -10 
    signal_lengths = (10, 15, 20) 

    x_in, y_in = gen_train_sig_cls_pair(signal_lengths, 10, mask_val) 

    mod = train_rnn(x_in[:, :, None], y_in, int(np.max(signal_lengths)), mask_val) 

    testing_dat, expected = gen_test_sig_cls_pair(signal_lengths, 3) 

    state_mod = stateful_model() 
    state_mod.set_weights(mod.get_weights()) 

    res = [] 
    for s_i in range(len(testing_dat)): 
     seq_in = list(testing_dat[s_i]) 
     seq_len = len(seq_in) 

     for t_i in range(seq_len): 
      res.extend(state_mod.predict(np.array([[[seq_in[t_i]]]]))) 

     state_mod.reset_states() 

    fig, axes = plt.subplots(2) 
    axes[0].plot(np.concatenate(testing_dat), label="input") 

    axes[1].plot(res, "ro", label="result", alpha=0.2) 
    axes[1].plot(np.concatenate(expected, axis=0), "bo", label="expected", alpha=0.2) 
    axes[1].legend(bbox_to_anchor=(1.1, 1)) 

    plt.show()