应用信息:IBM MQ 9.2、Cloudera CDP 7.1.6、Spark 2.4.5
我正在将 Spark 代码从 Spark 1.6 升级到 Spark 2.4.5。我有一个 json 内容(复杂模式)推送到消息长度超过 4096 的 MQ 队列。我能够直接读取具有相同内容的 json 文件,但是当相同的内容推送到 MQ 时,我得到了损坏的记录尝试使用以下代码打印架构。
val myMsg = JmsStreamUtils.createAsynchronousJmsQueueStream(ssc, MQConsumerFactory(host,port.toInt, qm, qn, user, credentials, qc), converter, Session.AUTO_ACKNOWLEDGE, StorageLevel.MEMORY_AND_DISK_SER)
myMsg.foreachRDD(rdd => {
val sqlContext = SparkSession.builder.getOrCreate()
val myDS = sqlContext.createDataset(rdd)
val readJson = sqlContext.read.json(myDS)
readJson.printSchema()
rdd.collect().foeach(println)
}
当我发出 时rdd.collect().foreach(println)
,它只在日志文件中显示 4095 个字符。
是否有任何线索可能是损坏记录的原因?
我的 run.sh
APPNAME="$(basename "$PWD")"
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
CDPPATH="/opt/cloudera/parcels/CDH/lib"
MQJARPH="/spark/mqjars"
LOGPH="/sparklogs"
JARLIST="$MQJARPH/MQCredentialUtil.jar,$MQJARPH/spark-core_2.11-1.5.2.logging.jar,$MQJARPH/config-1.3.0.jar,$MQJARPH/com.ibm.mq.allclient.jar,$MQJARPH/fscontext.jar,$MQJARPH/guava-15.0-rc1.jar,$MQJARPH/javax.jms.jar,$MQJARPH/jta.jar,$MQJARPH/spark-jms-receiver-0.1.2-s_2.11.jar,$MQJARPH/spark-mq-jms-receiver_2.11-0.0.1-SNAPSHOT.jar,$MQJARPH/jms.jar,$MQJARPH/providerutil.jar"
$CDPPATH/spark/bin/spark-submit --master local[2] --conf spark.ui.enabled=false --jars $JARLIST --packages com.databricks:spark-csv_2.11:1.5.0 --class sparkintegration.SparkMQ "$DIR/target/scala-2.11/spark-mq-jms_2.11-0.0.1-SNAPSHOT.jar" >> $LOGPH/"$APPNAME-application-log.out" 2>> $LOGPH/"$APPNAME-log.out"
是否有任何配置设置可以增加火花端的缓冲区大小/字符串长度?