1

应用信息:IBM MQ 9.2、Cloudera CDP 7.1.6、Spark 2.4.5

我正在将 Spark 代码从 Spark 1.6 升级到 Spark 2.4.5。我有一个 json 内容(复杂模式)推送到消息长度超过 4096 的 MQ 队列。我能够直接读取具有相同内容的 json 文件,但是当相同的内容推送到 MQ 时,我得到了损坏的记录尝试使用以下代码打印架构。

val myMsg = JmsStreamUtils.createAsynchronousJmsQueueStream(ssc, MQConsumerFactory(host,port.toInt, qm, qn, user, credentials, qc), converter, Session.AUTO_ACKNOWLEDGE, StorageLevel.MEMORY_AND_DISK_SER)
myMsg.foreachRDD(rdd => {
  val sqlContext = SparkSession.builder.getOrCreate()
  val myDS = sqlContext.createDataset(rdd)
  val readJson = sqlContext.read.json(myDS)
  readJson.printSchema()
  rdd.collect().foeach(println)
}

当我发出 时rdd.collect().foreach(println),它只在日志文件中显示 4095 个字符。

是否有任何线索可能是损坏记录的原因?

我的 run.sh

APPNAME="$(basename "$PWD")"
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
CDPPATH="/opt/cloudera/parcels/CDH/lib"
MQJARPH="/spark/mqjars"
LOGPH="/sparklogs"
JARLIST="$MQJARPH/MQCredentialUtil.jar,$MQJARPH/spark-core_2.11-1.5.2.logging.jar,$MQJARPH/config-1.3.0.jar,$MQJARPH/com.ibm.mq.allclient.jar,$MQJARPH/fscontext.jar,$MQJARPH/guava-15.0-rc1.jar,$MQJARPH/javax.jms.jar,$MQJARPH/jta.jar,$MQJARPH/spark-jms-receiver-0.1.2-s_2.11.jar,$MQJARPH/spark-mq-jms-receiver_2.11-0.0.1-SNAPSHOT.jar,$MQJARPH/jms.jar,$MQJARPH/providerutil.jar"
$CDPPATH/spark/bin/spark-submit --master local[2] --conf spark.ui.enabled=false --jars $JARLIST --packages com.databricks:spark-csv_2.11:1.5.0 --class sparkintegration.SparkMQ "$DIR/target/scala-2.11/spark-mq-jms_2.11-0.0.1-SNAPSHOT.jar" >> $LOGPH/"$APPNAME-application-log.out" 2>> $LOGPH/"$APPNAME-log.out"

是否有任何配置设置可以增加火花端的缓冲区大小/字符串长度?

4

1 回答 1

0

我对 Scala 或 Spark 一无所知,但 Google 先生说: Scala 在 JVM 上运行,因此 Java 和 Scala 堆栈可以自由混合以实现完全无缝集成。

那么,您使用的是 Java/MQ JAR 文件???真的???

IBM MQ Labs 用 Ja​​va/MQ 和 JMS/MQ 客户端库做了一些非常非常奇怪的事情。MQ 客户端库最初将使用 4KB 缓冲区来获取消息。如果它未能获取整个消息,则它将缓冲区大小增加到消息的大小并再次执行获取。

早在 2019 年夏天,我就写了很多很多关于此的博客文章。这些是 Java/MQ 相关的文章,还有另一套 JMS/MQ。

尝试将以下 JVM 参数设置为大于您尝试检索的消息大小的值。

IE

java -Dcom.ibm.mq.jmqi.defaultMaxMsgSize=250000 blah blah blah

其中 250000 是您的消息的最大消息大小。你可以使用任何你想要的值。

您应该说出您正在使用的 MQ/Java JAR 文件的版本。您可以尝试不同版本的 MQ/Java JAR 文件,以防您正在使用的文件中存在错误。

于 2021-09-07T22:38:27.720 回答