Spring Cloud Gateway 源码浅析


1、开门见山

在我们团队中,大多数项目主要使用了 SpringSpring Boot 技术栈,而 Spring Cloud Gateway 网关与现有团队使用的技术栈几乎一致,经过技术选型及对技术成本与技术栈生态的考虑,最终选择使用 Spring Cloud Gateway 作为我们的业务后端网关,统一收敛并统一微服务的流量入口。

2、程序启动入口

使用过 Spring Boot 或者 Spring Cloud Gateway 的人,对如何启动应用程序均不陌生。

 1@SpringBootApplication
 2public class DatanexusGatewayApplication {
 3    public static void main(String[] args) {
 4        SpringApplication.run(GatewayApplication.class, args);
 5    }
 6}
 7
 8//1、构建 SpringApplication 实例
 9public SpringApplication(ResourceLoader resourceLoader, Class<?>... primarySources) {
10    this.resourceLoader = resourceLoader;
11    Assert.notNull(primarySources, "PrimarySources must not be null");
12    this.primarySources = new LinkedHashSet<>(Arrays.asList(primarySources));
13    //1.1、判断classpath是否存在org.springframework.web.reactive.DispatcherHandler类,存在说明是 		  WebApplicationType.REACTIVE应用类型
14    this.webApplicationType = WebApplicationType.deduceFromClasspath();
15    this.bootstrapRegistryInitializers = new ArrayList<>(
16          getSpringFactoriesInstances(BootstrapRegistryInitializer.class));
17    //1.2、从META-INF中获取应用上下文初始化
18    setInitializers((Collection) getSpringFactoriesInstances(ApplicationContextInitializer.class));
19    //1.3、从META-INF中获应用监听器
20    setListeners((Collection) getSpringFactoriesInstances(ApplicationListener.class));
21    //1.4、根据当前stackTrace信息反射获取当前运行的ApplicationClass
22    this.mainApplicationClass = deduceMainApplicationClass();
23}
24
25//2、调用 SpringApplication run()方法,启动 SpringApplication
26public ConfigurableApplicationContext run(String... args) {
27		long startTime = System.nanoTime();
28  	//2.1、创建默认的启动上下文
29		DefaultBootstrapContext bootstrapContext = createBootstrapContext();
30		ConfigurableApplicationContext context = null;
31		configureHeadlessProperty();
32  
33  	//2.2、通过spring boot自动装配获取Spring应用运行监听器并启动(扫描META-INF/factories注册的类)
34		SpringApplicationRunListeners listeners = getRunListeners(args);
35		listeners.starting(bootstrapContext, this.mainApplicationClass);
36  	
37		try {
38      //2.3、封装命令行输入的应用参数
39			ApplicationArguments applicationArguments = new DefaultApplicationArguments(args);
40      
41      //2.4、根据webApplication类型构建可配置的环境变量实例
42			ConfigurableEnvironment environment = prepareEnvironment(listeners, bootstrapContext, applicationArguments);
43      //2.5、配置可忽略的Bean信息
44			configureIgnoreBeanInfo(environment);
45      
46      //2.6、构建BannerPrinter打印自定义的信息
47			Banner printedBanner = printBanner(environment);
48      
49      //2.7、根据webApplicationType构建应用上下文,gateway使用的是AnnotationConfigReactiveWebApplicationContext
50			context = createApplicationContext();
51			context.setApplicationStartup(this.applicationStartup);
52      
53      //2.8、完善上下文,把启动上下文、环境实例、应用监听器、BannerPrinter与上下文关联
54			prepareContext(bootstrapContext, context, environment, listeners, applicationArguments, printedBanner);
55      
56      //2.9、重点、重点、重点:刷新上下文,触发初始化Spring Cloud Gateway等组件资源的初始化,下面重点分析
57      //通过模板设计模式,该接口会调用抽象类AbstractApplicationContext的refresh()方法,再根据webApplicationType类型调用不同的子类,此处是ReactiveWebServerApplicationContext.refresh()方法
58			refreshContext(context);
59      
60      //3.0、刷新后操作,目前暂无实现
61			afterRefresh(context, applicationArguments);
62			Duration timeTakenToStartup = Duration.ofNanos(System.nanoTime() - startTime);
63			if (this.logStartupInfo) {
64				new StartupInfoLogger(this.mainApplicationClass).logStarted(getApplicationLog(), timeTakenToStartup);
65			}
66      //3.1、启动应用监听器
67			listeners.started(context, timeTakenToStartup);
68      
69      //3.2、执行runner的run逻辑
70			callRunners(context, applicationArguments);
71		}
72		catch (Throwable ex) {
73			//省略其余代码......
74		}
75			//省略其余代码......
76		return context;
77	}

3、初始化路由配置

Spring Cloud Gateway 启动时,根据约定大于配置原则,会自动初始化在 GatewayAutoConfiguration 配置的对象实例,其中包括 RouteDefinitionRouteDefinitionLocatorRouteLocatorRouteLocator 接口主要负责获取用户配置的路由信息 Route

  • CachingRouteLocator :缓存定位路由器,负责缓存 Route 配置,并包装代理 CompositeRouteLocator ,同时实现了 ApplicationListener 应用监听器接口、ApplicationEventPublisherAware 应用事件发布通知接口。
  • CompositeRouteLocato:组合路由定位器,负责组合路由定位器。
  • RouteDefinitionRouteLocator :路由定义路由定位器,负责获取用户配置的所有 Route Definition 配置,使得在用户请求时,能根据请求获取匹配的路由。

Spring Cloud Gateway 的作者使用了代理模式设计 RouteLocator 接口,代理关系链:CachingRouteLocator —> CompositeRouteLocator —> RouteDefinitionRouteLocator

**注意:**在 GatewayAutoConfiguration 里会默认创建路由定义定位器 PropertiesRouteDefinitionLocator 实例、基于内存保存的路由定义仓库 InMemoryRouteDefinitionRepository 实例。

 1//代码位置:GatewayAutoConfiguration
 2@Bean
 3public RouteLocatorBuilder routeLocatorBuilder(ConfigurableApplicationContext context) {
 4  return new RouteLocatorBuilder(context);
 5}
 6
 7//默认创建 Properties 配置的路由定义定位器实例
 8@Bean
 9@ConditionalOnMissingBean
10public PropertiesRouteDefinitionLocator propertiesRouteDefinitionLocator(GatewayProperties properties) {
11  return new PropertiesRouteDefinitionLocator(properties);
12}
13
14//默认创建基于内存保存的路由定义仓库实例
15@Bean
16@ConditionalOnMissingBean(RouteDefinitionRepository.class)
17public InMemoryRouteDefinitionRepository inMemoryRouteDefinitionRepository() {
18  return new InMemoryRouteDefinitionRepository();
19}
20
21//创建具有组合能力的路由定义定位器,此处routeDefinitionLocators 列表元素为InMemoryRouteDefinitionRepository、PropertiesRouteDefinitionLocator
22@Bean
23@Primary
24public RouteDefinitionLocator routeDefinitionLocator(List<RouteDefinitionLocator> routeDefinitionLocators) {
25  return new CompositeRouteDefinitionLocator(Flux.fromIterable(routeDefinitionLocators));
26}
27
28//创建路由定位器实例
29@Bean
30public RouteLocator routeDefinitionRouteLocator(GatewayProperties properties,
31      List<GatewayFilterFactory> gatewayFilters, List<RoutePredicateFactory> predicates,
32      RouteDefinitionLocator routeDefinitionLocator, ConfigurationService configurationService) {
33   //初始化用户配置的路由、断言等配置信息
34   return new RouteDefinitionRouteLocator(routeDefinitionLocator, predicates, gatewayFilters, properties,
35         configurationService);
36}
37
38//创建具有缓存组合能力的路由定位器
39@Bean
40@Primary
41@ConditionalOnMissingBean(name = "cachedCompositeRouteLocator")
42public RouteLocator cachedCompositeRouteLocator(List<RouteLocator> routeLocators) {
43   //
44   return new CachingRouteLocator(new CompositeRouteLocator(Flux.fromIterable(routeLocators)));
45}

