0

我正在学习Arrow结合TensorFlow,根据这个博客,我写了一个mnist的例子。

我的问题是为什么要对数据进行预处理,否则会报错:

tensorflow.python.framework.errors_impl.InternalError:类型错误:箭头类型不匹配:预期 dtype=2,但得到 dtype=9 [[node IteratorGetNext(定义在 home/maqy/tensorflowDemo/arrowDemo/mnist/arrow_mnist_local.py:35) ]] [操作:__inference_distributed_function_1455]

预处理的主要代码片段如下:

    # Read a CSV file into an Arrow Table
    opts = pyarrow.csv.ReadOptions(use_threads=True, autogenerate_column_names=True)
    table = pyarrow.csv.read_csv(filename, opts)
    # Fit the feature transform
    df = table.to_pandas()

    # normalizer the value except 'target'\
    mylist = []
    for i in range(COLUMNS_LEN - 1):
        tmp1 = 'f' + str(i+1)
        mylist.append(tmp1)

    scaler = Normalizer().fit(df[mylist])
    # Iterate over batches in the pyarrow.Table and apply processing
    for batch in table.to_batches():
        df = batch.to_pandas()

        # Process the batch and apply feature transform
        X_scaled = scaler.transform(df[mylist])

        # preprocess the data without label column
        my_dict = {'f0': df['f0']}
        for i in range(COLUMNS_LEN - 1):
            tmp2 = 'f' + str(i+1)
            my_dict[tmp2] = X_scaled[:, i]

        # when I use df as the param, the error will happen.
        # df_scaled = pd.DataFrame(my_dict)
        df_scaled = pd.DataFrame(df)

        batch_scaled = pa.RecordBatch.from_pandas(df_scaled, preserve_index=False)

        yield batch_scaled

完整代码如下,关键是使用df_scaled = pd.DataFrame(df) // 出现这个错误或者df_scaled = pd.DataFrame(my_dict) // 这个可以

from functools import partial
import tensorflow as tf
import pyarrow.csv
import pyarrow as pa
from sklearn.preprocessing import Normalizer
import tensorflow_io.arrow as arrow_io
import pandas as pd

"""run arrow_mnist local"""

# total columns num of mnist
COLUMNS_LEN = 785
# import os
# os.environ['CUDA_VISIBLE_DEVICES'] = '5'

# set gpu config
gpus = tf.config.experimental.list_physical_devices(device_type='GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

# from tensorflow demo
def model_fit(ds):
    model = tf.keras.Sequential([
        tf.keras.layers.Conv2D(32, 1, activation='relu', input_shape=(28, 28, 1)),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dense(10)
    ])

    model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                  optimizer=tf.keras.optimizers.Adam(),
                  metrics=['accuracy'])

    model.fit(ds, epochs=10)
    return model


def read_and_process(filename):
    """Read the given CSV file and yield processed Arrow batches."""

    # Read a CSV file into an Arrow Table
    opts = pyarrow.csv.ReadOptions(use_threads=True, autogenerate_column_names=True)
    table = pyarrow.csv.read_csv(filename, opts)
    # Fit the feature transform
    df = table.to_pandas()

    # normalizer the value except 'target'
    # for preprocess
    mylist = []
    for i in range(COLUMNS_LEN - 1):
        tmp1 = 'f' + str(i+1)
        mylist.append(tmp1)

    scaler = Normalizer().fit(df[mylist])
    # Iterate over batches in the pyarrow.Table and apply processing
    for batch in table.to_batches():
        df = batch.to_pandas()

        # Process the batch and apply feature transform
        X_scaled = scaler.transform(df[mylist])

        my_dict = {'f0': df['f0']}
        for i in range(COLUMNS_LEN - 1):
            tmp2 = 'f' + str(i+1)
            my_dict[tmp2] = X_scaled[:, i]

        # use my_dict will be successful
        # df_scaled = pd.DataFrame(my_dict)
        df_scaled = pd.DataFrame(df)

        batch_scaled = pa.RecordBatch.from_pandas(df_scaled, preserve_index=False)

        yield batch_scaled


