侧边栏壁纸
  • 累计撰写 25 篇文章
  • 累计创建 26 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录

Spring WebFlux Netty 服务器启动流程

Yttrium
2024-10-05 / 0 评论 / 0 点赞 / 14 阅读 / 11204 字

本文主要使用Kotlin语言, 基于Spring6.1.13

序言

Spring的WebFlux默认使用Netty, 其初始化流程也比较简单, 各种需要的实现类均在 org.springframework:spring-webflux 包中, 但因为Spring的控制反转特性, 何如初始化则需要细细研究, 网上的教程通常讲的是如何使用WebFlux做应用, 少有涉及到启动流程的, 询问AI也没有得到特别合适的答案. 当然, 本篇文章也许并不优秀, 涉入不深, 就作为在三天时间里不停的查看源代码和调试的交代吧. 本文的主要目标是在仅使用Spring容器, 不使用任何SpringBoot自动装配的情况下, 手动装配一个Reactor Server.

起点

HttpServer

通常在业务中我们使用SpringBoot初始化一个项目, 简单, 方便, 高效. 通过查阅Spring文档, Reactor-Core的初始化竟十分简单. 但很快我将陷入自我怀疑中...

先来看看文档中给出的代码

val handler: HttpHandler = ... 
val adapter = ReactorHttpHandlerAdapter(handler)
HttpServer.create().host(host).port(port).handle(adapter).bindNow()

HttpServer.create() 即可, 简单! 但痛苦的在后面, 当然, 如果研究透之后还是简单. 可以提前剧透, Reactor Server的所有操作均在这个adapter中, 也可以说是handler中. 查看ReactorHttpHandlerAdapter的源代码可以看出, ReactorHttpHandlerAdapter仅仅只是做了简单的适配工作.

首先是看一看HttpServer 的方法, 简单的查看可知主要是配置一些参数, 比如端口, 地址什么的, 心中有数即可, 略过. 我们很容易看到两个方法: handle( BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler)route(Consumer<? super HttpServerRoutes> routesBuilder) . HttpServer#route 方法最终也调用了HttpServer#handle 方法, 所以重点还是在HttpServer#handle 方法上.

HttpServer#handle 方法接受一个BiFunction函数式接口, 方法参数是 HttpServerRequestHttpServerResponse , 返回值为 Publisher<Void> 不难看出, 这个方法的作用是处理HTTP请求, 然后返回一个Publisher<Void> 代表处理完成. 因为我们使用Netty, 所以找到ReactorHttpHandlerAdapter这个类, 创建对象传入handler. (其它服务器Spring文档中也提供了相对应的HandlerAdapter)

HttpHandler

创建ReactorHttpHandlerAdapter类需要一个org.springframework.http.server.reactive.HttpHandler 对象, 查看源代码发现它有两个实现类, 一个是ContextPathCompositeHandler 在注释中看到是实现ContextPath效果的, 所以目光放到另一个实现类HttpWebHandlerAdapter 上, 如你所见, 这又是一个Adaptor, 尽管这个类里的内容有很多, 但目前不是我们关心的. 另外从名字看出这个类又是WebHandler 的Adaptor. 按图索骥, 找到WebHandler 吧.

WebHandler

还是查看源码, HttpHandler 有许多实现类, 其中最引人注目的是DispatcherHandler . 因为在同步模型中, Spring使用的Servlet就叫DispatcherServlet . 查看一下DispatcherHandler 的代码, 果然这个就是实现Controller mapping的类了.

至此, 我们已经了解了Server的启动配置了, 主要流程是 HttpServer.create() -> 添加ReactorHttpHandlerAdapter 对象 -> 添加HttpWebHandlerAdapter 对象 -> 添加 DispatcherHandler 对象. 大体上是这样.

深入

我们查看DispatcherHandler的构造方法 ,发现他有两个构造方法, 一个无参, 一个含有一个ApplicationContext参数, 同时注意到这个类实现了ApplicationContextAware接口, 观察setApplicationContext 方法, 发现内容和有参构造一样, 所以继续查看这里initStrategies(ApplicationContext context)初始化的逻辑.

