我正在将数据从 netezza 加载到数据框中,然后尝试写入 dashdb。我正在使用 ibmdpy 尝试将数据加载到 bluemix 上的 dashdb 中。Ibmdpy 需要 pandas 数据帧,因此我将 spark 数据帧转换为 pandas 以加载到 dashdb。
all_disputes_df = sqlContext.read.format('jdbc').options(url='jdbc:netezza://pda1-wall.pok.ibm.com:5480/BACC_PRD_ISCNZ_GAPNZ', user=user, password=password, dbtable='METRICS.AR_EM_D2_02_AGG', driver='org.netezza.Driver').load()
from ibmdbpy import IdaDataBase
idadb = IdaDataBase(dsn='BLUDB', uid='dash107474', pwd='k5TY24AbzFjE')
print("current_schema is %s" % idadb.current_schema)
print("tables %s" % idadb.show_tables())
idadb.as_idadataframe(all_disputes_df.toPandas(), "all_disputes")
我得到以下回溯。
ValueError Traceback (most recent call last)
<ipython-input-4-63dde713c67b> in <module>()
----> 1 idadb.as_idadataframe(all_disputes_df.toPandas(), "all_disputes")
/home/brente/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in toPandas(self)
1379 """
1380 import pandas as pd
-> 1381 return pd.DataFrame.from_records(self.collect(), columns=self.columns)
1382
1383 ##########################################################################################
/home/brente/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in collect(self)
279 with SCCallSiteSync(self._sc) as css:
280 port = self._jdf.collectToPython()
--> 281 return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
282
283 @ignore_unicode_prefix
/home/brente/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/rdd.pyc in _load_from_socket(port, serializer)
140 try:
141 rf = sock.makefile("rb", 65536)
--> 142 for item in serializer.load_stream(rf):
143 yield item
144 finally:
/home/brente/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/serializers.pyc in load_stream(self, stream)
137 while True:
138 try:
--> 139 yield self._read_with_length(stream)
140 except EOFError:
141 return
/home/brente/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/serializers.pyc in _read_with_length(self, stream)
162 if len(obj) < length:
163 raise EOFError
--> 164 return self.loads(obj)
165
166 def dumps(self, obj):
/home/brente/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/serializers.pyc in loads(self, obj, encoding)
420 else:
421 def loads(self, obj, encoding=None):
--> 422 return pickle.loads(obj)
423
424
/home/brente/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/types.pyc in <lambda>(*a)
1157 # This is used to unpickle a Row from JVM
1158 def _create_row_inbound_converter(dataType):
-> 1159 return lambda *a: dataType.fromInternal(a)
1160
1161
/home/brente/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/types.pyc in fromInternal(self, obj)
563 return obj
564 if self._needSerializeAnyField:
--> 565 values = [f.fromInternal(v) for f, v in zip(self.fields, obj)]
566 else:
567 values = obj
/home/brente/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/types.pyc in fromInternal(self, obj)
436
437 def fromInternal(self, obj):
--> 438 return self.dataType.fromInternal(obj)
439
440
/home/brente/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/types.pyc in fromInternal(self, v)
174 def fromInternal(self, v):
175 if v is not None:
--> 176 return datetime.date.fromordinal(v + self.EPOCH_ORDINAL)
177
178
ValueError: ('ordinal must be >= 1', <function <lambda> at 0x7f97c0be76e0>, (u'788', u'10', u'00620000 ', u'0129101548 ', 1, u'000028628 ', 16520, Decimal('2124.76'), Decimal('2124.76'), 16525, 16525, u'000611099
关于问题是什么的任何想法?