玖玖资源网-专注每日资源分享,活动线报,技术教程,网站源码,汇聚全网最新最热的资源网平台

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

admin 值得一看

Stream源码解析

Spring Cloud Stream(简称SCS)提供了一系列预先定义的注解来声明输入型和输出型Channel,业务系统基于这些Channel与消息中间件进行通信,而不是直接与具体的消息中间件进行通信。跟踪SCS的源码就会发现,Stream有很多外部依赖,最主要的就是Messaging和Integration两个项目,所以在讲解SCS源码前,有必要先介绍一下Messaging和Integration与SCS体系的关系。

SCS的目标是建立一套统一的基于注解的消息发送机制,屏蔽开发人员直接与底层消息系统进行细节交互,而Messaging模块正是Spring框架中用来做统一消息编程模型的,在Messaging中最关键的数据结构是Message,代码如下:

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

在Messaging模块中消息通道MessageChannel是一个接口类,用于发送Message消息,可以理解为Messaging模块中的标准接口,类似于J2EE中的Servlet接口,具体实现类可以实现具体消息通道。下面是MessageChannel的代码:

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

在Messaging模块中,消息通道的子接口SubscribableChannel继承了MessageHandler消息处理器:

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

由MessageHandler真正地消费/处理消息:

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

Integration基于Spring框架可以实现轻量级的消息传递,也是对Messaging的扩展实现,支持通过声明适配器与SCS集成。它实现了消息 过 滤 、 消 息 转 换 、 消 息 聚 合 和 消 息 分 割 等 功 能 , 提 供 了 对MessageChannel 和 MessageHandler 的 实 现 , 包 括 DirectChannel 、ExecutorChannel、PublishSubscribeChannel,以及MessageFilter、ServiceActivatingHandler、MethodInvokingSplitter等。下面介绍Integration 中 的 两 种 消 息 分 发 器 : DirectChannel 和PublishSubscribeChannel。

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

从代码可知,DirectChannel内部的UnicastingDispatcher类型分发器会发到对应消息通道的MessageChannel中,从名字也可以看出来,UnicastingDispatcher是一个单播的分发器,只能选择一个消息通道。而PublishSubscribeChannel使用BroadcastingDispatcher作为广播消息分发器,会把消息分发给所有的MessageHandler。

SCS在Integration的集成上进行了封装,通过注解的方式和统一的API进行消息的发送和消费,底层消息中间件的实现细节由各个消息中间件的Binder完成,同时,通过与Spring Boot的ExternalizedConfiguration整合,SCS提供了BindingProperties等外部化配置类,这些具体的配置信息将绑定到具体的消息中间件的配置类中。

SCS的架构流程图

下面是SCS的架构流程图,我们会从几个层次分别讲解其中相关联的源码和它们之间的交互关系。

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

应用层

SCS为用户提供了三个绑定消息通道的默认实现。

● Sink:通过指定消费消息的目标来标识消息消费者。

● Source:与Sink相反,用于标识消息生产者。

● Processor:集成了Sink和Source的功能,用于标识消息生产者和消费者。

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

对 应 用 而 言 , 想 要 启 动 SCS 的 功 能 , 需 要 先 启 动 注 解 。

@EnableBinding注解是Stream框架运转的起点,通过这个注解可以实现动态注册BeanDefinition,它会将消息通道绑定到自己修饰的目标实例上,从而让这些实例具备与消息队列进行交互的能力。下面我们看源码:

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

● BindingServiceConfiguration的 作 用 是 完 成BindingService、InputBindingLifecycle、OutputBindingLifecycle等重要Bean的初始化及相关配置文件加载。

● BindingBeansRegistrar的作用是注册声明通道的接口类的BeanDefinition,从而获取这些接口类的实例,并使用这些实例进行消息的发送和接收,具体代码实现如下:

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

registerBindingTargetBeanDefinitions方法会调用ReflectionUtils类完成扫描所有被注解@Input和@Output标注了的方法,然后注册BeanDefinition。下面是代码示例:

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

registerBindingTargetsQualifiedBeanDefinitions 是 在 注 册registerBindingTargetBeanDefinitions 时 使 用 的 工 厂 类BeanDefinition,这个工厂类用来生成registerBindingTargetBeanDefinition注册的Bean实例,如下所示:

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

Stream层