根据自动化配置创建完不同的 RouteLocatorRouteDefinitionLocator 后 ,那路由的断言配置是何时初始化的?

答案:在创建 CachingRouteLocator 缓存路由定位器实例后,该实例具备监听 RefreshRoutesEvent 事件的能力,用于接收 RefreshRoutesEvent 事件通知及发布应用事件。

除此之外,作者还使用订阅发布模式设计了 RouteRefreshListener 路由刷新监听器类,该监听器主要负责监听上下文的 ApplicationEvent 事件,并根据不同条件发布 RefreshRoutesEvent 刷新路由事件。

注意:在 Spring 上下文启动完成后,会调用 finishRefresh 方法发布 ContextRefreshedEvent 事件,详细请阅读下一章节的 刷新 Spring 上下文

 1public class RouteRefreshListener implements ApplicationListener<ApplicationEvent> {
 2
 3  private final ApplicationEventPublisher publisher;
 4
 5  public RouteRefreshListener(ApplicationEventPublisher publisher) {
 6    Assert.notNull(publisher, "publisher may not be null");
 7    this.publisher = publisher;
 8  }
 9
10  @Override
11  public void onApplicationEvent(ApplicationEvent event) {
12    //当前是上下文刷新事件且当前刷新的应用上下文没有 management 命名空间,则发布 RefreshRoutesEvent 事件
13    if (event instanceof ContextRefreshedEvent) {
14      ContextRefreshedEvent refreshedEvent = (ContextRefreshedEvent) event;
15      if (!WebServerApplicationContext.hasServerNamespace(refreshedEvent.getApplicationContext(), "management")) {
16        reset();
17      }
18    }
19    else if (event instanceof RefreshScopeRefreshedEvent || event instanceof InstanceRegisteredEvent) {
20      reset();
21    }
22    //省略部分代码......
23  }
24	
25  //根据参数判断是否需发布 RefreshRoutesEvent 事件
26  private void resetIfNeeded(Object value) {
27    if (this.monitor.update(value)) {
28      reset();
29    }
30  }
31
32  //发布 RefreshRoutesEvent 事件,CachingRouteLocator 监听器会监听到事件
33  private void reset() {
34    this.publisher.publishEvent(new RefreshRoutesEvent(this));
35  }
36}

RouteRefreshListener 实例发布 RefreshRoutesEvent 事件后,CachingRouteLocator 实例接收事件并实时地从获取最新的路由配置信息。

 1public class CachingRouteLocator
 2      implements Ordered, RouteLocator, ApplicationListener<RefreshRoutesEvent>, ApplicationEventPublisherAware {
 3  //此处省略部分代码.....
 4
 5  //拉取路由配置信息,并按照 order 优先级排序,此处的 delegate 是 CompositeRouteLocator 实例
 6   private Flux<Route> fetch() {
 7      return this.delegate.getRoutes().sort(AnnotationAwareOrderComparator.INSTANCE);
 8   }
 9
10  //监听 RefreshRoutesEvent 事件
11   @Override 
12   public void onApplicationEvent(RefreshRoutesEvent event) {
13      try {
14         //拉取路由配置信息
15         fetch().collect(Collectors.toList()).subscribe(
16               list -> Flux.fromIterable(list).materialize().collect(Collectors.toList()).subscribe(signals -> {
17                  //发布 RefreshRoutesResultEvent 事件
18                  applicationEventPublisher.publishEvent(new RefreshRoutesResultEvent(this));
19                  cache.put(CACHE_KEY, signals);
20               }, this::handleRefreshError), this::handleRefreshError);
21      }
22      catch (Throwable e) {
23         handleRefreshError(e);
24      }
25   }
26
27   private void handleRefreshError(Throwable throwable) {
28      //省略部分代码.....
29      applicationEventPublisher.publishEvent(new RefreshRoutesResultEvent(this, throwable));
30   }
31  
32   //省略部分代码.....
33}

经过层层代理后,程序最终会调用 RouteDefinitionRouteLocator 类的 getRoutes() 方法获取 Route 路由信息。

RouteDefinitionRouteLocator 既是 RouteLocator 类的最下层代理又是 RouteDefinitionLocator 的最上层代理,其底层调用的是 CompositeRouteDefinitionLocatorPropertiesRouteDefinitionLocator 类的方法。

它们之间的调用关系链如下:RouteDefinitionRouteLocator —> CompositeRouteDefinitionLocator —> PropertiesRouteDefinitionLocatorInMemoryRouteDefinitionRepository

 1//代码位置:RouteDefinitionRouteLocator.getRoutes()
 2@Override
 3public Flux<Route> getRoutes() {
 4  //此处的 routeDefinitionLocator 为 CompositeRouteDefinitionLocator 实例
 5   Flux<Route> routes = 	this.routeDefinitionLocator.getRouteDefinitions().map(this::convertToRoute);
 6   //省略部分代码....  
 7   return routes.map(route -> {
 8      if (logger.isDebugEnabled()) {
 9         logger.debug("RouteDefinition matched: " + route.getId());
10      }
11      return route;
12   });
13}
14
15private Route convertToRoute(RouteDefinition routeDefinition) {
16  AsyncPredicate<ServerWebExchange> predicate = combinePredicates(routeDefinition);
17  List<GatewayFilter> gatewayFilters = getFilters(routeDefinition);
18
19  return Route.async(routeDefinition).asyncPredicate(predicate).replaceFilters(gatewayFilters).build();
20}
  • CompositeRouteDefinitionLocator :主要负责组合 RouteDefinitionRouteLocator 实例,并对 id 为空的 route 设置随机 id ,最终代理将会执行 RouteDefinitionRouteLocator 实例的 getRoutes 方法。
  • PropertiesRouteDefinitionLocator:负责从 GatewayProperties 配置中加载 Route Definition 配置信息。
  • InMemoryRouteDefinitionRepository:主要作用是存储 Route Definition 配置信息在内存。
 1//获取 spring.cloud.gateway 配置的 Route Definitions
 2public class PropertiesRouteDefinitionLocator implements RouteDefinitionLocator {
 3
 4 private final GatewayProperties properties;
 5
 6 @Override
 7 public Flux<RouteDefinition> getRouteDefinitions() {
 8    return Flux.fromIterable(this.properties.getRoutes());
 9 }
10}
11
12// Route Definition 内存仓库
13public class InMemoryRouteDefinitionRepository implements RouteDefinitionRepository {
14	private final Map<String, RouteDefinition> routes = synchronizedMap(new LinkedHashMap<String, RouteDefinition>());
15   	
16  //省略部分代码.....
17	@Override
18	public Flux<RouteDefinition> getRouteDefinitions() {
19		Map<String, RouteDefinition> routesSafeCopy = new LinkedHashMap<>(routes);
20		return Flux.fromIterable(routesSafeCopy.values());
21	}
22}

因此,Spring Cloud Gateway 的路由定义数据实际上来源于配置文件里 spring.cloud.gateway.routes 路由配置、以及保存在内存里路由配置。

4、刷新 Spring 上下文

