0

我的开发环境是centos7,hbase 1.2.5,happybase 1.1.0,python 2.7,PyCharm,hadoop 2.7.3,spark 2.1 我正在开发一个大数据软件。我需要将值放入 HBase 表中。这些值来自 Spark RDD。以下是代码:

import happybase
from pyspark import SparkContext, SparkConf

connection = happybase.Connection('localhost') 
table = connection.table('tablename')
conf = SparkConf().setAppName("myFirstSparkApp").setMaster("local")
sc = SparkContext(conf=conf)
distFile = sc.textFile("/inputFilePath/")  
newLines = distFile.filter(lambda x: 'filter":' in x) 
newLines = newLines.map(lambda line:line.split('"'))
# The following line is working. Insert a row into the table.
table.put(b'row-key0', {'billCode:': '222', 'trayCode:': '222', 'pipeline:': '333'})
# But the following line is not working. what is wrong? Why?
newLines.foreach(lambda x: table.put(b'row-key', {'billCode:': x[7], 'trayCode:': x[3], 'pipeline:': x[11]}))

但是最后一行代码不起作用。错误消息是:

ImportError:没有名为 cybin pickle.PicklingError:无法序列化对象:ImportError:没有名为 cybin 的模块

我是 spark+happybase+python 的新开发者。如何解决?请需要你的帮助。谢谢你。

4

1 回答 1

0

这是一个简单的例子。

import happybase
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("App").setMaster("local")
sc = SparkContext(conf=conf)
rdd = parallelize([("a","1"),("b","2")])
def func(x):
    conn = happybase.Connection('localhost')
    table = conn.table("table_name")
    table.put(x[0],{"cf:c":x[1]})
    conn.close()
rdd.foreach(func)

但并不完美,可以参考http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd 祝你好运。

于 2017-06-09T14:48:51.177 回答