def make_local_dataset(filename):
    """Make a TensorFlow Arrow Dataset that reads from a local CSV file."""

    # Read the local file and get a record batch iterator
    batch_iter = read_and_process(filename)

    # make output_types
    my_types_list = [tf.int64]
    for i in range(COLUMNS_LEN - 1):
        my_types_list.append(tf.float64)
    my_types_tuple = tuple(my_types_list)

    # make output_shapes
    my_shapes_list = []
    for i in range(COLUMNS_LEN):
        my_shapes_list.append(tf.TensorShape([]))
    my_shapes_tuple = tuple(my_shapes_list)

    # Create the Arrow Dataset as a stream from local iterator of record batches
    ds = arrow_io.ArrowStreamDataset.from_record_batches(
        batch_iter,
        batch_size=500,
        batch_mode='keep_remainder',
        output_types=my_types_tuple,
        output_shapes=my_shapes_tuple,
        record_batch_iter_factory=partial(read_and_process, filename))

    def my_map_func(*args):
        data_list = []
        # split the label and data
        flag = 1
        for i in args:
            if flag == 1:
                data_label = i
                flag = flag + 1
            else:
                data_list.append(i)

        data_tensor = tf.stack(data_list, axis=1)

        # reshape data to (28, 28 , 1)
        data_tensor_reshape = tf.reshape(data_tensor, (-1, 28, 28, 1))

        # return with label
        return (data_tensor_reshape, data_label)

    ds = ds.map(my_map_func)
    return ds


if __name__ == '__main__':

    ds = make_local_dataset('/home/maqy/data/mnist_test.csv')

    print("hello")
    model = model_fit(ds)

完整的日志如下:

/home/maqy/tensorflowDemo/arrowDemo/mnist/arrow_mnist_local.py
2020-03-30 11:27:52.430240: W tensorflow/stream_executor/platform/default/dso_loader.cc:55] Could not load dynamic library 'libnvinfer.so.6'; dlerror: libnvinfer.so.6: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/cuda/lib64
2020-03-30 11:27:52.430357: W tensorflow/stream_executor/platform/default/dso_loader.cc:55] Could not load dynamic library 'libnvinfer_plugin.so.6'; dlerror: libnvinfer_plugin.so.6: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/cuda/lib64
2020-03-30 11:27:52.430377: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:30] Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly.
2020-03-30 11:27:53.361412: I tensorflow_io/core/kernels/cpu_check.cc:128] Your CPU supports instructions that this TensorFlow IO binary was not compiled to use: AVX2 AVX512F FMA
2020-03-30 11:27:53.563709: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcuda.so.1
2020-03-30 11:27:53.837795: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1555] Found device 0 with properties: 
pciBusID: 0000:b0:00.0 name: Tesla T4 computeCapability: 7.5
coreClock: 1.59GHz coreCount: 40 deviceMemorySize: 14.73GiB deviceMemoryBandwidth: 298.08GiB/s
2020-03-30 11:27:53.838085: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudart.so.10.1
2020-03-30 11:27:53.840256: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcublas.so.10
2020-03-30 11:27:53.843244: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcufft.so.10
2020-03-30 11:27:53.843597: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcurand.so.10
2020-03-30 11:27:53.845977: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcusolver.so.10
2020-03-30 11:27:53.847262: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcusparse.so.10
2020-03-30 11:27:53.855799: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudnn.so.7
2020-03-30 11:27:53.862652: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1697] Adding visible gpu devices: 0
2020-03-30 11:27:53.867447: I tensorflow/core/platform/cpu_feature_guard.cc:142] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 AVX512F FMA
2020-03-30 11:27:53.892772: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 2100000000 Hz
2020-03-30 11:27:53.897115: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x55d4784a6870 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
2020-03-30 11:27:53.897177: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): Host, Default Version
2020-03-30 11:27:54.069526: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x55d47850d1e0 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
2020-03-30 11:27:54.069589: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): Tesla T4, Compute Capability 7.5
2020-03-30 11:27:54.073007: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1555] Found device 0 with properties: 
pciBusID: 0000:b0:00.0 name: Tesla T4 computeCapability: 7.5
coreClock: 1.59GHz coreCount: 40 deviceMemorySize: 14.73GiB deviceMemoryBandwidth: 298.08GiB/s
2020-03-30 11:27:54.073145: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudart.so.10.1
2020-03-30 11:27:54.073191: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcublas.so.10
2020-03-30 11:27:54.073232: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcufft.so.10
2020-03-30 11:27:54.073272: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcurand.so.10
2020-03-30 11:27:54.073312: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcusolver.so.10
2020-03-30 11:27:54.073350: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcusparse.so.10
2020-03-30 11:27:54.073390: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudnn.so.7
2020-03-30 11:27:54.080029: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1697] Adding visible gpu devices: 0
2020-03-30 11:27:54.080116: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudart.so.10.1
2020-03-30 11:27:54.084759: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1096] Device interconnect StreamExecutor with strength 1 edge matrix:
2020-03-30 11:27:54.084795: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1102]      0 
2020-03-30 11:27:54.084823: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1115] 0:   N 
2020-03-30 11:27:54.091861: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1241] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 14125 MB memory) -> physical GPU (device: 0, name: Tesla T4, pci bus id: 0000:b0:00.0, compute capability: 7.5)
hello
Epoch 1/10
2020-03-30 11:27:56.227513: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcublas.so.10
2020-03-30 11:27:56.601754: W tensorflow/core/common_runtime/base_collective_executor.cc:217] BaseCollectiveExecutor::StartAbort Internal: Type error: Arrow type mismatch: expected dtype=2, but got dtype=9
     [[{{node IteratorGetNext}}]]
