我想实现一个自定义 s3 分区器类以包含一些 avro 消息字段和一些额外的逻辑来生成输出 s3 路径前缀
该项目在 kotlin 中,这是我的课程:
package co.kafkaProcessor.connect
import io.confluent.connect.storage.errors.PartitionException
import io.confluent.connect.storage.partitioner.TimeBasedPartitioner
import org.apache.kafka.connect.errors.ConnectException
import org.apache.kafka.connect.sink.SinkRecord
import org.joda.time.DateTime
import org.joda.time.DateTimeZone
import org.slf4j.Logger
import org.slf4j.LoggerFactory
class MachineAwareHourlyPartitioner<T> : TimeBasedPartitioner<T>() {
private val log: Logger = LoggerFactory.getLogger(MachineAwareHourlyPartitioner::class.java)
private lateinit var environmentName: String
override fun configure(config: MutableMap<String, Any>?) {
super.configure(config)
environmentName = config!!["environment.prefix"] as String
}
private fun encodedPartitionForTimestamp(sinkRecord: SinkRecord, timestamp: Long?): String? {
// Our custom logic goes here
}
}
起初我尝试通过创建自定义 shadowJar 任务来生成 Jar 文件:
tasks {
withType<ShadowJar> {
mergeServiceFiles()
append("META-INF/spring.handlers")
append("META-INF/spring.schemas")
append("META-INF/spring.tooling")
transform(PropertiesFileTransformer::class.java) {
paths = listOf("META-INF/spring.factories")
mergeStrategy = "append"
}
}
// Custom jars for kafka connect
create<ShadowJar>("kafkaConnectUtilsJar") {
archiveClassifier.set("connect-utils")
include("co/kafkaProcessor/connect/**")
include("co/kafkaProcessor/serializer/**")
from(project.sourceSets.main.get().output)
configurations = listOf(project.configurations.runtimeClasspath.get())
}
}
但这样做jar -tvf filename.jar
表明它只包含我自己的代码,而 kafka 连接失败了java.lang.ClassNotFoundException: io.confluent.connect.storage.partitioner.TimeBasedPartitioner
。我认为您不应该在自定义 jar 中包含 kakfa 连接代码,还因为如果我尝试使用它配置任务,TimeBasedPartitioner
那么该类可用。
然后我尝试通过将自定义 jar 定义更改为:
tasks {
withType<ShadowJar> {
mergeServiceFiles()
append("META-INF/spring.handlers")
append("META-INF/spring.schemas")
append("META-INF/spring.tooling")
transform(PropertiesFileTransformer::class.java) {
paths = listOf("META-INF/spring.factories")
mergeStrategy = "append"
}
}
// Custom jars for kafka connect
create<ShadowJar>("kafkaConnectUtilsJar") {
archiveClassifier.set("connect-utils")
dependencies {
include(dependency("io.confluent:kafka-connect-storage-partitioner:10.2.4"))
}
from(project.sourceSets.main.get().output)
configurations = listOf(project.configurations.runtimeClasspath.get())
}
}
不幸的是,这包括我所有的应用程序代码,但我可以看到分区程序包含在 jar 文件中。
Kafka 连接现在失败并出现此错误:
java.lang.ClassCastException: class co.kafkaProcessor.connect.MachineAwareHourlyPartitioner cannot be cast to class io.confluent.connect.storage.partitioner.Partitioner (co.kafkaProcessor.connect.MachineAwareHourlyPartitioner is in unnamed module of loader 'app'; io.confluent.connect.storage.partitioner.Partitioner is in unnamed module of loader org.apache.kafka.connect.runtime.isolation.PluginClassLoader @63a6dffd)
at io.confluent.connect.s3.S3SinkTask.newPartitioner(S3SinkTask.java:196)
at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:117)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:312)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
更新:我还尝试通过覆盖公共方法来改变函数的覆盖方式encodePartition
,但没有改变。
我也尝试添加这样的测试(希望应该尝试强制转换为 Partitioner`:
val partitioner = MachineAwareHourlyPartitioner<String>()
val implementedPartitioner = partitioner as Partitioner<String>
没有失败