在我的 java web 应用程序中,我正在向kafka发送消息。
我想在发送之前压缩我的消息,所以我在我的生产者属性中设置:
props.put("compression.codec", "2");
据我了解,“2”代表 snappy,但是在发送消息时,我得到:
java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I
at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method)
at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316)
at org.xerial.snappy.SnappyOutputStream.<init>(SnappyOutputStream.java:79)
at org.xerial.snappy.SnappyOutputStream.<init>(SnappyOutputStream.java:66)
at kafka.message.SnappyCompression.<init>(CompressionUtils.scala:61)
at kafka.message.CompressionFactory$.apply(CompressionUtils.scala:82)
at kafka.message.CompressionUtils$.compress(CompressionUtils.scala:109)
at kafka.message.MessageSet$.createByteBuffer(MessageSet.scala:71)
at kafka.message.ByteBufferMessageSet.<init>(ByteBufferMessageSet.scala:44)
at kafka.producer.async.DefaultEventHandler$$anonfun$3.apply(DefaultEventHandler.scala:94)
at kafka.producer.async.DefaultEventHandler$$anonfun$3.apply(DefaultEventHandler.scala:82)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
at scala.collection.Iterator$class.foreach(Iterator.scala:772)
at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
at scala.collection.mutable.HashMap.map(HashMap.scala:45)
at kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:82)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71)
at scala.collection.immutable.Stream.foreach(Stream.scala:526)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41)
为了解决它,我尝试向我的 pom 添加 snappy 依赖项:
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>${snappy-version}</version>
<scope>provided</scope>
</dependency>
并将 jar 添加到 /lib/ext 下的码头服务器,但仍然出现此错误。
如果我在“compression.codec”属性中设置“0”而不是“2”,我不会得到异常,如预期的那样。
我应该怎么做才能使用 snappy 压缩?
这是我的活泼版本(我应该使用不同的版本吗?):1.1.0.1
我在 Ubuntu 12.10 上运行的 jetty 8.1.9 上部署我的应用程序。