initStrategies 方法中主要是取出了三类对象, HandlerMapping , HandlerAdapter, HandlerResultHandler , 看名字就能看出点端倪, 有点像是路径匹配和处理返回值的意思. 还是一样, 看源码, 找实现类.

HandlerMapping

很快啊, 一眼就看到了RequestMappingHandlerMapping , 它的作用是找到RequestMapping 的方法, 给出一个handler用于后面的执行. That's all. DispatcherServlet不仅可以处理RequestMapping 相关的逻辑, 还能通过SimpleUrlHandlerMapping将请求转发到ResourceWebHandler 实现文件访问.

HandlerAdapter

很明显了, 它用到的实现类就是RequestMappingHandlerAdapter , 这个Adaptor的作用是: 经过HandlerMapping之后得到的handler, 判断是否可被这个adaptor使用并执行它. 一个直观的逻辑是: 相信一眼就能够看出来, 如果是WebHandler , 则直接执行handle方法. RequestMappingHandlerAdapter 的要复杂些: RequestMappingHandlerMapping 返回的是一个HandlerMethod 对象, 它则包装了最终执行的Controller方法, HandlerAdapter需要处理HTTP请求的格式, 参数等等...然后返回一个HandlerResult

public class SimpleHandlerAdapter implements HandlerAdapter {

    @Override
    public boolean supports(Object handler) {
       return WebHandler.class.isAssignableFrom(handler.getClass());
    }

    @Override
    public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
       WebHandler webHandler = (WebHandler) handler;
       Mono<Void> mono = webHandler.handle(exchange);
       return mono.then(Mono.empty());
    }

}

HandlerResultHandler

还是查看源码, 不难发现一个ResponseBodyResultHandler, 我们平常使用RestController的时候, RequestMapping方法就默认加上了ResponseBody. 观察其构造函数 ResponseBodyResultHandler(List<HttpMessageWriter<?>> writers, RequestedContentTypeResolver resolver), 参数resolver 能容易看到RequestedContentTypeResolver 其三个实现类的作用, 按照需求选择即可. 但HttpMessageWriter 其实现类仅定义了一些FormData, Multipart等相应体handler, 并没有编码JSON的. 在SpringBoot源代码中也没有找到相应的实现类. 经过一段时间的寻找后发现了EncoderHttpMessageWriter 中其实可以添加许多org.springframework.core.codec.Encoder , Encoder的实现类就有一个Jackson2JsonEncoder . 当然Spring贴心的为我们准备了工具类, org.springframework.http.codec.support.DefaultServerCodecConfigurer , 它会检测你的类加载器中是否含有指定的类, 然后并添加相应的Coder. 不仅有Encoder, Decoder也准备好了, Decoder这部分是在HandlerAdapter 中使用的. 这下我们的拼图终于完整了.

复盘

在研究这项内容的时候, 期间走了很多的坑, 比如, 当时没有发现EncoderHttpMessageWriter Encoder 关系的时候, 尝试手动实现一个HttpMessageWriter , 调试的时候看到序列化代码已经执行了, 但最终就是没有输出到响应体中. ResourceWebHandler 需要添加一个ResourceResolver 才能匹配Resource

在最后给出我奋斗了三天的结果, 在缺少资料的情况下, 还是还原了Reactor server的启动过程.


import io.netty.channel.group.DefaultChannelGroup
import io.netty.util.concurrent.DefaultEventExecutor
import ng.i.sav.qdroid.log.Slf4kt
import org.springframework.context.ApplicationContext
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.ComponentScan
import org.springframework.context.annotation.Configuration
import org.springframework.core.io.ClassPathResource
import org.springframework.http.client.ReactorResourceFactory
import org.springframework.http.codec.support.DefaultServerCodecConfigurer
import org.springframework.http.server.reactive.HttpHandler
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter
import org.springframework.web.reactive.DispatcherHandler
import org.springframework.web.reactive.accept.HeaderContentTypeResolver
import org.springframework.web.reactive.function.server.support.RouterFunctionMapping
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping
import org.springframework.web.reactive.handler.WebFluxResponseStatusExceptionHandler
import org.springframework.web.reactive.resource.PathResourceResolver
import org.springframework.web.reactive.resource.ResourceWebHandler
import org.springframework.web.reactive.result.SimpleHandlerAdapter
import org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerAdapter
import org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerMapping
import org.springframework.web.reactive.result.method.annotation.ResponseBodyResultHandler
import org.springframework.web.server.adapter.WebHttpHandlerBuilder
import reactor.netty.DisposableServer
import reactor.netty.http.HttpProtocol
import reactor.netty.http.server.HttpServer
import java.time.Duration


