0

Problem Statement

I have to read from Google Cloud Storage, several JSON Lines files .jsonl. In order to do this, I have created a dataset from the records I want to read, which is a numpy array containing [[<gs:// url>, id], ...] where id is row number to check which line is train/test/validation.

Code

The main function, which creates the TF Dataset from a generator which yields the previously described np.ndarray and then runs a map function to download and parse the file is:

def load_dataset(records: np.ndarray) -> tf.data.Dataset:
    """Create Tensorflow Dataset MapDataset (generator) from a list of gs:// data URL.

    Args:
        records (np.ndarray): List of strings, which are gs://<foo>/foo<N>/*.jsonl.gz files

    Returns:
        tf.data.Dataset: MapDataset generator which can be used for training Keras models.
    """
    dataset = tf.data.Dataset.from_generator(lambda: _generator(records), (tf.string, tf.int8))
    return dataset


def _generator(records):
    for r in records:
        yield r[0], r[1]

As you can see, the generator is simply iterating through the np.ndarray to get url and a 'line index'

Then I have to load and preprocess the file from the URL to get a list of the json -> Dict objects.

def _load_and_preprocess(filepath, selected_sample):
    """Read a file GCS or local path and process it into a tensor

    Args:
        path (tensor): path string, pointer to GCS or local path

    Returns:
        tensor: processed input
    """
    sample_raw_input = tf.io.read_file(filepath)
    uncompressed_inputs = tf.py_function(_get_uncompressed_inputs, [sample_raw_input], tf.string)
    sample = tf.py_function(_load_sampled_sample, [uncompressed_inputs, selected_sample], tf.float32) #This `tf.float32` is definitely wrong
    return sample #This is not a tensor, but a List of Dictionaries which I will process later


def _get_uncompressed_inputs(record):
    return zlib.decompress(record.numpy(), 16 + zlib.MAX_WBITS)


def _load_sampled_sample(inputs: Iterable, selected_sample: List[int]) -> List[Dict[str, str]]:
    if not tf.executing_eagerly():
        raise RuntimeError("TensorFlow must be executing eagerly.")
    inputs = inputs.numpy()
    selected_sample = selected_sample.numpy()
    sample = _load__sampled_sample_from_jsonl(inputs, selected_sample)
    return sample


def _load__sampled_sample_from_jsonl(jsonl: bytes, selected_sample: List[int]) -> List[Dict[str, str]]:
    json_lines = _read_jsonl(jsonl).split("\n")
    sample = list()
    for n, sample_json in enumerate(json_lines):
        sample_obj = _read_json(sample_json) if n in selected_sample else None
        if sample_obj:
            sample.append(sample_obj)
    return sample


def _read_jsonl(jsonl: bytes) -> str:
    return jsonl.decode()

Executing

I then create the dataset with the above code, and try to retrieve a single sample from it to test.

val_ds = load_dataset(validation_records)
samples = tf.data.experimental.get_single_element(
    val_ds
) # This should be a list of Dicts

Which raises:

InvalidArgumentError: ValueError: Attempt to convert a value ({...}) with an unsupported type (<class 'dict'>) to a Tensor.
# ... are the dict values, which is really big so I've shortened it to `...`
Traceback (most recent call last):

  File "/home/victor/anaconda3/lib/python3.8/site-packages/tensorflow/python/ops/script_ops.py", line 242, in __call__
    return func(device, token, args)

  File "/home/victor/anaconda3/lib/python3.8/site-packages/tensorflow/python/ops/script_ops.py", line 140, in __call__
    outputs = [

  File "/home/victor/anaconda3/lib/python3.8/site-packages/tensorflow/python/ops/script_ops.py", line 141, in <listcomp>
    _maybe_copy_to_context_device(self._convert(x, dtype=dtype),

  File "/home/victor/anaconda3/lib/python3.8/site-packages/tensorflow/python/ops/script_ops.py", line 120, in _convert
    return ops.convert_to_tensor(value, dtype=dtype)

  File "/home/victor/anaconda3/lib/python3.8/site-packages/tensorflow/python/framework/ops.py", line 1499, in convert_to_tensor
    ret = conversion_func(value, dtype=dtype, name=name, as_ref=as_ref)

  File "/home/victor/anaconda3/lib/python3.8/site-packages/tensorflow/python/framework/constant_op.py", line 338, in _constant_tensor_conversion_function
    return constant(v, dtype=dtype, name=name)

  File "/home/victor/anaconda3/lib/python3.8/site-packages/tensorflow/python/framework/constant_op.py", line 263, in constant
    return _constant_impl(value, dtype, shape, name, verify_shape=False,

  File "/home/victor/anaconda3/lib/python3.8/site-packages/tensorflow/python/framework/constant_op.py", line 275, in _constant_impl
    return _constant_eager_impl(ctx, value, dtype, shape, verify_shape)

  File "/home/victor/anaconda3/lib/python3.8/site-packages/tensorflow/python/framework/constant_op.py", line 300, in _constant_eager_impl
    t = convert_to_eager_tensor(value, ctx, dtype)

  File "/home/victor/anaconda3/lib/python3.8/site-packages/tensorflow/python/framework/constant_op.py", line 98, in convert_to_eager_tensor
    return ops.EagerTensor(value, ctx.device_name, dtype)

ValueError: Attempt to convert a value ({...}) with an unsupported type (<class 'dict'>) to a Tensor.
# ... are the dict values, which is really big so I've shortened it to `...`


     [[{{node EagerPyFunc_1}}]] [Op:DatasetToSingleElement]

Conclusion

Is there any way I can work with List of Dicts without Eager execution (which is not allowed from TF Dataset)?

This list of dicts is not the input for my model, however, I simply cannot work with it in the preprocessing function because this error is raised before passing the values to any other function.

Additional Informations:

  • Python Version: 3.8
  • Tensorflow Version: 2.3.1
4

1 回答 1

0

好的,我想我已经通过急切地运行一个dataset.map函数来修复它:

dataset.map(lambda file, samples: tf.py_function(_load_and_preprocess, [file, samples], tf.variant))

此处描述:How can you map values in a tf.data.Dataset using a dictionary

于 2020-10-28T14:27:17.903 回答