Spring Cloud Hystrix 服务容错保护

Posted by 彭楷淳 on 2021-02-05
Estimated Reading Time 11 Minutes
Words 2.6k In Total
Viewed Times

Hystrix 简介


Spring Cloud Hystrix 是 Spring Cloud Netflix 子项目的核心组件之一,具有服务容错及线程隔离等一系列服务保护功能,本文将对其用法进行详细介绍。

在微服务架构中,服务与服务之间通过远程调用的方式进行通信,一旦某个被调用的服务发生了故障,其依赖服务也会发生故障,此时就会发生故障的蔓延,最终导致系统瘫痪。Hystrix实现了断路器模式,当某个服务发生故障时,通过断路器的监控,给调用方返回一个错误响应,而不是长时间的等待,这样就不会使得调用方由于长时间得不到响应而占用线程,从而防止故障的蔓延。Hystrix 具备服务降级、服务熔断、线程隔离、请求缓存、请求合并及服务监控等强大功能。

创建模块


这里我们创建一个 hystrix-service 模块来演示 hystrix 的常用功能。

引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

相关配置

主要是配置了端口、注册中心地址及 user-service 的调用路径。

1
2
3
4
5
6
7
8
9
10
11
12
13
server:
port: 8401
spring:
application:
name: hystrix-service
eureka:
client:
register-with-eureka: true
fetch-registry: true
service-url:
defaultZone: http://localhost:8001/eureka/
service-url:
user-service: http://user-service

在启动类上添加 @EnableCircuitBreaker 来开启 Hystrix 的断路器功能

1
2
3
4
5
6
7
8
9
10
@EnableCircuitBreaker
@EnableDiscoveryClient
@SpringBootApplication
public class HystrixServiceApplication {

public static void main(String[] args) {
SpringApplication.run(HystrixServiceApplication.class, args);
}

}

创建 Controller

服务降级演示

在 UserHystrixController 中添加用于测试服务降级的接口:

1
2
3
4
@GetMapping("/testFallback/{id}")
public CommonResult testFallback(@PathVariable Long id) {
return userService.getUser(id);
}

在 UserService 中添加调用方法与服务降级方法,方法上需要添加 @HystrixCommand 注解:

1
2
3
4
5
6
7
8
9
@HystrixCommand(fallbackMethod = "getDefaultUser")
public CommonResult getUser(Long id) {
return restTemplate.getForObject(userServiceUrl + "/user/{1}", CommonResult.class, id);
}

public CommonResult getDefaultUser(@PathVariable Long id) {
User defaultUser = new User(-1L, "defaultUser", "123456");
return new CommonResult<>(defaultUser);
}

启动 eureka-server、user-service、hystrix-service 服务,调用接口进行测试:http://localhost:8401/user/testFallback/1,关闭 user-service 服务重新测试该接口,发现已经发生了服务降级。

@HystrixCommand 详解

@HystrixCommand 中的常用参数

  • fallbackMethod:指定服务降级处理方法;
  • ignoreExceptions:忽略某些异常,不发生服务降级;
  • commandKey:命令名称,用于区分不同的命令;
  • groupKey:分组名称,Hystrix会根据不同的分组来统计命令的告警及仪表盘信息;
  • threadPoolKey:线程池名称,用于划分线程池。

设置命令、分组及线程池名称

在 UserHystrixController 中添加测试接口

1
2
3
4
@GetMapping("/testCommand/{id}")
public CommonResult testCommand(@PathVariable Long id) {
return userService.getUserCommand(id);
}

在 UserService 中添加方式实现功能

1
2
3
4
5
6
7
 @HystrixCommand(fallbackMethod = "getDefaultUser",
commandKey = "getUserCommand",
groupKey = "getUserGroup",
threadPoolKey = "getUserThreadPool")
public CommonResult getUserCommand(@PathVariable Long id) {
return restTemplate.getForObject(userServiceUrl + "/user/{1}", CommonResult.class, id);
}

使用 ignoreExceptions 忽略某些异常降级

在 UserHystrixController 中添加测试接口:

1
2
3
4
@GetMapping("/testException/{id}")
public CommonResult testException(@PathVariable Long id) {
return userService.getUserException(id);
}

在 UserService 中添加实现方法,这里忽略了 NullPointerException,当id为1时抛出 IndexOutOfBoundsException,id为2时抛出 NullPointerException

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@HystrixCommand(fallbackMethod = "getDefaultUser2", ignoreExceptions = {NullPointerException.class})
public CommonResult getUserException(Long id) {
if (id == 1) {
throw new IndexOutOfBoundsException();
} else if (id == 2) {
throw new NullPointerException();
}
return restTemplate.getForObject(userServiceUrl + "/user/{1}", CommonResult.class, id);
}