@Configuration
open class NettyServerInitializer {
    @Bean
    open fun webHandler(): DispatcherHandler {
        return DispatcherHandler()
    }

    @Bean
    open fun reactorResourceFactory(): ReactorResourceFactory {
        return ReactorResourceFactory()
    }

    @Bean
    open fun httpHandler(context: ApplicationContext): HttpHandler {
        return WebHttpHandlerBuilder.applicationContext(context).exceptionHandler(WebFluxResponseStatusExceptionHandler()).build()
    }

    @Bean
    open fun resourceWebHandler(): ResourceWebHandler {
        return ResourceWebHandler().apply {
            setLocations(listOf(ClassPathResource("static/")))
            resourceResolvers.add(PathResourceResolver().apply { setAllowedLocations(ClassPathResource("static/")) })
        }
    }

    @Bean
    open fun disposableServer(
        httpHandler: HttpHandler,
        reactorResourceFactory: ReactorResourceFactory,
    ): DisposableServer {
        return HttpServer.create().accessLog(true)
            .bindAddress { java.net.InetSocketAddress(18089) }.protocol(*listProtocols())
            .channelGroup(DefaultChannelGroup(DefaultEventExecutor()))
            .handle(ReactorHttpHandlerAdapter(httpHandler))
            .runOn(reactorResourceFactory.loopResources).bindNow(Duration.ofSeconds(10))
    }

    @Bean
    open fun webFluxResponseStatusExceptionHandler(): WebFluxResponseStatusExceptionHandler {
        return WebFluxResponseStatusExceptionHandler()
    }

    @Bean
    open fun requestMappingHandlerMapping(): RequestMappingHandlerMapping {
        return RequestMappingHandlerMapping()
    }

    @Bean
    open fun simpleUrlHandlerMapping(): SimpleUrlHandlerMapping {
        return SimpleUrlHandlerMapping(hashMapOf("/static/**" to "resourceWebHandler"))
    }

    @Bean
    open fun simpleHandlerAdapter(): SimpleHandlerAdapter {
        return SimpleHandlerAdapter()
    }

    @Bean
    open fun routerFunctionMapping(): RouterFunctionMapping {
        return RouterFunctionMapping()
    }

    @Bean
    open fun requestMappingHandlerAdapter(defaultServerCodecConfigurer: DefaultServerCodecConfigurer): RequestMappingHandlerAdapter {
        return RequestMappingHandlerAdapter().apply { messageReaders = defaultServerCodecConfigurer.readers }
    }

    @Bean
    open fun responseBodyResultHandler(defaultServerCodecConfigurer: DefaultServerCodecConfigurer): ResponseBodyResultHandler {
        return ResponseBodyResultHandler(
            defaultServerCodecConfigurer.writers, HeaderContentTypeResolver()
        )
    }

    @Bean
    open fun defaultServerCodecConfigurer(): DefaultServerCodecConfigurer {
        return DefaultServerCodecConfigurer()
    }

    private fun listProtocols(): Array<HttpProtocol> {
        return arrayOf(HttpProtocol.HTTP11, HttpProtocol.H2C)
    }

    companion object {
        private val log = Slf4kt.getLogger(NettyServerInitializer::class.java)
    }
}

DisposableServer其实没有什么用了, 可以不加入容器. 代码中包含了使用RequestMapping的初始化和static目录的静态文件服务.

其实还有很多想写的没有写出来, 等什么时候理通顺了更新.

0

评论区