neutron代码解析之l3-agent的启动

每个 L3 Agent 运行在一个 network namespace 中,以 qrouter-命名。网络节点如果不支持 Linux namespace 的,只能运行一个 Virtual Router。通过配置项use_namespaces = True开启namespace。本文只做单纯的分析代码,研究了neutron的l3-agent代码(m版本),代码路径为/neutron/agent/l3/agent.py。主要的类是:

  • class L3PluginApi(object): l3-agent的rpc接口,用于回复和查询l3-plugin的。
  • class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,ha.AgentMixin,dvr.AgentMixin,manager.Manager): l3-agent实现类。

主要来说说L3NATAgent。该类在运行的时候,会启动两个周期任务:

  • periodic_sync_routers_task:周期性获取router列表,并将需要操作的router加入到队列中;
  • _process_routers_loop:从上个任务中获取队列中的router,并进行处理(增删和更新)。

接下来依次分析这两个任务。

periodic_sync_routers_task

注意该类继承了FWaaSL3AgentRpcCallbackManager两个类。Manager类里有个方法periodic_tasks,运行的是父类的方法run_periodic_tasks

class Manager(periodic_task.PeriodicTasks):
target = oslo_messaging.Target(version='1.0')
def periodic_tasks(self, context, raise_on_error=False):
self.run_periodic_tasks(context, raise_on_error=raise_on_error)

在neutron/service.py里的类Service,启动start的时候,会加载Manager的periodic_tasks方法,并设置周期时间:

class Service(n_rpc.Service):
def start(self):
self.manager.init_host()
super(Service, self).start()
if self.report_interval:
pulse = loopingcall.FixedIntervalLoopingCall(self.report_state)
pulse.start(interval=self.report_interval,initial_delay=self.report_interval)
self.timers.append(pulse)
if self.periodic_interval:
if self.periodic_fuzzy_delay:
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
else:
initial_delay = None
periodic = loopingcall.FixedIntervalLoopingCall(self.periodic_tasks)
periodic.start(interval=self.periodic_interval,initial_delay=initial_delay)
self.timers.append(periodic)
self.manager.after_start()
def periodic_tasks(self, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
ctxt = context.get_admin_context()
self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)

然后说回L3NATAgent类,它定义了一个方法periodic_sync_routers_task,被@periodic_task.periodic_task(spacing=1, run_immediately=True)进行了装饰,可以理解为间隔时间为1秒运行一次任务。periodic_task是oslo_service/periodic_task.py里的装饰器方法,用于将被装饰的方法声明为一个周期性任务;而该文件里还有一个方法也就是上文提到的run_periodic_tasks,用于运行所有的周期任务:

@periodic_task.periodic_task(spacing=1, run_immediately=True)
def periodic_sync_routers_task(self, context):
...
try:
with self.namespaces_manager as ns_manager:
self.fetch_and_sync_all_routers(context, ns_manager)

periodic_sync_routers_task执行了一个任务fetch_and_sync_all_routers,即是通过l3-plugin RPC获取到router列表,然后将需要进行操作的router加入到 _queue 中。理解了这个,就能明白接下来的一个线程循环任务 _process_routers_loop。

_process_routers_loop

在l3-agent开始启动过程after_start中,还会启动一个线程来循环执行一个任务_process_routers_loop

def after_start(self):
eventlet.spawn_n(self._process_routers_loop)
LOG.info(_LI("L3 agent started"))
def _process_routers_loop(self):
LOG.debug("Starting _process_routers_loop")
pool = eventlet.GreenPool(size=8)
while True:
pool.spawn_n(self._process_router_update)

再来看看方法_process_router_update,方法一开始就会通过_queue.each_update_to_next_router从上一个任务的队列里获取需要进行操作的router信息。然后会针对不同的action,进行一些处理,真正进行router操作的是_process_router_if_compatible:

def _process_router_update(self):
for rp, update in self._queue.each_update_to_next_router():
LOG.debug("Starting router update for %s, action %s, priority %s",
update.id, update.action, update.priority)
if update.action == queue.PD_UPDATE:
self.pd.process_prefix_update()
LOG.debug("Finished a router update for %s", update.id)
continue
router = update.router
......
try:
self._process_router_if_compatible(router)
......
LOG.debug("Finished a router update for %s", update.id)
rp.fetched_and_processed(update.timestamp)

_process_router_if_compatible方法的主要实现,在最后:

def _process_router_if_compatible(self, router):
......
if router['id'] not in self.router_info:
self._process_added_router(router)
else:
self._process_updated_router(router)

以_process_updated_router为例:

def _process_updated_router(self, router):
ri = self.router_info[router['id']]
ri.router = router
registry.notify(resources.ROUTER, events.BEFORE_UPDATE,self, router=ri)
ri.process(self)
registry.notify(resources.ROUTER, events.AFTER_UPDATE, self, router=ri)

针对router的所有操作都在neutron/agent/l3/router_info.py中的方法process里。具体细节在这里就不做分析了,包括对external-port,internal-port和floating ip的一系列处理。

个人分析,欢迎指正,若转载请注明出处!