每个 L3 Agent 运行在一个 network namespace 中,以 qrouter-命名。网络节点如果不支持 Linux namespace 的,只能运行一个 Virtual Router。通过配置项use_namespaces = True开启namespace。本文只做单纯的分析代码,研究了neutron的l3-agent代码(m版本),代码路径为/neutron/agent/l3/agent.py。主要的类是:
- class L3PluginApi(object): l3-agent的rpc接口,用于回复和查询l3-plugin的。
- class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,ha.AgentMixin,dvr.AgentMixin,manager.Manager): l3-agent实现类。
主要来说说L3NATAgent。该类在运行的时候,会启动两个周期任务:
- periodic_sync_routers_task:周期性获取router列表,并将需要操作的router加入到队列中;
- _process_routers_loop:从上个任务中获取队列中的router,并进行处理(增删和更新)。
接下来依次分析这两个任务。
periodic_sync_routers_task
注意该类继承了FWaaSL3AgentRpcCallback
和Manager
两个类。Manager类里有个方法periodic_tasks
,运行的是父类的方法run_periodic_tasks
:
class Manager(periodic_task.PeriodicTasks): target = oslo_messaging.Target(version='1.0') def periodic_tasks(self, context, raise_on_error=False): self.run_periodic_tasks(context, raise_on_error=raise_on_error)
|
在neutron/service.py里的类Service,启动start的时候,会加载Manager的periodic_tasks方法,并设置周期时间:
class Service(n_rpc.Service): def start(self): self.manager.init_host() super(Service, self).start() if self.report_interval: pulse = loopingcall.FixedIntervalLoopingCall(self.report_state) pulse.start(interval=self.report_interval,initial_delay=self.report_interval) self.timers.append(pulse) if self.periodic_interval: if self.periodic_fuzzy_delay: initial_delay = random.randint(0, self.periodic_fuzzy_delay) else: initial_delay = None periodic = loopingcall.FixedIntervalLoopingCall(self.periodic_tasks) periodic.start(interval=self.periodic_interval,initial_delay=initial_delay) self.timers.append(periodic) self.manager.after_start() def periodic_tasks(self, raise_on_error=False): """Tasks to be run at a periodic interval.""" ctxt = context.get_admin_context() self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
|
然后说回L3NATAgent类,它定义了一个方法periodic_sync_routers_task
,被@periodic_task.periodic_task(spacing=1, run_immediately=True)进行了装饰,可以理解为间隔时间为1秒运行一次任务。periodic_task是oslo_service/periodic_task.py里的装饰器方法,用于将被装饰的方法声明为一个周期性任务;而该文件里还有一个方法也就是上文提到的run_periodic_tasks
,用于运行所有的周期任务:
@periodic_task.periodic_task(spacing=1, run_immediately=True) def periodic_sync_routers_task(self, context): ... try: with self.namespaces_manager as ns_manager: self.fetch_and_sync_all_routers(context, ns_manager)
|
periodic_sync_routers_task执行了一个任务fetch_and_sync_all_routers
,即是通过l3-plugin RPC获取到router列表,然后将需要进行操作的router加入到 _queue 中。理解了这个,就能明白接下来的一个线程循环任务 _process_routers_loop。
_process_routers_loop
在l3-agent开始启动过程after_start中,还会启动一个线程来循环执行一个任务_process_routers_loop
:
def after_start(self): eventlet.spawn_n(self._process_routers_loop) LOG.info(_LI("L3 agent started")) def _process_routers_loop(self): LOG.debug("Starting _process_routers_loop") pool = eventlet.GreenPool(size=8) while True: pool.spawn_n(self._process_router_update)
|
再来看看方法_process_router_update
,方法一开始就会通过_queue.each_update_to_next_router从上一个任务的队列里获取需要进行操作的router信息。然后会针对不同的action,进行一些处理,真正进行router操作的是_process_router_if_compatible
:
def _process_router_update(self): for rp, update in self._queue.each_update_to_next_router(): LOG.debug("Starting router update for %s, action %s, priority %s", update.id, update.action, update.priority) if update.action == queue.PD_UPDATE: self.pd.process_prefix_update() LOG.debug("Finished a router update for %s", update.id) continue router = update.router ...... try: self._process_router_if_compatible(router) ...... LOG.debug("Finished a router update for %s", update.id) rp.fetched_and_processed(update.timestamp)
|
_process_router_if_compatible方法的主要实现,在最后:
def _process_router_if_compatible(self, router): ...... if router['id'] not in self.router_info: self._process_added_router(router) else: self._process_updated_router(router)
|
以_process_updated_router为例:
def _process_updated_router(self, router): ri = self.router_info[router['id']] ri.router = router registry.notify(resources.ROUTER, events.BEFORE_UPDATE,self, router=ri) ri.process(self) registry.notify(resources.ROUTER, events.AFTER_UPDATE, self, router=ri)
|
针对router的所有操作都在neutron/agent/l3/router_info.py中的方法process
里。具体细节在这里就不做分析了,包括对external-port,internal-port和floating ip的一系列处理。
个人分析,欢迎指正,若转载请注明出处!