到目前为止,我一直专注于如何让消息进出消息代理,也就是RabbitMQ。
实际上,我们可以继续使用
RabbitMQ
和它的 Exchanges 来连接这个应用程序的其他部分,但是我想探索一个稍微不同的模型:我想使用协调器来跟踪哪些类型的消费者得到消息通知。
这样的话,我断开了传感器数据生成器和数据使用者之间的连接。
同时为了处理这些数据通信,我决定使用
事件(
event
)
来通知用户系统中正在发生的事情,并让他们决定是否要处理数据。
其原理大致如下:
- 在协调器内部,我们有构建好的
QueueListener
。
- 我还需要构建另外一个类型,我叫它
EventAggregator
。
- 来自
RabbitMQ
的消息,它将通过一个异步的
goroutine
进入
QueueListener
- goroutine
将把消息传输到一个
事件对象(
event object
)
中,并通过
事件聚合对象(
event aggregation object
)
进行广播。
- 该对象将维护任何对事件感兴趣的使用者的注册表,并向其发送事件对象的副本。
- 这使我们能够通过将数据转储到下游
的
Queue 来为这些事件注册其他应用程序,但它也
可以让
使用者能够在协调器内部
进行
设置,例如日志系统。
- 最后,如果使用者最终要通过
Queue
将数据发送到另一个应用程序,则可以对其进行预处理,以添加有用的附加数据,而最终使用者不必知道这些附加信息是如何到达那里的。
编写代码
创建
EventAggregator
在
coordinator
目录下添加
eventaggregator.go
,代码如下:
- 第
28
行,建立
EventData struct
,目前它的字段碰巧和
SensorMessage
是一样的,但是两个
struct
的职责不同,所以我们不复用
SensorMessage
,而是单独建立
EventData
,以便它们以后可以独立的进化;
- 第
5
行,建立了
EventAggregator struct
,也就是事件聚合,它只有一个
listeners
字段,是一个
map
,它的
key
是事件的名称,它的值是回调函数的集合。当事件发生的时候,
EventAggregator
就轮流调用为该事件注册的回调函数;
- 第
9
行,就是
EventAggregator
的构造函数;
- 第
16
行,
AddListener
方法,使用者通过该方法可以向
EventAggregator
注册回调函数;
- 第
20
行,
PublishEvent
方法用来发布事件。它接收事件名称和事件的数据作为参数。这里需要判断
EventAggregator
里是否已经注册了该事件,如果注册了,那么遍历其对应的回调函数,并使用事件数据进行调用。
- 调用回调函数时,使用的不是
EventData
的指针,而是
EventData
的副本,这可以保证使用者不会把事件数据搞乱,影响其它使用者
- 取消订阅的功能我就不做了。
把
EventAggregator
连接到
QueueListener
打开
queuelistener.go
,添加代码:
- 第
19
行,在
QueueListener struct
里面添加字段
ea
,类型是
*EventAggregator
;
- 第
25
行,在
QueueListener
的构造函数里为
ea
自读赋初始值。
在
AddListener
方法里,原来只是把原始数据打印到控制台。现在添加如下代码:
- 创建一个
EventData
,其字段内容目前和传感器的消息内容一样;
- 使用
QueueListener
上的
EventAggregator
发布事件:
- 事件的名称是
MessageReceived_
传感器名称
- 第二个参数就是事件数据
发现早已运行的传感器
最后我们要做的就是如何让协调器发现在协调器上线前就已经在运行的传感器。
目前我们的做法是这样的:首先协调器先运行,然后传感器在上线的时候立即把它们的数据
Queue
发送过去,使用的是
Fanout Exchange
,这样多个协调器都可以被通知到。
但是,如果传感器先运行,协调器后运行,那么协调器就无法知道传感器的存在,为了解决这个问题,我这样做:
- 我在消息代理中也就是
RabbitMQ
里,建立一个新的
Exchange
,它是一个
Fanout Exchange
,它和其它信息流的方向正好相反。
- 在这里,协调器将会向这个
Fanout Exchange
发出一个“发现”请求,这个信息将会发送给所有的传感器。
- 传感器接收到这个“发现”请