Problem Statement
I have to read from Google Cloud Storage, several JSON Lines files .jsonl. In order to do this, I have created a dataset from the records I want to read, which is a numpy array containing [[<gs:// url>, id], ...] where id is row number to check which line is train/test/validation.
Code
The main function, which creates the TF Dataset from a generator which yields the previously described np.ndarray and then runs a map function to download and parse the file is:
def load_dataset(records: np.ndarray) -> tf.data.Dataset:
"""Create Tensorflow Dataset MapDataset (generator) from a list of gs:// data URL.
Args:
records (np.ndarray): List of strings, which are gs://<foo>/foo<N>/*.jsonl.gz files
Returns:
tf.data.Dataset: MapDataset generator which can be used for training Keras models.
"""
dataset = tf.data.Dataset.from_generator(lambda: _generator(records), (tf.string, tf.int8))
return dataset
def _generator(records):
for r in records:
yield r[0], r[1]
As you can see, the generator is simply iterating through the np.ndarray to get url and a 'line index'
Then I have to load and preprocess the file from the URL to get a list of the json -> Dict objects.
def _load_and_preprocess(filepath, selected_sample):
"""Read a file GCS or local path and process it into a tensor
Args:
path (tensor): path string, pointer to GCS or local path
Returns:
tensor: processed input
"""
sample_raw_input = tf.io.read_file(filepath)
uncompressed_inputs = tf.py_function(_get_uncompressed_inputs, [sample_raw_input], tf.string)
sample = tf.py_function(_load_sampled_sample, [uncompressed_inputs, selected_sample], tf.float32) #This `tf.float32` is definitely wrong
return sample #This is not a tensor, but a List of Dictionaries which I will process later
def _get_uncompressed_inputs(record):
return zlib.decompress(record.numpy(), 16 + zlib.MAX_WBITS)
def _load_sampled_sample(inputs: Iterable, selected_sample: List[int]) -> List[Dict[str, str]]:
if not tf.executing_eagerly():
raise RuntimeError("TensorFlow must be executing eagerly.")
inputs = inputs.numpy()
selected_sample = selected_sample.numpy()
sample = _load__sampled_sample_from_jsonl(inputs, selected_sample)
return sample
def _load__sampled_sample_from_jsonl(jsonl: bytes, selected_sample: List[int]) -> List[Dict[str, str]]:
json_lines = _read_jsonl(jsonl).split("\n")
sample = list()
for n, sample_json in enumerate(json_lines):
sample_obj = _read_json(sample_json) if n in selected_sample else None
if sample_obj:
sample.append(sample_obj)
return sample
def _read_jsonl(jsonl: bytes) -> str:
return jsonl.decode()
Executing
I then create the dataset with the above code, and try to retrieve a single sample from it to test.
val_ds = load_dataset(validation_records)
samples = tf.data.experimental.get_single_element(
val_ds
) # This should be a list of Dicts
Which raises:
InvalidArgumentError: ValueError: Attempt to convert a value ({...}) with an unsupported type (<class 'dict'>) to a Tensor.
# ... are the dict values, which is really big so I've shortened it to `...`
Traceback (most recent call last):
File "/home/victor/anaconda3/lib/python3.8/site-packages/tensorflow/python/ops/script_ops.py", line 242, in __call__
return func(device, token, args)
File "/home/victor/anaconda3/lib/python3.8/site-packages/tensorflow/python/ops/script_ops.py", line 140, in __call__
outputs = [
File "/home/victor/anaconda3/lib/python3.8/site-packages/tensorflow/python/ops/script_ops.py", line 141, in <listcomp>
_maybe_copy_to_context_device(self._convert(x, dtype=dtype),
File "/home/victor/anaconda3/lib/python3.8/site-packages/tensorflow/python/ops/script_ops.py", line 120, in _convert
return ops.convert_to_tensor(value, dtype=dtype)
File "/home/victor/anaconda3/lib/python3.8/site-packages/tensorflow/python/framework/ops.py", line 1499, in convert_to_tensor
ret = conversion_func(value, dtype=dtype, name=name, as_ref=as_ref)
File "/home/victor/anaconda3/lib/python3.8/site-packages/tensorflow/python/framework/constant_op.py", line 338, in _constant_tensor_conversion_function
return constant(v, dtype=dtype, name=name)
File "/home/victor/anaconda3/lib/python3.8/site-packages/tensorflow/python/framework/constant_op.py", line 263, in constant
return _constant_impl(value, dtype, shape, name, verify_shape=False,
File "/home/victor/anaconda3/lib/python3.8/site-packages/tensorflow/python/framework/constant_op.py", line 275, in _constant_impl
return _constant_eager_impl(ctx, value, dtype, shape, verify_shape)
File "/home/victor/anaconda3/lib/python3.8/site-packages/tensorflow/python/framework/constant_op.py", line 300, in _constant_eager_impl
t = convert_to_eager_tensor(value, ctx, dtype)
File "/home/victor/anaconda3/lib/python3.8/site-packages/tensorflow/python/framework/constant_op.py", line 98, in convert_to_eager_tensor
return ops.EagerTensor(value, ctx.device_name, dtype)
ValueError: Attempt to convert a value ({...}) with an unsupported type (<class 'dict'>) to a Tensor.
# ... are the dict values, which is really big so I've shortened it to `...`
[[{{node EagerPyFunc_1}}]] [Op:DatasetToSingleElement]
Conclusion
Is there any way I can work with List of Dicts without Eager execution (which is not allowed from TF Dataset)?
This list of dicts is not the input for my model, however, I simply cannot work with it in the preprocessing function because this error is raised before passing the values to any other function.
Additional Informations:
- Python Version:
3.8 - Tensorflow Version:
2.3.1