1

我正在将数据从 netezza 加载到数据框中,然后尝试写入 dashdb。我正在使用 ibmdpy 尝试将数据加载到 bluemix 上的 dashdb 中。Ibmdpy 需要 pandas 数据帧,因此我将 spark 数据帧转换为 pandas 以加载到 dashdb。

all_disputes_df = sqlContext.read.format('jdbc').options(url='jdbc:netezza://pda1-wall.pok.ibm.com:5480/BACC_PRD_ISCNZ_GAPNZ', user=user, password=password, dbtable='METRICS.AR_EM_D2_02_AGG', driver='org.netezza.Driver').load()
from ibmdbpy import IdaDataBase
idadb = IdaDataBase(dsn='BLUDB', uid='dash107474', pwd='k5TY24AbzFjE')
print("current_schema is %s" % idadb.current_schema)
print("tables %s" % idadb.show_tables())
idadb.as_idadataframe(all_disputes_df.toPandas(), "all_disputes")

我得到以下回溯。


ValueError                                Traceback (most recent call last)
<ipython-input-4-63dde713c67b> in <module>()
----> 1 idadb.as_idadataframe(all_disputes_df.toPandas(), "all_disputes")

/home/brente/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in toPandas(self)
1379         """
1380         import pandas as pd
-> 1381         return pd.DataFrame.from_records(self.collect(), columns=self.columns)
1382 
1383     ##########################################################################################

/home/brente/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in collect(self)
279         with SCCallSiteSync(self._sc) as css:
280             port = self._jdf.collectToPython()
--> 281         return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
282 
283     @ignore_unicode_prefix

/home/brente/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/rdd.pyc in _load_from_socket(port, serializer)
140     try:
141         rf = sock.makefile("rb", 65536)
--> 142         for item in serializer.load_stream(rf):
143             yield item
144     finally:

/home/brente/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/serializers.pyc in load_stream(self, stream)
137         while True:
138             try:
--> 139                 yield self._read_with_length(stream)
140             except EOFError:
141                 return

/home/brente/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/serializers.pyc in _read_with_length(self, stream)
162         if len(obj) < length:
163             raise EOFError
--> 164         return self.loads(obj)
165 
166     def dumps(self, obj):

/home/brente/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/serializers.pyc in loads(self, obj, encoding)
420     else:
421         def loads(self, obj, encoding=None):
--> 422             return pickle.loads(obj)
423 
424 

/home/brente/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/types.pyc in <lambda>(*a)
1157 # This is used to unpickle a Row from JVM
1158 def _create_row_inbound_converter(dataType):
-> 1159     return lambda *a: dataType.fromInternal(a)
1160 
1161 

/home/brente/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/types.pyc in fromInternal(self, obj)
563             return obj
564         if self._needSerializeAnyField:
--> 565             values = [f.fromInternal(v) for f, v in zip(self.fields, obj)]
566         else:
567             values = obj

/home/brente/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/types.pyc in fromInternal(self, obj)
436 
437     def fromInternal(self, obj):
--> 438         return self.dataType.fromInternal(obj)
439 
440 

/home/brente/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/types.pyc in fromInternal(self, v)
174     def fromInternal(self, v):
175         if v is not None:
--> 176             return datetime.date.fromordinal(v + self.EPOCH_ORDINAL)
177 
178 
ValueError: ('ordinal must be >= 1', <function <lambda> at 0x7f97c0be76e0>, (u'788', u'10', u'00620000  ', u'0129101548  ', 1, u'000028628   ', 16520, Decimal('2124.76'), Decimal('2124.76'), 16525, 16525, u'000611099    

关于问题是什么的任何想法?

4

1 回答 1

0

将您的数据从 Netezza 读入数据帧失败。除此之外的一切都是我的猜测:

  1. Netezza 中是否存储了无效数据,从而导致反序列化成为数据帧?
  2. 也许尝试一些其他查询,以确保没有连接问题,数据库名称没有错字,诸如此类。
于 2016-06-15T06:33:55.503 回答