Stream 层 的 BindableProxyFactory 被 初 始 化 为 一 个rootBeanDefinition,并注册为一个FactoryBean,这样Spring容器就可 以 获 得 registerBindingTargetBeanDefinitions 方 法 中 所 注 册 的Bean实例(MessageChannel对象实例)。BindableProxyFactory可以说是SCS实现通道接口类声明及相关类型的核心类,代码如下:

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

afterPropertiesSet方法会处理所有被@Input和@Output注解的函数 , 并 将 生 成 函 数 返 回 类 型 实 例 存 储 在 BoundTargetHolder 中 ,getBindingTargetName方法会返回SubscribableChannelBindingTargetFactory 实 例 , 它 会 在createOutput方法中返回一个DirectChannel实例,该实例会被存储起来供BindableProxyFactory使用。

名称为output的BeanDefinition将BindableProxyFactory设置成其实例工厂类,并将outputMessagefunction方法设置成其实例的工厂函数(BeanFactoryMethod)。当Spring容器创建该实例时,会调用BindableProxyFactory 的 outputMessagefunction 方 法 , 由 于BindableProxyFactory实现了Methodlnterceptor接口,所以就调用了其invoke方法。invoke方法会从BindableProxyFactory缓存的Channel实例中匹配符合的实例方法,并反射调用。

BindingService是Stream层获取绑定器和执行绑定任务的一个重要类,首先我们看BindingService的bindProducer方法,代码如下:

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

在 BindingService 实 现 中 , getBinder 方 法 最 终 会 调 用DefaultBinderFactory中的getBinder方法实现,我们可以看到,DefaultBinderFactory的作用就是获取具体的Binder实现并提供给相应的MessageChannel实例。DefaultBinderFactory的初始化依赖于BinderTypeRegistry获得的BinderType列表。DefaultBinderFactory的getBinder实现中会调用BinderConfiguration获取对应的Binder实例 , 通 过 跟 踪 BinderConfiguration 的 初 始 化 过 程 , 可 以 发 现BinderConfiguration 是 在 BinderFactoryConfiguration 执 行getBinderConfiguration方法时将bindingServiceProperties变量中的BinderProperties与BinderTypeRegistry中的BinderType结合,封装成BinderConfiguration对象。BinderProperties封装了Stream从application.yml文件中读取的关于Binder的配置信息,而BinderType则 是 具 体 Binder 的 实 现 类 信 息 。 DefaultBinderFactory 的getBinderInstance实现如下:

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

这 里 的 getBinderInstance 方 法 中 会 生 成 一 个ConfigurableApplicationContext 来 创 建 Binder 实 例 , 在 创 建ConfigurableApplicationContext实例时,它会将BinderConfiguration设置到SpringApplicationBuilder中。

ConfigurableApplicationContext调用getBinder方法时,会使用BinderConfiguration的属性和配置生成BinderConfiguration中设置的具体类型的Binder实现。如果你使用的Binder是RabbitMQ,那么对应 的 RabbitServiceAutoConfiguration 会 自 动 初 始 化 并 加 载RabbitMessageChannelBinder实例。

在 Stream 层 对 Binder 实 例 的 初 始 化 工 作 都 完 成 后 , 再 回 到BindingService 的 bindProducer 方 法 实 现 , 它 会 调 用AbstractMessagChannlBinder 的 doBindProducer 方 法 , 关 键 代 码 如下:

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

从源码可知,ProvisioningProvider是一个接口,不同的Binder实 现 可 以 根 据 接 口 实 现 各 自 不 同 的 ProducerDestination 和ConsumerDestination,代码如下:

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

doBindProducer会调用createProducerMessageHandler方法创建MessageHandler实例,MessageChannel会使用SendingHandler封装后的MessageHandler实例,当有output消息时,将消息发送给最终的Binder实例。

通过上面的步骤,基本上在Stream层就完成了对生产者的绑定操作,消费者的绑定就是将SubscribableChannel与具体的消息队列实现连接,doBindConsumer与doBindProducer流程类似。

首先通过ProvisioningProvider的provisionConsumerDestination方法创建ConsumerDestination,然后调用createConsumerEndpoint方法创建MessageProducer实例,最后生成DefaultBinding实例,代码如下:

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

Message/Integrate/消息中间件Binder层