接下来我们看看 Spring 在更新上下文时,通过模板设计模式在抽象类 AbstractApplicationContext 做了哪些操作:

 1//AbstractRefreshableApplicationContext.onRefresh()
 2@Override
 3public void refresh() throws BeansException, IllegalStateException {
 4  synchronized (this.startupShutdownMonitor) {
 5    StartupStep contextRefresh = this.applicationStartup.start("spring.context.refresh");
 6
 7    // 1、刷新上下文预准备操作,包括设置上下文状态、初始化占位符属性源、校验必要属性
 8    prepareRefresh();
 9
10    // 2、通知子类刷新 bean 工厂类
11    ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();
12
13    // 3、准备用于上下文使用的 bean 工厂
14    prepareBeanFactory(beanFactory);
15
16    try {
17      //4、允许在上下文子类中对 bean 工程进行后处理操作
18      postProcessBeanFactory(beanFactory);
19
20      StartupStep beanPostProcess = this.applicationStartup.start("spring.context.beans.post-process");
21      //5、调用并初始化在上下文中注册为 bean 的工厂处理器
22      invokeBeanFactoryPostProcessors(beanFactory);
23
24      // 6、注册拦截 bean 创建的 bean 处理器
25      registerBeanPostProcessors(beanFactory);
26      beanPostProcess.end();
27
28      //7、为上下文初始化 message 源
29      initMessageSource();
30
31      //8、为上下文初始化事件组播期器
32      initApplicationEventMulticaster();
33
34      //9、在特定上下文子类中初始化其他特定的 bean,spring cloud gateway 的路由、过滤器等在该子类中实现
35      onRefresh();
36
37      //10、检查并注册监听器 bean
38      registerListeners();
39
40      //11、实例化所有剩余的(非延迟初始化)的单例
41      finishBeanFactoryInitialization(beanFactory);
42
43      //12、初始化生命周期处理器、发布 ContextRefreshedEvent 事件
44      finishRefresh();
45    }
46    catch (BeansException ex) {
47      //省略部分代码......
48      throw ex;
49    }
50    finally {
51      //省略部分代码......
52    }
53  }
54}

5、创建 WebServer

 1//ReactiveWebServerApplicationContext.onRefresh()
 2@Override
 3protected void onRefresh() {
 4  super.onRefresh();
 5  try {
 6    createWebServer();
 7  }
 8  //省略部分代码......
 9}
10
11private void createWebServer() {
12		WebServerManager serverManager = this.serverManager;
13  	//第一次创建时,serverManager 是为空的
14		if (serverManager == null) {
15      //1、记录webserver创建的步骤
16			StartupStep createWebServer = this.getApplicationStartup().start("spring.boot.webserver.create");
17      //2、获取 WebServerFactory bean 名称,gateway 对应的 bean 名称 nettyReactiveWebServerFactory
18			String webServerFactoryBeanName = getWebServerFactoryBeanName();
19			ReactiveWebServerFactory webServerFactory = getWebServerFactory(webServerFactoryBeanName);
20			createWebServer.tag("factory", webServerFactory.getClass().toString());
21     
22			boolean lazyInit = getBeanFactory().getBeanDefinition(webServerFactoryBeanName).isLazyInit();
23      //3、创建 webServerManager 实例,getHttpHandler()方法获取绑定处理 http 请求的 hanlder
24			this.serverManager = new WebServerManager(this, webServerFactory, this::getHttpHandler, lazyInit);
25      //4、注册 webServerGracefulShutdown bean,优美地关闭 webserver
26			getBeanFactory().registerSingleton("webServerGracefulShutdown",
27					new WebServerGracefulShutdownLifecycle(this.serverManager.getWebServer()));
28      
29      //5、注册 webServerStartStop bean,优美地启动或停止 webserver
30			getBeanFactory().registerSingleton("webServerStartStop",
31					new WebServerStartStopLifecycle(this.serverManager));
32			createWebServer.end();
33		}
34  	//6、用实际的实例,替换任何属性源
35		initPropertySources();
36	}

获取上下文中已注册的 HttpHandler 实例,用于绑定到 WebServer

 1//ReactiveWebServerApplicationContext.getHttpHandler(),返回 httpHandler 请求处理器
 2protected HttpHandler getHttpHandler() {
 3  // 这里使用 bean 名称是可以避免层级结构
 4  String[] beanNames = getBeanFactory().getBeanNamesForType(HttpHandler.class);
 5  if (beanNames.length == 0) {
 6    throw new ApplicationContextException(
 7        "Unable to start ReactiveWebApplicationContext due to missing HttpHandler bean.");
 8  }
 9  if (beanNames.length > 1) {
10    throw new ApplicationContextException(
11        "Unable to start ReactiveWebApplicationContext due to multiple HttpHandler beans : "
12            + StringUtils.arrayToCommaDelimitedString(beanNames));
13  }
14  //委派模式,HttpWebHandlerAdapter -> ExceptionHandlingWebHandler -> FilteringWebHandler -> DispatcherHandler
15  return getBeanFactory().getBean(beanNames[0], HttpHandler.class);
16}

ReactiveWebServerFactoryCustomizer:通过住入 ServerProperties 对象,设置 nettyServer 的地址端口、ssl、http协议、压缩方式等基础字段。

NettyWebServerFactoryCustomizer:注入 ServerProperties 对象,设置 nettyServer initialBufferSize、maxChunkSize、maxInitialLineLength、idleTimeout等字段。

DelayedInitializationHttpHandler:延迟初始化 httpHandler 实例,通过装饰设计模式增加 httpHandler。

 1//	WebServerManager 构造器,在 factory.getWebServer(this.handler) 进一步创建 nettyWebServer
 2// factory 变量实际引用的是 NettyReactiveWebServerFactory
 3WebServerManager(ReactiveWebServerApplicationContext applicationContext, ReactiveWebServerFactory factory,
 4    Supplier<HttpHandler> handlerSupplier, boolean lazyInit) {
 5		this.applicationContext = applicationContext;
 6		Assert.notNull(factory, "Factory must not be null");
 7		this.handler = new DelayedInitializationHttpHandler(handlerSupplier, lazyInit);
 8		this.webServer = factory.getWebServer(this.handler);
 9}
10
11// NettyReactiveWebServerFactory.getWebServer(HttpHandler httpHandler),该处的 httpHandler 具体是实现为 DelayedInitializationHttpHandler,该 httpHandler 实例化发生在 WebServerManager.start()方法。
12@Override
13public WebServer getWebServer(HttpHandler httpHandler) {
14  	//1、创建 httpServer
15		HttpServer httpServer = createHttpServer();
16  	//2、创建 httpHandler 适配器对象
17		ReactorHttpHandlerAdapter handlerAdapter = new ReactorHttpHandlerAdapter(httpHandler);
18  	//3、创建 nettyWebServer,把 httpServer、httpHandler 统一封装到这个实例中
19		NettyWebServer webServer = createNettyWebServer(httpServer, handlerAdapter, this.lifecycleTimeout,
20				getShutdown());
21		webServer.setRouteProviders(this.routeProviders);
22		return webServer;
23}
24
25// NettyReactiveWebServerFactory.createHttpServer()
26private HttpServer createHttpServer() {
27		HttpServer server = HttpServer.create();
28		if (this.resourceFactory != null) {
29			LoopResources resources = this.resourceFactory.getLoopResources();
30			Assert.notNull(resources, "No LoopResources: is ReactorResourceFactory not initialized yet?");
31      //绑定监听地址,获取 ServerProperties 配置的端口,详细可参考NettyWebServerFactoryCustomizer,此处的 getListenAddress 参数是 Supplier 类型,在使用时才会真正调用方法
32			server = server.runOn(resources).bindAddress(this::getListenAddress);
33		}
34		else {
35			server = server.bindAddress(this::getListenAddress);
36		}
37		if (getSsl() != null && getSsl().isEnabled()) {
38			server = customizeSslConfiguration(server);
39		}
40		if (getCompression() != null && getCompression().getEnabled()) {
41			CompressionCustomizer compressionCustomizer = new CompressionCustomizer(getCompression());
42			server = compressionCustomizer.apply(server);
43		}
44		server = server.protocol(listProtocols()).forwarded(this.useForwardHeaders);
45		return applyCustomizers(server);
46}

6、启动 WebServer

大家还记得吗?在上述的刷新上下文步骤中,有一个逻辑 finishRefresh() 是刷新完上下文后会执行初始化生命周期处理器、发布 ContextRefreshedEvent 事件等。

 1// 代码位置:AbstractApplicationContext.finishRefresh()
 2protected void finishRefresh() {
 3  //1、清理上下文层级的资源缓存,例如来自扫描的 ASM 原数据
 4  clearResourceCaches();
 5
 6  //2、为该上下文初始化生命周期处理器,该处使用的生命周期处理器是 DefaultLifecycleProcessor
 7  initLifecycleProcessor();
 8
 9  //3、将刷新传播到生命周期处理器,重点分析:调用生命周期处理器的 start()方法,
10  getLifecycleProcessor().onRefresh();
11
12  //4、发布上下文刷新事件 ContextRefreshedEvent
13  publishEvent(new ContextRefreshedEvent(this));
14
15  //5、探测 GraalVM native image 环境是否开启,若开启则注册到应用上下文中
16  if (!NativeDetector.inNativeImage()) {
17    LiveBeansView.registerApplicationContext(this);
18  }
19}

