一、概述
本文档详细说明 java ai对话聊天模块中聊天 SSE(Server-Sent Events)功能背后 Spring 框架的核心机制,包括:
Spring WebFlux 如何处理响应式流
核心类如何注入到 IOC 容器
不同聊天类型的对话如何通过策略模式切换
完整的请求处理生命周期
二、技术栈与依赖
2.1 核心依赖
<!-- Spring WebFlux 响应式 Web 框架 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
</dependency>
<!-- Project Reactor 响应式编程库 -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
2.2 关键注解
@SpringBootApplication: 启用自动配置和组件扫描@RestController: 标记 REST 控制器@Component: 标记组件,自动注册到 IOC 容器@RequiredArgsConstructor: Lombok 生成构造函数注入
三、Spring Boot 启动过程
3.1 启动入口
@SpringBootApplication
public class PigxKnowledgeApplication {
public static void main(String[] args) {
SpringApplication.run(PigxKnowledgeApplication.class, args);
}
}
3.2 启动阶段的关键步骤
阶段 1: 应用上下文初始化
创建 ApplicationContext
Spring Boot 创建
AnnotationConfigServletWebServerApplicationContext(如果使用 WebFlux,则是AnnotationConfigReactiveWebServerApplicationContext)但由于项目同时引入了
spring-webflux,Spring Boot 会检测并启用 WebFlux 支持
组件扫描(Component Scanning)
// Spring 扫描 @Component、@Service、@Controller、@RestController 等注解 // 扫描路径:com.pig4cloud.pigx.knowledge 及其子包扫描到的关键类:
AiChatController(@RestController)AiChatServiceImpl(@Service)SimpleChatRule(@Component("simpleChat")) // 简单聊天VectorChatRule(@Component("vectorChat")) // 知识库聊天McpChatRule(@Component("mcpChat")) // mcp聊天JsonChatRule(@Component("jsonChat")) // json聊天DatabaseChatRule(@Component("databaseChat")) // 数据库聊天等等...
阶段 2: Bean 定义注册
Spring 为每个扫描到的组件创建 BeanDefinition:
// 伪代码展示 Spring 内部处理
BeanDefinition aiChatControllerDef = new BeanDefinition();
aiChatControllerDef.setBeanClass(AiChatController.class);
aiChatControllerDef.setScope(Singleton);
// 注册到 BeanDefinitionRegistry
BeanDefinition simpleChatRuleDef = new BeanDefinition();
simpleChatRuleDef.setBeanClass(SimpleChatRule.class);
simpleChatRuleDef.setScope(Singleton);
simpleChatRuleDef.setBeanName("simpleChat"); // @Component("simpleChat")
阶段 3: Bean 实例化与依赖注入
3.3.1 ChatRule 实现类的注册
所有 ChatRule 实现类通过 @Component 注解自动注册:
@Component("simpleChat") // Bean 名称为 "simpleChat"
public class SimpleChatRule implements ChatRule { ... }
@Component("vectorChat") // Bean 名称为 "vectorChat"
public class VectorChatRule implements ChatRule { ... }
@Component("mcpChat") // Bean 名称为 "mcpChat"
public class McpChatRule implements ChatRule { ... }
3.3.2 Map<String, ChatRule> 的自动注入
Spring 的特殊机制:当构造函数参数类型为 Map<String, Interface> 时,Spring 会自动收集所有该接口的实现类,以 bean 名称作为 key 注入:
@Service
@RequiredArgsConstructor // 生成构造函数
public class AiChatServiceImpl implements ChatService {
// Spring 自动注入:收集所有 ChatRule 实现类
// key = bean 名称(@Component 的 value)
// value = ChatRule 实例
private final Map<String, ChatRule> chatRuleMap;
// 等价于:
// public AiChatServiceImpl(Map<String, ChatRule> chatRuleMap) {
// this.chatRuleMap = chatRuleMap;
// }
}
Spring 内部处理逻辑(伪代码):
// Spring 容器初始化时
Map<String, ChatRule> chatRuleMap = new HashMap<>();
// 查找所有 ChatRule 类型的 bean
for (String beanName : beanFactory.getBeanNamesForType(ChatRule.class)) {
ChatRule rule = beanFactory.getBean(beanName, ChatRule.class);
chatRuleMap.put(beanName, rule);
}
// 注入到 AiChatServiceImpl
aiChatServiceImpl.setChatRuleMap(chatRuleMap);
最终注入结果:
chatRuleMap = {
"simpleChat" -> SimpleChatRule 实例,
"vectorChat" -> VectorChatRule 实例,
"mcpChat" -> McpChatRule 实例,
"jsonChat" -> JsonChatRule 实例,
"databaseChat" -> DatabaseChatRule 实例,
...
}
3.3.3 其他核心 Bean 的注入
@Service
@RequiredArgsConstructor
public class AiChatServiceImpl implements ChatService {
// Optional 类型:如果 LiteFlow 未配置,则为空
private final Optional<FlowExecutor> flowExecutorOptional;
// MyBatis Mapper:通过 @Mapper 扫描注册
private final AiDatasetMapper datasetMapper;
private final AiChatRecordMapper recordMapper;
// Spring AI 配置的 Bean
private final ChatMemoryProvider chatMemoryProvider;
private final MessageWindowChatMemory messageWindowChatMemory;
}
阶段 4: WebFlux 自动配置
Spring Boot 检测到 spring-webflux 依赖后,自动配置:
ReactiveWebServerFactory
默认使用 Netty(如果 classpath 中有
reactor-netty)或使用 Tomcat/Undertow(如果配置了相应依赖)
WebFlux 配置类
WebFluxAutoConfiguration: 配置WebFluxConfigurerHttpHandlerAutoConfiguration: 配置 HTTP 处理器ReactiveWebServerFactoryAutoConfiguration: 配置响应式 Web 服务器
消息编解码器(Codec)
// Spring 自动注册的编解码器 - ServerCodecConfigurer: 配置 HTTP 消息编解码 - ServerSentEventHttpMessageWriter: 处理 SSE 格式 - Jackson2JsonEncoder: JSON 编码
阶段 5: 控制器映射注册
Spring 扫描 @RestController 并注册路由:
@RestController
@RequestMapping("/chat")
public class AiChatController {
@RequestMapping(value = "/msg/list",
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<AiMessageResultDTO> msg(@RequestParam Long key) {
return chatService.chatList(key);
}
}
Spring 内部注册(伪代码):
// HandlerMapping 注册
RequestMappingInfo mappingInfo = RequestMappingInfo
.paths("/chat/msg/list")
.methods(RequestMethod.POST, RequestMethod.GET)
.produces(MediaType.TEXT_EVENT_STREAM_VALUE)
.build();
HandlerMethod handlerMethod = new HandlerMethod(
aiChatController, // Controller 实例
AiChatController.class.getMethod("msg", Long.class)
);
handlerMapping.register(mappingInfo, handlerMethod);
四、请求处理生命周期
4.1 HTTP 请求到达
客户端请求: GET /chat/msg/list?key=123
Accept: text/event-stream
4.2 WebFlux 请求处理链
步骤 1: Netty 接收请求
// Netty 接收 HTTP 请求
HttpServerRequest request = ...;
HttpServerResponse response = ...;
步骤 2: 路由匹配
// DispatcherHandler 查找匹配的 Handler
HandlerMapping handlerMapping = ...;
HandlerMethod handlerMethod = handlerMapping.getHandler(request);
// 匹配到: AiChatController.msg()
步骤 3: 参数解析
// HandlerMethodArgumentResolver 解析参数
@RequestParam Long key -> 从请求参数中提取 key = 123
步骤 4: 控制器方法调用
// 调用控制器方法
Flux<AiMessageResultDTO> result = aiChatController.msg(123L);
调用链:
AiChatController.msg(123L)
-> chatService.chatList(123L) // AiChatServiceImpl
-> 查询数据库获取聊天记录
-> 风险检测(可选)
-> chatRuleMap.get(chatType).process(chatMessageDTO)
-> SimpleChatRule.process() / VectorChatRule.process() / ...
-> 调用 AI 模型 API
-> 返回 Flux<AiMessageResultDTO>
4.3 Flux 流的处理
4.3.1 Flux 创建
// SimpleChatRule.process() 示例
public Flux<AiMessageResultDTO> process(ChatMessageDTO chatMessageDTO) {
// 1. 获取 AI 流式助手
AiStreamAssistantService assistant = modelProvider
.getAiStreamAssistant(chatMessageDTO.getModelName())
.getValue();
// 2. 调用 AI 模型,返回 TokenStream
TokenStream tokenStream = assistant.chatTokenStream(...);
// 3. 创建 Flux
return Flux.create(emitter -> {
tokenStream
.onPartialResponse(partialResponse -> {
AiMessageResultDTO result = new AiMessageResultDTO();
result.setMessage(partialResponse);
emitter.next(result); // 推送数据到流
})
.onCompleteResponse(completeResponse -> {
emitter.complete(); // 完成流
})
.onError(error -> {
emitter.error(error); // 错误处理
})
.start();
});
}
4.3.2 Flux 转换为 SSE
Spring WebFlux 的自动转换机制:
检测返回类型
// Spring 检测到方法返回类型为 Flux ReturnValueHandler returnValueHandler = ...; // 使用 ResponseBodyResultHandler检测 Content-Type
// produces = MediaType.TEXT_EVENT_STREAM_VALUE // Spring 识别为 SSE 格式选择消息写入器
// ServerSentEventHttpMessageWriter // 负责将 Flux 转换为 SSE 格式SSE 格式转换
SSE 协议格式:
data: {"message":"Hello","finish":false} data: {"message":"World","finish":false} data: {"message":"","finish":true}Spring 内部转换(伪代码):
ServerSentEventHttpMessageWriter writer = ...; Flux<AiMessageResultDTO> flux = ...; // 将每个元素转换为 SSE 事件 Flux<ServerSentEvent<AiMessageResultDTO>> sseFlux = flux.map(data -> { return ServerSentEvent.builder() .data(data) .build(); }); // 写入 HTTP 响应 response.writeWith(sseFlux.map(event -> { // 格式化为 SSE 文本格式 String sseText = formatAsSSE(event); return dataBufferFactory.wrap(sseText.getBytes()); }));
4.3.3 流式传输
// 响应式流式传输
Flux<DataBuffer> responseFlux = flux
.map(dto -> {
// 序列化为 JSON
String json = objectMapper.writeValueAsString(dto);
// 格式化为 SSE 格式
String sse = "data: " + json + "\n\n";
// 转换为 DataBuffer
return dataBufferFactory.wrap(sse.getBytes());
})
.doOnNext(buffer -> {
// 立即写入响应(非阻塞)
response.writeAndFlush(buffer);
});
关键特性:
非阻塞: 使用 Netty 的异步 I/O
背压(Backpressure): Reactor 自动处理流控
流式传输: 数据产生即发送,无需等待全部完成
4.4 完整请求流程图
┌─────────────────────────────────────────────────────────────┐
│ 1. 客户端请求 │
│ GET /chat/msg/list?key=123 │
│ Accept: text/event-stream │
└───────────────────────┬─────────────────────────────────────┘
│
↓
┌─────────────────────────────────────────────────────────────┐
│ 2. Netty 接收请求 │
│ - HttpServerRequest │
│ - HttpServerResponse │
└───────────────────────┬─────────────────────────────────────┘
│
↓
┌─────────────────────────────────────────────────────────────┐
│ 3. DispatcherHandler 路由匹配 │
│ - HandlerMapping.getHandler() │
│ - 匹配到: AiChatController.msg() │
└───────────────────────┬─────────────────────────────────────┘
│
↓
┌─────────────────────────────────────────────────────────────┐
│ 4. 参数解析 │
│ - @RequestParam Long key = 123 │
└───────────────────────┬─────────────────────────────────────┘
│
↓
┌─────────────────────────────────────────────────────────────┐
│ 5. 控制器方法调用 │
│ aiChatController.msg(123L) │
│ ↓ │
│ chatService.chatList(123L) │
│ ↓ │
│ - 查询数据库 │
│ - 风险检测(可选) │
│ - chatRuleMap.get(type).process() │
│ ↓ │
│ - 调用 AI 模型 API │
│ - 返回 Flux<AiMessageResultDTO> │
└───────────────────────┬─────────────────────────────────────┘
│
↓
┌─────────────────────────────────────────────────────────────┐
│ 6. ResponseBodyResultHandler 处理返回值 │
│ - 检测返回类型: Flux<AiMessageResultDTO> │
│ - 检测 Content-Type: text/event-stream │
│ - 选择: ServerSentEventHttpMessageWriter │
└───────────────────────┬─────────────────────────────────────┘
│
↓
┌─────────────────────────────────────────────────────────────┐
│ 7. SSE 格式转换 │
│ Flux<AiMessageResultDTO> │
│ ↓ map() │
│ Flux<ServerSentEvent<AiMessageResultDTO>> │
│ ↓ map() │
│ Flux<DataBuffer> (SSE 格式文本) │
└───────────────────────┬─────────────────────────────────────┘
│
↓
┌─────────────────────────────────────────────────────────────┐
│ 8. 流式写入响应 │
│ - 每个元素立即写入(非阻塞) │
│ - 格式: data: {...}\n\n │
│ - 客户端实时接收 │
└───────────────────────┬─────────────────────────────────────┘
│
↓
┌─────────────────────────────────────────────────────────────┐
│ 9. 客户端接收 SSE 流 │
│ - EventSource API │
│ - 实时显示 AI 响应 │
└─────────────────────────────────────────────────────────────┘
五、核心类注入到 IOC 容器的完整过程
5.1 组件扫描阶段
// Spring Boot 启动时
@ComponentScan(basePackages = "com.pig4cloud.pigx.knowledge")
扫描到的类:
Controller 层
@RestController // -> Bean: aiChatController public class AiChatController { ... }Service 层
@Service // -> Bean: aiChatServiceImpl public class AiChatServiceImpl implements ChatService { ... }Rule 层(策略模式)
@Component("simpleChat") // -> Bean: simpleChat public class SimpleChatRule implements ChatRule { ... } @Component("vectorChat") // -> Bean: vectorChat public class VectorChatRule implements ChatRule { ... } @Component("mcpChat") // -> Bean: mcpChat public class McpChatRule implements ChatRule { ... }
5.2 Bean 定义注册
// Spring 内部处理(简化版)
BeanDefinitionRegistry registry = ...;
// 1. 注册 Controller
registry.registerBeanDefinition("aiChatController",
new RootBeanDefinition(AiChatController.class));
// 2. 注册 Service
registry.registerBeanDefinition("aiChatServiceImpl",
new RootBeanDefinition(AiChatServiceImpl.class));
// 3. 注册所有 ChatRule 实现
registry.registerBeanDefinition("simpleChat",
new RootBeanDefinition(SimpleChatRule.class));
registry.registerBeanDefinition("vectorChat",
new RootBeanDefinition(VectorChatRule.class));
// ... 其他 Rule
5.3 Bean 实例化顺序
依赖关系:
AiChatController
└─> ChatService (AiChatServiceImpl)
├─> Map<String, ChatRule> (需要所有 ChatRule 实例)
├─> AiChatRecordMapper
├─> AiDatasetMapper
└─> ChatMemoryProvider
实例化顺序:
第一步:实例化 ChatRule 实现类
// Spring 实例化所有 ChatRule(无依赖或依赖已满足) SimpleChatRule simpleChatRule = new SimpleChatRule(...); VectorChatRule vectorChatRule = new VectorChatRule(...); McpChatRule mcpChatRule = new McpChatRule(...); // ... 注册到容器第二步:收集 ChatRule 到 Map
// Spring 自动收集(在注入 AiChatServiceImpl 时) Map<String, ChatRule> chatRuleMap = new HashMap<>(); chatRuleMap.put("simpleChat", simpleChatRule); chatRuleMap.put("vectorChat", vectorChatRule); chatRuleMap.put("mcpChat", mcpChatRule); // ...第三步:实例化 Service
// Spring 调用构造函数注入 AiChatServiceImpl service = new AiChatServiceImpl( flowExecutorOptional, // Optional<FlowExecutor> chatRuleMap, // Map<String, ChatRule> - 自动收集 datasetMapper, // MyBatis Mapper recordMapper, // MyBatis Mapper chatMemoryProvider, // ChatMemoryProvider messageWindowChatMemory // MessageWindowChatMemory );第四步:实例化 Controller
// Spring 调用构造函数注入 AiChatController controller = new AiChatController( chatService, // AiChatServiceImpl generateService, // AiGenerateService chartGenerateService // AiChartGenerateService );
5.4 Map<String, ChatRule> 注入机制详解
Spring 的特殊注入规则:
当构造函数参数类型为 Map<String, Interface> 时:
查找所有实现该接口的 Bean
// Spring 内部逻辑(简化) Class<?> interfaceType = ChatRule.class; Map<String, ChatRule> beans = new LinkedHashMap<>(); // 获取所有 ChatRule 类型的 bean 名称 String[] beanNames = beanFactory.getBeanNamesForType(interfaceType); for (String beanName : beanNames) { ChatRule bean = beanFactory.getBean(beanName, interfaceType); beans.put(beanName, bean); // key = bean 名称 }Bean 名称来源
@Component("simpleChat")-> key = "simpleChat"@Component("vectorChat")-> key = "vectorChat"如果没有指定名称,使用类名首字母小写:
SimpleChatRule-> "simpleChatRule"
注入到目标 Bean
// Spring 调用构造函数时传入收集好的 Map public AiChatServiceImpl(Map<String, ChatRule> chatRuleMap) { this.chatRuleMap = chatRuleMap; }
六、启动时的关键初始化
6.1 应用上下文刷新
// SpringApplication.run() 内部
ConfigurableApplicationContext context = ...;
context.refresh(); // 关键方法
refresh() 执行流程:
prepareRefresh(): 准备刷新
obtainFreshBeanFactory(): 获取 BeanFactory
prepareBeanFactory(): 准备 BeanFactory
postProcessBeanFactory(): 后处理 BeanFactory
invokeBeanFactoryPostProcessors(): 执行 BeanFactoryPostProcessor
registerBeanPostProcessors(): 注册 BeanPostProcessor
initMessageSource(): 初始化消息源
initApplicationEventMulticaster(): 初始化事件广播器
onRefresh(): 子类刷新(Web 应用创建 WebServer)
registerListeners(): 注册监听器
finishBeanFactoryInitialization(): 完成 Bean 初始化 ⭐
finishRefresh(): 完成刷新
6.2 WebFlux 服务器启动
在 onRefresh() 阶段:
// ReactiveWebServerApplicationContext.onRefresh()
@Override
protected void onRefresh() {
super.onRefresh();
try {
createWebServer(); // 创建 Netty 服务器
} catch (Throwable ex) {
throw new ApplicationContextException(...);
}
}
protected void createWebServer() {
// 获取 ReactiveWebServerFactory(Netty)
ReactiveWebServerFactory factory = getWebServerFactory();
// 创建 HttpHandler(处理 HTTP 请求)
this.webServer = factory.getWebServer(getHttpHandler());
// 启动服务器
this.webServer.start();
}
Netty 服务器启动:
// ReactorNettyWebServer.start()
@Override
public void start() throws WebServerException {
if (this.nettyContext == null) {
this.nettyContext = startHttpServer();
}
}
private NettyContext startHttpServer() {
HttpServer server = HttpServer.create()
.port(this.port)
.handle(this.httpHandler); // WebFlux 的 HttpHandler
return server.bindNow(); // 绑定端口,开始监听
}
6.3 HandlerMapping 注册
在 finishBeanFactoryInitialization() 阶段:
// RequestMappingHandlerMapping 扫描 @RequestMapping
@Override
protected void initHandlerMethods() {
for (String beanName : getCandidateBeanNames()) {
if (isHandler(getBeanType(beanName))) {
detectHandlerMethods(beanName); // 检测处理方法
}
}
}
// 检测到 AiChatController
protected void detectHandlerMethods(Object handler) {
Class<?> handlerType = handler.getClass();
// 扫描所有方法
Map<Method, RequestMappingInfo> methods = MethodIntrospector
.selectMethods(handlerType,
(Method method) -> {
RequestMapping requestMapping =
AnnotatedElementUtils.findMergedAnnotation(
method, RequestMapping.class);
return requestMapping != null ?
createRequestMappingInfo(requestMapping) : null;
});
// 注册方法映射
methods.forEach((method, mapping) -> {
Method invocableMethod = AopUtils.selectInvocableMethod(
method, handlerType);
registerHandlerMethod(handler, invocableMethod, mapping);
});
}
注册结果:
// HandlerMapping 内部存储
Map<RequestMappingInfo, HandlerMethod> handlerMethods = {
RequestMappingInfo(
paths = ["/chat/msg/list"],
methods = [GET, POST],
produces = ["text/event-stream"]
) -> HandlerMethod(
bean = aiChatController,
method = AiChatController.msg(Long)
)
}
七、运行时请求处理详细流程
7.1 请求到达 Netty
// Netty 接收连接
Channel channel = ...;
HttpServerRequest request = ...;
HttpServerResponse response = ...;
// 转换为 Spring WebFlux 的 ServerHttpRequest
ServerHttpRequest serverRequest = new ReactorServerHttpRequest(request, bufferFactory);
ServerHttpResponse serverResponse = new ReactorServerHttpResponse(response, bufferFactory);
// 创建 ServerWebExchange
ServerWebExchange exchange = new DefaultServerWebExchange(
serverRequest, serverResponse, handlerMapping, ...);
7.2 DispatcherHandler 处理
// DispatcherHandler.handle()
public Mono<Void> handle(ServerWebExchange exchange) {
return Flux.fromIterable(this.handlerMappings)
.concatMap(mapping -> mapping.getHandler(exchange))
.next()
.switchIfEmpty(createNotFoundError())
.flatMap(handler -> invokeHandler(exchange, handler))
.flatMap(result -> handleResult(exchange, result));
}
步骤分解:
查找 Handler
// RequestMappingHandlerMapping.getHandler() HandlerMethod handlerMethod = this.mappingRegistry .getMappingsByUrl(exchange.getRequest().getPath().value()) .stream() .filter(mapping -> mapping.matches(exchange)) .findFirst() .map(mapping -> mapping.getHandlerMethod()) .orElse(null); // 找到: AiChatController.msg()调用 Handler
// InvocableHandlerMethod.invoke() Object[] args = resolveHandlerMethodArguments( handlerMethod, exchange, ...); // 解析参数: @RequestParam Long key = 123 Object result = handlerMethod.invoke(aiChatController, args); // 返回: Flux<AiMessageResultDTO>处理返回值
// ResponseBodyResultHandler.handleResult() if (result instanceof Publisher) { // 检测 Content-Type MediaType mediaType = exchange.getResponse() .getHeaders() .getContentType(); if (mediaType.equals(MediaType.TEXT_EVENT_STREAM)) { // 使用 ServerSentEventHttpMessageWriter return writeWithMessageWriter( exchange, result, // Flux<AiMessageResultDTO> ServerSentEventHttpMessageWriter.class ); } }
7.3 SSE 消息写入
// ServerSentEventHttpMessageWriter.write()
public Mono<Void> write(Publisher<?> inputStream,
ResolvableType elementType,
MediaType mediaType,
ServerHttpResponse response,
Map<String, Object> hints) {
// 设置响应头
response.getHeaders().setContentType(
new MediaType("text", "event-stream", StandardCharsets.UTF_8));
response.getHeaders().set("Cache-Control", "no-cache");
response.getHeaders().set("Connection", "keep-alive");
// 转换 Flux
Flux<DataBuffer> body = Flux.from(inputStream)
.map(data -> {
// 序列化为 JSON
String json = objectMapper.writeValueAsString(data);
// 格式化为 SSE 格式
StringBuilder sb = new StringBuilder();
sb.append("data: ").append(json).append("\n\n");
// 转换为 DataBuffer
return response.bufferFactory()
.wrap(sb.toString().getBytes(StandardCharsets.UTF_8));
});
// 写入响应(流式)
return response.writeWith(body);
}
7.4 流式传输到客户端
// Netty 异步写入
response.writeAndFlush(dataBuffer)
.addListener(future -> {
if (future.isSuccess()) {
// 继续写入下一个数据
} else {
// 处理错误
}
});
客户端接收:
// 浏览器端
const eventSource = new EventSource('/chat/msg/list?key=123');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
// 实时显示: data.message
console.log(data.message);
};
八、关键配置类与自动配置
8.1 WebFlux 自动配置
配置类:WebFluxAutoConfiguration
@Configuration
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
@AutoConfigureAfter({ReactiveWebServerFactoryAutoConfiguration.class})
public class WebFluxAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public DispatcherHandler webHandler(ApplicationContext applicationContext) {
return new DispatcherHandler(applicationContext);
}
@Bean
@ConditionalOnMissingBean
public RequestMappingHandlerMapping requestMappingHandlerMapping(...) {
return new RequestMappingHandlerMapping(...);
}
@Bean
@ConditionalOnMissingBean
public ResponseBodyResultHandler responseBodyResultHandler(...) {
return new ResponseBodyResultHandler(...);
}
}
8.2 消息编解码器配置
配置类:WebFluxCodecAutoConfiguration
@Configuration
@ConditionalOnClass({CodecConfigurer.class})
public class WebFluxCodecAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public ServerCodecConfigurer serverCodecConfigurer() {
ServerCodecConfigurer configurer = new DefaultServerCodecConfigurer();
// 注册 SSE 消息写入器
configurer.writers().add(
new ServerSentEventHttpMessageWriter(
new Jackson2JsonEncoder()
)
);
return configurer;
}
}
九、总结
9.1 核心机制总结
组件扫描与注册
Spring Boot 启动时扫描所有
@Component注解的类自动注册到 IOC 容器
Map 注入机制
Spring 自动收集接口的所有实现类
以 bean 名称作为 key 注入到
Map<String, Interface>
WebFlux 响应式处理
使用 Netty 作为底层服务器
非阻塞异步处理请求
Flux 转 SSE
Spring 自动检测
produces = TEXT_EVENT_STREAM使用
ServerSentEventHttpMessageWriter转换格式流式传输,实时推送数据
9.2 完整生命周期时间线
启动阶段:
T0: SpringApplication.run()
T1: 创建 ApplicationContext
T2: 组件扫描 (@Component, @Service, @RestController)
T3: Bean 定义注册
T4: Bean 实例化(按依赖顺序)
T5: 依赖注入(包括 Map<String, ChatRule>)
T6: WebFlux 自动配置
T7: Netty 服务器启动
T8: HandlerMapping 注册
T9: 应用就绪
运行时:
T10: HTTP 请求到达 Netty
T11: DispatcherHandler 路由匹配
T12: 参数解析
T13: 控制器方法调用
T14: Service 层处理
T15: ChatRule 策略执行
T16: AI 模型 API 调用
T17: Flux 流创建
T18: SSE 格式转换
T19: 流式写入响应
T20: 客户端实时接收
9.3 关键技术点
响应式编程: Project Reactor (Flux/Mono)
非阻塞 I/O: Netty
流式传输: Server-Sent Events (SSE)
依赖注入: Spring IOC 容器
策略模式: ChatRule 接口 + 多个实现
自动配置: Spring Boot AutoConfiguration
十、参考资料
Spring WebFlux 官方文档
Project Reactor 官方文档
Server-Sent Events (SSE) 规范
Spring Boot 自动配置原理