1

I have 2 very large (in tb) datasets (using pentastorm to train tf model)

what I am doing is loading the datasets using pentastorm and then creating a single feature and labels dataset, as I cant pass two separate datasets

train_X_mlp = lm_df_train.select(mlp_feature)# features dataset with 11 columns
train_Y = lm_df_train.select(out_feature)# 1 label

using pentastorm :

penta_test_X_mlp = make_spark_converter(test_X_mlp)
penta_train_Y = make_spark_converter(train_Y)

model function:

LEARNING_RATE=0.001
BATCH_SIZE = 128
TRAIN_MAX_STEPS=None
STEPS = None
NUM_EPOCHS = 2
LEAKY_RELU_ALPHA = 0.1
def build_model_mlp(in_shape=None,LEARNING_RATE=0.001):
    print("input shape:",in_shape)   
    
    input_layer_mlp = Input(shape=(in_shape,))
    m1 = Dense(32, activation=LeakyReLU(alpha=LEAKY_RELU_ALPHA), kernel_initializer='glorot_uniform')(input_layer_mlp)
    #     m2 = Concatenate()([l2, m2])
    m3 = Dense(16, activation=LeakyReLU(alpha=LEAKY_RELU_ALPHA))(m1)
    out = Dense(1, activation=LeakyReLU(alpha=LEAKY_RELU_ALPHA), name="output_mlp")(m3)
    losses_mlp = {'output_mlp': Huber(delta=1.0)}

    metrics_mlp = {'output_mlp': MeanAbsoluteError()}
    optimizer = tf.keras.optimizers.Adam(learning_rate = LEARNING_RATE)

    model_mlp = tf.keras.Model(inputs=input_layer_mlp,
                      outputs=out)

    model_mlp.compile(optimizer=optimizer, loss=losses_mlp, metrics = metrics_mlp)
    return model_mlp

Traning loop :

def mlp_split_window(x):
    features = x[0:-1]
    labels = x[-1:]
    return features, labels

with penta_train_X_mlp.make_tf_dataset(batch_size=BATCH_SIZE) as train_dataset_mlp,penta_train_Y.make_tf_dataset(batch_size=BATCH_SIZE) as train_dataset_Y,:

    train_dataset_mlp = train_dataset_mlp.map(lambda x: tf.reshape(tf.convert_to_tensor(x, dtype=tf.float64),[-1,11]))
    train_dataset_Y = train_dataset_Y.map(lambda x: tf.reshape(tf.convert_to_tensor(x, dtype=tf.float64),[-1,1]))

    model_mlp = build_model_mlp(in_shape=mlp_size_input)
    train_data=tf.data.Dataset.zip((train_dataset_mlp, train_dataset_Y))

    
    early_stopping = EarlyStopping(patience=3, monitor='val_accuracy', restore_best_weights=True, verbose=1)
    print(train_dataset_mlp,train_dataset_Y,test_dataset_mlp,test_dataset_Y,train_data,test_data)


    model_mlp.fit(train_data, epochs=5, verbose=2, callbacks=[early_stopping])

Error:

Stuck at Epoch 1/5
/databricks/python/lib/python3.7/site-packages/petastorm/arrow_reader_worker.py:53: FutureWarning: Calling .data on ChunkedArray is provided for compatibility after Column was removed, simply drop this attribute
  column_as_pandas = column.data.chunks[0].to_pandas()
for a long time

Any help would be great.

4

0 回答 0