接下来我们开始重点分析 getLifecycleProcessor().onRefresh() 方法的实现:

 1// 代码位置:DefaultLifecycleProcessor.onRefresh()
 2@Override
 3public void onRefresh() {
 4  startBeans(true);
 5  this.running = true;
 6}
 7
 8private void startBeans(boolean autoStartupOnly) {
 9  //1、从 beanFactory 工厂中获取当前生命周期实例,格式<beanName,生命周期实例>
10  Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans();
11  Map<Integer, LifecycleGroup> phases = new TreeMap<>();
12	
13  //2、遍历并计算生命周期实例阶段值 phase
14  lifecycleBeans.forEach((beanName, bean) -> {
15    if (!autoStartupOnly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) {
16      int phase = getPhase(bean);
17      phases.computeIfAbsent(
18          phase,
19          p -> new LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly)
20      ).add(beanName, bean);
21    }
22  });
23  //3、循环调用生命周期组的 start 方法
24  if (!phases.isEmpty()) {
25    phases.values().forEach(LifecycleGroup::start);
26  }
27}
28
29public void start() {
30  //省略部分代码
31  for (LifecycleGroupMember member : this.members) {
32    doStart(this.lifecycleBeans, member.name, this.autoStartupOnly);
33  }
34}
35
36private void doStart(Map<String, ? extends Lifecycle> lifecycleBeans, String beanName, boolean autoStartupOnly) {
37  Lifecycle bean = lifecycleBeans.remove(beanName);
38  if (bean != null && bean != this) {
39    //1、如果该 lifecycleBean 存在依赖实例,则递归完成该依赖实例的 start()方法
40    String[] dependenciesForBean = getBeanFactory().getDependenciesForBean(beanName);
41    for (String dependency : dependenciesForBean) {
42      doStart(lifecycleBeans, dependency, autoStartupOnly);
43    }
44    
45    //2、检测生命周期 bean 实例是否处于 running 状态
46    if (!bean.isRunning() &&
47        (!autoStartupOnly || !(bean instanceof SmartLifecycle) || ((SmartLifecycle) bean).isAutoStartup())) {
48      if (logger.isTraceEnabled()) {
49        logger.trace("Starting bean '" + beanName + "' of type [" + bean.getClass().getName() + "]");
50      }
51      try {
52        //通过模版设计模式,调用不同的子类实现 start() 方法
53        bean.start();
54      }
55      catch (Throwable ex) {
56        throw new ApplicationContextException("Failed to start bean '" + beanName + "'", ex);
57      }
58      if (logger.isDebugEnabled()) {
59        logger.debug("Successfully started bean '" + beanName + "'");
60      }
61    }
62  }
63}

具体启动逻辑交由子类实现:org.springframework.boot.web.reactive.context.WebServerStartStopLifecycle

 1// 代码位置:WebServerStartStopLifecycle.start()
 2@Override
 3public void start() {
 4  //调用组合的 WebServerManager 实例启动方法
 5  this.weServerManager.start();
 6  this.running = true;
 7}
 8
 9// 代码位置:WebServerManager.start()
10void start() {
11  //初始化 httpHandler
12  this.handler.initializeHandler();
13  
14  //启动 webServer,最终调用 NettyWebServer.startHttpServer()
15  this.webServer.start();
16  
17  //发布 ReactiveWebServerInitializedEvent 事件
18  this.applicationContext
19      .publishEvent(new ReactiveWebServerInitializedEvent(this.webServer, this.applicationContext));
20}

在默认情况下,Spring Cloud Gateway 使用的 webServer 实现是 NettyWebServer,当然用户也可以自定义选择其他的 Server,例如:TomcatWebServerJettyWebServer 或者 UndertowWebServer

//补充类图

熟悉或使用过 Netty 框架的童鞋,基本对 Netty Server 启动的流程不会很陌生,整体流程是创建 Server、初始化 及绑定 ChannelHandler、监听在端口上绑定的事件,当发生符合条件的事件时,执行 ChannelHandler 的逻辑。

 1//代码位置:NettyWebServer.startHttpServer()
 2DisposableServer startHttpServer() {
 3   HttpServer server = this.httpServer;
 4   if (this.routeProviders.isEmpty()) {
 5     	//绑定 server 对应的 workHandler
 6      server = server.handle(this.handler);
 7   }
 8   else {
 9      server = server.route(this::applyRouteProviders);
10   }
11   if (this.lifecycleTimeout != null) {
12      return server.bindNow(this.lifecycleTimeout);
13   }
14   //将 channel、handler 等绑定到 nettyServer 上,详细实现代码可查看:ServerTransport.bind
15   return server.bindNow();
16}
17
18//代码位置:HttpServer.handle()
19	public final HttpServer handle(
20			BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
21		Objects.requireNonNull(handler, "handler");
22		return childObserve(new HttpServerHandle(handler));
23	}

在完成 NettyWebServer 启动后,NettyServer 将监听 TCP 端口的事件,并调度分发事件到绑定的 ChannelHandler 处理。

7、请求调度分发

现在 Spring Cloud Gateway 已经具备监听端口事件、处理用户请求的能力了,可谓是万事俱备、只欠东风,当东风吹来之际,NettyWebServer 通过 HttpTrafficHandler 类接收、下发数据,它继承 ChannelDuplexHandler 父类,而 ChannelDuplexHandler 类同时实现了ChannelInboundHandlerChannelOutboundHandler 接口,因此具备处理入站事件又处理出站事件的能力。

疑问:出站、入站的语义是什么?

  • 从服务端的角度,数据从客户端发送到服务端,称之为入站,当数据处理完成返回给客户端,称之为出站。

  • 从客户端的角度,数据从服务端发送给客户端,称之为入站,当数据返回给服务端,称之为出站。

HttpTrafficHandler 接收到请求时,它将生成 ConnectionObserver.State.CONFIGURED 事件,而该事件被监听器 ConnectionObserver 消费处理,其中 ConnectionObserver 有多个不同的实现:

  • ServerTransport.ChildObserver:连接监听器,作为 NettyWebServer 监听器第一入口
  • ReactorNetty.CompositeConnectionObserver:组合 ConnectionObserver 连接监听器并遍历监听器传递事件。
  • ServerTransportConfig.ServerTransportDoOnConnection:监听 State.CONNECTEDState.CONFIGURED 状态的事件。
  • HttpServer.HttpServerHandle :监听 HttpServerState.REQUEST_RECEIVED 状态事件,调用 HttpHandler 逻辑的入口处
 1@Override
 2public void channelRead(ChannelHandlerContext ctx, Object msg) {
 3   //省略部分的代码......
 4   // read message and track if it was keepAlive
 5   if (msg instanceof HttpRequest) {
 6      IdleTimeoutHandler.removeIdleTimeoutHandler(ctx.pipeline());
 7
 8      final HttpRequest request = (HttpRequest) msg;
 9    	//省略部分的代码......
10
11       HttpServerOperations ops;
12       try {
13          //构建 HttpServerOperations 对象,后面作为request、response 使用
14          ops = new HttpServerOperations(Connection.from(ctx.channel()),
15                listener,request,compress,
16                ConnectionInfo.from(ctx.channel(),request,secure,remoteAddress,
17         forwardedHeaderHandler),cookieDecoder,cookieEncoder,formDecoderProvider,mapHandle,secure);
18         }
19         catch (RuntimeException e) {
20            sendDecodingFailures(e, msg);
21            return;
22         }
23         ops.bind();
24         //第一次请求时会先传递 ConnectionObserver.State.CONFIGURED 事件给 ServerTransport 类处理,随后传递至 ReactorNetty.CompositeConnectionObserver 类,最后会执行下述的 ctx.fireChannelRead
25         listener.onStateChange(ops, ConnectionObserver.State.CONFIGURED);
26				 //从当前节点往下传播事件
27         ctx.fireChannelRead(msg);
28         return;
29   }
30	//省略部分的代码......
31   ctx.fireChannelRead(msg);
32}

