3

我在本地机器上部署了分布式配置。

1 个管理员 1 个容器 1 个动物园管理员 1 个 hsql 1 个 redis

我有以下流:

stream create --name activityLog --definition "tail --name=/home/bruno/randomName | log"
stream create --name activityLogCounterTap --definition "tap:stream:activityLog > json-to-tuple | filter --expression=payload.reportable.equals(true) | counter --name=activitylogcount"
stream create --name activityLogEventTypeCounterTap --definition "tap:stream:activityLogCounterTap.filter > field-value-counter --fieldName=status --name=status"
stream create --name activityLogClientTypeCounterTap --definition "tap:stream:activityLogCounterTap.filter > field-value-counter --fieldName=clientType --name=clientType"
stream create --name activityLogIndexCounterTap --definition "tap:stream:activityLogCounterTap.filter > field-value-counter --fieldName=index --name=index"
stream create --name activityLogCustomerCounterTap --definition "tap:stream:activityLogCounterTap.filter > field-value-counter --fieldName=customer --name=customer"
stream create --name activityLogChannelCounterTap --definition "tap:stream:activityLogCounterTap.filter > field-value-counter --fieldName=channel --name=channel"
stream create --name activityLogStrategyOnlineCounterTap --definition "tap:stream:activityLogCounterTap.filter > field-value-counter --fieldName=strategyOnline --name=strategyOnline"
stream create --name activityLogStrategyOfflineCounterTap --definition "tap:stream:activityLogCounterTap.filter > field-value-counter --fieldName=strategyOffline --name=strategyOffline"

我想使用以下部署描述符进行部署:

stream deploy --name activityLog --properties "module.*.count=0"
stream deploy --name activityLogCounterTap --properties "module.*.count=0"
stream deploy --name activityLogEventTypeCounterTap --properties "module.*.count=0"
stream deploy --name activityLogClientTypeCounterTap --properties "module.*.count=0"
stream deploy --name activityLogIndexCounterTap --properties "module.*.count=0"
stream deploy --name activityLogCustomerCounterTap --properties "module.*.count=0"
stream deploy --name activityLogChannelCounterTap --properties "module.*.count=0"
stream deploy --name activityLogStrategyOnlineCounterTap --properties "module.*.count=0"
stream deploy --name activityLogStrategyOfflineCounterTap --properties "module.*.count=0"

消息是 json 对象。

这个想法是为使用来自 apache kafka 的消息的容器设置一个任意编号,但是对于该示例,使用了一个简单的尾部。

我一一部署,我可以看到使用 redis 作为传输的性能在部署第 4 个流后下降了很多。我使用视觉 vm 观察到 inbound.*redis:queue-inbound-channel-adapter1 中存在某种竞争条件。如果我部署更多流(水龙头),它将变得无法使用。

(我尝试在此处添加来自 Visual VM 的线程视图图像,但我需要 10 个声望:s)

它似乎被锁定在这里:

