Failsafe 使用指南
什么是FailSafe
Failsafe是一个轻量级、无额外依赖,用于处理故障情况的Java库,它有着简洁的API来处理日常用例,同时也有便捷的灵活性处理其他日常情景,Failsafe是通过用一个或多个弹性的polices策略器包装可执行的逻辑工作的,这些策略可根据自身需要进行组合使用。
通过 maven 引入依赖,即可使用。
1<dependency>
2 <groupId>net.jodah</groupId>
3 <artifactId>failsafe</artifactId>
4 <version>2.4.0</version>
5</dependency>
策略器
- 重试策略器
- 超时策略器
- Fallback 策略器
- 断路器策略器
1、重试策略器
重试策略表是用来定义当发生重试时应执行哪些行为。
-
尝试次数:默认情况下重试策略器最大尝试次数为3次,当然我们可自定义最大尝试次数。
1retryPolicy.withMaxAttempts(3);
或者我们也可以配置最大重试次
1retryPolicy.withMaxRetries(2);
-
延迟配置:默认情况下,重试策略器在每次尝试中是没有设置延迟的,我们可自定义一个固定延迟。
1retryPolicy.withDelay(Duration.ofSeconds(1)); 2 3//配置一个指数降低的延迟 4retryPolicy.withBackoff(1, 30, ChronoUnit.SECONDS); 5 6//随机的延迟 7retryPolicy.withDelay(1, 10, ChronoUnit.SECONDS);
-
抖动配置:为每个重试配置一个抖动因子
1retryPolicy.withJitter(.1); 2 3//基于时间配置抖动 4retryPolicy.withJitter(Duration.ofMillis(100));
-
持续时间:在重试即将结束前,可以为每个执行设置一个最大持续时间,若需要取消或者中断执行,可参考超时策略器。
1retryPolicy.withMaxDuration(Duration.ofMinutes(5));
-
终止:可通过配置具体的结果、故障或条件来终止重试。
1retryPolicy.abortWhen(true).abortOn(NoRouteToHostException.class).abortIf(result -> result == true)
-
故障处理:像FailurePolicy一样,RetryPolicy 也可配置成仅处理某些结果或者故障。
1retryPolicy.handle(ConnectException.class) .handleResult(null);
-
事件监听器:除了标准的策略器监听器外,RetryPolicy还设置在执行尝试失败或重试之前收到通知。
1retryPolicy.onFailedAttempt(e -> log.error("Connection attempt failed", e.getLastFailure())) 2.onRetry(e -> log.warn("Failure #{}. Retrying.", e.getAttemptCount())); 3 4//当执行失败且超过最大重试次数时会收到通知 5retryPolicy.onRetriesExceeded(e -> log.warn("Failed to connect. Max retries exceeded.")); 6 7//或者当重试被终止时也将收到通知 8retryPolicy.onAbort(e -> log.warn("Connection aborted due to {}.", e.getFailure()));
2、超时策略器
当执行时间过长时,超时策略器将会生效于 TimeoutExceededException 并执行设定的逻辑。
1Timeout<Object> timeout = Timeout.of(Duration.ofSeconds(10));
2
3//如果出现超时情况,我们也可以取消执行或可选择性的中断
4timeout.withCancel(shouldInterrupt);
如果取消是被超时触发的,则依然会通过 TimeoutExceededException 完成执行。
-
事件监听器:超时策略器支持标准的策略监听器,当超时发生时将会收到对应的通知。
1timeout.onFailure(e -> log.error("Connection attempt timed out", e.getFailure())); 2 3//当执行逻辑完成时且未出现超时情况 4timeout.onSuccess(e -> log.info("Execution completed on time"));
3、Fallback策略器
Fallback策略器可以为失败的执行提供可选择性的结果,犹如忽略异常并提供默认的结果
1Fallback<Object> fallback = Fallback.of(defaultResult);
2
3//抛出自定义的异常
4Fallback<Object> fallback = Fallback.ofException(e -> new CustomException(e.getLastFailure()));
5
6//从额外的资源中计算可选择性的结果
7Fallback<Object> fallback = Fallback.of(this::connectToBackup);
8
9//CompletionStage可以作为一个fallback提供
10Fallback<Object> fallback = Fallback.ofStage(this::connectToBackup);
11
12//对于阻塞的运算,可以将Fallback配置为异步运行
13Fallback<Object> fallback = Fallback.ofAsync(this::blockingCall);
- 失败处理:像FailurePolicy一样,Fallbacks可配置为仅处理某些结果或故障
1fallback.handle(ConnectException.class).handleResult(null);
- 事件监听器:Fallback策略器支持事件监听器,并在执行尝试失败时将会发出通知。
1fallback.onFailedAttempt(e -> log.error("Connection failed", e.getLastFailure()))
2
3//当执行失败时通知
4fallback.onFailure(e -> log.error("Failed to connect to backup", e.getFailure()));
5
6//当执行或fallback尝试成功时:
7fallback.onSuccess(e -> log.info("Connection established"));
4、Circuit Breaker策略器
Circuit Breaker 允许临时性禁用执行,以防止系统过载的出现,这里提供两种类型的circuit breaker策略器:基于计数以及基于时间的方式。
- 基于计数的circuit breaker策略器是通过追踪最近的结果达到一定的限制来进行操作的。
- 基于时间的circuit breaker策略器是通过追踪一段时间内任何数量的执行结果来操作的。
1创建一个circuit breaker
2CircuitBreaker<Object> breaker = new CircuitBreaker<>() .handle(ConnectException.class) .withFailureThreshold(3, 10) .withSuccessThreshold(5) .withDelay(Duration.ofMinutes(1));
- 它是如何工作的?
当最近执行的故障结果超过配置的阈值时,Circuit Breaker将会被打开以及进一步对后续的请求以CircuitBreakerOpenException的方式返回结果,一段时间延迟后,breaker将会尝试处于半开状态、尝试接收一定的请求,并根据请求的结果决定breaker接下来是继续打开还是关闭,如果尝试的执行达到成功的法制,breaker将再次关闭并且执行将会被正常处理,否则就会重新打开。
- 灵活配置
Circuit Breaker可以灵活地配置表明什么时候它应该被关闭、被打开或者半开状态。
打开状态:可以将基于计数的breaker配置成在连续多次执行失败时打开
1breaker.withFailureThreshold(5);
2
3//当总共执行5次时,有3次时失败的
4breaker.withFailureThreshold(3, 5);
5
6//在连续多次执行失败时打开
7breaker.withFailureThreshold(3, Duration.ofMinutes(1));
8
9//当在某个时间段内连续发生一定次数的失败
10breaker.withFailureThreshold(3, 5, Duration.ofMinutes(1));
11
12//在某个时间段内发生失败的比例
13breaker.withFailureRateThreshold(20, 5, Duration.ofMinutes(1));
半开状态
在breaker打开后,会默认延迟1分钟,然后过渡到半打开的状态,当然可配置不同的延迟,或者基于执行结果来配置延迟
1breaker.withDelay(Duration.ofSeconds(30));
关闭状态
如果一些尝试执行的结果成功了,breaker可被再次设置为关闭状态,否则会将重新打开。
1breaker.withSuccessThreshold(5);
2
3//配置为基于区间的阈值执行关闭
4breaker.withSuccessThreshold(3, 5);
如果成功的阈值没有被配置,那么失败的阈值将会决定breaker是处于打开状态还是关闭、半打开状态
-
失败处理:像FailurePolicy一样,Circuit Breaker也可自定义配置来处理特定的结果或者故障。
1circuitBreaker .handle(ConnectException.class) .handleResult(null);
-
事件监听器:除了标准的策略监听器外,当Circuit Breaker的状态发生变化时也会发起通知
1circuitBreaker.onOpen(() -> log.info("The circuit breaker was opened")) 2.onClose(() -> log.info("The circuit breaker was closed")) 3.onHalfOpen(() -> log.info("The circuit breaker was half-opened"));
-
指标:Circuit Breaker能为当前状态的breaker提供指标统计,包括执行次数、成功次数、失败次数、成功率及失败率,当breaker处于打开状态时还可以返回剩余的延迟数。
1public long getFailureCount() { 2 return state.get().getStats().getFailureCount(); 3}
-
最佳实践:Circuit Breaker 可以且应该在访问代码中被共享,这能保证在 breaker 打开时,共享及使用同一个 breaker 的所有执行将会被阻塞,直到breaker被再次关闭,例如:如果多个连接或者请求服务于同个外部服务器,正常地它们应该全部都经过同一个breaker处理。
-
独立使用:Circuit Breaker在平常中也可以单独地使用。
1breaker.open(); 2breaker.halfOpen(); 3breaker.close(); 4if (breaker.allowsExecution()) { 5 try { 6 breaker.preExecute(); 7 doSomething(); 8 breaker.recordSuccess(); 9 } catch (Exception e) { 10 breaker.recordFailure(e); 11 } 12}
-
基于时间的方案:基于时间的breaker是以滑动窗口机制来汇总执行结果的,随着时间的推移及新结果的产生,旧结果将会被抛弃。为了保证时间与空间上的效率,结果将会被划分为10个分片,每个分片表示配置故障周期的十分之一,当时间片不处于阈值段内,结果将会被抛弃,这个允许circuit breaker根据最近的结果进行操作,而不需要追踪每个单独的时间。
-
性能:Failsafe内部的circuit breaker同时实现了时间与空间的效率,利用单个循环数据结构记录结果,记录执行以及计算是否达到阈值的时间复杂度为O(1)。
Failsafe 特性
-
定时器:默认情况下,Failsafe使用ForkJoinPool的线程池来实现异步执行,但我们也可以配置一个更详细的ScheduledExecutorService,根据业务情景自定义Scheduler、ExecutorService使用。
1Failsafe.with(policy).with(scheduler).getAsync(this::connect);
-
事件监听器:Failsafe在顶级的API中或不同的策略器中都支持事件监听器。
Failsafe执行监听器:对于所有的执行器,当执行完成时将会Failsafe将会发出通知
1Failsafe.with(retryPolicy, circuitBreaker).onComplete(e -> { 2 if (e.getResult() != null) { 3 log.info("Connected to {}", e.getResult()); 4 } else if (e.getFailure() != null) { 5 log.error("Failed to create connection", e.getFailure()); 6 } 7}).get(this::connect);
另外,当执行被成功完成或者对于任一策略器执行失败时,同样会发起通知。
1Failsafe.with(retryPolicy, circuitBreaker) .onSuccess(e -> log.info("Connected to {}", e.getResult())) .get(this::connect); Failsafe.with(retryPolicy, circuitBreaker) .onFailure(e -> log.error("Failed to create connection", e.getFailure())) .get(this::connect);
-
策略器监听器:在策略器方面,对于策略器执行成功或者失败时,Failsafe也会通知到我们。
1policy.onSuccess(e -> log.info("Connected to {}", e.getResult())).onFailure(e -> log.error("Failed to create connection", e.getFailure()));
还有其他的监听器,像retryPolicies、fallbacks、circuit breakers
- 定义执行成功
许多监听器是基于执行的结果是成功还是失败而生效的,每个策略器都会基于对故障处理的配置来对决定对成功的配置。
当执行提供的结果为成功或者结果失败,但策略器能提供一个成功的结果,则会被视为这个策略成功;当执行的结果为失败或策略器不能提供一个成功的结果则这个策略被当失败。
-
执行上下文:Failsafe提供一个执行上下文,包含执行的相关信息,例如:执行尝试的次数、开始与结束时间以及最近一次的结果,这是非常有用的,对于基于上一次执行尝试中的结果重试。
1Failsafe.with(retryPolicy).run(ctx -> { 2 log.debug("Connection attempt #{}", ctx.getAttemptCount()); 3 connect(); 4});
-
取消执行:Failsafe提供取消及可选择性中断执行的功能,当返回Timeout或者通过异步执行器返回Future时,执行可被取消或中断
1Future<Connection> future = Failsafe.with(retryPolicy).getAsync(this::connect); 2future.cancel(shouldInterrupt);
取消后将会导致任何任何异步执行和超时重试终止,而中断则会导致执行线程的interrupt标识设置为中断。
-
处理取消:Executions的取消可配合使用ExecutionContext.isCancelled()来达到效果。
1Failsafe.with(timeout).getAsync(ctx -> { 2 while (!ctx.isCancelled()) { 3 doWork(); 4 } 5});
-
处理中断:执行的中断将会导致某些阻塞接触并在执行块中抛出InterruptedException,非阻塞的执行可通过定期检查Thread.isInterrupted()来配置中断。
1Failsafe.with(timeout).getAsync(ctx -> { 2while (!Thread.isInterrupted()) { 3 doBlockingWork(); 4} 5});
-
异步API的支持:Failsafe可通过异步返回结果的API代码交互,runAsyncExecution, getAsyncExecution及getStageAsyncExecution方法提供异步执行的引用,用于从内部的异步回调中定期的重试或完成执行。
1Failsafe.with(retryPolicy).getAsyncExecution(execution -> service.connect().whenComplete((result, failure) -> { 2 if (execution.complete(result, failure)) { 3 log.info("Connected"); 4 } else if (!execution.retry()) { 5 log.error("Connection attempts failed", failure); 6 } 7}));
Failsafe也可通过Scheduler接口在第三方调度程序上异步执行及重试。
-
CompletionStage的支持:Failsafe在处理故障配置中可接受一个CompletionStage参数且返回新的CompletableFuture。
1Failsafe.with(retryPolicy).getStageAsync(this::connectAsync).thenApplyAsync(value -> value + "bar") 2 .thenAccept(System.out::println));
-
执行跟踪:除了自动执行重试外,Failsafe还可用于跟踪执行的情况并根据需要手动重试。
1Execution execution = new Execution(retryPolicy); 2while (!execution.isComplete()) { 3try { 4 doSomething(); 5 execution.complete(); 6} catch (ConnectException e) { 7 execution.recordFailure(e); 8 } 9}
执行跟踪对于整合一些有自身重试机制的API是非常有用的。
1Execution execution = new Execution(retryPolicy); 2if (execution.canRetryOn(someFailure)) { 3 service.scheduleRetry(execution.getWaitTime().toNanos(), TimeUnit.MILLISECONDS); 4}
-
Policy SPI:Failsafe提供SPI的机制允许我们自己定义策略器并加到Failsafe中,每个策略器的实现都必须返回PolicyExecutor ,此PolicyExecutor 用于处理同步或异步执行、请求预处理、请求后处理的结果,可根据参考现有的策略器来创建自定义的实现。
Failsafe与Hystrix的区别
Failsafe的出现,目的在于成为一个轻量级、用于处理各种类型执行的通用库,而Hystrix是更多专注于远程请求的执行以及为此提供更多的特性,一下是它们之间的几个区别:
1、Hystrix具有多个外部依赖,包括Archais, Guava, ReactiveX,以及Apache Commons的配置,而Failsafe是零外部依赖。
2、Hystrix主要是circuit breaker及隔离模式的主要实现,而Failsafe除了circuit breaker外,还提供了多种策略器供需要自行组合
3、Hystrix要求执行逻辑必须要放置于HystrixCommand的实现中,Failsafe要求执行逻辑是lambda表达式、Runnable任务或Supplier
4、Hystrix的circuit breaker对时间非常敏感,默认情况下每秒记录执行结果,并基于最后一秒的结果决定开关,Failsafe的circuit breaker对时间并不敏感,不管什么时候发生,它都是基于最后N次的打开/关闭状态决定
5、Failsafe的circuit breaker支持执行超时及可配置成功的阈值,Hystrix仅在处于半开状态时执行一次来确定是否关闭breaker
6、Failsafe支持使用者提供自定义线程池及定时器,但在Hystrix里异步的命令是通过额外管理的线程池中执行的
7、在Failsafe中,异步的执行可通过事件监听器及返回Future获取结果,但在Hystrix需要通过RxJava Observables观察异步执行结果
8、Failsafe的circuit breaker可共享于不同的执行中,如果发生故障将停止对组件的所有执行
9、Failsafe的circuit breaker可以以单独模式的使用与操作