随后调用 ctx.fireChannelRead(msg) 方法将事件从当前节点事件传播下去,触发 ChannelOperationsHandler 类的channelRead(ChannelHandlerContext ctx,Object msg) 方法,并在该方法中再调用 HttpServerOperations 类的 onInboundNext(ChannelHandlerContext ctx, Object msg) 方法生成 HttpServerState.REQUEST_RECEIVED 事件。

 1//代码位置:ChannelOperationsHandler.channelRead(ChannelHandlerContext ctx, Object msg)
 2@Override
 3final public void channelRead(ChannelHandlerContext ctx, Object msg) {
 4	//省略部分代码.....
 5  ChannelOperations<?, ?> ops = ChannelOperations.get(ctx.channel());
 6  if (ops != null) {
 7    //调用 HttpServerOperations 类的 onInboundNext 方法
 8    ops.onInboundNext(ctx, msg);
 9  }
10    //省略部分代码.....
11}
12
13//代码位置:HttpServerOperations.onInboundNext(ChannelHandlerContext ctx, Object msg)
14@Override
15protected void onInboundNext(ChannelHandlerContext ctx, Object msg) {
16   if (msg instanceof HttpRequest) {
17      //生成 HttpServerState.REQUEST_RECEIVED 事件,调用 ConnectionObserver 的 onStateChange 
18       listener().onStateChange(this, HttpServerState.REQUEST_RECEIVED);
19      //省略部分代码.....
20      return;
21   }
22			//省略部分代码.....
23}

根据上述 ConnectionObserver 实现类功能职责判断,REQUEST_RECEIVED 事件会被 HttpServer.HttpServerHandle 消费处理,并调用 ReactorHttpHandlerAdapter.apply() 方法把请求传递给后链路的 Handler

 1public void onStateChange(Connection connection, State newState) {
 2   if (newState == HttpServerState.REQUEST_RECEIVED) {
 3      try {
 4				 //忽略部分代码.....
 5         HttpServerOperations ops = (HttpServerOperations) connection;
 6         //在此处把请求传递到 ReactorHttpHandlerAdapter 适配器
 7         Publisher<Void> publisher = handler.apply(ops, ops);
 8         Mono<Void> mono = Mono.deferContextual(ctx -> {
 9            ops.currentContext = Context.of(ctx);
10            return Mono.fromDirect(publisher);
11         });
12         if (ops.mapHandle != null) {
13            mono = ops.mapHandle.apply(mono, connection);
14         }
15         mono.subscribe(ops.disposeSubscriber());
16      }
17      //忽略部分代码.....
18   }
19}

到这里为止,NettyWebServer 已经完成请求的整体的度分发,接下来会把请求交给 ReactorHttpHandlerAdapter 适配器处理了。

8、处理器映射

NettyWebServer 完成调度分发后, ReactorHttpHandlerAdapter 类把接收到的请求、响应转换为 HttpHandler 能识别的 ReactorServerHttpRequestReactorServerHttpResponse

在处理器映射方面上,Spring 作者使用了委派模式设计 HttpHandler,其关系链:ReactorHttpHandlerAdapter -> DelayedInitializationHttpHandler -> HttpWebHandlerAdapter -> ExceptionHandlingWebHandler -> FilteringWebHandler -> DispatcherHandler

其中 ReactorHttpHandlerAdapterHttpWebHandlerAdapter 使用适配设计模式Request/Response 进行适配 ,ExceptionHandlingWebHandleFilteringWebHandler 通过装饰模式对其功能包装增强。

  • ReactorHttpHandlerAdapter: Reactor Http 处理适配器,负责把 HttpHandler 适配到 Reactor Netty Channel ,将 HttpServerOperations 转换为 ReactorServerHttpRequestReactorServerHttpResponse

  • DelayedInitializationHttpHandler:延迟初始化 Http 处理器,负责惰性创建 LazyHttpHandler

  • HttpWebHandlerAdapter:默认的 WebHandler 适配器,负责将 WebHandler 适配到 HttpHandler,并转换请求中的 Forwarded Header 等字段、以及将 ReactorServerHttpRequestReactorServerHttpResponse 转换为 ServerWebExchange 对象,传递到下游处理器。

  • WebHandlerDecoratorWebHandler 装饰器,负责装饰并委托另外的 WebHandler

  • ExceptionHandlingWebHandle :负责遍历 WebExceptionHandler 拦截处理执行异常的请求

  • FilteringWebHandler :负责过滤请求的 WebHandler,通过组合 DefaultWebFilterChain 类及使用责任链模式串联 MetricsWebFilterWeightCalculatorWebFilter 串联起来,对请求执行的过滤操作。

    • DefaultWebFilterChainWebFilterChain 的默认实现,使用责任链模式负责允许 WebFilter 委托给链的下一个 WebFilter
    • MetricsWebFilter: 负责拦截由 Spring WebFlux 处理的 HTTP 请求并记录有关执行时间与结果的指标。
    • WeightCalculatorWebFilter:负责拦截记录权重的计算指标。
  • DispatcherHandler:中央调度器,调度处理器或控制器处理 http 请求。

创建 HttpWebHandlerAdapter 实例的代码位置位于: WebHttpHandlerBuilder.build

 1//代码位置:WebHttpHandlerBuilder.build
 2public HttpHandler build() {
 3		WebHandler decorated = new FilteringWebHandler(this.webHandler, this.filters);
 4		decorated = new ExceptionHandlingWebHandler(decorated,  this.exceptionHandlers);
 5		HttpWebHandlerAdapter adapted = new HttpWebHandlerAdapter(decorated);
 6  	//省略部分代码.....
 7		return (this.httpHandlerDecorator != null ? this.httpHandlerDecorator.apply(adapted) : adapted);
 8}
 9	
10	//代码位置:HttpHandlerAutoConfiguration.httpHandler
11	@Bean
12	public HttpHandler httpHandler(ObjectProvider<WebFluxProperties> propsProvider) {
13		HttpHandler httpHandler = 	WebHttpHandlerBuilder.applicationContext(this.applicationContext).build();
14  //省略部分代码.....
15		return httpHandler;
16	}

DefaultWebFilterChain 作为 WebFilterChain 的默认实现,会遍历执行责任链中的全部 WebFilter,直至为空后才执行 Spring Web Flux调度处理器 DispatcherHandler

1@Override
2public Mono<Void> filter(ServerWebExchange exchange) {
3   //判断当前责任链里是否还有过滤器,存在过滤器则执行 filter 方法,没有则执行 DispatcherHandler 的 handle
4   return Mono.defer(() ->
5         this.currentFilter != null && this.chain != null ?
6               invokeFilter(this.currentFilter, this.chain, exchange) :
7               this.handler.handle(exchange));
8}

接下来重点分析调度处理器 DispatcherHandler的处理流程, DispatcherHandler 主要负责调度分发请求,通过组合 HandlerMapping 处理器映射实例、ResultHandler 结果处理器完成 Request—> Response 的整体流程。