"inbound.activityLog.0-redis:queue-inbound-channel-adapter1" prio=10 tid=0x00007fe3581a1800 nid=0x29c7 waiting on condition [0x00007fe3d28de000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0000000788b48d48> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
    at org.apache.commons.pool2.impl.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:524)
    at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:438)
    at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:361)
    at redis.clients.util.Pool.getResource(Pool.java:40)
    at org.springframework.data.redis.connection.jedis.JedisConnectionFactory.fetchJedisConnector(JedisConnectionFactory.java:90)
    at org.springframework.data.redis.connection.jedis.JedisConnectionFactory.getConnection(JedisConnectionFactory.java:143)
    at org.springframework.data.redis.connection.jedis.JedisConnectionFactory.getConnection(JedisConnectionFactory.java:41)
    at org.springframework.data.redis.core.RedisConnectionUtils.doGetConnection(RedisConnectionUtils.java:128)
    at org.springframework.data.redis.core.RedisConnectionUtils.getConnection(RedisConnectionUtils.java:91)
    at org.springframework.data.redis.core.RedisConnectionUtils.getConnection(RedisConnectionUtils.java:78)
    at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:177)
    at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:152)
    at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:84)
    at org.springframework.data.redis.core.DefaultListOperations.rightPop(DefaultListOperations.java:151)
    at org.springframework.data.redis.core.DefaultBoundListOperations.rightPop(DefaultBoundListOperations.java:92)
    at org.springframework.integration.redis.inbound.RedisQueueMessageDrivenEndpoint.popMessageAndSend(RedisQueueMessageDrivenEndpoint.java:177)
    at org.springframework.integration.redis.inbound.RedisQueueMessageDrivenEndpoint.access$300(RedisQueueMessageDrivenEndpoint.java:50)
    at org.springframework.integration.redis.inbound.RedisQueueMessageDrivenEndpoint$ListenerTask.run(RedisQueueMessageDrivenEndpoint.java:290)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
    at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
    - None

如果部署的流少于 4 个,则永远不会发生等待条件:

"inbound.activityLog.0-redis:queue-inbound-channel-adapter1" prio=10 tid=0x00007ff5101f6800 nid=0x352c runnable [0x00007ff5794a4000]
   java.lang.Thread.State: RUNNABLE
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.read(SocketInputStream.java:152)
    at java.net.SocketInputStream.read(SocketInputStream.java:122)
    at java.net.SocketInputStream.read(SocketInputStream.java:108)
    at redis.clients.util.RedisInputStream.fill(RedisInputStream.java:109)
    at redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:45)
    at redis.clients.jedis.Protocol.process(Protocol.java:120)
    at redis.clients.jedis.Protocol.read(Protocol.java:191)
    at redis.clients.jedis.Connection.getBinaryMultiBulkReply(Connection.java:212)
    at redis.clients.jedis.BinaryJedis.brpop(BinaryJedis.java:2068)
    at org.springframework.data.redis.connection.jedis.JedisConnection.bRPop(JedisConnection.java:1514)
    at org.springframework.data.redis.core.DefaultListOperations$12.inRedis(DefaultListOperations.java:154)
    at org.springframework.data.redis.core.AbstractOperations$ValueDeserializingRedisCallback.doInRedis(AbstractOperations.java:50)
    at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:190)
    at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:152)
    at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:84)
    at org.springframework.data.redis.core.DefaultListOperations.rightPop(DefaultListOperations.java:151)
    at org.springframework.data.redis.core.DefaultBoundListOperations.rightPop(DefaultBoundListOperations.java:92)
    at org.springframework.integration.redis.inbound.RedisQueueMessageDrivenEndpoint.popMessageAndSend(RedisQueueMessageDrivenEndpoint.java:177)
    at org.springframework.integration.redis.inbound.RedisQueueMessageDrivenEndpoint.access$300(RedisQueueMessageDrivenEndpoint.java:50)
    at org.springframework.integration.redis.inbound.RedisQueueMessageDrivenEndpoint$ListenerTask.run(RedisQueueMessageDrivenEndpoint.java:290)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
    at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
    - None
  • 我的第一个问题是为什么 spring-xd 不能使用 redis 作为传输来处理更多的水龙头。我尝试使用rabbitMq,它可以工作。

  • 我的第二个问题是为什么它首先使用redis,因此我部署在内存配置中。再次在内存配置中使用,但这次使用rabbitMQ,我可以跟踪传递的消息,我认为这是不应该发生的。

--已编辑--

因此,关于第一个问题,我发现 org.apache.commons.pool2.GenericObjectPool 中的 maxTotal 参数默认为 8。我进入调试模式并在运行时更改了该值(无限制 = -1),这似乎解决了问题。为了能够将其配置为使用该 hack 进行部署,我需要创建一个扩展(来自文档)并覆盖以下 bean:

