好的,可以通过在RedisRateLimiter
类顶部创建自定义速率限制器。不幸的是,该类尚未针对可扩展性进行架构设计,因此该解决方案有些“hacky”,我只能在其中装饰正常RedisRateLimiter
并复制其中的一些代码:
@Primary
@Component
public class ApiKeyRateLimiter implements RateLimiter {
private Log log = LogFactory.getLog(getClass());
// How many requests per second do you want a user to be allowed to do?
private static final int REPLENISH_RATE = 1;
// How much bursting do you want to allow?
private static final int BURST_CAPACITY = 1;
private final RedisRateLimiter rateLimiter;
private final RedisScript<List<Long>> script;
private final ReactiveRedisTemplate<String, String> redisTemplate;
@Autowired
public ApiKeyRateLimiter(
RedisRateLimiter rateLimiter,
@Qualifier(RedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript<List<Long>> script,
ReactiveRedisTemplate<String, String> redisTemplate) {
this.rateLimiter = rateLimiter;
this.script = script;
this.redisTemplate = redisTemplate;
}
// These two methods are the core of the rate limiter
// Their purpose is to come up with a rate limits for given API KEY (or user ID)
// It is up to implementor to return limits based up on the api key passed
private int getBurstCapacity(String routeId, String apiKey) {
return BURST_CAPACITY;
}
private int getReplenishRate(String routeId, String apiKey) {
return REPLENISH_RATE;
}
public Mono<Response> isAllowed(String routeId, String apiKey) {
int replenishRate = getReplenishRate(routeId, apiKey);
int burstCapacity = getBurstCapacity(routeId, apiKey);
try {
List<String> keys = getKeys(apiKey);
// The arguments to the LUA script. time() returns unixtime in seconds.
List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "",
Instant.now().getEpochSecond() + "", "1");
Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
.reduce(new ArrayList<Long>(), (longs, l) -> {
longs.addAll(l);
return longs;
}) .map(results -> {
boolean allowed = results.get(0) == 1L;
Long tokensLeft = results.get(1);
Response response = new Response(allowed, getHeaders(tokensLeft, replenishRate, burstCapacity));
if (log.isDebugEnabled()) {
log.debug("response: " + response);
}
return response;
});
}
catch (Exception e) {
/*
* We don't want a hard dependency on Redis to allow traffic. Make sure to set
* an alert so you know if this is happening too much. Stripe's observed
* failure rate is 0.01%.
*/
log.error("Error determining if user allowed from redis", e);
}
return Mono.just(new Response(true, getHeaders(-1L, replenishRate, burstCapacity)));
}
private static List<String> getKeys(String id) {
String prefix = "request_rate_limiter.{" + id;
String tokenKey = prefix + "}.tokens";
String timestampKey = prefix + "}.timestamp";
return Arrays.asList(tokenKey, timestampKey);
}
private HashMap<String, String> getHeaders(Long tokensLeft, Long replenish, Long burst) {
HashMap<String, String> headers = new HashMap<>();
headers.put(RedisRateLimiter.REMAINING_HEADER, tokensLeft.toString());
headers.put(RedisRateLimiter.REPLENISH_RATE_HEADER, replenish.toString());
headers.put(RedisRateLimiter.BURST_CAPACITY_HEADER, burst.toString());
return headers;
}
@Override
public Map getConfig() {
return rateLimiter.getConfig();
}
@Override
public Class getConfigClass() {
return rateLimiter.getConfigClass();
}
@Override
public Object newConfig() {
return rateLimiter.newConfig();
}
}
因此,路线将如下所示:
@Component
public class Routes {
@Autowired
ApiKeyRateLimiter rateLimiter;
@Autowired
ApiKeyResolver apiKeyResolver;
@Bean
public RouteLocator theRoutes(RouteLocatorBuilder b) {
return b.routes()
.route(p -> p
.path("/unlimited")
.uri("http://httpbin.org:80/anything?route=unlimited")
)
.route(p -> p
.path("/limited")
.filters(f ->
f.requestRateLimiter(r -> {
r.setKeyResolver(apiKeyResolver);
r.setRateLimiter(rateLimiter);
} )
)
.uri("http://httpbin.org:80/anything?route=limited")
)
.build();
}
}
希望它可以为某人节省一个工作日...