Spring Cloud Gateway 源码浅析
1、开门见山
在我们团队中,大多数项目主要使用了 Spring
或 Spring Boot
技术栈,而 Spring Cloud Gateway
网关与现有团队使用的技术栈几乎一致,经过技术选型及对技术成本与技术栈生态的考虑,最终选择使用 Spring Cloud Gateway
作为我们的业务后端网关,统一收敛并统一微服务的流量入口。
2、程序启动入口
使用过 Spring Boot
或者 Spring Cloud Gateway
的人,对如何启动应用程序均不陌生。
1@SpringBootApplication
2public class DatanexusGatewayApplication {
3 public static void main(String[] args) {
4 SpringApplication.run(GatewayApplication.class, args);
5 }
6}
7
8//1、构建 SpringApplication 实例
9public SpringApplication(ResourceLoader resourceLoader, Class<?>... primarySources) {
10 this.resourceLoader = resourceLoader;
11 Assert.notNull(primarySources, "PrimarySources must not be null");
12 this.primarySources = new LinkedHashSet<>(Arrays.asList(primarySources));
13 //1.1、判断classpath是否存在org.springframework.web.reactive.DispatcherHandler类,存在说明是 WebApplicationType.REACTIVE应用类型
14 this.webApplicationType = WebApplicationType.deduceFromClasspath();
15 this.bootstrapRegistryInitializers = new ArrayList<>(
16 getSpringFactoriesInstances(BootstrapRegistryInitializer.class));
17 //1.2、从META-INF中获取应用上下文初始化
18 setInitializers((Collection) getSpringFactoriesInstances(ApplicationContextInitializer.class));
19 //1.3、从META-INF中获应用监听器
20 setListeners((Collection) getSpringFactoriesInstances(ApplicationListener.class));
21 //1.4、根据当前stackTrace信息反射获取当前运行的ApplicationClass
22 this.mainApplicationClass = deduceMainApplicationClass();
23}
24
25//2、调用 SpringApplication run()方法,启动 SpringApplication
26public ConfigurableApplicationContext run(String... args) {
27 long startTime = System.nanoTime();
28 //2.1、创建默认的启动上下文
29 DefaultBootstrapContext bootstrapContext = createBootstrapContext();
30 ConfigurableApplicationContext context = null;
31 configureHeadlessProperty();
32
33 //2.2、通过spring boot自动装配获取Spring应用运行监听器并启动(扫描META-INF/factories注册的类)
34 SpringApplicationRunListeners listeners = getRunListeners(args);
35 listeners.starting(bootstrapContext, this.mainApplicationClass);
36
37 try {
38 //2.3、封装命令行输入的应用参数
39 ApplicationArguments applicationArguments = new DefaultApplicationArguments(args);
40
41 //2.4、根据webApplication类型构建可配置的环境变量实例
42 ConfigurableEnvironment environment = prepareEnvironment(listeners, bootstrapContext, applicationArguments);
43 //2.5、配置可忽略的Bean信息
44 configureIgnoreBeanInfo(environment);
45
46 //2.6、构建BannerPrinter打印自定义的信息
47 Banner printedBanner = printBanner(environment);
48
49 //2.7、根据webApplicationType构建应用上下文,gateway使用的是AnnotationConfigReactiveWebApplicationContext
50 context = createApplicationContext();
51 context.setApplicationStartup(this.applicationStartup);
52
53 //2.8、完善上下文,把启动上下文、环境实例、应用监听器、BannerPrinter与上下文关联
54 prepareContext(bootstrapContext, context, environment, listeners, applicationArguments, printedBanner);
55
56 //2.9、重点、重点、重点:刷新上下文,触发初始化Spring Cloud Gateway等组件资源的初始化,下面重点分析
57 //通过模板设计模式,该接口会调用抽象类AbstractApplicationContext的refresh()方法,再根据webApplicationType类型调用不同的子类,此处是ReactiveWebServerApplicationContext.refresh()方法
58 refreshContext(context);
59
60 //3.0、刷新后操作,目前暂无实现
61 afterRefresh(context, applicationArguments);
62 Duration timeTakenToStartup = Duration.ofNanos(System.nanoTime() - startTime);
63 if (this.logStartupInfo) {
64 new StartupInfoLogger(this.mainApplicationClass).logStarted(getApplicationLog(), timeTakenToStartup);
65 }
66 //3.1、启动应用监听器
67 listeners.started(context, timeTakenToStartup);
68
69 //3.2、执行runner的run逻辑
70 callRunners(context, applicationArguments);
71 }
72 catch (Throwable ex) {
73 //省略其余代码......
74 }
75 //省略其余代码......
76 return context;
77 }
3、初始化路由配置
在 Spring Cloud Gateway
启动时,根据约定大于配置原则,会自动初始化在 GatewayAutoConfiguration
配置的对象实例,其中包括 RouteDefinition
、RouteDefinitionLocator
、 RouteLocator
,RouteLocator
接口主要负责获取用户配置的路由信息 Route
。
CachingRouteLocator
:缓存定位路由器,负责缓存Route
配置,并包装代理CompositeRouteLocator
,同时实现了ApplicationListener
应用监听器接口、ApplicationEventPublisherAware
应用事件发布通知接口。CompositeRouteLocato
:组合路由定位器,负责组合路由定位器。RouteDefinitionRouteLocator
:路由定义路由定位器,负责获取用户配置的所有Route Definition
配置,使得在用户请求时,能根据请求获取匹配的路由。
Spring Cloud Gateway
的作者使用了代理模式设计 RouteLocator
接口,代理关系链:CachingRouteLocator
—> CompositeRouteLocator
—> RouteDefinitionRouteLocator
。
**注意:**在 GatewayAutoConfiguration
里会默认创建路由定义定位器 PropertiesRouteDefinitionLocator
实例、基于内存保存的路由定义仓库 InMemoryRouteDefinitionRepository
实例。
1//代码位置:GatewayAutoConfiguration
2@Bean
3public RouteLocatorBuilder routeLocatorBuilder(ConfigurableApplicationContext context) {
4 return new RouteLocatorBuilder(context);
5}
6
7//默认创建 Properties 配置的路由定义定位器实例
8@Bean
9@ConditionalOnMissingBean
10public PropertiesRouteDefinitionLocator propertiesRouteDefinitionLocator(GatewayProperties properties) {
11 return new PropertiesRouteDefinitionLocator(properties);
12}
13
14//默认创建基于内存保存的路由定义仓库实例
15@Bean
16@ConditionalOnMissingBean(RouteDefinitionRepository.class)
17public InMemoryRouteDefinitionRepository inMemoryRouteDefinitionRepository() {
18 return new InMemoryRouteDefinitionRepository();
19}
20
21//创建具有组合能力的路由定义定位器,此处routeDefinitionLocators 列表元素为InMemoryRouteDefinitionRepository、PropertiesRouteDefinitionLocator
22@Bean
23@Primary
24public RouteDefinitionLocator routeDefinitionLocator(List<RouteDefinitionLocator> routeDefinitionLocators) {
25 return new CompositeRouteDefinitionLocator(Flux.fromIterable(routeDefinitionLocators));
26}
27
28//创建路由定位器实例
29@Bean
30public RouteLocator routeDefinitionRouteLocator(GatewayProperties properties,
31 List<GatewayFilterFactory> gatewayFilters, List<RoutePredicateFactory> predicates,
32 RouteDefinitionLocator routeDefinitionLocator, ConfigurationService configurationService) {
33 //初始化用户配置的路由、断言等配置信息
34 return new RouteDefinitionRouteLocator(routeDefinitionLocator, predicates, gatewayFilters, properties,
35 configurationService);
36}
37
38//创建具有缓存组合能力的路由定位器
39@Bean
40@Primary
41@ConditionalOnMissingBean(name = "cachedCompositeRouteLocator")
42public RouteLocator cachedCompositeRouteLocator(List<RouteLocator> routeLocators) {
43 //
44 return new CachingRouteLocator(new CompositeRouteLocator(Flux.fromIterable(routeLocators)));
45}
根据自动化配置创建完不同的 RouteLocator
与 RouteDefinitionLocator
后 ,那路由的断言配置是何时初始化的?
答案:在创建 CachingRouteLocator
缓存路由定位器实例后,该实例具备监听 RefreshRoutesEvent
事件的能力,用于接收 RefreshRoutesEvent
事件通知及发布应用事件。
除此之外,作者还使用订阅发布模式设计了 RouteRefreshListener
路由刷新监听器类,该监听器主要负责监听上下文的 ApplicationEvent
事件,并根据不同条件发布 RefreshRoutesEvent
刷新路由事件。
注意:在 Spring
上下文启动完成后,会调用 finishRefresh
方法发布 ContextRefreshedEvent
事件,详细请阅读下一章节的 刷新 Spring 上下文 。
1public class RouteRefreshListener implements ApplicationListener<ApplicationEvent> {
2
3 private final ApplicationEventPublisher publisher;
4
5 public RouteRefreshListener(ApplicationEventPublisher publisher) {
6 Assert.notNull(publisher, "publisher may not be null");
7 this.publisher = publisher;
8 }
9
10 @Override
11 public void onApplicationEvent(ApplicationEvent event) {
12 //当前是上下文刷新事件且当前刷新的应用上下文没有 management 命名空间,则发布 RefreshRoutesEvent 事件
13 if (event instanceof ContextRefreshedEvent) {
14 ContextRefreshedEvent refreshedEvent = (ContextRefreshedEvent) event;
15 if (!WebServerApplicationContext.hasServerNamespace(refreshedEvent.getApplicationContext(), "management")) {
16 reset();
17 }
18 }
19 else if (event instanceof RefreshScopeRefreshedEvent || event instanceof InstanceRegisteredEvent) {
20 reset();
21 }
22 //省略部分代码......
23 }
24
25 //根据参数判断是否需发布 RefreshRoutesEvent 事件
26 private void resetIfNeeded(Object value) {
27 if (this.monitor.update(value)) {
28 reset();
29 }
30 }
31
32 //发布 RefreshRoutesEvent 事件,CachingRouteLocator 监听器会监听到事件
33 private void reset() {
34 this.publisher.publishEvent(new RefreshRoutesEvent(this));
35 }
36}
当 RouteRefreshListener
实例发布 RefreshRoutesEvent
事件后,CachingRouteLocator
实例接收事件并实时地从获取最新的路由配置信息。
1public class CachingRouteLocator
2 implements Ordered, RouteLocator, ApplicationListener<RefreshRoutesEvent>, ApplicationEventPublisherAware {
3 //此处省略部分代码.....
4
5 //拉取路由配置信息,并按照 order 优先级排序,此处的 delegate 是 CompositeRouteLocator 实例
6 private Flux<Route> fetch() {
7 return this.delegate.getRoutes().sort(AnnotationAwareOrderComparator.INSTANCE);
8 }
9
10 //监听 RefreshRoutesEvent 事件
11 @Override
12 public void onApplicationEvent(RefreshRoutesEvent event) {
13 try {
14 //拉取路由配置信息
15 fetch().collect(Collectors.toList()).subscribe(
16 list -> Flux.fromIterable(list).materialize().collect(Collectors.toList()).subscribe(signals -> {
17 //发布 RefreshRoutesResultEvent 事件
18 applicationEventPublisher.publishEvent(new RefreshRoutesResultEvent(this));
19 cache.put(CACHE_KEY, signals);
20 }, this::handleRefreshError), this::handleRefreshError);
21 }
22 catch (Throwable e) {
23 handleRefreshError(e);
24 }
25 }
26
27 private void handleRefreshError(Throwable throwable) {
28 //省略部分代码.....
29 applicationEventPublisher.publishEvent(new RefreshRoutesResultEvent(this, throwable));
30 }
31
32 //省略部分代码.....
33}
经过层层代理后,程序最终会调用 RouteDefinitionRouteLocator
类的 getRoutes()
方法获取 Route
路由信息。
而 RouteDefinitionRouteLocator
既是 RouteLocator 类的最下层代理,又是 RouteDefinitionLocator
的最上层代理,其底层调用的是 CompositeRouteDefinitionLocator
、PropertiesRouteDefinitionLocator
类的方法。
它们之间的调用关系链如下:RouteDefinitionRouteLocator
—> CompositeRouteDefinitionLocator
—> PropertiesRouteDefinitionLocator
、InMemoryRouteDefinitionRepository
1//代码位置:RouteDefinitionRouteLocator.getRoutes()
2@Override
3public Flux<Route> getRoutes() {
4 //此处的 routeDefinitionLocator 为 CompositeRouteDefinitionLocator 实例
5 Flux<Route> routes = this.routeDefinitionLocator.getRouteDefinitions().map(this::convertToRoute);
6 //省略部分代码....
7 return routes.map(route -> {
8 if (logger.isDebugEnabled()) {
9 logger.debug("RouteDefinition matched: " + route.getId());
10 }
11 return route;
12 });
13}
14
15private Route convertToRoute(RouteDefinition routeDefinition) {
16 AsyncPredicate<ServerWebExchange> predicate = combinePredicates(routeDefinition);
17 List<GatewayFilter> gatewayFilters = getFilters(routeDefinition);
18
19 return Route.async(routeDefinition).asyncPredicate(predicate).replaceFilters(gatewayFilters).build();
20}
CompositeRouteDefinitionLocator
:主要负责组合RouteDefinitionRouteLocator
实例,并对id
为空的route
设置随机id
,最终代理将会执行RouteDefinitionRouteLocator
实例的getRoutes
方法。PropertiesRouteDefinitionLocator
:负责从GatewayProperties
配置中加载Route Definition
配置信息。InMemoryRouteDefinitionRepository
:主要作用是存储Route Definition
配置信息在内存。
1//获取 spring.cloud.gateway 配置的 Route Definitions
2public class PropertiesRouteDefinitionLocator implements RouteDefinitionLocator {
3
4 private final GatewayProperties properties;
5
6 @Override
7 public Flux<RouteDefinition> getRouteDefinitions() {
8 return Flux.fromIterable(this.properties.getRoutes());
9 }
10}
11
12// Route Definition 内存仓库
13public class InMemoryRouteDefinitionRepository implements RouteDefinitionRepository {
14 private final Map<String, RouteDefinition> routes = synchronizedMap(new LinkedHashMap<String, RouteDefinition>());
15
16 //省略部分代码.....
17 @Override
18 public Flux<RouteDefinition> getRouteDefinitions() {
19 Map<String, RouteDefinition> routesSafeCopy = new LinkedHashMap<>(routes);
20 return Flux.fromIterable(routesSafeCopy.values());
21 }
22}
因此,Spring Cloud Gateway
的路由定义数据实际上来源于配置文件里 spring.cloud.gateway.routes
路由配置、以及保存在内存里路由配置。
4、刷新 Spring 上下文
接下来我们看看 Spring 在更新上下文时,通过模板设计模式在抽象类 AbstractApplicationContext 做了哪些操作:
1//AbstractRefreshableApplicationContext.onRefresh()
2@Override
3public void refresh() throws BeansException, IllegalStateException {
4 synchronized (this.startupShutdownMonitor) {
5 StartupStep contextRefresh = this.applicationStartup.start("spring.context.refresh");
6
7 // 1、刷新上下文预准备操作,包括设置上下文状态、初始化占位符属性源、校验必要属性
8 prepareRefresh();
9
10 // 2、通知子类刷新 bean 工厂类
11 ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();
12
13 // 3、准备用于上下文使用的 bean 工厂
14 prepareBeanFactory(beanFactory);
15
16 try {
17 //4、允许在上下文子类中对 bean 工程进行后处理操作
18 postProcessBeanFactory(beanFactory);
19
20 StartupStep beanPostProcess = this.applicationStartup.start("spring.context.beans.post-process");
21 //5、调用并初始化在上下文中注册为 bean 的工厂处理器
22 invokeBeanFactoryPostProcessors(beanFactory);
23
24 // 6、注册拦截 bean 创建的 bean 处理器
25 registerBeanPostProcessors(beanFactory);
26 beanPostProcess.end();
27
28 //7、为上下文初始化 message 源
29 initMessageSource();
30
31 //8、为上下文初始化事件组播期器
32 initApplicationEventMulticaster();
33
34 //9、在特定上下文子类中初始化其他特定的 bean,spring cloud gateway 的路由、过滤器等在该子类中实现
35 onRefresh();
36
37 //10、检查并注册监听器 bean
38 registerListeners();
39
40 //11、实例化所有剩余的(非延迟初始化)的单例
41 finishBeanFactoryInitialization(beanFactory);
42
43 //12、初始化生命周期处理器、发布 ContextRefreshedEvent 事件
44 finishRefresh();
45 }
46 catch (BeansException ex) {
47 //省略部分代码......
48 throw ex;
49 }
50 finally {
51 //省略部分代码......
52 }
53 }
54}
5、创建 WebServer
1//ReactiveWebServerApplicationContext.onRefresh()
2@Override
3protected void onRefresh() {
4 super.onRefresh();
5 try {
6 createWebServer();
7 }
8 //省略部分代码......
9}
10
11private void createWebServer() {
12 WebServerManager serverManager = this.serverManager;
13 //第一次创建时,serverManager 是为空的
14 if (serverManager == null) {
15 //1、记录webserver创建的步骤
16 StartupStep createWebServer = this.getApplicationStartup().start("spring.boot.webserver.create");
17 //2、获取 WebServerFactory bean 名称,gateway 对应的 bean 名称 nettyReactiveWebServerFactory
18 String webServerFactoryBeanName = getWebServerFactoryBeanName();
19 ReactiveWebServerFactory webServerFactory = getWebServerFactory(webServerFactoryBeanName);
20 createWebServer.tag("factory", webServerFactory.getClass().toString());
21
22 boolean lazyInit = getBeanFactory().getBeanDefinition(webServerFactoryBeanName).isLazyInit();
23 //3、创建 webServerManager 实例,getHttpHandler()方法获取绑定处理 http 请求的 hanlder
24 this.serverManager = new WebServerManager(this, webServerFactory, this::getHttpHandler, lazyInit);
25 //4、注册 webServerGracefulShutdown bean,优美地关闭 webserver
26 getBeanFactory().registerSingleton("webServerGracefulShutdown",
27 new WebServerGracefulShutdownLifecycle(this.serverManager.getWebServer()));
28
29 //5、注册 webServerStartStop bean,优美地启动或停止 webserver
30 getBeanFactory().registerSingleton("webServerStartStop",
31 new WebServerStartStopLifecycle(this.serverManager));
32 createWebServer.end();
33 }
34 //6、用实际的实例,替换任何属性源
35 initPropertySources();
36 }
获取上下文中已注册的 HttpHandler
实例,用于绑定到 WebServer
中
1//ReactiveWebServerApplicationContext.getHttpHandler(),返回 httpHandler 请求处理器
2protected HttpHandler getHttpHandler() {
3 // 这里使用 bean 名称是可以避免层级结构
4 String[] beanNames = getBeanFactory().getBeanNamesForType(HttpHandler.class);
5 if (beanNames.length == 0) {
6 throw new ApplicationContextException(
7 "Unable to start ReactiveWebApplicationContext due to missing HttpHandler bean.");
8 }
9 if (beanNames.length > 1) {
10 throw new ApplicationContextException(
11 "Unable to start ReactiveWebApplicationContext due to multiple HttpHandler beans : "
12 + StringUtils.arrayToCommaDelimitedString(beanNames));
13 }
14 //委派模式,HttpWebHandlerAdapter -> ExceptionHandlingWebHandler -> FilteringWebHandler -> DispatcherHandler
15 return getBeanFactory().getBean(beanNames[0], HttpHandler.class);
16}
ReactiveWebServerFactoryCustomizer:通过住入 ServerProperties 对象,设置 nettyServer 的地址端口、ssl、http协议、压缩方式等基础字段。
NettyWebServerFactoryCustomizer:注入 ServerProperties 对象,设置 nettyServer initialBufferSize、maxChunkSize、maxInitialLineLength、idleTimeout等字段。
DelayedInitializationHttpHandler:延迟初始化 httpHandler 实例,通过装饰设计模式增加 httpHandler。
1// WebServerManager 构造器,在 factory.getWebServer(this.handler) 进一步创建 nettyWebServer
2// factory 变量实际引用的是 NettyReactiveWebServerFactory
3WebServerManager(ReactiveWebServerApplicationContext applicationContext, ReactiveWebServerFactory factory,
4 Supplier<HttpHandler> handlerSupplier, boolean lazyInit) {
5 this.applicationContext = applicationContext;
6 Assert.notNull(factory, "Factory must not be null");
7 this.handler = new DelayedInitializationHttpHandler(handlerSupplier, lazyInit);
8 this.webServer = factory.getWebServer(this.handler);
9}
10
11// NettyReactiveWebServerFactory.getWebServer(HttpHandler httpHandler),该处的 httpHandler 具体是实现为 DelayedInitializationHttpHandler,该 httpHandler 实例化发生在 WebServerManager.start()方法。
12@Override
13public WebServer getWebServer(HttpHandler httpHandler) {
14 //1、创建 httpServer
15 HttpServer httpServer = createHttpServer();
16 //2、创建 httpHandler 适配器对象
17 ReactorHttpHandlerAdapter handlerAdapter = new ReactorHttpHandlerAdapter(httpHandler);
18 //3、创建 nettyWebServer,把 httpServer、httpHandler 统一封装到这个实例中
19 NettyWebServer webServer = createNettyWebServer(httpServer, handlerAdapter, this.lifecycleTimeout,
20 getShutdown());
21 webServer.setRouteProviders(this.routeProviders);
22 return webServer;
23}
24
25// NettyReactiveWebServerFactory.createHttpServer()
26private HttpServer createHttpServer() {
27 HttpServer server = HttpServer.create();
28 if (this.resourceFactory != null) {
29 LoopResources resources = this.resourceFactory.getLoopResources();
30 Assert.notNull(resources, "No LoopResources: is ReactorResourceFactory not initialized yet?");
31 //绑定监听地址,获取 ServerProperties 配置的端口,详细可参考NettyWebServerFactoryCustomizer,此处的 getListenAddress 参数是 Supplier 类型,在使用时才会真正调用方法
32 server = server.runOn(resources).bindAddress(this::getListenAddress);
33 }
34 else {
35 server = server.bindAddress(this::getListenAddress);
36 }
37 if (getSsl() != null && getSsl().isEnabled()) {
38 server = customizeSslConfiguration(server);
39 }
40 if (getCompression() != null && getCompression().getEnabled()) {
41 CompressionCustomizer compressionCustomizer = new CompressionCustomizer(getCompression());
42 server = compressionCustomizer.apply(server);
43 }
44 server = server.protocol(listProtocols()).forwarded(this.useForwardHeaders);
45 return applyCustomizers(server);
46}
6、启动 WebServer
大家还记得吗?在上述的刷新上下文步骤中,有一个逻辑 finishRefresh()
是刷新完上下文后会执行初始化生命周期处理器、发布 ContextRefreshedEvent
事件等。
1// 代码位置:AbstractApplicationContext.finishRefresh()
2protected void finishRefresh() {
3 //1、清理上下文层级的资源缓存,例如来自扫描的 ASM 原数据
4 clearResourceCaches();
5
6 //2、为该上下文初始化生命周期处理器,该处使用的生命周期处理器是 DefaultLifecycleProcessor
7 initLifecycleProcessor();
8
9 //3、将刷新传播到生命周期处理器,重点分析:调用生命周期处理器的 start()方法,
10 getLifecycleProcessor().onRefresh();
11
12 //4、发布上下文刷新事件 ContextRefreshedEvent
13 publishEvent(new ContextRefreshedEvent(this));
14
15 //5、探测 GraalVM native image 环境是否开启,若开启则注册到应用上下文中
16 if (!NativeDetector.inNativeImage()) {
17 LiveBeansView.registerApplicationContext(this);
18 }
19}
接下来我们开始重点分析 getLifecycleProcessor().onRefresh()
方法的实现:
1// 代码位置:DefaultLifecycleProcessor.onRefresh()
2@Override
3public void onRefresh() {
4 startBeans(true);
5 this.running = true;
6}
7
8private void startBeans(boolean autoStartupOnly) {
9 //1、从 beanFactory 工厂中获取当前生命周期实例,格式<beanName,生命周期实例>
10 Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans();
11 Map<Integer, LifecycleGroup> phases = new TreeMap<>();
12
13 //2、遍历并计算生命周期实例阶段值 phase
14 lifecycleBeans.forEach((beanName, bean) -> {
15 if (!autoStartupOnly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) {
16 int phase = getPhase(bean);
17 phases.computeIfAbsent(
18 phase,
19 p -> new LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly)
20 ).add(beanName, bean);
21 }
22 });
23 //3、循环调用生命周期组的 start 方法
24 if (!phases.isEmpty()) {
25 phases.values().forEach(LifecycleGroup::start);
26 }
27}
28
29public void start() {
30 //省略部分代码
31 for (LifecycleGroupMember member : this.members) {
32 doStart(this.lifecycleBeans, member.name, this.autoStartupOnly);
33 }
34}
35
36private void doStart(Map<String, ? extends Lifecycle> lifecycleBeans, String beanName, boolean autoStartupOnly) {
37 Lifecycle bean = lifecycleBeans.remove(beanName);
38 if (bean != null && bean != this) {
39 //1、如果该 lifecycleBean 存在依赖实例,则递归完成该依赖实例的 start()方法
40 String[] dependenciesForBean = getBeanFactory().getDependenciesForBean(beanName);
41 for (String dependency : dependenciesForBean) {
42 doStart(lifecycleBeans, dependency, autoStartupOnly);
43 }
44
45 //2、检测生命周期 bean 实例是否处于 running 状态
46 if (!bean.isRunning() &&
47 (!autoStartupOnly || !(bean instanceof SmartLifecycle) || ((SmartLifecycle) bean).isAutoStartup())) {
48 if (logger.isTraceEnabled()) {
49 logger.trace("Starting bean '" + beanName + "' of type [" + bean.getClass().getName() + "]");
50 }
51 try {
52 //通过模版设计模式,调用不同的子类实现 start() 方法
53 bean.start();
54 }
55 catch (Throwable ex) {
56 throw new ApplicationContextException("Failed to start bean '" + beanName + "'", ex);
57 }
58 if (logger.isDebugEnabled()) {
59 logger.debug("Successfully started bean '" + beanName + "'");
60 }
61 }
62 }
63}
具体启动逻辑交由子类实现:org.springframework.boot.web.reactive.context.WebServerStartStopLifecycle
1// 代码位置:WebServerStartStopLifecycle.start()
2@Override
3public void start() {
4 //调用组合的 WebServerManager 实例启动方法
5 this.weServerManager.start();
6 this.running = true;
7}
8
9// 代码位置:WebServerManager.start()
10void start() {
11 //初始化 httpHandler
12 this.handler.initializeHandler();
13
14 //启动 webServer,最终调用 NettyWebServer.startHttpServer()
15 this.webServer.start();
16
17 //发布 ReactiveWebServerInitializedEvent 事件
18 this.applicationContext
19 .publishEvent(new ReactiveWebServerInitializedEvent(this.webServer, this.applicationContext));
20}
在默认情况下,Spring Cloud Gateway
使用的 webServer
实现是 NettyWebServer
,当然用户也可以自定义选择其他的 Server
,例如:TomcatWebServer
、JettyWebServer
或者 UndertowWebServer
。
//补充类图
熟悉或使用过 Netty
框架的童鞋,基本对 Netty Server
启动的流程不会很陌生,整体流程是创建 Server
、初始化 及绑定 ChannelHandler
、监听在端口上绑定的事件,当发生符合条件的事件时,执行 ChannelHandler
的逻辑。
1//代码位置:NettyWebServer.startHttpServer()
2DisposableServer startHttpServer() {
3 HttpServer server = this.httpServer;
4 if (this.routeProviders.isEmpty()) {
5 //绑定 server 对应的 workHandler
6 server = server.handle(this.handler);
7 }
8 else {
9 server = server.route(this::applyRouteProviders);
10 }
11 if (this.lifecycleTimeout != null) {
12 return server.bindNow(this.lifecycleTimeout);
13 }
14 //将 channel、handler 等绑定到 nettyServer 上,详细实现代码可查看:ServerTransport.bind
15 return server.bindNow();
16}
17
18//代码位置:HttpServer.handle()
19 public final HttpServer handle(
20 BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
21 Objects.requireNonNull(handler, "handler");
22 return childObserve(new HttpServerHandle(handler));
23 }
在完成 NettyWebServer
启动后,NettyServer
将监听 TCP
端口的事件,并调度分发事件到绑定的 ChannelHandler
处理。
7、请求调度分发
现在 Spring Cloud Gateway
已经具备监听端口事件、处理用户请求的能力了,可谓是万事俱备、只欠东风,当东风吹来之际,NettyWebServer
通过 HttpTrafficHandler
类接收、下发数据,它继承 ChannelDuplexHandler
父类,而 ChannelDuplexHandler
类同时实现了ChannelInboundHandler
和 ChannelOutboundHandler
接口,因此具备处理入站事件又处理出站事件的能力。
疑问:出站、入站的语义是什么?
-
从服务端的角度,数据从客户端发送到服务端,称之为入站,当数据处理完成返回给客户端,称之为出站。
-
从客户端的角度,数据从服务端发送给客户端,称之为入站,当数据返回给服务端,称之为出站。
当 HttpTrafficHandler
接收到请求时,它将生成 ConnectionObserver.State.CONFIGURED
事件,而该事件被监听器 ConnectionObserver
消费处理,其中 ConnectionObserver
有多个不同的实现:
ServerTransport.ChildObserver
:连接监听器,作为NettyWebServer
监听器第一入口。ReactorNetty.CompositeConnectionObserver
:组合ConnectionObserver
连接监听器并遍历监听器传递事件。ServerTransportConfig.ServerTransportDoOnConnection
:监听State.CONNECTED
与State.CONFIGURED
状态的事件。HttpServer.HttpServerHandle
:监听HttpServerState.REQUEST_RECEIVED
状态事件,调用HttpHandler
逻辑的入口处。
1@Override
2public void channelRead(ChannelHandlerContext ctx, Object msg) {
3 //省略部分的代码......
4 // read message and track if it was keepAlive
5 if (msg instanceof HttpRequest) {
6 IdleTimeoutHandler.removeIdleTimeoutHandler(ctx.pipeline());
7
8 final HttpRequest request = (HttpRequest) msg;
9 //省略部分的代码......
10
11 HttpServerOperations ops;
12 try {
13 //构建 HttpServerOperations 对象,后面作为request、response 使用
14 ops = new HttpServerOperations(Connection.from(ctx.channel()),
15 listener,request,compress,
16 ConnectionInfo.from(ctx.channel(),request,secure,remoteAddress,
17 forwardedHeaderHandler),cookieDecoder,cookieEncoder,formDecoderProvider,mapHandle,secure);
18 }
19 catch (RuntimeException e) {
20 sendDecodingFailures(e, msg);
21 return;
22 }
23 ops.bind();
24 //第一次请求时会先传递 ConnectionObserver.State.CONFIGURED 事件给 ServerTransport 类处理,随后传递至 ReactorNetty.CompositeConnectionObserver 类,最后会执行下述的 ctx.fireChannelRead
25 listener.onStateChange(ops, ConnectionObserver.State.CONFIGURED);
26 //从当前节点往下传播事件
27 ctx.fireChannelRead(msg);
28 return;
29 }
30 //省略部分的代码......
31 ctx.fireChannelRead(msg);
32}
随后调用 ctx.fireChannelRead(msg)
方法将事件从当前节点事件传播下去,触发 ChannelOperationsHandler
类的channelRead(ChannelHandlerContext ctx,Object msg)
方法,并在该方法中再调用 HttpServerOperations
类的 onInboundNext(ChannelHandlerContext ctx, Object msg)
方法生成 HttpServerState.REQUEST_RECEIVED
事件。
1//代码位置:ChannelOperationsHandler.channelRead(ChannelHandlerContext ctx, Object msg)
2@Override
3final public void channelRead(ChannelHandlerContext ctx, Object msg) {
4 //省略部分代码.....
5 ChannelOperations<?, ?> ops = ChannelOperations.get(ctx.channel());
6 if (ops != null) {
7 //调用 HttpServerOperations 类的 onInboundNext 方法
8 ops.onInboundNext(ctx, msg);
9 }
10 //省略部分代码.....
11}
12
13//代码位置:HttpServerOperations.onInboundNext(ChannelHandlerContext ctx, Object msg)
14@Override
15protected void onInboundNext(ChannelHandlerContext ctx, Object msg) {
16 if (msg instanceof HttpRequest) {
17 //生成 HttpServerState.REQUEST_RECEIVED 事件,调用 ConnectionObserver 的 onStateChange
18 listener().onStateChange(this, HttpServerState.REQUEST_RECEIVED);
19 //省略部分代码.....
20 return;
21 }
22 //省略部分代码.....
23}
根据上述 ConnectionObserver
实现类功能职责判断,REQUEST_RECEIVED
事件会被 HttpServer.HttpServerHandle
消费处理,并调用 ReactorHttpHandlerAdapter.apply()
方法把请求传递给后链路的 Handler
。
1public void onStateChange(Connection connection, State newState) {
2 if (newState == HttpServerState.REQUEST_RECEIVED) {
3 try {
4 //忽略部分代码.....
5 HttpServerOperations ops = (HttpServerOperations) connection;
6 //在此处把请求传递到 ReactorHttpHandlerAdapter 适配器
7 Publisher<Void> publisher = handler.apply(ops, ops);
8 Mono<Void> mono = Mono.deferContextual(ctx -> {
9 ops.currentContext = Context.of(ctx);
10 return Mono.fromDirect(publisher);
11 });
12 if (ops.mapHandle != null) {
13 mono = ops.mapHandle.apply(mono, connection);
14 }
15 mono.subscribe(ops.disposeSubscriber());
16 }
17 //忽略部分代码.....
18 }
19}
到这里为止,NettyWebServer
已经完成请求的整体的度分发,接下来会把请求交给 ReactorHttpHandlerAdapter
适配器处理了。
8、处理器映射
当 NettyWebServer
完成调度分发后, ReactorHttpHandlerAdapter
类把接收到的请求、响应转换为 HttpHandler
能识别的 ReactorServerHttpRequest
、ReactorServerHttpResponse
。
在处理器映射方面上,Spring 作者使用了委派模式设计 HttpHandler
,其关系链:ReactorHttpHandlerAdapter
-> DelayedInitializationHttpHandler
-> HttpWebHandlerAdapter
-> ExceptionHandlingWebHandler
-> FilteringWebHandler
-> DispatcherHandler
。
其中 ReactorHttpHandlerAdapter
、 HttpWebHandlerAdapter
使用适配设计模式对 Request/Response
进行适配 ,ExceptionHandlingWebHandle
、FilteringWebHandler
通过装饰模式对其功能包装增强。
-
ReactorHttpHandlerAdapter
:Reactor Http
处理适配器,负责把HttpHandler
适配到Reactor Netty Channel
,将HttpServerOperations
转换为ReactorServerHttpRequest
、ReactorServerHttpResponse
。 -
DelayedInitializationHttpHandler
:延迟初始化Http
处理器,负责惰性创建LazyHttpHandler
。 -
HttpWebHandlerAdapter
:默认的WebHandler
适配器,负责将WebHandler
适配到HttpHandler
,并转换请求中的Forwarded Header
等字段、以及将ReactorServerHttpRequest
、ReactorServerHttpResponse
转换为ServerWebExchange
对象,传递到下游处理器。 -
WebHandlerDecorator
:WebHandler
装饰器,负责装饰并委托另外的WebHandler
。 -
ExceptionHandlingWebHandle
:负责遍历WebExceptionHandler
拦截处理执行异常的请求 -
FilteringWebHandler
:负责过滤请求的WebHandler
,通过组合DefaultWebFilterChain
类及使用责任链模式串联MetricsWebFilter
、WeightCalculatorWebFilter
串联起来,对请求执行的过滤操作。DefaultWebFilterChain
:WebFilterChain
的默认实现,使用责任链模式负责允许WebFilter
委托给链的下一个WebFilter
。MetricsWebFilter
: 负责拦截由 Spring WebFlux 处理的 HTTP 请求并记录有关执行时间与结果的指标。- WeightCalculatorWebFilter:负责拦截记录权重的计算指标。
-
DispatcherHandler
:中央调度器,调度处理器或控制器处理 http 请求。
创建 HttpWebHandlerAdapter
实例的代码位置位于: WebHttpHandlerBuilder.build
1//代码位置:WebHttpHandlerBuilder.build
2public HttpHandler build() {
3 WebHandler decorated = new FilteringWebHandler(this.webHandler, this.filters);
4 decorated = new ExceptionHandlingWebHandler(decorated, this.exceptionHandlers);
5 HttpWebHandlerAdapter adapted = new HttpWebHandlerAdapter(decorated);
6 //省略部分代码.....
7 return (this.httpHandlerDecorator != null ? this.httpHandlerDecorator.apply(adapted) : adapted);
8}
9
10 //代码位置:HttpHandlerAutoConfiguration.httpHandler
11 @Bean
12 public HttpHandler httpHandler(ObjectProvider<WebFluxProperties> propsProvider) {
13 HttpHandler httpHandler = WebHttpHandlerBuilder.applicationContext(this.applicationContext).build();
14 //省略部分代码.....
15 return httpHandler;
16 }
DefaultWebFilterChain
作为 WebFilterChain
的默认实现,会遍历执行责任链中的全部 WebFilter
,直至为空后才执行 Spring Web Flux
的调度处理器 DispatcherHandler
。
1@Override
2public Mono<Void> filter(ServerWebExchange exchange) {
3 //判断当前责任链里是否还有过滤器,存在过滤器则执行 filter 方法,没有则执行 DispatcherHandler 的 handle
4 return Mono.defer(() ->
5 this.currentFilter != null && this.chain != null ?
6 invokeFilter(this.currentFilter, this.chain, exchange) :
7 this.handler.handle(exchange));
8}
接下来重点分析调度处理器 DispatcherHandler
的处理流程, DispatcherHandler
主要负责调度分发请求,通过组合 HandlerMapping
处理器映射实例、ResultHandler
结果处理器完成 Request
—> Response
的整体流程。
DispatcherHanlder 除了实现 WebHandler
接口外,还实现了 ApplicationContextAware
接口(在实例对象初始化时,设置该对象的应用上下文),分析源码后 DispatcherHanlder
组合的 HandlerMapping
与 ResultHandler
是在设置应用上下文 setApplicationContext(ApplicationContext applicationContext)
初始化,详细源码如下:
1//DispatcherHandler
2@Override
3public void setApplicationContext(ApplicationContext applicationContext) {
4 initStrategies(applicationContext);
5}
6
7protected void initStrategies(ApplicationContext context) {
8 //1、获取指定 HandlerMapping 类型的所有处理器映射 Bean
9 Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
10 context, HandlerMapping.class, true, false);
11
12 ArrayList<HandlerMapping> mappings = new ArrayList<>(mappingBeans.values());
13 //2、根据 HandlerMapping 处理器映射(实现 Ordered接口) 优先级排序
14 AnnotationAwareOrderComparator.sort(mappings);
15 this.handlerMappings = Collections.unmodifiableList(mappings);
16
17 //3、获取指定 HandlerAdapter 类型的所有处理器适配器 Bean
18 Map<String, HandlerAdapter> adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
19 context, HandlerAdapter.class, true, false);
20
21 this.handlerAdapters = new ArrayList<>(adapterBeans.values());
22 //4、根据 HandlerAdapter 处理器适配器(实现 Ordered接口) 优先级排序
23 AnnotationAwareOrderComparator.sort(this.handlerAdapters);
24
25 //5、获取指定 HandlerResultHandler 类型的所有结果处理器 Bean
26 Map<String, HandlerResultHandler> beans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
27 context, HandlerResultHandler.class, true, false);
28
29 this.resultHandlers = new ArrayList<>(beans.values());
30 AnnotationAwareOrderComparator.sort(this.resultHandlers);
31}
HandlerMapping
用于定义请求与处理器对象之间的映射关系,通俗易懂的理解就是负责根据用户请求找到 Handler
处理器(日常工作中写的 Controller
),Spring WebFlux
与 Spring MVC
均提供了不同映射器实现不同的映射方式,例如:配置文件方式,实现接口方式,注解方式等,该接口有以下实现类:
//补充类关系图
-
AbstractHandlerMapping
:抽象处理器映射基类,作者通过模板设计模式,定义处理器映射类的整体流程,把具体实现下沉到子类实现。 -
AbstractHandlerMethodMapping
:实现HandlerMapping
接口的抽象基类,定义请求与HandlerMethod
的映射。 -
AbstractUrlHandlerMapping
:实现HandlerMapping
接口的抽象基类,定义请求与 URL 的映射。 -
RequestMappingInfoHandlerMapping
:AbstractHandlerMethodMapping
抽象子类,RequestMappingInfo
定义请求与handler
处理器方法之间的映射,其内部管理和注册的是 RequestMappingInfo 类型对象。 -
AbstractWebFluxEndpointHandlerMapping
:RequestMappingInfoHandlerMapping
子类,该HandlerMapping
使得Web
端点可通过HTTP
被访问。 -
AdditionalHealthEndpointPathsWebFluxHandlerMapping
:自定义HandlerMapping
实现,允许将健康组映射到额外路径,可简单理解为允许自定义健康检查接口。 -
CloudFoundryWebFluxEndpointHandlerMapping
:AbstractWebFluxEndpointHandlerMapping
子类, 该HandlerMapping
使得 WEB 端点能够在Cloud Foundry
特定URL
上通过HTTP
方式被访问。 -
ControllerEndpointHandlerMapping
: 该实现类的作用是通过Spring WebFlux
暴露被@ControllerEndpoint
与@RestControllerEndpoint
注解修饰的端点。 -
EmptyHandlerMapping
:空处理器映射,如果没有注册其他类型的处理器映射类,则使用默认的空处理器映射。 -
RequestMappingHandlerMapping
:RequestMappingInfoHandlerMapping
的一个拓展实现,该实现类将根据扫描类级别和方法级别上的@RequestMapping
注解创建RequestMappingInfo
实例。 -
RoutePredicateHandlerMapping
:路由断言处理器映射,负责根据用户请求找到对应的路由断言信息。 -
RouterFunctionMapping
:支持路由函数的处理器映射,如果在构造时没有路由函数提供,则该映射将会在应用上下文中探测所有函数路由并按照顺序访问他们。 -
SimpleUrlHandlerMapping
:实现了org.springframework.web.reactive.HandlerMapping
接口的简单URL
处理器映射,负责从URL
映射到请求处理器bean
,映射的方式同时支持映射到bean
实例与bean
的名称,而后者则要求必须是非单实例的处理器bean
。 -
WebFluxEndpointHandlerMapping
:自定义的HandlerMapping
,使得Web
端点可通过HTTP
被访问。
1@Override
2public Mono<Void> handle(ServerWebExchange exchange) {
3 //1、如果处理器映射为空直接返回错误
4 if (this.handlerMappings == null) {
5 return createNotFoundError();
6 }
7 //2、判断是否 cors 预请求并处理
8 if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {
9 return handlePreFlight(exchange);
10 }
11 //3、遍历处理器映射,并获取处理器映射关联的 webHandler 处理请求、返回结果
12 return Flux.fromIterable(this.handlerMappings)
13 .concatMap(mapping -> mapping.getHandler(exchange))
14 //取出找到的第一个 handler,实际上由 RoutePredicateHandlerMapping 返回 FilteringWebHandler
15 .next()
16 //如果为空则返回 not found error
17 .switchIfEmpty(createNotFoundError())
18 //调用 handler
19 .flatMap(handler -> invokeHandler(exchange, handler))
20 .flatMap(result -> handleResult(exchange, result));
21}
在调度分发处理器的 handle
方法中,通过流式遍历处理器映射实例,根据当前 ServerWebExchange
请求获取对应的 handler
处理器,如果 handler
处理器为空则返回 not found
错误提示,否则遍历流元素,调用 invokeHandler
方法返回结果。
下面将从 根据 ServerWebExchange
请求获取 handler
处理器、执行 handler
处理器逻辑、处理 handler
处理器返回的结果等三个步骤进行逐一分析。
- 根据
ServerWebExchange
请求获取handler
处理器
在调度分发器中会调用 HandlerMapping
接口的 getHandler(ServerWebExchange exchange)
方法,该接口有多个抽象子类及实现子类,AbstractHandlerMapping
、AbstractHandlerMethodMapping
、AbstractUrlHandlerMapping
、 RoutePredicateHandlerMapping
等多个实现类,细心的读者观察其子类名称,会发现其使用了模板设计模式,提取抽象或共性的逻辑,提高代码的拓展性与可读性。
1//代码位置 AbstractHandlerMapping.getHandler(ServerWebExchange exchange) ,根据
2@Override
3public Mono<Object> getHandler(ServerWebExchange exchange) {
4 // getHandlerInternal(exchange) 是由各子类实现的,我们重点分析网关路由的实现
5 return getHandlerInternal(exchange).map(handler -> {
6 //检查当前的 handler 是否为 CorsConfiguration 或者 当前请求是 cors 预请求
7 ServerHttpRequest request = exchange.getRequest();
8 if (hasCorsConfigurationSource(handler) || CorsUtils.isPreFlightRequest(request)) {
9 CorsConfiguration config = (this.corsConfigurationSource != null ?
10 this.corsConfigurationSource.getCorsConfiguration(exchange) : null);
11 CorsConfiguration handlerConfig = getCorsConfiguration(handler, exchange);
12 config = (config != null ? config.combine(handlerConfig) : handlerConfig);
13 if (config != null) {
14 config.validateAllowCredentials();
15 }
16 // corsProcessor 处理器处理当前请求
17 if (!this.corsProcessor.process(config, exchange) || CorsUtils.isPreFlightRequest(request)) {
18 return NO_OP_HANDLER;
19 }
20 }
21 return handler;
22 });
23}
在此处我们主要对 RoutePredicateHandlerMapping
路由断言处理器映射展开详细分析,挖掘 Spring Cloud Gateway
是如何根据配置的路由信息转发处理请求的,对于其余的处理器映射,感兴趣的请读者自行研读分析。
1//代码位置:RoutePredicateHandlerMapping.getHandlerInternal(ServerWebExchange exchange)
2@Override
3protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
4 //1、如果设置的 management 端口与服务器端口不同,则不处理在 manager 端口上的请求
5 if (this.managementPortType == DIFFERENT && this.managementPort != null
6 && exchange.getRequest().getURI().getPort() == this.managementPort) {
7 return Mono.empty();
8 }
9 //2、根据请求查找对应的 Route 路由信息,如果没有找到路由信息则返回空,即 Mono.empty()
10 return lookupRoute(exchange)
11 .flatMap((Function<Route, Mono<?>>) r -> {
12 exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
13 //省略部分代码......
14 return Mono.just(webHandler);
15 }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
16 //省略部分代码......
17 })));
18}
19
20//根据请求查找对应的路由
21protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
22 return this.routeLocator.getRoutes()
23 .concatMap(route -> Mono.just(route).filterWhen(r -> {
24 exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
25 //找到路由后获取它的 predicates 配置信息,并判断当前请求是否匹配
26 return r.getPredicate().apply(exchange);
27 })
28 .doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e))
29 .onErrorResume(e -> Mono.empty()))
30 //省略部分代码......
31 .next()
32 .map(route -> {
33 //省略部分代码......
34 validateRoute(route, exchange);
35 return route;
36 });
37}
在 RoutePredicateHandlerMapping.lookupRoute()
方法查找请求对应的路由过程中,需要使用到 RouteLocator
接口查询 CachingRouteLocator
缓存的路由(关于 RouteLocator
接口的初始化及创建过程请阅读过第二节的《初始化路由配置》),并返回路由关联的断言,通过断言请求是否为真,断言为真则说明当前请求有匹配到路由。
在 Spring Cloud Gateway
网关框架里,默认自带 14 种路由断言,当默认的路由断言不满足场景需求,可根据使用场景自行拓展 AbstractRoutePredicateFactory
类按需实现。
BeforeRoutePredicateFactory
: 请求时间是否满足在配置时间之前的断言。AfterRoutePredicateFactory
:请求时间是否满足在配置时间之后的断言。BetweenRoutePredicateFactory
:时间范围路由断言,判断请求时间是否满足在配置时间范围内。CloudFoundryRouteServiceRoutePredicateFactory
:请求是否用于Cloud Foundry
路由服务的断言。CookieRoutePredicateFactory
:Cookie
路由断言,判断请求的Cookie
是否满足配置的Cookie
。HeaderRoutePredicateFactory
:Header
路由断言,判断请求头Header
是否满足配置的Header
。HostRoutePredicateFactory
:Host
路由断言,判断请求Host
是否满足配置的Host
。MethodRoutePredicateFactory
请求方法路由断言,判断请求的Method
是否满足配置的一个或多个。PathRoutePredicateFactory
:Path
路径路由断言,判断请求的路径Path
是否满足配置的一个或多个。QueryRoutePredicateFactory
:Query
参数路由断言,判断请求 Query 参数是否满足配置的参数名称、参数值。ReadBodyRoutePredicateFactory
:RequestBody
路由断言,读取请求中的requestBody
内容,并判断是否满足配置的自定义断言处理规则。RemoteAddrRoutePredicateFactory
:远程地址路由断言,读取请求的主机地址,判断请求主机地址是否在指定的IP
地址段中。WeightRoutePredicateFactory
:Weight
权重路由断言,根据配置的权重路由请求。XForwardedRemoteAddrRoutePredicateFactory
:基于XForwarded
实现的远程地址路由断言,判断请求主机地址是否在指定的IP
地址段中。
以下述路由配置为例,路由配置的断言器为 Path 即 PathRoutePredicateFactory
,它根据请求的路径判断与配置文件中的断言内容是否满足条件。
注意:Spring Cloud Gateway
约定断言器简称为实现类名中的 RoutePredicateFactory
前缀部分,具体可查看工具类 NameUtils
。
1spring:
2 cloud:
3 gateway:
4 routes:
5 - id: route-test
6 uri: http://127.0.0.1:8080
7 predicates:
8 - Path=/user/**
9 filters:
10 - Authentication
因此对于上述的路由配置,实际会调用 PathRoutePredicateFactory
类 apply(Config config)
方法断言匹配与否。
1//代码位置:PathRoutePredicateFactory.apply(Config config)
2@Override
3public Predicate<ServerWebExchange> apply(Config config) {
4 //解析 yaml 配置中 path predicate 的值,并保存到 pathPatterns 变量
5 final ArrayList<PathPattern> pathPatterns = new ArrayList<>();
6 synchronized (this.pathPatternParser) {
7 pathPatternParser.setMatchOptionalTrailingSeparator(config.isMatchTrailingSlash());
8 config.getPatterns().forEach(pattern -> {
9 PathPattern pathPattern = this.pathPatternParser.parse(pattern);
10 pathPatterns.add(pathPattern);
11 });
12 }
13 return new GatewayPredicate() {
14 @Override
15 public boolean test(ServerWebExchange exchange) {
16 //获取当前请求的路径 path
17 PathContainer path = parsePath(exchange.getRequest().getURI().getRawPath());
18 PathPattern match = null;
19 //判断当前请求的路径是否与配置的匹配
20 for (int i = 0; i < pathPatterns.size(); i++) {
21 PathPattern pathPattern = pathPatterns.get(i);
22 if (pathPattern.matches(path)) {
23 match = pathPattern;
24 break;
25 }
26 }
27
28 if (match != null) {
29 traceMatch("Pattern", match.getPatternString(), path, true);
30 //如果存在环境变量,则解析变量值并放置到请求的 attribute 属性
31 PathMatchInfo pathMatchInfo = match.matchAndExtract(path);
32 putUriTemplateVariables(exchange, pathMatchInfo.getUriVariables());
33 exchange.getAttributes().put(GATEWAY_PREDICATE_MATCHED_PATH_ATTR, match.getPatternString());
34 String routeId = (String) exchange.getAttributes().get(GATEWAY_PREDICATE_ROUTE_ATTR);
35 if (routeId != null) {
36 // populated in RoutePredicateHandlerMapping
37 exchange.getAttributes().put(GATEWAY_PREDICATE_MATCHED_PATH_ROUTE_ID_ATTR, routeId);
38 }
39 return true;
40 }
41 //省略部分代码.....
42 }
43 //省略部分代码.....
44 };
45}
根据请求找到对应的 Route
后,设置 Route
信息到 ServerWebExchange
上下文,并返回 FilteringWebHandler
,随后根据该 WebHandler
寻找 SimpleHandlerAdapter
适配器,并执行全局过滤器的拦截逻辑。
SimpleHandlerAdapter
:负责将 DispatcherHandler
分发与 具体 Handler
的执行分离(单一职责原则) ,使得 DispatcherHandler
类能支持并拓展任何类型的 Handler
。
注意:此处的 FilteringWebHandler
实例与处理器映流程中的并不是同一个,该 FilteringWebHandler
类的详细路径:org.springframework.cloud.gateway.handler.FilteringWebHandler
,它关联了多个全局过滤器,而处理器映流程中的是 org.springframework.web.server.handler.FilteringWebHandler
。
9、过滤器拦截
对于 Spring Cloud Gateway
的功能 ,基本 80% 功能是由过滤器实现的,这么说并不为过,因为负载均衡、实际请求转发、请求/响应重写等均是由过滤器完成的,因此熟悉 Spring Cloud Gateway
的过滤器拦截原理尤为重要。
在上一章节中,程序完成查找 Route
后将委托 SimpleHandlerAdapter
适配器执行 FilteringWebHandler
的逻辑,包括获取 ServerWebExchange
上下文关联的 Route
路由、路由绑定的 GatewayFilter
过滤器、系统默认的全局过滤器等。
FilteringWebHandler
:负责将请求委托给 GlobalFilter
全局过滤器、GatewayFilterFactory
网关过滤器链,然后再委托给目标的 WebHandler
。
GatewayFilter
:用于拦截式、链式处理 Web
请求,实现切面、应用无关性的的特性,例如安全性、超时及其他功能。
GatewayFilterChain
:负责构建拦截器链,负责将 WebFilter
委托给责任链中的另外一个 WebFilter
。
注意:此处 FilteringWebHandler
类路径org.springframework.cloud.gateway.handler.FilteringWebHandler
。
1//代码位置:FilteringWebHandler.handle(ServerWebExchange exchange)
2@Override
3public Mono<Void> handle(ServerWebExchange exchange) {
4 //从 ServerWebExchange 上下文查询当前的 Route 路由实例
5 Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
6 //获取路由配置的过滤器
7 List<GatewayFilter> gatewayFilters = route.getFilters();
8
9 List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
10 //组合系统默认的过滤器
11 combined.addAll(gatewayFilters);
12 //根据实现 Ordered 优先级接口排序
13 AnnotationAwareOrderComparator.sort(combined);
14 //省略部分代码......
15
16 //构建默认 WebFilter 过滤器责任链并执行 filter 方法
17 return new DefaultGatewayFilterChain(combined).filter(exchange);
18}
构建完过滤器链后,在 DefaultGatewayFilterChain
遍历每一个过滤器并执行 filter
过滤方法,其中全局默认的过滤器有以下:
RemoveCachedBodyFilter
:在 ForwardRoutingFilter
过滤器执行完后,移除 ServerWebExchange
上下文缓存的 requestBody
并释放资源。
AdaptCachedBodyGlobalFilter
:缓存 requestBody
、ServerHttpRequest
到 ServerWebExchange
上下文。
NettyWriteResponseFilter
:在 ForwardRoutingFilter
过滤器执行完后,负责拦截并重写 ServerWebExchange
上下文关联的 Response Body
,该过滤器的优先级顺序并不是最后一个,实际上是通过 Mono.defer()
方法将逻辑延迟执行。
ForwardPathFilter
:提取 Route
路由的 forward://
前缀协议的 URI
信息,将其设置到 Request PATH
变量 。
GatewayMetricsFilter
:用于提供监控指标的过滤器,通过配置 spring.cloud.gateway.metrics.enabled:true
开启统计监控指标,在 NettyRoutingFilter
过滤器执行完后才会真正执行过滤器的逻辑。
RouteToRequestUrlFilter
:合并原始请求中的 URI
与 Route URI
等资源,组成代理下游 URI
,并将该 URI
配置到 ServerWebExchange
上下文供后链路使用。
NoLoadBalancerClientFilter
:当 Spring Cloud Gateway
未使用负载均衡能力时,将创建该过滤器,注意:缺失 ReactorLoadBalancer
、ReactiveLoadBalancerClientFilter
时才会创建 NoLoadBalancerClientFilter
生效。
WebsocketRoutingFilter
:拦截处理 ws
、wss
协议开头的 WebSocket
请求的过滤器。
NettyRoutingFilter
:核心过滤器,负责将 HTTP
、HTTPS
请求通过转发到实际代理的下游处理的过滤器。
ForwardRoutingFilter
:负责将 forward://
前缀格式的请求转发给 DispatcherHandler
分发器处理
1//代码位置:DefaultGatewayFilterChain.filter(ServerWebExchange exchange)
2@Override
3public Mono<Void> filter(ServerWebExchange exchange) {
4 return Mono.defer(() -> {
5 if (this.index < filters.size()) {
6 GatewayFilter filter = filters.get(this.index);
7 DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1);
8 return filter.filter(exchange, chain);
9 }
10 else {
11 return Mono.empty(); // complete
12 }
13 });
14}
注意: GatewayMetricsFilter
、NettyWriteResponseFilter
、RemoveCachedBodyFilter
三个过滤器真正执行过滤逻辑的顺序是与其优先级顺序相反的,只有等真正调用了代理下游后才能重写 Response Body
内容,其原理分别是通过响应式编程的 **Mono.doOnSuccess()
、Mono.defer()
、Mono.doFinally()
**等延迟执行或回调方法等触发真正的逻辑。
上述的每个过滤器各司其职,并以责任链的方式执行工作,完成请求转发代理、响应重写等功能,如果默认的过滤器不场景,可根据实际的需求自定义 GatewayFilter
过滤器。
10、释放关闭资源
到这里为止,整个流程已经 over
了 ?no
,接下来还有一件重要的事情:释放资源,从接收请求、路由寻址、转发代理下游、再到成功地返回响应后,将回调入站 onInboundNext
方法,判断是否为最后一个带有标识头的 HttpContent
消息 ,若是则调用 terminate
方法发出多个事件包括:响应完成 HttpClientState.RESPONSE_COMPLETED
、断开连接 State.DISCONNECTING
、释放连接 State.RELEASED
等状态事件。
注意:站在 NettyServer
角度,对代理下游发请求是 Inbound
出站操作,返回响应则是 Inbound
入站操作。
1//代码位置:HttpClientOperations.onInboundNext(ChannelHandlerContext ctx, Object msg)
2@Override
3protected void onInboundNext(ChannelHandlerContext ctx, Object msg) {
4 if (msg instanceof HttpResponse) {
5 HttpResponse response = (HttpResponse) msg;
6 //省略部分代码.....
7
8 //判断是否为最后一个带有标识头的 HttpContent 消息
9 if (msg instanceof LastHttpContent) {
10 //省略部分代码.....
11 channel().config().setAutoRead(true);
12 if (markSentBody()) {
13 markPersistent(false);
14 }
15 //执行终止逻辑
16 terminate();
17 return;
18 }
19 //省略部分代码.....
20 super.onInboundNext(ctx, msg);
21}
在入站完成后,将释放出 HttpClientState.RESPONSE_COMPLETED
状态事件。
1//代码位置:HttpClientOperations.afterInboundComplete
2@Override
3protected void afterInboundComplete() {
4 if (redirecting != null) {
5 listener().onUncaughtException(this, redirecting);
6 }
7 else {
8 //触发 RESPONSE_COMPLETED 状态事件
9 listener().onStateChange(this, HttpClientState.RESPONSE_COMPLETED);
10 }
11}
紧接着释放断开连接 State.DISCONNECTING
状态事件,监听器监听到 State.DISCONNECTING
事件后释放连接池的资源 pooledRef.release()
,同时进一步发出 State.RELEASED
状态事件,触发该事件的回调执行。
1//代码位置:DefaultPooledConnectionProvider.onStateChange(Connection connection, State newState)
2@Override
3public void onStateChange(Connection connection, State newState) {
4 if (newState == State.DISCONNECTING) {
5 //省略部分代码......
6 ConnectionObserver obs = channel.attr(OWNER)
7 .getAndSet(ConnectionObserver.emptyListener());
8 pooledRef.release()
9 .subscribe(null,t -> {
10 //省略部分代码......
11 onTerminate.tryEmitEmpty();
12 obs.onStateChange(connection, State.RELEASED);
13 },() -> {
14 //省略部分代码......
15 onTerminate.tryEmitEmpty();
16 obs.onStateChange(connection, State.RELEASED);
17 });
18 return;
19 }
20 owner().onStateChange(connection, newState);
21}
在完成 TCP
连接资源释放后,返回接口的请求数据,至此整个请求—响应的流程已经结束了。
11、最后的总结
架构没有银弹,每一种技术或框架的出现是为了解决某场景的问题,Spring Cloud Gateway
是 Spring Cloud
官方推出的第二代网关框架,取代 Zuul
网关,它的主要作用是收敛并统一微服务的流量入口,并提供路由寻址、协议转换、鉴权认证、熔断、限流等基础功能。
业界中存在很多网关种类,它们都能满足我们的功能需求,选择一种合适的网关对项目或团队而言尤其重要。在我们日常工作中,大多数项目主要以 Spring
或者 Spring Boot
生态为主,因此使用 Spring Cloud Gateway
更为合适,同时学习了解它的源码及流程原理是必不可少的,在熟悉掌握使用该网关的基础上(注意:不建议没有使用经验的基础上,直接阅读框架源码,这不仅会加大学习成本,效果也会适得其反),通过分析 Spring Cloud Gateway
的源码,我们不仅可以了解到 Spring Cloud Gateway
的整体执行流程,更重要的是学习到其中的设计思想及使用到的设计模式等,以便日后在某些场景中可借鉴应用。