从@Output注解可以看到,Stream框架会使用MessageChannel发送消 息 。 通 过 BindingService 的 doBindProducer 方 法 创 建 并 绑 定SendingHandler对象,然后调用handleMessageInternal方法,它会将消息再发送给delegate对象处理。下面是SendingHandler对象的handleMessageInternal方法的代码实现:

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

delegate是之前在BindingServer中抽象类AbstractMessageChannelBinder执行的createProducerMessageHandler方法返回的生产者MessageHandler实例。对于RabbitMQ Binder来说,就是rmqpOutboundEndpoint对象,该实 例 将 最 终 调 用 其 handlerMessage 方 法 , 该 方 法 进 一 步 调 用RabbitTemplate的send方法。消息发送流程如下图所示。

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

消息的接收过程

消息的接收过程可以分为两个阶段:第一个阶段是从RabbitMQ到SubscribableChannel的过程。我们从@Input注解可以看到,Stream框架 会 使 用 SubscribableChannel 接 收 消 息 。 第 二 个 阶 段 是 注 解@StreamListener告诉SubscribableChannel如何将消息发送给对应的Sink接收端对应的回调方法。

Spring的RabbitMQ使用InternalConsumer作为默认的消息消费方,当接收到对应消息后,会调用handleDelivery方法将RabbitMQ消息发送给BlockingQueueConsumer中的队列。下面是handleDelivery的源码实现。

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

AsyncMessageProcessingConsumer类是Runnable类型的,它会消费 阻 塞 队 列 , 并 将 消 息 传 给 AmqpInboundChannelAdapter 。

AmqpInboundChannelAdapter 实 例 是 在 BindingService 构 造createConsumerEndpoint时创建的consumerEndpoint,并将它与对应的Channel绑定。下面是AmqpInboundChannelAdapter的关键代码,即processMessage方法,它会调用MessagingTemplate对象的send方法将消息发送给SubscribableChannel模块。

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

下面就是消息处理的第二个阶段,就是将SubscribableChannel中的 消 息 发 送 给 指 定 的 方 法 , 主 要 靠 @StreamListener 注 解 实 现 。

@StreamListener是注释在消费方法上的注解,用来接收输入型通道的消 息 , Stream 定 义 了 StreamListenerAnnotationBeanPostProcessor类,用来处理项目中的@SteamListener注解。

StreamListenerAnnotationBeanPostProcessor实现了BeanPostProcessor接口,用来在Bean初始化之前和之后两个时间点对Bean实例进行处理。

postProcessAfterlnitialization是在Bean实例初始化之后被调用 的 方 法 , 它 会 遍 历 Bean 实 例 中 的 所 有 函 数 , 处 理 那 些 被@StreamListener注解修饰的函数。

afterSingletonsInstantiated方法会遍历mappedListenerMethods 对 应 的 所 有 Entry 对 象 , 为 每 一 个StreamListenerHandlerMethodMapping 创 建 一 个 MessageHandler 实例。然后根据条件生成DispatchingStreamListenerMessageHandler并注册给SubscribableChannel。

下 面 是 StreamListenerAnnotationBeanPostProcessor 的 代 码 实现:

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

当 SubscribableChannel 接 收 到 消 息 后 , 会 调 用DispatchingStreamListenerMessageHandler类的handleRequestMessage方法,该方法会调用ConditionalStreamListenerHandler的handleMessage方法。

findMatchingHandlers方法根据ConditionalStreamListenerHandler 的 Expression 实 例 来 判 断ConditionalStreamListenerHandler是否适合处理当前这个消息,最终消息经过InvocableHandlerMethod传递给对应的函数。SCS消费消息的整体流程如下图所示。

不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

本文给大家讲解的内容是MOM异步通信,Stream源码解析

标签: 暂无标签

免责声明:

本站提供的资源,都来自网络,版权争议与本站无关,所有内容及软件的文章仅限用于学习和研究目的。不得将上述内容用于商业或者非法用途,否则,一切后果请用户自负,我们不保证内容的长久可用性,通过使用本站内容随之而来的风险与本站无关,您必须在下载后的24个小时之内,从您的电脑/手机中彻底删除上述内容。如果您喜欢该程序,请支持正版软件,购买注册,得到更好的正版服务。侵删请致信E-mail:2848481868@qq.com

同类推荐
评论列表