DispatcherHanlder 除了实现 WebHandler 接口外,还实现了 ApplicationContextAware 接口(在实例对象初始化时,设置该对象的应用上下文),分析源码后 DispatcherHanlder 组合的 HandlerMappingResultHandler 是在设置应用上下文 setApplicationContext(ApplicationContext applicationContext) 初始化,详细源码如下:

 1//DispatcherHandler
 2@Override
 3public void setApplicationContext(ApplicationContext applicationContext) {
 4  initStrategies(applicationContext);
 5}
 6
 7protected void initStrategies(ApplicationContext context) {
 8   //1、获取指定 HandlerMapping 类型的所有处理器映射 Bean
 9   Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
10         context, HandlerMapping.class, true, false);
11	 
12   ArrayList<HandlerMapping> mappings = new ArrayList<>(mappingBeans.values());
13   //2、根据 HandlerMapping 处理器映射(实现 Ordered接口) 优先级排序
14   AnnotationAwareOrderComparator.sort(mappings);
15   this.handlerMappings = Collections.unmodifiableList(mappings);
16	
17   //3、获取指定 HandlerAdapter 类型的所有处理器适配器 Bean
18   Map<String, HandlerAdapter> adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
19         context, HandlerAdapter.class, true, false);
20	
21   this.handlerAdapters = new ArrayList<>(adapterBeans.values());
22   //4、根据 HandlerAdapter 处理器适配器(实现 Ordered接口) 优先级排序
23   AnnotationAwareOrderComparator.sort(this.handlerAdapters);
24	
25   //5、获取指定 HandlerResultHandler 类型的所有结果处理器 Bean
26   Map<String, HandlerResultHandler> beans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
27         context, HandlerResultHandler.class, true, false);
28
29   this.resultHandlers = new ArrayList<>(beans.values());
30   AnnotationAwareOrderComparator.sort(this.resultHandlers);
31}

HandlerMapping 用于定义请求与处理器对象之间的映射关系,通俗易懂的理解就是负责根据用户请求找到 Handler 处理器(日常工作中写的 ControllerSpring WebFluxSpring MVC 均提供了不同映射器实现不同的映射方式,例如:配置文件方式,实现接口方式,注解方式等,该接口有以下实现类:

//补充类关系图

  • AbstractHandlerMapping:抽象处理器映射基类,作者通过模板设计模式,定义处理器映射类的整体流程,把具体实现下沉到子类实现。

  • AbstractHandlerMethodMapping:实现 HandlerMapping 接口的抽象基类,定义请求与 HandlerMethod 的映射。

  • AbstractUrlHandlerMapping:实现 HandlerMapping 接口的抽象基类,定义请求与 URL 的映射。

  • RequestMappingInfoHandlerMappingAbstractHandlerMethodMapping 抽象子类,RequestMappingInfo 定义请求与handler 处理器方法之间的映射,其内部管理和注册的是 RequestMappingInfo 类型对象。

  • AbstractWebFluxEndpointHandlerMappingRequestMappingInfoHandlerMapping 子类,该 HandlerMapping 使得 Web 端点可通过 HTTP 被访问。

  • AdditionalHealthEndpointPathsWebFluxHandlerMapping:自定义 HandlerMapping 实现,允许将健康组映射到额外路径,可简单理解为允许自定义健康检查接口

  • CloudFoundryWebFluxEndpointHandlerMappingAbstractWebFluxEndpointHandlerMapping 子类, 该 HandlerMapping 使得 WEB 端点能够在 Cloud Foundry 特定 URL 上通过 HTTP 方式被访问。

  • ControllerEndpointHandlerMapping: 该实现类的作用是通过 Spring WebFlux 暴露被 @ControllerEndpoint@RestControllerEndpoint 注解修饰的端点。

  • EmptyHandlerMapping:空处理器映射,如果没有注册其他类型的处理器映射类,则使用默认的空处理器映射。

  • RequestMappingHandlerMappingRequestMappingInfoHandlerMapping 的一个拓展实现,该实现类将根据扫描类级别和方法级别上的 @RequestMapping 注解创建 RequestMappingInfo 实例。

  • RoutePredicateHandlerMapping:路由断言处理器映射,负责根据用户请求找到对应的路由断言信息。

  • RouterFunctionMapping:支持路由函数的处理器映射,如果在构造时没有路由函数提供,则该映射将会在应用上下文中探测所有函数路由并按照顺序访问他们。

  • SimpleUrlHandlerMapping:实现了 org.springframework.web.reactive.HandlerMapping 接口的简单 URL 处理器映射,负责从 URL 映射到请求处理器 bean,映射的方式同时支持映射到 bean 实例与 bean 的名称,而后者则要求必须是非单实例的处理器 bean

  • WebFluxEndpointHandlerMapping:自定义的 HandlerMapping,使得 Web 端点可通过 HTTP 被访问。

 1@Override
 2public Mono<Void> handle(ServerWebExchange exchange) {
 3   //1、如果处理器映射为空直接返回错误
 4   if (this.handlerMappings == null) {
 5      return createNotFoundError();
 6   }
 7   //2、判断是否 cors 预请求并处理
 8   if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {
 9      return handlePreFlight(exchange);
10   }
11   //3、遍历处理器映射,并获取处理器映射关联的 webHandler 处理请求、返回结果
12   return Flux.fromIterable(this.handlerMappings)
13         .concatMap(mapping -> mapping.getHandler(exchange))
14         //取出找到的第一个 handler,实际上由 RoutePredicateHandlerMapping 返回 FilteringWebHandler
15         .next()
16     		 //如果为空则返回 not found error
17         .switchIfEmpty(createNotFoundError())
18          //调用 handler 
19         .flatMap(handler -> invokeHandler(exchange, handler))
20         .flatMap(result -> handleResult(exchange, result));
21}

在调度分发处理器的 handle 方法中,通过流式遍历处理器映射实例,根据当前 ServerWebExchange 请求获取对应的 handler 处理器,如果 handler 处理器为空则返回 not found 错误提示,否则遍历流元素,调用 invokeHandler 方法返回结果。

下面将从 根据 ServerWebExchange 请求获取 handler 处理器、执行 handler 处理器逻辑、处理 handler 处理器返回的结果等三个步骤进行逐一分析。

  • 根据 ServerWebExchange 请求获取 handler 处理器

在调度分发器中会调用 HandlerMapping 接口的 getHandler(ServerWebExchange exchange) 方法,该接口有多个抽象子类及实现子类,AbstractHandlerMappingAbstractHandlerMethodMappingAbstractUrlHandlerMappingRoutePredicateHandlerMapping 等多个实现类,细心的读者观察其子类名称,会发现其使用了模板设计模式,提取抽象或共性的逻辑,提高代码的拓展性与可读性。

 1//代码位置 AbstractHandlerMapping.getHandler(ServerWebExchange exchange) ,根据
 2@Override
 3public Mono<Object> getHandler(ServerWebExchange exchange) {
 4   // getHandlerInternal(exchange) 是由各子类实现的,我们重点分析网关路由的实现
 5   return getHandlerInternal(exchange).map(handler -> {
 6     	//检查当前的 handler 是否为 CorsConfiguration 或者 当前请求是 cors 预请求
 7      ServerHttpRequest request = exchange.getRequest();
 8      if (hasCorsConfigurationSource(handler) || CorsUtils.isPreFlightRequest(request)) {
 9         CorsConfiguration config = (this.corsConfigurationSource != null ?
10               this.corsConfigurationSource.getCorsConfiguration(exchange) : null);
11         CorsConfiguration handlerConfig = getCorsConfiguration(handler, exchange);
12         config = (config != null ? config.combine(handlerConfig) : handlerConfig);
13         if (config != null) {
14            config.validateAllowCredentials();
15         }
16         // corsProcessor 处理器处理当前请求
17         if (!this.corsProcessor.process(config, exchange) || CorsUtils.isPreFlightRequest(request)) {
18            return NO_OP_HANDLER;
19         }
20      }
21      return handler;
22   });
23}

在此处我们主要对 RoutePredicateHandlerMapping 路由断言处理器映射展开详细分析,挖掘 Spring Cloud Gateway 是如何根据配置的路由信息转发处理请求的,对于其余的处理器映射,感兴趣的请读者自行研读分析。

 1//代码位置:RoutePredicateHandlerMapping.getHandlerInternal(ServerWebExchange exchange)
 2@Override
 3protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
 4 //1、如果设置的 management 端口与服务器端口不同,则不处理在 manager 端口上的请求
 5 if (this.managementPortType == DIFFERENT && this.managementPort != null
 6       && exchange.getRequest().getURI().getPort() == this.managementPort) {
 7    return Mono.empty();
 8 }
 9 //2、根据请求查找对应的 Route 路由信息,如果没有找到路由信息则返回空,即 Mono.empty()
10 return lookupRoute(exchange)
11       .flatMap((Function<Route, Mono<?>>) r -> {
12          exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
13          //省略部分代码......
14          return Mono.just(webHandler);
15       }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
16          //省略部分代码......
17       })));
18}
19
20//根据请求查找对应的路由
21protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
22  return this.routeLocator.getRoutes()
23      .concatMap(route -> Mono.just(route).filterWhen(r -> {
24        exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
25        //找到路由后获取它的 predicates 配置信息,并判断当前请求是否匹配
26        return r.getPredicate().apply(exchange);
27      })
28          .doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e))
29          .onErrorResume(e -> Mono.empty()))
30    	//省略部分代码......
31      .next()
32      .map(route -> {
33    	//省略部分代码......
34        validateRoute(route, exchange);
35        return route;
36      });
37}