2020-03-30 11:27:56.710903: W tensorflow/core/common_runtime/base_collective_executor.cc:217] BaseCollectiveExecutor::StartAbort Internal: Type error: Arrow type mismatch: expected dtype=2, but got dtype=9
     [[{{node IteratorGetNext}}]]
     [[IteratorGetNext/_2]]
      1/Unknown - 1s 934ms/stepTraceback (most recent call last):
  File "/home/maqy/tensorflowDemo/arrowDemo/mnist/arrow_mnist_local.py", line 175, in <module>
    model = model_fit(ds)
  File "/home/maqy/tensorflowDemo/arrowDemo/mnist/arrow_mnist_local.py", line 35, in model_fit
    model.fit(ds, epochs=10)
  File "/root/miniconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training.py", line 819, in fit
    use_multiprocessing=use_multiprocessing)
  File "/root/miniconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2.py", line 342, in fit
    total_epochs=epochs)
  File "/root/miniconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2.py", line 128, in run_one_epoch
    batch_outs = execution_function(iterator)
  File "/root/miniconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2_utils.py", line 98, in execution_function
    distributed_function(input_fn))
  File "/root/miniconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py", line 568, in __call__
    result = self._call(*args, **kwds)
  File "/root/miniconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py", line 632, in _call
    return self._stateless_fn(*args, **kwds)
  File "/root/miniconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py", line 2363, in __call__
    return graph_function._filtered_call(args, kwargs)  # pylint: disable=protected-access
  File "/root/miniconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py", line 1611, in _filtered_call
    self.captured_inputs)
  File "/root/miniconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py", line 1692, in _call_flat
    ctx, args, cancellation_manager=cancellation_manager))
  File "/root/miniconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py", line 545, in call
    ctx=ctx)
  File "/root/miniconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/execute.py", line 67, in quick_execute
    six.raise_from(core._status_to_exception(e.code, message), None)
  File "<string>", line 3, in raise_from
tensorflow.python.framework.errors_impl.InternalError:  Type error: Arrow type mismatch: expected dtype=2, but got dtype=9
     [[node IteratorGetNext (defined at home/maqy/tensorflowDemo/arrowDemo/mnist/arrow_mnist_local.py:35) ]] [Op:__inference_distributed_function_1455]

Function call stack:
distributed_function

Fatal Python error: This thread state must be current when releasing

Current thread 0x00007f6d14b0d700 (most recent call first):
  File "/root/miniconda3/lib/python3.7/socket.py", line 607 in write
  File "/root/miniconda3/lib/python3.7/site-packages/tensorflow_io/arrow/python/ops/arrow_dataset_ops.py", line 557 in run_server
  File "/root/miniconda3/lib/python3.7/threading.py", line 870 in run
  File "/root/miniconda3/lib/python3.7/threading.py", line 926 in _bootstrap_inner
  File "/root/miniconda3/lib/python3.7/threading.py", line 890 in _bootstrap

Thread 0x00007f6e4e9ed740 (most recent call first):
  File "/root/miniconda3/lib/python3.7/site-packages/tensorflow_core/python/ops/gen_dataset_ops.py", line 1139 in delete_iterator
  File "/root/miniconda3/lib/python3.7/site-packages/tensorflow_core/python/data/ops/iterator_ops.py", line 537 in __del__

Process finished with exit code 134
4

1 回答 1

1

It's not necessary to preprocess the data, it was just part of the example. The error indicates there is a mixup in columns, where ArrowStreamDataset.from_record_batches was expecting a float64 but got an int64. If you are using python <= 3.5, then making a dict could reorder the columns. You can make a Pandas DataFrame and specify column ordering like pd.DataFrame({'f': ...}, columns=['f', ...]). Check the column ordering in the DataFrame and the converted Arrow RecordBatch, and make sure it matches up with output types in ArrowStreamDataset.

于 2020-04-02T23:10:58.500 回答