public CommonResult getDefaultUser2(@PathVariable Long id, Throwable e) {
LOGGER.error("getDefaultUser2 id:{},throwable class:{}", id, e.getClass());
User defaultUser = new User(-2L, "defaultUser2", "123456");
return new CommonResult<>(defaultUser);
}

调用接口进行测试:http://localhost:8401/user/tesException/1http://localhost:8401/user/tesException/1

Hytrix 的请求缓存


当系统并发量越来越大时,我们需要使用缓存来优化系统,达到减轻并发请求线程数,提供响应速度的效果。

相关注解

  • @CacheResult:开启缓存,默认所有参数作为缓存的 key,cacheKeyMethod 可以通过返回String类型的方法指定 key
  • @CacheKey:指定缓存的 key,可以指定参数或指定参数中的属性值为缓存key,cacheKeyMethod 还可以通过返回 String 类型的方法指定
  • @CacheRemove:移除缓存,需要指定 commandKey

使用缓存

在 UserHystrixController 中添加使用缓存的测试接口,直接调用三次 getUserCache 方法

1
2
3
4
5
6
7
@GetMapping("/testCache/{id}")
public CommonResult testCache(@PathVariable Long id) {
userService.getUserCache(id);
userService.getUserCache(id);
userService.getUserCache(id);
return new CommonResult("操作成功", 200);
}

在 UserService 中添加具有缓存功能的 getUserCache 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
@CacheResult(cacheKeyMethod = "getCacheKey")
@HystrixCommand(fallbackMethod = "getDefaultUser", commandKey = "getUserCache")
public CommonResult getUserCache(Long id) {
LOGGER.info("getUserCache id:{}", id);
return restTemplate.getForObject(userServiceUrl + "/user/{1}", CommonResult.class, id);
}

/**
* 为缓存生成key的方法
*/
public String getCacheKey(Long id) {
return String.valueOf(id);
}

调用接口测试 http://localhost:8401/user/testCache/1,这个接口中调用了三次getUserCache方法,但是只打印了一次日志,说明有两次走的是缓存

移除缓存

在 UserHystrixController 中添加移除缓存的测试接口,调用一次 removeCache 方法:

1
2
3
4
5
6
7
@GetMapping("/testRemoveCache/{id}")
public CommonResult testRemoveCache(@PathVariable Long id) {
userService.getUserCache(id);
userService.removeCache(id);
userService.getUserCache(id);
return new CommonResult("操作成功", 200);
}

在 UserService 中添加具有移除缓存功能的 removeCache 方法:

1
2
3
4
5
6
@CacheRemove(commandKey = "getUserCache", cacheKeyMethod = "getCacheKey")
@HystrixCommand
public CommonResult removeCache(Long id) {
LOGGER.info("removeCache id:{}", id);
return restTemplate.postForObject(userServiceUrl + "/user/delete/{1}", null, CommonResult.class, id);
}

调用接口测试 http://localhost:8401/user/testRemoveCache/1,可以发现有两次查询都走的是接口

缓存使用过程中的问题

在缓存使用过程中,我们需要在每次使用缓存的请求前后对 HystrixRequestContext 进行初始化和关闭,否则会出现如下异常

1
2
3
4
java.lang.IllegalStateException: Request caching is not available. Maybe you need to initialize the HystrixRequestContext?
at com.netflix.hystrix.HystrixRequestCache.get(HystrixRequestCache.java:104) ~[hystrix-core-1.5.18.jar:1.5.18]
at com.netflix.hystrix.AbstractCommand$7.call(AbstractCommand.java:478) ~[hystrix-core-1.5.18.jar:1.5.18]
at com.netflix.hystrix.AbstractCommand$7.call(AbstractCommand.java:454) ~[hystrix-core-1.5.18.jar:1.5.18]

这里我们通过使用过滤器,在每个请求前后初始化和关闭 HystrixRequestContext 来解决该问题

1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
@WebFilter(urlPatterns = "/*",asyncSupported = true)
public class HystrixRequestContextFilter implements Filter {
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
filterChain.doFilter(servletRequest, servletResponse);
} finally {
context.close();
}
}
}

请求合并


微服务系统中的服务间通信,需要通过远程调用来实现,随着调用次数越来越多,占用线程资源也会越来越多。Hystrix中提供了@HystrixCollapser用于合并请求,从而达到减少通信消耗及线程数量的效果。

@HystrixCollapser 的常用属性

  • batchMethod:用于设置请求合并的方法;
  • collapserProperties:请求合并属性,用于控制实例属性,有很多;
  • timerDelayInMilliseconds:collapserProperties中的属性,用于控制每隔多少时间合并一次请求;

功能演示

在 UserHystrixController 中添加 testCollapser 方法,这里我们先进行两次服务调用,再间隔 200ms 以后进行第三次服务调用:

1
2
3
4
5
6
7
8
9
10
11
@GetMapping("/testCollapser")
public CommonResult testCollapser() throws ExecutionException, InterruptedException {
Future<User> future1 = userService.getUserFuture(1L);
Future<User> future2 = userService.getUserFuture(2L);
future1.get();
future2.get();
ThreadUtil.safeSleep(200);
Future<User> future3 = userService.getUserFuture(3L);
future3.get();
return new CommonResult("操作成功", 200);
}

使用 @HystrixCollapser 实现请求合并,所有对 getUserFuture 的多次调用都会转化为对 getUserByIds 的单次调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@HystrixCollapser(batchMethod = "getUserByIds",collapserProperties = {
@HystrixProperty(name = "timerDelayInMilliseconds", value = "100")
})
public Future<User> getUserFuture(Long id) {
return new AsyncResult<User>(){
@Override
public User invoke() {
CommonResult commonResult = restTemplate.getForObject(userServiceUrl + "/user/{1}", CommonResult.class, id);
Map data = (Map) commonResult.getData();
User user = BeanUtil.mapToBean(data,User.class,true);
LOGGER.info("getUserById username:{}", user.getUsername());
return user;
}
};
}

@HystrixCommand
public List<User> getUserByIds(List<Long> ids) {
LOGGER.info("getUserByIds:{}", ids);
CommonResult commonResult = restTemplate.getForObject(userServiceUrl + "/user/getUserByIds?ids={1}", CommonResult.class, CollUtil.join(ids,","));
return (List<User>) commonResult.getData();
}

访问接口测试 http://localhost:8401/user/testCollapser,由于我们设置了 100ms 进行一次请求合并,前两次被合并,最后一次自己单独合并了。

Hystrix的常用配置


全局配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
hystrix:
command: #用于控制HystrixCommand的行为
default:
execution:
isolation:
strategy: THREAD #控制HystrixCommand的隔离策略,THREAD->线程池隔离策略(默认),SEMAPHORE->信号量隔离策略
thread:
timeoutInMilliseconds: 1000 #配置HystrixCommand执行的超时时间,执行超过该时间会进行服务降级处理
interruptOnTimeout: true #配置HystrixCommand执行超时的时候是否要中断
interruptOnCancel: true #配置HystrixCommand执行被取消的时候是否要中断
timeout:
enabled: true #配置HystrixCommand的执行是否启用超时时间
semaphore:
maxConcurrentRequests: 10 #当使用信号量隔离策略时,用来控制并发量的大小,超过该并发量的请求会被拒绝
fallback:
enabled: true #用于控制是否启用服务降级
circuitBreaker: #用于控制HystrixCircuitBreaker的行为
enabled: true #用于控制断路器是否跟踪健康状况以及熔断请求
requestVolumeThreshold: 20 #超过该请求数的请求会被拒绝
forceOpen: false #强制打开断路器,拒绝所有请求
forceClosed: false #强制关闭断路器,接收所有请求
requestCache:
enabled: true #用于控制是否开启请求缓存
collapser: #用于控制HystrixCollapser的执行行为
default:
maxRequestsInBatch: 100 #控制一次合并请求合并的最大请求数
timerDelayinMilliseconds: 10 #控制多少毫秒内的请求会被合并成一个
requestCache:
enabled: true #控制合并请求是否开启缓存
threadpool: #用于控制HystrixCommand执行所在线程池的行为
default:
coreSize: 10 #线程池的核心线程数
maximumSize: 10 #线程池的最大线程数,超过该线程数的请求会被拒绝
maxQueueSize: -1 #用于设置线程池的最大队列大小,-1采用SynchronousQueue,其他正数采用LinkedBlockingQueue
queueSizeRejectionThreshold: 5 #用于设置线程池队列的拒绝阀值,由于LinkedBlockingQueue不能动态改版大小,使用时需要用该参数来控制线程数

实例配置

实例配置只需要将全局配置中的 default 换成与之对应的 key 即可。

1
2
3
4
5
6
7
8
9
10
11
12
hystrix:
command:
HystrixComandKey: #将default换成HystrixComrnandKey
execution:
isolation:
strategy: THREAD
collapser:
HystrixCollapserKey: #将default换成HystrixCollapserKey
maxRequestsInBatch: 100
threadpool:
HystrixThreadPoolKey: #将default换成HystrixThreadPoolKey
coreSize: 10
  • HystrixComandKey 对应 @HystrixCommand 中的 commandKey 属性;
  • HystrixCollapserKey 对应 @HystrixCollapser 注解中的 collapserKey 属性;
  • HystrixThreadPoolKey 对应 @HystrixCommand 中的 threadPoolKey 属性。

更多干货请移步:https://antoniopeng.com


If you like this blog or find it useful for you, you are welcome to comment on it. You are also welcome to share this blog, so that more people can participate in it. If the images used in the blog infringe your copyright, please contact the author to delete them. Thank you !