我在本地机器上部署了分布式配置。
1 个管理员 1 个容器 1 个动物园管理员 1 个 hsql 1 个 redis
我有以下流:
stream create --name activityLog --definition "tail --name=/home/bruno/randomName | log"
stream create --name activityLogCounterTap --definition "tap:stream:activityLog > json-to-tuple | filter --expression=payload.reportable.equals(true) | counter --name=activitylogcount"
stream create --name activityLogEventTypeCounterTap --definition "tap:stream:activityLogCounterTap.filter > field-value-counter --fieldName=status --name=status"
stream create --name activityLogClientTypeCounterTap --definition "tap:stream:activityLogCounterTap.filter > field-value-counter --fieldName=clientType --name=clientType"
stream create --name activityLogIndexCounterTap --definition "tap:stream:activityLogCounterTap.filter > field-value-counter --fieldName=index --name=index"
stream create --name activityLogCustomerCounterTap --definition "tap:stream:activityLogCounterTap.filter > field-value-counter --fieldName=customer --name=customer"
stream create --name activityLogChannelCounterTap --definition "tap:stream:activityLogCounterTap.filter > field-value-counter --fieldName=channel --name=channel"
stream create --name activityLogStrategyOnlineCounterTap --definition "tap:stream:activityLogCounterTap.filter > field-value-counter --fieldName=strategyOnline --name=strategyOnline"
stream create --name activityLogStrategyOfflineCounterTap --definition "tap:stream:activityLogCounterTap.filter > field-value-counter --fieldName=strategyOffline --name=strategyOffline"
我想使用以下部署描述符进行部署:
stream deploy --name activityLog --properties "module.*.count=0"
stream deploy --name activityLogCounterTap --properties "module.*.count=0"
stream deploy --name activityLogEventTypeCounterTap --properties "module.*.count=0"
stream deploy --name activityLogClientTypeCounterTap --properties "module.*.count=0"
stream deploy --name activityLogIndexCounterTap --properties "module.*.count=0"
stream deploy --name activityLogCustomerCounterTap --properties "module.*.count=0"
stream deploy --name activityLogChannelCounterTap --properties "module.*.count=0"
stream deploy --name activityLogStrategyOnlineCounterTap --properties "module.*.count=0"
stream deploy --name activityLogStrategyOfflineCounterTap --properties "module.*.count=0"
消息是 json 对象。
这个想法是为使用来自 apache kafka 的消息的容器设置一个任意编号,但是对于该示例,使用了一个简单的尾部。
我一一部署,我可以看到使用 redis 作为传输的性能在部署第 4 个流后下降了很多。我使用视觉 vm 观察到 inbound.*redis:queue-inbound-channel-adapter1 中存在某种竞争条件。如果我部署更多流(水龙头),它将变得无法使用。
(我尝试在此处添加来自 Visual VM 的线程视图图像,但我需要 10 个声望:s)
它似乎被锁定在这里:
"inbound.activityLog.0-redis:queue-inbound-channel-adapter1" prio=10 tid=0x00007fe3581a1800 nid=0x29c7 waiting on condition [0x00007fe3d28de000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000788b48d48> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at org.apache.commons.pool2.impl.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:524)
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:438)
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:361)
at redis.clients.util.Pool.getResource(Pool.java:40)
at org.springframework.data.redis.connection.jedis.JedisConnectionFactory.fetchJedisConnector(JedisConnectionFactory.java:90)
at org.springframework.data.redis.connection.jedis.JedisConnectionFactory.getConnection(JedisConnectionFactory.java:143)
at org.springframework.data.redis.connection.jedis.JedisConnectionFactory.getConnection(JedisConnectionFactory.java:41)
at org.springframework.data.redis.core.RedisConnectionUtils.doGetConnection(RedisConnectionUtils.java:128)
at org.springframework.data.redis.core.RedisConnectionUtils.getConnection(RedisConnectionUtils.java:91)
at org.springframework.data.redis.core.RedisConnectionUtils.getConnection(RedisConnectionUtils.java:78)
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:177)
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:152)
at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:84)
at org.springframework.data.redis.core.DefaultListOperations.rightPop(DefaultListOperations.java:151)
at org.springframework.data.redis.core.DefaultBoundListOperations.rightPop(DefaultBoundListOperations.java:92)
at org.springframework.integration.redis.inbound.RedisQueueMessageDrivenEndpoint.popMessageAndSend(RedisQueueMessageDrivenEndpoint.java:177)
at org.springframework.integration.redis.inbound.RedisQueueMessageDrivenEndpoint.access$300(RedisQueueMessageDrivenEndpoint.java:50)
at org.springframework.integration.redis.inbound.RedisQueueMessageDrivenEndpoint$ListenerTask.run(RedisQueueMessageDrivenEndpoint.java:290)
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None
如果部署的流少于 4 个,则永远不会发生等待条件:
"inbound.activityLog.0-redis:queue-inbound-channel-adapter1" prio=10 tid=0x00007ff5101f6800 nid=0x352c runnable [0x00007ff5794a4000]
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:152)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at java.net.SocketInputStream.read(SocketInputStream.java:108)
at redis.clients.util.RedisInputStream.fill(RedisInputStream.java:109)
at redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:45)
at redis.clients.jedis.Protocol.process(Protocol.java:120)
at redis.clients.jedis.Protocol.read(Protocol.java:191)
at redis.clients.jedis.Connection.getBinaryMultiBulkReply(Connection.java:212)
at redis.clients.jedis.BinaryJedis.brpop(BinaryJedis.java:2068)
at org.springframework.data.redis.connection.jedis.JedisConnection.bRPop(JedisConnection.java:1514)
at org.springframework.data.redis.core.DefaultListOperations$12.inRedis(DefaultListOperations.java:154)
at org.springframework.data.redis.core.AbstractOperations$ValueDeserializingRedisCallback.doInRedis(AbstractOperations.java:50)
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:190)
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:152)
at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:84)
at org.springframework.data.redis.core.DefaultListOperations.rightPop(DefaultListOperations.java:151)
at org.springframework.data.redis.core.DefaultBoundListOperations.rightPop(DefaultBoundListOperations.java:92)
at org.springframework.integration.redis.inbound.RedisQueueMessageDrivenEndpoint.popMessageAndSend(RedisQueueMessageDrivenEndpoint.java:177)
at org.springframework.integration.redis.inbound.RedisQueueMessageDrivenEndpoint.access$300(RedisQueueMessageDrivenEndpoint.java:50)
at org.springframework.integration.redis.inbound.RedisQueueMessageDrivenEndpoint$ListenerTask.run(RedisQueueMessageDrivenEndpoint.java:290)
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None
我的第一个问题是为什么 spring-xd 不能使用 redis 作为传输来处理更多的水龙头。我尝试使用rabbitMq,它可以工作。
我的第二个问题是为什么它首先使用redis,因此我部署在内存配置中。再次在内存配置中使用,但这次使用rabbitMQ,我可以跟踪传递的消息,我认为这是不应该发生的。
--已编辑--
因此,关于第一个问题,我发现 org.apache.commons.pool2.GenericObjectPool 中的 maxTotal 参数默认为 8。我进入调试模式并在运行时更改了该值(无限制 = -1),这似乎解决了问题。为了能够将其配置为使用该 hack 进行部署,我需要创建一个扩展(来自文档)并覆盖以下 bean:
<bean id="messageBus" class="org.springframework.xd.dirt.integration.redis.RedisMessageBus">
<constructor-arg ref="redisConnectionFactory2" />
<constructor-arg ref="codec"/>
<property name="defaultBackOffInitialInterval" value="${xd.messagebus.redis.default.backOffInitialInterval}" />
<property name="defaultBackOffMaxInterval" value="${xd.messagebus.redis.default.backOffMaxInterval}" />
<property name="defaultBackOffMultiplier" value="${xd.messagebus.redis.default.backOffMultiplier}" />
<property name="defaultConcurrency" value="${xd.messagebus.redis.default.concurrency}" />
<property name="defaultMaxAttempts" value="${xd.messagebus.redis.default.maxAttempts}" />
</bean>
<bean id="counterRepository"
class="org.springframework.xd.analytics.metrics.redis.RedisCounterRepository">
<constructor-arg ref="redisConnectionFactory2" />
</bean>
<bean id="fieldValueCounterRepository"
class="org.springframework.xd.analytics.metrics.redis.RedisFieldValueCounterRepository">
<constructor-arg ref="redisConnectionFactory2" />
</bean>
<bean id="gaugeRepository"
class="org.springframework.xd.analytics.metrics.redis.RedisGaugeRepository">
<constructor-arg ref="redisConnectionFactory2" />
</bean>
<bean id="richGaugeRepository"
class="org.springframework.xd.analytics.metrics.redis.RedisRichGaugeRepository">
<constructor-arg ref="redisConnectionFactory2" />
</bean>
<bean id="aggregateCounterRepository"
class="org.springframework.xd.analytics.metrics.redis.RedisAggregateCounterRepository">
<constructor-arg ref="redisConnectionFactory2" />
</bean>
<bean id="redisConnectionFactory2" lazy-init="false"
class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
<property name="hostName" value="${spring.redis.host}" />
<property name="port" value="${spring.redis.port}" />
<property name="password" value="" />
<property name="poolConfig" ref="jedisPoolConfig"/>
</bean>
<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
<property name="maxTotal" value="-1"/>
</bean>
我一开始只是想定义一个 redisConnectionFactory bean,它会覆盖默认的,但是它没有那样工作。
我不确定解决方案是在 servers.yml 中配置 maxTotal 以允许增加该值,或者是否存在无法释放池的错误。无论如何,我会打开一个jira,希望这可以帮助那些没有完成我所做的所有工作的人:-)