RoutePredicateHandlerMapping.lookupRoute() 方法查找请求对应的路由过程中,需要使用到 RouteLocator 接口查询 CachingRouteLocator 缓存的路由(关于 RouteLocator 接口的初始化及创建过程请阅读过第二节的《初始化路由配置》),并返回路由关联的断言,通过断言请求是否为真,断言为真则说明当前请求有匹配到路由

Spring Cloud Gateway 网关框架里,默认自带 14 种路由断言,当默认的路由断言不满足场景需求,可根据使用场景自行拓展 AbstractRoutePredicateFactory 类按需实现。

  • BeforeRoutePredicateFactory: 请求时间是否满足在配置时间之前的断言。
  • AfterRoutePredicateFactory:请求时间是否满足在配置时间之后的断言。
  • BetweenRoutePredicateFactory:时间范围路由断言,判断请求时间是否满足在配置时间范围内。
  • CloudFoundryRouteServiceRoutePredicateFactory:请求是否用于 Cloud Foundry 路由服务的断言。
  • CookieRoutePredicateFactoryCookie 路由断言,判断请求的 Cookie 是否满足配置的 Cookie
  • HeaderRoutePredicateFactoryHeader 路由断言,判断请求头 Header 是否满足配置的 Header
  • HostRoutePredicateFactoryHost 路由断言,判断请求 Host 是否满足配置的 Host
  • MethodRoutePredicateFactory 请求方法路由断言,判断请求的 Method 是否满足配置的一个或多个。
  • PathRoutePredicateFactoryPath 路径路由断言,判断请求的路径 Path 是否满足配置的一个或多个。
  • QueryRoutePredicateFactoryQuery 参数路由断言,判断请求 Query 参数是否满足配置的参数名称、参数值。
  • ReadBodyRoutePredicateFactoryRequestBody 路由断言,读取请求中的 requestBody 内容,并判断是否满足配置的自定义断言处理规则。
  • RemoteAddrRoutePredicateFactory:远程地址路由断言,读取请求的主机地址,判断请求主机地址是否在指定的 IP 地址段中。
  • WeightRoutePredicateFactoryWeight 权重路由断言,根据配置的权重路由请求。
  • XForwardedRemoteAddrRoutePredicateFactory:基于 XForwarded 实现的远程地址路由断言,判断请求主机地址是否在指定的 IP 地址段中。

以下述路由配置为例,路由配置的断言器为 Path 即 PathRoutePredicateFactory ,它根据请求的路径判断与配置文件中的断言内容是否满足条件。

