我正在学习Arrow结合TensorFlow,根据这个博客,我写了一个mnist的例子。
我的问题是为什么要对数据进行预处理,否则会报错:
tensorflow.python.framework.errors_impl.InternalError:类型错误:箭头类型不匹配:预期 dtype=2,但得到 dtype=9 [[node IteratorGetNext(定义在 home/maqy/tensorflowDemo/arrowDemo/mnist/arrow_mnist_local.py:35) ]] [操作:__inference_distributed_function_1455]
预处理的主要代码片段如下:
# Read a CSV file into an Arrow Table
opts = pyarrow.csv.ReadOptions(use_threads=True, autogenerate_column_names=True)
table = pyarrow.csv.read_csv(filename, opts)
# Fit the feature transform
df = table.to_pandas()
# normalizer the value except 'target'\
mylist = []
for i in range(COLUMNS_LEN - 1):
tmp1 = 'f' + str(i+1)
mylist.append(tmp1)
scaler = Normalizer().fit(df[mylist])
# Iterate over batches in the pyarrow.Table and apply processing
for batch in table.to_batches():
df = batch.to_pandas()
# Process the batch and apply feature transform
X_scaled = scaler.transform(df[mylist])
# preprocess the data without label column
my_dict = {'f0': df['f0']}
for i in range(COLUMNS_LEN - 1):
tmp2 = 'f' + str(i+1)
my_dict[tmp2] = X_scaled[:, i]
# when I use df as the param, the error will happen.
# df_scaled = pd.DataFrame(my_dict)
df_scaled = pd.DataFrame(df)
batch_scaled = pa.RecordBatch.from_pandas(df_scaled, preserve_index=False)
yield batch_scaled
完整代码如下,关键是使用df_scaled = pd.DataFrame(df) // 出现这个错误或者df_scaled = pd.DataFrame(my_dict) // 这个可以
from functools import partial
import tensorflow as tf
import pyarrow.csv
import pyarrow as pa
from sklearn.preprocessing import Normalizer
import tensorflow_io.arrow as arrow_io
import pandas as pd
"""run arrow_mnist local"""
# total columns num of mnist
COLUMNS_LEN = 785
# import os
# os.environ['CUDA_VISIBLE_DEVICES'] = '5'
# set gpu config
gpus = tf.config.experimental.list_physical_devices(device_type='GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
# from tensorflow demo
def model_fit(ds):
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 1, activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.Adam(),
metrics=['accuracy'])
model.fit(ds, epochs=10)
return model
def read_and_process(filename):
"""Read the given CSV file and yield processed Arrow batches."""
# Read a CSV file into an Arrow Table
opts = pyarrow.csv.ReadOptions(use_threads=True, autogenerate_column_names=True)
table = pyarrow.csv.read_csv(filename, opts)
# Fit the feature transform
df = table.to_pandas()
# normalizer the value except 'target'
# for preprocess
mylist = []
for i in range(COLUMNS_LEN - 1):
tmp1 = 'f' + str(i+1)
mylist.append(tmp1)
scaler = Normalizer().fit(df[mylist])
# Iterate over batches in the pyarrow.Table and apply processing
for batch in table.to_batches():
df = batch.to_pandas()
# Process the batch and apply feature transform
X_scaled = scaler.transform(df[mylist])
my_dict = {'f0': df['f0']}
for i in range(COLUMNS_LEN - 1):
tmp2 = 'f' + str(i+1)
my_dict[tmp2] = X_scaled[:, i]
# use my_dict will be successful
# df_scaled = pd.DataFrame(my_dict)
df_scaled = pd.DataFrame(df)
batch_scaled = pa.RecordBatch.from_pandas(df_scaled, preserve_index=False)
yield batch_scaled
def make_local_dataset(filename):
"""Make a TensorFlow Arrow Dataset that reads from a local CSV file."""
# Read the local file and get a record batch iterator
batch_iter = read_and_process(filename)
# make output_types
my_types_list = [tf.int64]
for i in range(COLUMNS_LEN - 1):
my_types_list.append(tf.float64)
my_types_tuple = tuple(my_types_list)
# make output_shapes
my_shapes_list = []
for i in range(COLUMNS_LEN):
my_shapes_list.append(tf.TensorShape([]))
my_shapes_tuple = tuple(my_shapes_list)
# Create the Arrow Dataset as a stream from local iterator of record batches
ds = arrow_io.ArrowStreamDataset.from_record_batches(
batch_iter,
batch_size=500,
batch_mode='keep_remainder',
output_types=my_types_tuple,
output_shapes=my_shapes_tuple,
record_batch_iter_factory=partial(read_and_process, filename))
def my_map_func(*args):
data_list = []
# split the label and data
flag = 1
for i in args:
if flag == 1:
data_label = i
flag = flag + 1
else:
data_list.append(i)
data_tensor = tf.stack(data_list, axis=1)
# reshape data to (28, 28 , 1)
data_tensor_reshape = tf.reshape(data_tensor, (-1, 28, 28, 1))
# return with label
return (data_tensor_reshape, data_label)
ds = ds.map(my_map_func)
return ds
if __name__ == '__main__':
ds = make_local_dataset('/home/maqy/data/mnist_test.csv')
print("hello")
model = model_fit(ds)
完整的日志如下:
/home/maqy/tensorflowDemo/arrowDemo/mnist/arrow_mnist_local.py
2020-03-30 11:27:52.430240: W tensorflow/stream_executor/platform/default/dso_loader.cc:55] Could not load dynamic library 'libnvinfer.so.6'; dlerror: libnvinfer.so.6: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/cuda/lib64
2020-03-30 11:27:52.430357: W tensorflow/stream_executor/platform/default/dso_loader.cc:55] Could not load dynamic library 'libnvinfer_plugin.so.6'; dlerror: libnvinfer_plugin.so.6: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/cuda/lib64
2020-03-30 11:27:52.430377: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:30] Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly.
2020-03-30 11:27:53.361412: I tensorflow_io/core/kernels/cpu_check.cc:128] Your CPU supports instructions that this TensorFlow IO binary was not compiled to use: AVX2 AVX512F FMA
2020-03-30 11:27:53.563709: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcuda.so.1
2020-03-30 11:27:53.837795: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1555] Found device 0 with properties:
pciBusID: 0000:b0:00.0 name: Tesla T4 computeCapability: 7.5
coreClock: 1.59GHz coreCount: 40 deviceMemorySize: 14.73GiB deviceMemoryBandwidth: 298.08GiB/s
2020-03-30 11:27:53.838085: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudart.so.10.1
2020-03-30 11:27:53.840256: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcublas.so.10
2020-03-30 11:27:53.843244: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcufft.so.10
2020-03-30 11:27:53.843597: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcurand.so.10
2020-03-30 11:27:53.845977: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcusolver.so.10
2020-03-30 11:27:53.847262: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcusparse.so.10
2020-03-30 11:27:53.855799: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudnn.so.7
2020-03-30 11:27:53.862652: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1697] Adding visible gpu devices: 0
2020-03-30 11:27:53.867447: I tensorflow/core/platform/cpu_feature_guard.cc:142] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 AVX512F FMA
2020-03-30 11:27:53.892772: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 2100000000 Hz
2020-03-30 11:27:53.897115: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x55d4784a6870 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
2020-03-30 11:27:53.897177: I tensorflow/compiler/xla/service/service.cc:176] StreamExecutor device (0): Host, Default Version
2020-03-30 11:27:54.069526: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x55d47850d1e0 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
2020-03-30 11:27:54.069589: I tensorflow/compiler/xla/service/service.cc:176] StreamExecutor device (0): Tesla T4, Compute Capability 7.5
2020-03-30 11:27:54.073007: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1555] Found device 0 with properties:
pciBusID: 0000:b0:00.0 name: Tesla T4 computeCapability: 7.5
coreClock: 1.59GHz coreCount: 40 deviceMemorySize: 14.73GiB deviceMemoryBandwidth: 298.08GiB/s
2020-03-30 11:27:54.073145: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudart.so.10.1
2020-03-30 11:27:54.073191: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcublas.so.10
2020-03-30 11:27:54.073232: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcufft.so.10
2020-03-30 11:27:54.073272: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcurand.so.10
2020-03-30 11:27:54.073312: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcusolver.so.10
2020-03-30 11:27:54.073350: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcusparse.so.10
2020-03-30 11:27:54.073390: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudnn.so.7
2020-03-30 11:27:54.080029: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1697] Adding visible gpu devices: 0
2020-03-30 11:27:54.080116: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudart.so.10.1
2020-03-30 11:27:54.084759: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1096] Device interconnect StreamExecutor with strength 1 edge matrix:
2020-03-30 11:27:54.084795: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1102] 0
2020-03-30 11:27:54.084823: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1115] 0: N
2020-03-30 11:27:54.091861: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1241] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 14125 MB memory) -> physical GPU (device: 0, name: Tesla T4, pci bus id: 0000:b0:00.0, compute capability: 7.5)
hello
Epoch 1/10
2020-03-30 11:27:56.227513: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcublas.so.10
2020-03-30 11:27:56.601754: W tensorflow/core/common_runtime/base_collective_executor.cc:217] BaseCollectiveExecutor::StartAbort Internal: Type error: Arrow type mismatch: expected dtype=2, but got dtype=9
[[{{node IteratorGetNext}}]]
2020-03-30 11:27:56.710903: W tensorflow/core/common_runtime/base_collective_executor.cc:217] BaseCollectiveExecutor::StartAbort Internal: Type error: Arrow type mismatch: expected dtype=2, but got dtype=9
[[{{node IteratorGetNext}}]]
[[IteratorGetNext/_2]]
1/Unknown - 1s 934ms/stepTraceback (most recent call last):
File "/home/maqy/tensorflowDemo/arrowDemo/mnist/arrow_mnist_local.py", line 175, in <module>
model = model_fit(ds)
File "/home/maqy/tensorflowDemo/arrowDemo/mnist/arrow_mnist_local.py", line 35, in model_fit
model.fit(ds, epochs=10)
File "/root/miniconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training.py", line 819, in fit
use_multiprocessing=use_multiprocessing)
File "/root/miniconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2.py", line 342, in fit
total_epochs=epochs)
File "/root/miniconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2.py", line 128, in run_one_epoch
batch_outs = execution_function(iterator)
File "/root/miniconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2_utils.py", line 98, in execution_function
distributed_function(input_fn))
File "/root/miniconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py", line 568, in __call__
result = self._call(*args, **kwds)
File "/root/miniconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py", line 632, in _call
return self._stateless_fn(*args, **kwds)
File "/root/miniconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py", line 2363, in __call__
return graph_function._filtered_call(args, kwargs) # pylint: disable=protected-access
File "/root/miniconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py", line 1611, in _filtered_call
self.captured_inputs)
File "/root/miniconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py", line 1692, in _call_flat
ctx, args, cancellation_manager=cancellation_manager))
File "/root/miniconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py", line 545, in call
ctx=ctx)
File "/root/miniconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/execute.py", line 67, in quick_execute
six.raise_from(core._status_to_exception(e.code, message), None)
File "<string>", line 3, in raise_from
tensorflow.python.framework.errors_impl.InternalError: Type error: Arrow type mismatch: expected dtype=2, but got dtype=9
[[node IteratorGetNext (defined at home/maqy/tensorflowDemo/arrowDemo/mnist/arrow_mnist_local.py:35) ]] [Op:__inference_distributed_function_1455]
Function call stack:
distributed_function
Fatal Python error: This thread state must be current when releasing
Current thread 0x00007f6d14b0d700 (most recent call first):
File "/root/miniconda3/lib/python3.7/socket.py", line 607 in write
File "/root/miniconda3/lib/python3.7/site-packages/tensorflow_io/arrow/python/ops/arrow_dataset_ops.py", line 557 in run_server
File "/root/miniconda3/lib/python3.7/threading.py", line 870 in run
File "/root/miniconda3/lib/python3.7/threading.py", line 926 in _bootstrap_inner
File "/root/miniconda3/lib/python3.7/threading.py", line 890 in _bootstrap
Thread 0x00007f6e4e9ed740 (most recent call first):
File "/root/miniconda3/lib/python3.7/site-packages/tensorflow_core/python/ops/gen_dataset_ops.py", line 1139 in delete_iterator
File "/root/miniconda3/lib/python3.7/site-packages/tensorflow_core/python/data/ops/iterator_ops.py", line 537 in __del__
Process finished with exit code 134