1

我正在尝试从镶木地板中读取数据以获取语言模型。

镶木地板包含两列:

  • 目标(整数)
  • 特征向量(整数数组)

我正在修改这篇文章中的代码(这对我有用)。当我尝试下面的代码时,我在尝试运行模型时收到 InvalidArgumentError。

import random
from pyspark.sql import Row

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Activation, Embedding,  LSTM
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import  Sequence

from petastorm import make_batch_reader
from petastorm.etl.dataset_metadata import materialize_dataset
import pyarrow.parquet as pq

## build toy dataset
vocab_size = 250
seq_length = 100
parquet_path = '/dbfs/ml/langmod/petastorm/toy_dataset.parquet'

def row_generator(x):
    """Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
    return Row(target = random.randint(0, vocab_size), feature_vec = [random.randint(0, vocab_size) for i in range(seq_length)])


rows_count = 1000
rows_rdd = sc.parallelize(range(rows_count)).map(row_generator)

df = spark.createDataFrame(rows_rdd)


df.write.parquet(parquet_path, mode = 'overwrite')


underscore_files = [f for f in os.listdir(get_local_path(parquet_path)) if f.startswith("_")]
pq.EXCLUDED_PARQUET_PATHS.update(underscore_files)
## build model and read in data from parquet, converting to tf.Dataset format

with make_batch_reader('file:/dbfs/' + parquet_path, num_epochs = None) as train_reader:
    train_dataset = make_petastorm_dataset(train_reader).map(lambda x: (tf.convert_to_tensor(x.feature_vec), tf.one_hot(x.target, depth = vocab_size)))   \
                                                        .apply(tf.data.experimental.unbatch())                                                            \
                                                        .batch(10, drop_remainder = True)


    model = Sequential()
    model.add(Embedding(vocab_size, 20, mask_zero = True, input_length = None, name = 'embed'))
    model.add(LSTM(50, return_sequences = False, name = 'lstm1'))
    model.add(Dense(vocab_size, activation = 'softmax', name = 'dense_layer'))
    model.compile(loss = 'categorical_crossentropy', optimizer = 'adam', metrics = ['categorical_accuracy'])                                                         
    model.fit(train_dataset, epochs = 2, steps_per_epoch = 10, verbose = 1)

错误:

InvalidArgumentError                      Traceback (most recent call last)
<command-2202319388737190> in <module>
     10     model.add(Dense(vocab_size, activation = 'softmax', name = 'dense_layer'))
     11     model.compile(loss = 'categorical_crossentropy', optimizer = 'adam', metrics = ['categorical_accuracy'])
---> 12     model.fit(train_dataset, epochs = 2, steps_per_epoch = 10, verbose = 1)

/databricks/python/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training.py in fit(self, x, y, batch_size, epochs, verbose, callbacks, validation_split, validation_data, shuffle, class_weight, sample_weight, initial_epoch, steps_per_epoch, validation_steps, validation_freq, max_queue_size, workers, use_multiprocessing, **kwargs)
    725         max_queue_size=max_queue_size,
    726         workers=workers,
--> 727         use_multiprocessing=use_multiprocessing)
    728 
    729   def evaluate(self,

/databricks/python/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_arrays.py in fit(self, model, x, y, batch_size, epochs, verbose, callbacks, validation_split, validation_data, shuffle, class_weight, sample_weight, initial_epoch, steps_per_epoch, validation_steps, validation_freq, **kwargs)
    673         validation_steps=validation_steps,
    674         validation_freq=validation_freq,
--> 675         steps_name='steps_per_epoch')
    676 
    677   def evaluate(self,

/databricks/python/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_arrays.py in model_iteration(model, inputs, targets, sample_weights, batch_size, epochs, verbose, callbacks, val_inputs, val_targets, val_sample_weights, shuffle, initial_epoch, steps_per_epoch, validation_steps, validation_freq, mode, validation_in_fit, prepared_feed_values_from_dataset, steps_name, **kwargs)
    298           else:
    299             actual_inputs = ins()
--> 300           batch_outs = f(actual_inputs)
    301         except errors.OutOfRangeError:
    302           if is_dataset:

/databricks/python/lib/python3.7/site-packages/tensorflow_core/python/keras/backend.py in __call__(self, inputs)
   3474 
   3475     fetched = self._callable_fn(*array_vals,
-> 3476                                 run_metadata=self.run_metadata)
   3477     self._call_fetch_callbacks(fetched[-len(self._fetches):])
   3478     output_structure = nest.pack_sequence_as(

/databricks/python/lib/python3.7/site-packages/tensorflow_core/python/client/session.py in __call__(self, *args, **kwargs)
   1470         ret = tf_session.TF_SessionRunCallable(self._session._session,
   1471                                                self._handle, args,
-> 1472                                                run_metadata_ptr)
   1473         if run_metadata:
   1474           proto_data = tf_session.TF_GetBuffer(run_metadata_ptr)

InvalidArgumentError: 2 root error(s) found.
  (0) Invalid argument: transpose expects a vector of size 4. But input(1) is a vector of size 3
     [[{{node lstm1_3/transpose}}]]
     [[lstm1_3/TensorArrayUnstack_1/range/_459]]
  (1) Invalid argument: transpose expects a vector of size 4. But input(1) is a vector of size 3
     [[{{node lstm1_3/transpose}}]]
0 successful operations.
0 derived errors ignored.

这个错误令人惊讶,因为它似乎说模型中间层的形状存在问题,它应该正好适合前一层输出的形状。

但是,如果我将数据集转换为迭代器,然后分别运行输出 X 和 Ys,它将按预期运行该批次:

with make_batch_reader('file:/dbfs/' + parquet_path, num_epochs = None) as train_reader:
  train_dataset = make_petastorm_dataset(train_reader).map(lambda x: (tf.convert_to_tensor(x.feature_vec), tf.one_hot(x.target, depth = vocab_size)))   \
                                                        .apply(tf.data.experimental.unbatch())                                                            \
                                                        .batch(10, drop_remainder = True)
  iterator = train_dataset.make_one_shot_iterator()
  tensor = iterator.get_next()

  with tf.Session() as sess:
    features, target = sess.run(tensor)

    model = Sequential()
    model.add(Embedding(vocab_size, 20, mask_zero = True, input_length = None, name = 'embed'))
    model.add(LSTM(50, return_sequences = False, name = 'lstm1'))
    model.add(Dense(vocab_size, activation = 'softmax', name = 'dense_layer'))
    model.compile(loss = 'categorical_crossentropy', optimizer = 'adam', metrics = ['categorical_accuracy'])                                                         

    model.fit(x = features, y = target,  verbose = 1)
10/10 [==============================] - 1s 76ms/sample - loss: 5.5202 - categorical_accuracy: 0.1000

我猜整数数组列被读取并转换为 tf.Dataset 格式的方式存在一些问题,但看不到可能导致这种情况的原因。我认为上述块中的这一行肯定存在一些问题:

train_dataset = make_petastorm_dataset(train_reader).map(lambda x: (tf.convert_to_tensor(x.feature_vec), tf.one_hot(x.target, depth = vocab_size)))   \
                                                        .apply(tf.data.experimental.unbatch())                                                            \
                                                        .batch(10, drop_remainder = True)

我正在运行 databricks 运行时 ML 6.2

  • 张量流 1.15.0
  • 拍风暴 0.8.0
4

0 回答 0