<bean id="messageBus" class="org.springframework.xd.dirt.integration.redis.RedisMessageBus">
            <constructor-arg ref="redisConnectionFactory2" />
            <constructor-arg ref="codec"/>
            <property name="defaultBackOffInitialInterval" value="${xd.messagebus.redis.default.backOffInitialInterval}" />
            <property name="defaultBackOffMaxInterval" value="${xd.messagebus.redis.default.backOffMaxInterval}" />
            <property name="defaultBackOffMultiplier" value="${xd.messagebus.redis.default.backOffMultiplier}" />
            <property name="defaultConcurrency" value="${xd.messagebus.redis.default.concurrency}" />
            <property name="defaultMaxAttempts" value="${xd.messagebus.redis.default.maxAttempts}" />
    </bean>


<bean id="counterRepository"
            class="org.springframework.xd.analytics.metrics.redis.RedisCounterRepository">
            <constructor-arg ref="redisConnectionFactory2" />
    </bean>

    <bean id="fieldValueCounterRepository"
            class="org.springframework.xd.analytics.metrics.redis.RedisFieldValueCounterRepository">
            <constructor-arg ref="redisConnectionFactory2" />
    </bean>

    <bean id="gaugeRepository"
            class="org.springframework.xd.analytics.metrics.redis.RedisGaugeRepository">
            <constructor-arg ref="redisConnectionFactory2" />
    </bean>

    <bean id="richGaugeRepository"
            class="org.springframework.xd.analytics.metrics.redis.RedisRichGaugeRepository">
            <constructor-arg ref="redisConnectionFactory2" />
    </bean>

    <bean id="aggregateCounterRepository"
            class="org.springframework.xd.analytics.metrics.redis.RedisAggregateCounterRepository">
            <constructor-arg ref="redisConnectionFactory2" />
    </bean>

<bean id="redisConnectionFactory2" lazy-init="false"
      class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
    <property name="hostName" value="${spring.redis.host}" />
    <property name="port" value="${spring.redis.port}" />
    <property name="password" value="" />
    <property name="poolConfig" ref="jedisPoolConfig"/>
</bean>

<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
    <property name="maxTotal" value="-1"/>
</bean>

我一开始只是想定义一个 redisConnectionFactory bean,它会覆盖默认的,但是它没有那样工作。

我不确定解决方案是在 servers.yml 中配置 maxTotal 以允许增加该值,或者是否存在无法释放池的错误。无论如何,我会打开一个jira,希望这可以帮助那些没有完成我所做的所有工作的人:-)

4

1 回答 1

0

这里的问题是 Spring Boot(Spring XD 基于)默认将 redis 连接池大小设置为 8(参见https://docs.spring.io/spring-boot上的 spring.redis.pool.max-active /docs/current/reference/html/common-application-properties.html)。这就是问题中提到的 GenericObjectPool 大小为 8 的原因(Jedis 使用 GenericObjectPool)。

同样如第二个堆栈跟踪所示,Spring XD 可以从池中获取一个连接并阻塞它,直到它收到一条消息。池中只有 8 个连接,如果在同一个 XD 容器中部署多个流/分接头/队列等,很快就会导致性能非常差。

但是,无需问题中提到的 hack 即可轻松更改默认池大小 - max-active 参数已经在 servers.yml 中配置(因为这仅基于 spring boot 配置),例如

# Redis properties
spring:
  redis:
   port: 6379
   host: 127.0.0.1
   pool:
      maxActive: 100
      maxIdle: 100
#   sentinel:
#     master: mymaster
#     nodes: 127.0.0.1:26379,127.0.0.1:26380,127.0.0.1:26381

这目前在 Spring XD 中没有很好的记录,但默认值将在下一个版本中记录在 servers.yml 中,请参阅https://jira.spring.io/browse/XD-3733

于 2016-02-02T17:00:05.220 回答