注意Spring Cloud Gateway 约定断言器简称为实现类名中的 RoutePredicateFactory 前缀部分,具体可查看工具类 NameUtils

 1spring:
 2  cloud:
 3    gateway:
 4      routes:
 5        - id: route-test
 6          uri: http://127.0.0.1:8080
 7          predicates:
 8            - Path=/user/**
 9          filters:
10            - Authentication

因此对于上述的路由配置,实际会调用 PathRoutePredicateFactoryapply(Config config) 方法断言匹配与否。

 1//代码位置:PathRoutePredicateFactory.apply(Config config)
 2@Override
 3public Predicate<ServerWebExchange> apply(Config config) {
 4  //解析 yaml 配置中 path predicate 的值,并保存到 pathPatterns 变量
 5  final ArrayList<PathPattern> pathPatterns = new ArrayList<>();
 6  synchronized (this.pathPatternParser) {
 7    pathPatternParser.setMatchOptionalTrailingSeparator(config.isMatchTrailingSlash());
 8    config.getPatterns().forEach(pattern -> {
 9      PathPattern pathPattern = this.pathPatternParser.parse(pattern);
10      pathPatterns.add(pathPattern);
11    });
12  }
13 return new GatewayPredicate() {
14    @Override
15    public boolean test(ServerWebExchange exchange) {
16       //获取当前请求的路径 path
17       PathContainer path = parsePath(exchange.getRequest().getURI().getRawPath());
18       PathPattern match = null;
19       //判断当前请求的路径是否与配置的匹配
20       for (int i = 0; i < pathPatterns.size(); i++) {
21          PathPattern pathPattern = pathPatterns.get(i);
22          if (pathPattern.matches(path)) {
23             match = pathPattern;
24             break;
25          }
26       }
27
28       if (match != null) {
29          traceMatch("Pattern", match.getPatternString(), path, true);
30         	//如果存在环境变量,则解析变量值并放置到请求的 attribute 属性
31          PathMatchInfo pathMatchInfo = match.matchAndExtract(path);
32          putUriTemplateVariables(exchange, pathMatchInfo.getUriVariables());
33          exchange.getAttributes().put(GATEWAY_PREDICATE_MATCHED_PATH_ATTR, match.getPatternString());
34          String routeId = (String) exchange.getAttributes().get(GATEWAY_PREDICATE_ROUTE_ATTR);
35          if (routeId != null) {
36             // populated in RoutePredicateHandlerMapping
37             exchange.getAttributes().put(GATEWAY_PREDICATE_MATCHED_PATH_ROUTE_ID_ATTR, routeId);
38          }
39          return true;
40       }
41       //省略部分代码.....
42    }
43   		//省略部分代码.....
44 };
45}

根据请求找到对应的 Route 后,设置 Route 信息到 ServerWebExchange 上下文,并返回 FilteringWebHandler,随后根据该 WebHandler 寻找 SimpleHandlerAdapter 适配器,并执行全局过滤器的拦截逻辑

SimpleHandlerAdapter:负责将 DispatcherHandler 分发与 具体 Handler 的执行分离(单一职责原则) ,使得 DispatcherHandler 类能支持并拓展任何类型的 Handler

注意:此处的 FilteringWebHandler 实例与处理器映流程中的并不是同一个,该 FilteringWebHandler 类的详细路径:org.springframework.cloud.gateway.handler.FilteringWebHandler,它关联了多个全局过滤器,而处理器映流程中的是 org.springframework.web.server.handler.FilteringWebHandler

9、过滤器拦截

对于 Spring Cloud Gateway 的功能 ,基本 80% 功能是由过滤器实现的,这么说并不为过,因为负载均衡实际请求转发请求/响应重写等均是由过滤器完成的,因此熟悉 Spring Cloud Gateway 的过滤器拦截原理尤为重要。

在上一章节中,程序完成查找 Route 后将委托 SimpleHandlerAdapter 适配器执行 FilteringWebHandler 的逻辑,包括获取 ServerWebExchange 上下文关联的 Route 路由、路由绑定的 GatewayFilter 过滤器、系统默认的全局过滤器等。

FilteringWebHandler:负责将请求委托给 GlobalFilter 全局过滤器、GatewayFilterFactory 网关过滤器链,然后再委托给目标的 WebHandler

GatewayFilter:用于拦截式、链式处理 Web 请求,实现切面、应用无关性的的特性,例如安全性、超时及其他功能。

GatewayFilterChain:负责构建拦截器链,负责将 WebFilter 委托给责任链中的另外一个 WebFilter

注意:此处 FilteringWebHandler 类路径org.springframework.cloud.gateway.handler.FilteringWebHandler

 1//代码位置:FilteringWebHandler.handle(ServerWebExchange exchange)
 2@Override
 3public Mono<Void> handle(ServerWebExchange exchange) {
 4   //从 ServerWebExchange 上下文查询当前的 Route 路由实例
 5   Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
 6   //获取路由配置的过滤器
 7   List<GatewayFilter> gatewayFilters = route.getFilters();
 8		
 9   List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
10   //组合系统默认的过滤器
11   combined.addAll(gatewayFilters);
12   //根据实现 Ordered 优先级接口排序
13   AnnotationAwareOrderComparator.sort(combined);
14   //省略部分代码......
15  
16   //构建默认 WebFilter 过滤器责任链并执行 filter 方法
17   return new DefaultGatewayFilterChain(combined).filter(exchange);
18}

构建完过滤器链后,在 DefaultGatewayFilterChain 遍历每一个过滤器并执行 filter 过滤方法,其中全局默认的过滤器有以下:

RemoveCachedBodyFilter:在 ForwardRoutingFilter 过滤器执行完后,移除 ServerWebExchange 上下文缓存的 requestBody 并释放资源。

AdaptCachedBodyGlobalFilter:缓存 requestBodyServerHttpRequestServerWebExchange 上下文。

NettyWriteResponseFilter:在 ForwardRoutingFilter 过滤器执行完后,负责拦截并重写 ServerWebExchange 上下文关联的 Response Body,该过滤器的优先级顺序并不是最后一个,实际上是通过 Mono.defer() 方法将逻辑延迟执行

ForwardPathFilter:提取 Route 路由的 forward://前缀协议的 URI 信息,将其设置到 Request PATH 变量 。

GatewayMetricsFilter:用于提供监控指标的过滤器,通过配置 spring.cloud.gateway.metrics.enabled:true开启统计监控指标,在 NettyRoutingFilter 过滤器执行完后才会真正执行过滤器的逻辑。

RouteToRequestUrlFilter:合并原始请求中的 URIRoute URI 等资源,组成代理下游 URI,并将该 URI 配置到 ServerWebExchange 上下文供后链路使用。

NoLoadBalancerClientFilter:当 Spring Cloud Gateway 未使用负载均衡能力时,将创建该过滤器,注意:缺失 ReactorLoadBalancerReactiveLoadBalancerClientFilter 时才会创建 NoLoadBalancerClientFilter 生效。

WebsocketRoutingFilter:拦截处理 wswss 协议开头的 WebSocket 请求的过滤器。

NettyRoutingFilter核心过滤器,负责将 HTTPHTTPS 请求通过转发到实际代理的下游处理的过滤器。

ForwardRoutingFilter:负责将 forward:// 前缀格式的请求转发给 DispatcherHandler 分发器处理

 1//代码位置:DefaultGatewayFilterChain.filter(ServerWebExchange exchange)
 2@Override
 3public Mono<Void> filter(ServerWebExchange exchange) {
 4   return Mono.defer(() -> {
 5      if (this.index < filters.size()) {
 6         GatewayFilter filter = filters.get(this.index);
 7         DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1);
 8         return filter.filter(exchange, chain);
 9      }
10      else {
11         return Mono.empty(); // complete
12      }
13   });
14}

注意: GatewayMetricsFilterNettyWriteResponseFilterRemoveCachedBodyFilter 三个过滤器真正执行过滤逻辑的顺序是与其优先级顺序相反的,只有等真正调用了代理下游后才能重写 Response Body 内容,其原理分别是通过响应式编程的 **Mono.doOnSuccess()Mono.defer()Mono.doFinally()**等延迟执行或回调方法等触发真正的逻辑。

上述的每个过滤器各司其职,并以责任链的方式执行工作,完成请求转发代理响应重写等功能,如果默认的过滤器不场景,可根据实际的需求自定义 GatewayFilter 过滤器。

10、释放关闭资源

到这里为止,整个流程已经 over 了 ?no ,接下来还有一件重要的事情:释放资源,从接收请求、路由寻址、转发代理下游、再到成功地返回响应后,将回调入站 onInboundNext 方法,判断是否为最后一个带有标识头的 HttpContent 消息 ,若是则调用 terminate 方法发出多个事件包括:响应完成 HttpClientState.RESPONSE_COMPLETED 、断开连接 State.DISCONNECTING释放连接 State.RELEASED 等状态事件。

注意:站在 NettyServer 角度,对代理下游发请求是 Inbound 出站操作,返回响应则是 Inbound 入站操作。

 1//代码位置:HttpClientOperations.onInboundNext(ChannelHandlerContext ctx, Object msg)
 2@Override
 3protected void onInboundNext(ChannelHandlerContext ctx, Object msg) {
 4  if (msg instanceof HttpResponse) {
 5    HttpResponse response = (HttpResponse) msg;
 6    //省略部分代码.....
 7    
 8   //判断是否为最后一个带有标识头的 HttpContent 消息
 9  if (msg instanceof LastHttpContent) {
10    //省略部分代码.....
11    channel().config().setAutoRead(true);
12    if (markSentBody()) {
13      markPersistent(false);
14    }
15    //执行终止逻辑
16    terminate();
17    return;
18  }
19  //省略部分代码.....
20  super.onInboundNext(ctx, msg);
21}

在入站完成后,将释放出 HttpClientState.RESPONSE_COMPLETED 状态事件。

 1//代码位置:HttpClientOperations.afterInboundComplete
 2@Override
 3protected void afterInboundComplete() {
 4  if (redirecting != null) {
 5    listener().onUncaughtException(this, redirecting);
 6  }
 7  else {
 8    //触发 RESPONSE_COMPLETED 状态事件
 9    listener().onStateChange(this, HttpClientState.RESPONSE_COMPLETED);
10  }
11}

紧接着释放断开连接 State.DISCONNECTING 状态事件,监听器监听到 State.DISCONNECTING 事件后释放连接池的资源 pooledRef.release() ,同时进一步发出 State.RELEASED 状态事件,触发该事件的回调执行。

 1//代码位置:DefaultPooledConnectionProvider.onStateChange(Connection connection, State newState)
 2@Override
 3public void onStateChange(Connection connection, State newState) {
 4 if (newState == State.DISCONNECTING) {
 5		//省略部分代码......
 6    ConnectionObserver obs = channel.attr(OWNER)
 7                                    .getAndSet(ConnectionObserver.emptyListener());
 8    pooledRef.release()
 9             .subscribe(null,t -> {
10                         //省略部分代码......
11                         onTerminate.tryEmitEmpty();
12                         obs.onStateChange(connection, State.RELEASED);
13                     },() -> {
14                         //省略部分代码......
15                         onTerminate.tryEmitEmpty();
16                         obs.onStateChange(connection, State.RELEASED);
17                     });
18    return;
19 }
20 owner().onStateChange(connection, newState);
21}

在完成 TCP 连接资源释放后,返回接口的请求数据,至此整个请求—响应的流程已经结束了。

11、最后的总结

架构没有银弹,每一种技术或框架的出现是为了解决某场景的问题,Spring Cloud GatewaySpring Cloud 官方推出的第二代网关框架,取代 Zuul 网关,它的主要作用是收敛并统一微服务的流量入口,并提供路由寻址协议转换、鉴权认证、熔断、限流等基础功能。

业界中存在很多网关种类,它们都能满足我们的功能需求,选择一种合适的网关对项目或团队而言尤其重要。在我们日常工作中,大多数项目主要以 Spring 或者 Spring Boot 生态为主,因此使用 Spring Cloud Gateway 更为合适,同时学习了解它的源码及流程原理是必不可少的,在熟悉掌握使用该网关的基础上(注意:不建议没有使用经验的基础上,直接阅读框架源码,这不仅会加大学习成本,效果也会适得其反),通过分析 Spring Cloud Gateway 的源码,我们不仅可以了解到 Spring Cloud Gateway 的整体执行流程,更重要的是学习到其中的设计思想及使用到的设计模式等,以便日后在某些场景中可借鉴应用。