JunJiang江骏 bio photo

JunJiang江骏

Staff Engineer @ Ant Group. Machine Learning GDE(Google Developer Expert). Focus on machine learning platform and training framework optimization.

Twitter Google+ LinkedIn Instagram Github Weibo

Nova RPC

tags: OpenStack Nova

继之前介绍 oslo.messaging 笔记,这次专门来看看 Nova 里 RPC 的使用和实现。

Overview

每个 Nova 服务都会在初始化的时候创建 2 个 queue,一个是以 NODE-TYPE.NODE-ID (比如 compute.hostname) 为 key 来接受消息,另一个是以总称 NODE-TYPE (for example compute) 为 key 来接受消息。前者,是当 Nova-API 需要将命令进行重定向的时候用的,比如 euca-terminate instance 这个命令只有传递给运行着指定 vm 的 compute 节点才能生效。

当 RPC 是 ‘请求/相应’ 模式(call)的时候,API 的行为就是一个 ‘消费者(consumer)’,其它情况下,所有的 API 行为都是一个 Publisher。

一个 RabbitMQ 节点就是一个 message broker(中间人)。每一个 Nova 组件都连接到一个 message broker,然后根据它们自己的属性 (比如是 compute node 或是 network node),就可以使用对应的 queue,这样它自己就变成了一个 Invoker(调用者, 比如 API 或者 Scheduler) 或者是一个 Worker(真正做事的人,比如 Compute 或者 Network)。

在 Nova 的对象里,是没有 Invoker 和 Worker 的,这两个名字只是为了方便理解。

  • Invoker: 就是一个在消息队列系统中发消息的组件。通过 rpc.call 或 rpc.cast 发送。
  • Worker: 就是一个从消息队列系统中接受消息的组件,如果消息是 rpc.call 发出的,它然后还要回复这个消息。

Topic Publisher

生命周期从 rpc.call 或 rpc.cast 开始。它被生成用来 push 一个消息到消息队列。每个 Publisher 连的 topic-based exchange 是不会变的。它的生命周期仅限于这个消息的传递。

Direct Consumer

生命周期只会从一个 rpc.call 被执行开始。它用来从消息队列接收消息。每一个这种 consumer 通过一个唯一指定的 queue 连接到一个唯一指定的 direct-based exchange。它的生命周期也仅限于这个消息的传递,exchange 和 queue 身份的识别是通过一个 UUID 生成器,这些都封装在 Topic Publisher 发出的这个 call 消息里。

Topic Consumer

它的生命周期会跟某个 Worker 是同步的。它用来从队列接收信息,并启动定义在 Worker 里正确对应的方法。一个 Topic Consumer 通过一个共享的 queue 或是一个独立的 queue 连到一个对应 topic-based exchange 上。

每个 Worker 会有两个 topic consumer:

  • 一个是处理 rpc.cast,它是连接到一个共享的 queue,exchange 的 key 是 topic
  • 另一个是处理 rpc.call,它连接到一个唯一的 queue,exchange 的 key 是 topic.host

Direct Publisher

rpc.call 的行为产生一个 Direct Publisher,它实例化后就返回在 “request/response” 操作里需要的消息。它连到一个 direct-based exchange,在传来的消息里保存着字典化后的身份信息。

Topic Exchange

Exchange 是一张路由表,它存放在一个 virtual host 的 context 里,它的多租户机制由 Qpid 或者 RabbitMQ 本身来提供。它的类型(是 topic 还是 direct)决定了路由的策略。在 Nova 里,一个 message broker 节点为每一个(or 所有?) topic 只会提供一个 topic-based exchange。

Direct Exchange

这张路由 表是指 rpc.call 的操作过程中被创建。在一个 message broker 节点的生命周期中会生成很多个这样的 Direct Exchange,一个 rpc.call 就会生成一个。

Queue Element

队列就是一个消息筒,消息一直被保存着,直到一个 Consumer(无论是 Topic Consumer 还是 Direct Consumer) 连接到这个 queue,然后把消息拿走。Queue 可以是被共享的,也可以是独立的。相同功能(personality)的 Workers 共享着 queues,它们的 routing key 都是 topic

Call, Cast Flow

用 OpenStack 官方文档里的两张图

Detail implementation

nova/openstack/common/rpc/impl_kombu.py

通过 nova/openstack/common/rpc/__init__.py 最后的 _get_impl() 方法,从 nova.conf 中读出 rpc_backend 的类路径,比如, nova.openstack.common.rpc.impl_kombu ,然后载入。

而基于 AMQP 的 rpc 实现,比如 impl_kombu 和 impl_qpid,都会用到 nova/openstack/common/rpc/amqp.py ,由它共享一些通用的方法和类,比如连接池、Callback 的一系列封装。

每一种 rpc_backend 都会实现以下类:

    ConsumerBase   # 在一个 amqp channel 上声明一个 queue
    DirectConsumer # 一个 direct queue
    TopicConsumer  # 一个 topic queue
    FanoutConsumer # 一个 fanout queue
    Publisher      # 它提供与 rabbit 的 reconnect 和 send 方法
    DirectPublisher
    TopicPublisher
    FanoutPublisher
    NotifyPublisher
    Connection     # 最重要的类

其中 Connection 类的 reconnect(包含了第一次的 connect)、close、reset、declare_consumer 这几个方法,对整个 rpc 通讯的过程都至关重要。

Connection 中的 ensure 方法是一个比较巧妙的方法。它接收 2 个方法作为参数,一个是要执行的方法,另一个是前一个方法执行出错时调用的方法。这样就可以根据不同的错误执行不同的操作了,比如记录不同的 log。

而上面的几个 xxxConsumer 类,也都在 Connection 中有对应的方法来创建出这类 consumer,比如 declare_topic_consumer(...)

那么,在这个 impl_kombu 的最后,提供了对外最关键的一系列 rpc 方法:

    create_connection
    multicall      # Make a call that returns multiple times.
    call           # Sends a message on a topic and wait for a response.
    cast           # Sends a message on a topic without waiting for a response.
    fanout_cast    # Sends a message on a fanout exchange without waiting for a response.
    cast_to_server # Sends a message on a topic to a specific server.
    fanout_cast_to_server # Sends a message on a fanout exchange to a specific server.
    notify         # Sends a notification event on a topic.
    cleanup

comments powered by Disqus