概述
ML2Plugin的主要工作是管理虚拟网络资源,保证数据正确无误,具体物理设备的设置则由Agent完成。L2Agent通常运行在Hypervisor,与neutron-server通过RPC通信,监听并通知设备的变化,创建新的设备来确保网络segment的正确性,应用security groups规则等。例如,OVS Agent,使用Open vSwitch来实现VLAN, GRE,VxLAN来实现网络的隔离,还包括了网络流量的转发控制。
初始化
各个组件启动流程图
Agent初始化
Agent启动命令
/usr/bin/python /usr/local/bin/neutron-openvswitch-agent --config-file /etc/neutron/neutron.conf --config-file /etc/neutron/plugins/ml2/ml2_conf.ini
脚本内容:
#!/usr/bin/python
# PBR Generated from u'console_scripts'
import sys
from neutron.cmd.eventlet.plugins.ovs_neutron_agent import main
if __name__ == "__main__":
sys.exit(main())
通过查看setup.cfg文件可知,ovs agent的入口位于:
# 根据setup.cfg文件可以看出neutron-openvswitch-agent的代码路径是
# neutron\cmd\eventlet\plugins.ovs_neutron_agent
neutron-openvswitch-agent = neutron.cmd.eventlet.plugins.ovs_neutron_agent:main
Ovs agent在初始化阶段根据ml2_conf.ini的配置文件建立基本完整的虚拟网络环境,建立vlan和tunnel转发所需要的主要流表和默认规则。参考配置信息如下:
[ml2]
tenant_network_types = vxlan
extension_drivers = port_security
mechanism_drivers = openvswitch,linuxbridge
[ml2_type_flat]
flat_networks = public,
[ml2_type_geneve]
vni_ranges = 1:1000
[ml2_type_gre]
tunnel_id_ranges = 1:1000
[ml2_type_vlan]
network_vlan_ranges = public
[ml2_type_vxlan]
vni_ranges = 1:1000
[securitygroup]
firewall_driver = iptables_hybrid
[agent]
tunnel_types = vxlan
root_helper_daemon = sudo /usr/local/bin/neutron-rootwrap-daemon /etc/neutron/rootwrap.conf
root_helper = sudo /usr/local/bin/neutron-rootwrap /etc/neutron/rootwrap.conf
[ovs]
datapath_type = system
bridge_mappings = public:br-ex
tunnel_bridge = br-tun
local_ip = 192.168.209.134
启动过程代码分析
neutron.cmd.eventlet.plugins.ovs_neutron_agent:main cmd.ovs_neutron-agent.py
import neutron.plugins.ml2.drivers.openvswitch.agent.main as agent_main
def main():
agent_main.main()
neutron/plugins/ml2/drivers/openvswitch/agent/main.py:main
LOG = logging.getLogger(__name__)
cfg.CONF.import_group('OVS', 'neutron.plugins.ml2.drivers.openvswitch.agent.'
'common.config')
_main_modules = {
'ovs-ofctl': 'neutron.plugins.ml2.drivers.openvswitch.agent.openflow.'
'ovs_ofctl.main',
'native': 'neutron.plugins.ml2.drivers.openvswitch.agent.openflow.'
'native.main',
}
def main():
common_config.init(sys.argv[1:])
driver_name = cfg.CONF.OVS.of_interface
mod_name = _main_modules[driver_name]
mod = importutils.import_module(mod_name)
mod.init_config()
common_config.setup_logging()
n_utils.log_opt_values(LOG)
profiler.setup("neutron-ovs-agent", cfg.CONF.host)
mod.main()
neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl.main
def main():
bridge_classes = {
'br_int': br_int.OVSIntegrationBridge,
'br_phys': br_phys.OVSPhysicalBridge,
'br_tun': br_tun.OVSTunnelBridge,
}
ovs_neutron_agent.main(bridge_classes)
neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py:main
def main(bridge_classes):
prepare_xen_compute()
ovs_capabilities.register()
validate_tunnel_config(cfg.CONF.AGENT.tunnel_types, cfg.CONF.OVS.local_ip)
try:
agent = OVSNeutronAgent(bridge_classes, cfg.CONF)
capabilities.notify_init_event(n_const.AGENT_TYPE_OVS, agent)
except (RuntimeError, ValueError) as e:
LOG.error("%s Agent terminated!", e)
sys.exit(1)
agent.daemon_loop()
neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py:init
@profiler.trace_cls("rpc")
class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
dvr_rpc.DVRAgentRpcCallbackMixin):
'''Implements OVS-based tunneling, VLANs and flat networks.
Two local bridges are created: an integration bridge (defaults to
'br-int') and a tunneling bridge (defaults to 'br-tun'). An
additional bridge is created for each physical network interface
used for VLANs and/or flat networks.
All VM VIFs are plugged into the integration bridge. VM VIFs on a
given virtual network share a common "local" VLAN (i.e. not
propagated externally). The VLAN id of this local VLAN is mapped
to the physical networking details realizing that virtual network.
For virtual networks realized as GRE tunnels, a Logical Switch
(LS) identifier is used to differentiate tenant traffic on
inter-HV tunnels. A mesh of tunnels is created to other
Hypervisors in the cloud. These tunnels originate and terminate on
the tunneling bridge of each hypervisor. Port patching is done to
connect local VLANs on the integration bridge to inter-hypervisor
tunnels on the tunnel bridge.
For each virtual network realized as a VLAN or flat network, a
veth or a pair of patch ports is used to connect the local VLAN on
the integration bridge with the physical network bridge, with flow
rules adding, modifying, or stripping VLAN tags as necessary.
'''
target = oslo_messaging.Target(version='1.4')
def __init__(self, bridge_classes, conf=None):
'''Constructor.
:param bridge_classes: a dict for bridge classes.
:param conf: an instance of ConfigOpts
'''
super(OVSNeutronAgent, self).__init__()
self.conf = conf or cfg.CONF
self.ovs = ovs_lib.BaseOVS()
agent_conf = self.conf.AGENT
ovs_conf = self.conf.OVS
self.fullsync = False
self.br_int_cls, self.br_phys_cls, self.br_tun_cls = (
functools.partial(bridge_classes[b],
datapath_type=ovs_conf.datapath_type)
for b in ('br_int', 'br_phys', 'br_tun'))
self.use_veth_interconnection = ovs_conf.use_veth_interconnection
self.veth_mtu = agent_conf.veth_mtu
self.available_local_vlans = set(moves.range(p_const.MIN_VLAN_TAG,
p_const.MAX_VLAN_TAG + 1))
self.tunnel_types = agent_conf.tunnel_types or []
self.l2_pop = agent_conf.l2_population
self.enable_distributed_routing = agent_conf.enable_distributed_routing
self.arp_responder_enabled = agent_conf.arp_responder and self.l2_pop
host = self.conf.host
self.agent_id = 'ovs-agent-%s' % host
self.enable_tunneling = bool(self.tunnel_types)
self._check_agent_configurations()
self.int_br_device_count = 0
self.int_br = self.br_int_cls(ovs_conf.integration_bridge)
self.setup_integration_br()
self.updated_ports = set()
self.deleted_ports = set()
self.network_ports = collections.defaultdict(set)
self.vifname_to_ofport_map = {}
self.setup_rpc()
self.bridge_mappings = self._parse_bridge_mappings(
ovs_conf.bridge_mappings)
self.setup_physical_bridges(self.bridge_mappings)
self.vlan_manager = vlanmanager.LocalVlanManager()
self._reset_tunnel_ofports()
self.polling_interval = agent_conf.polling_interval
self.minimize_polling = agent_conf.minimize_polling
self.ovsdb_monitor_respawn_interval = (
agent_conf.ovsdb_monitor_respawn_interval or
constants.DEFAULT_OVSDBMON_RESPAWN)
self.local_ip = ovs_conf.local_ip
self.tunnel_count = 0
self.vxlan_udp_port = agent_conf.vxlan_udp_port
self.dont_fragment = agent_conf.dont_fragment
self.tunnel_csum = agent_conf.tunnel_csum
self.tun_br = None
self.patch_int_ofport = constants.OFPORT_INVALID
self.patch_tun_ofport = constants.OFPORT_INVALID
if self.enable_tunneling:
self.setup_tunnel_br(ovs_conf.tunnel_bridge)
self.setup_tunnel_br_flows()
self.init_extension_manager(self.connection)
self.dvr_agent = ovs_dvr_neutron_agent.OVSDVRNeutronAgent(
self.context,
self.dvr_plugin_rpc,
self.int_br,
self.tun_br,
self.bridge_mappings,
self.phys_brs,
self.int_ofports,
self.phys_ofports,
self.patch_int_ofport,
self.patch_tun_ofport,
host,
self.enable_tunneling,
self.enable_distributed_routing)
if self.enable_distributed_routing:
self.dvr_agent.setup_dvr_flows()
self.ancillary_brs = self.setup_ancillary_bridges(
ovs_conf.integration_bridge, ovs_conf.tunnel_bridge)
self._restore_local_vlan_map()
self.sg_agent = agent_sg_rpc.SecurityGroupAgentRpc(
self.context, self.sg_plugin_rpc, defer_refresh_firewall=True,
integration_bridge=self.int_br)
self.sg_plugin_rpc.register_legacy_sg_notification_callbacks(
self.sg_agent)
hybrid_plug = getattr(self.sg_agent.firewall,
'OVS_HYBRID_PLUG_REQUIRED', False)
self.prevent_arp_spoofing = (
not self.sg_agent.firewall.provides_arp_spoofing_protection)
self.agent_state = {
'binary': 'neutron-openvswitch-agent',
'host': host,
'topic': n_const.L2_AGENT_TOPIC,
'configurations': {'bridge_mappings': self.bridge_mappings,
'tunnel_types': self.tunnel_types,
'tunneling_ip': self.local_ip,
'l2_population': self.l2_pop,
'arp_responder_enabled':
self.arp_responder_enabled,
'enable_distributed_routing':
self.enable_distributed_routing,
'log_agent_heartbeats':
agent_conf.log_agent_heartbeats,
'extensions': self.ext_manager.names(),
'datapath_type': ovs_conf.datapath_type,
'ovs_capabilities': self.ovs.capabilities,
'vhostuser_socket_dir':
ovs_conf.vhostuser_socket_dir,
portbindings.OVS_HYBRID_PLUG: hybrid_plug},
'resource_versions': resources.LOCAL_RESOURCE_VERSIONS,
'agent_type': agent_conf.agent_type,
'start_flag': True}
report_interval = agent_conf.report_interval
if report_interval:
heartbeat = loopingcall.FixedIntervalLoopingCall(
self._report_state)
heartbeat.start(interval=report_interval)
self.iter_num = 0
self.run_daemon_loop = True
self.catch_sigterm = False
self.catch_sighup = False
self.connection.consume_in_threads()
self.dead_topics.consume_in_threads()
self.quitting_rpc_timeout = agent_conf.quitting_rpc_timeout
Agent与Plugin的RPC通信
Agent 与Plugin直接通过RPC通道通信。Plugin需要通知Agent删除network以及更新port,Agent则向Plugin汇报端口up/down以及进行tunnel sync。
一个交互通道有一个发送发与一个接收方,ML2Plugin初始化的时候会建立两个RPC通道。其中一个用于通知Agent端口变化或者network被删除,另一个用于接收Agent请求消息,为Agent提供各种所需信息,以及接收端口的up/down事件。Agent同样也需要两个RPC端点分别与Plugin对应。具体流程参考Neutron-API服务初始化
self.setup_rpc()
def setup_rpc(self):
self.plugin_rpc = OVSPluginApi(topics.PLUGIN)
self.plugin_rpc.register_legacy_notification_callbacks(self)
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerAPIShim(
self.plugin_rpc.remote_resource_cache)
self.dvr_plugin_rpc = dvr_rpc.DVRServerRpcApi(topics.PLUGIN)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS)
self.context = context.get_admin_context_without_session()
consumers = [[constants.TUNNEL, topics.UPDATE],
[constants.TUNNEL, topics.DELETE],
[topics.DVR, topics.UPDATE]]
if self.l2_pop:
consumers.append([topics.L2POPULATION, topics.UPDATE])
self.connection = agent_rpc.create_consumers([self],
topics.AGENT,
consumers,
start_listening=False)
self.setup_old_topic_sinkhole()
br-int创建与初始化
br-int用于local vlan的二层交换,功能比较简单。主要是创建bridge并初始化到标准二层交换机模式,清空流表。代码分析如下:
# 创建br-int,重置流表规则等,通过调用brctl, ovs-vsctl, ip等命令实现
self.int_br = self.br_int_cls(ovs_conf.integration_bridge)
self.setup_integration_br()
# init bridge classes with configured datapath type.
self.br_int_cls, self.br_phys_cls, self.br_tun_cls = (
functools.partial(bridge_classes[b],
datapath_type=ovs_conf.datapath_type)
for b in ('br_int', 'br_phys', 'br_tun'))
def setup_integration_br(self):
'''Setup the integration bridge.
'''
self.int_br.create()
self.int_br.set_secure_mode()
self.int_br.setup_controllers(self.conf)
if self.conf.AGENT.drop_flows_on_start:
self.int_br.delete_port(self.conf.OVS.int_peer_patch_port)
self.int_br.uninstall_flows(cookie=ovs_lib.COOKIE_ANY)
self.int_br.setup_default_table()
br-eth初始化
对于物理网络设备的映射,每个物理bridge都需要管理员创建,并将物理接口attach到这个bridge上,参考配置
bridge_mappings = public:br-ex
ovs-vsctl add-port br-ex eth0
这个bridge完成物理网络vlan到local vlan的转换,转换过程不需要像tunnel那样复杂的流表,只需要作为一个二层交换机,因此br-eth的设置主要是初始化默认流表,建立与br-int 相连的端口,参考代码如下:
self.bridge_mappings = self._parse_bridge_mappings(
ovs_conf.bridge_mappings)
self.setup_physical_bridges(self.bridge_mappings)
def setup_physical_bridges(self, bridge_mappings):
'''Setup the physical network bridges.
Creates physical network bridges and links them to the
integration bridge using veths or patch ports.
:param bridge_mappings: map physical network names to bridge names.
'''
self.phys_brs = {}
self.int_ofports = {}
self.phys_ofports = {}
ip_wrapper = ip_lib.IPWrapper()
ovs = ovs_lib.BaseOVS()
ovs_bridges = ovs.get_bridges()
for physical_network, bridge in bridge_mappings.items():
LOG.info("Mapping physical network %(physical_network)s to "
"bridge %(bridge)s",
{'physical_network': physical_network,
'bridge': bridge})
if bridge not in ovs_bridges:
LOG.error("Bridge %(bridge)s for physical network "
"%(physical_network)s does not exist. Agent "
"terminated!",
{'physical_network': physical_network,
'bridge': bridge})
sys.exit(1)
br = self.br_phys_cls(bridge)
br.create()
br.set_secure_mode()
br.setup_controllers(self.conf)
if cfg.CONF.AGENT.drop_flows_on_start:
br.uninstall_flows(cookie=ovs_lib.COOKIE_ANY)
br.setup_default_table()
self.phys_brs[physical_network] = br
int_if_name = p_utils.get_interface_name(
bridge, prefix=constants.PEER_INTEGRATION_PREFIX)
phys_if_name = p_utils.get_interface_name(
bridge, prefix=constants.PEER_PHYSICAL_PREFIX)
int_type = self.int_br.db_get_val("Interface",
int_if_name, "type", log_errors=False)
if self.use_veth_interconnection:
if int_type == 'patch':
self.int_br.delete_port(int_if_name)
br.delete_port(phys_if_name)
device = ip_lib.IPDevice(int_if_name)
if device.exists():
device.link.delete()
utils.execute(['udevadm', 'settle', '--timeout=10'])
int_veth, phys_veth = ip_wrapper.add_veth(int_if_name,
phys_if_name)
int_ofport = self.int_br.add_port(int_if_name)
phys_ofport = br.add_port(phys_if_name)
else:
if int_type == 'veth':
self.int_br.delete_port(int_if_name)
br.delete_port(phys_if_name)
if self.int_br.port_exists(int_if_name):
int_ofport = self.int_br.get_port_ofport(int_if_name)
else:
int_ofport = self.int_br.add_patch_port(
int_if_name, constants.NONEXISTENT_PEER)
if br.port_exists(phys_if_name):
phys_ofport = br.get_port_ofport(phys_if_name)
else:
phys_ofport = br.add_patch_port(
phys_if_name, constants.NONEXISTENT_PEER)
self.int_ofports[physical_network] = int_ofport
self.phys_ofports[physical_network] = phys_ofport
self.int_br.drop_port(in_port=int_ofport)
br.drop_port(in_port=phys_ofport)
if self.use_veth_interconnection:
int_veth.link.set_up()
phys_veth.link.set_up()
if self.veth_mtu:
int_veth.link.set_mtu(self.veth_mtu)
phys_veth.link.set_mtu(self.veth_mtu)
else:
self.int_br.set_db_attribute('Interface', int_if_name,
'options', {'peer': phys_if_name})
br.set_db_attribute('Interface', phys_if_name,
'options', {'peer': int_if_name})
br-tun初始化
创建tunnel port
执行rpc_loop
通过neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py:main中的的如下代码,调用rpc_loop
agent.daemon_loop()
def daemon_loop(self):
LOG.info("Agent initialized successfully, now running... ")
signal.signal(signal.SIGTERM, self._handle_sigterm)
if hasattr(signal, 'SIGHUP'):
signal.signal(signal.SIGHUP, self._handle_sighup)
with polling.get_polling_manager(
self.minimize_polling,
self.ovsdb_monitor_respawn_interval) as pm:
self.rpc_loop(polling_manager=pm)
rpc_loop代码
def rpc_loop(self, polling_manager=None):
if not polling_manager:
polling_manager = polling.get_polling_manager(
minimize_polling=False)
sync = False
ports = set()
updated_ports_copy = set()
ancillary_ports = set()
tunnel_sync = True
ovs_restarted = False
consecutive_resyncs = 0
need_clean_stale_flow = True
ports_not_ready_yet = set()
failed_devices = {'added': set(), 'removed': set()}
failed_ancillary_devices = {'added': set(), 'removed': set()}
failed_devices_retries_map = {}
while self._check_and_handle_signal():
if self.fullsync:
LOG.info("rpc_loop doing a full sync.")
sync = True
self.fullsync = False
port_info = {}
ancillary_port_info = {}
start = time.time()
LOG.debug("Agent rpc_loop - iteration:%d started",
self.iter_num)
ovs_status = self.check_ovs_status()
if ovs_status == constants.OVS_RESTARTED:
self.setup_integration_br()
self.setup_physical_bridges(self.bridge_mappings)
if self.enable_tunneling:
self._reset_tunnel_ofports()
self.setup_tunnel_br()
self.setup_tunnel_br_flows()
tunnel_sync = True
if self.enable_distributed_routing:
self.dvr_agent.reset_ovs_parameters(self.int_br,
self.tun_br,
self.patch_int_ofport,
self.patch_tun_ofport)
self.dvr_agent.reset_dvr_parameters()
self.dvr_agent.setup_dvr_flows()
registry.notify(
callback_resources.AGENT,
callback_events.OVS_RESTARTED,
self)
if isinstance(
polling_manager, polling.InterfacePollingMinimizer):
polling_manager.stop()
polling_manager.start()
elif ovs_status == constants.OVS_DEAD:
port_stats = self.get_port_stats({}, {})
self.loop_count_and_wait(start, port_stats)
continue
if self.enable_tunneling and tunnel_sync:
try:
tunnel_sync = self.tunnel_sync()
except Exception:
LOG.exception("Error while configuring tunnel endpoints")
tunnel_sync = True
ovs_restarted |= (ovs_status == constants.OVS_RESTARTED)
devices_need_retry = (any(failed_devices.values()) or
any(failed_ancillary_devices.values()) or
ports_not_ready_yet)
if (self._agent_has_updates(polling_manager) or sync
or devices_need_retry):
try:
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
"starting polling. Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
updated_ports_copy = self.updated_ports
self.updated_ports = set()
(port_info, ancillary_port_info, consecutive_resyncs,
ports_not_ready_yet) = (self.process_port_info(
start, polling_manager, sync, ovs_restarted,
ports, ancillary_ports, updated_ports_copy,
consecutive_resyncs, ports_not_ready_yet,
failed_devices, failed_ancillary_devices))
sync = False
self.process_deleted_ports(port_info)
ofport_changed_ports = self.update_stale_ofport_rules()
if ofport_changed_ports:
port_info.setdefault('updated', set()).update(
ofport_changed_ports)
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
"port information retrieved. "
"Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
if (self._port_info_has_changes(port_info) or
self.sg_agent.firewall_refresh_needed() or
ovs_restarted):
LOG.debug("Starting to process devices in:%s",
port_info)
failed_devices = self.process_network_ports(
port_info, ovs_restarted)
if need_clean_stale_flow:
self.cleanup_stale_flows()
need_clean_stale_flow = False
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
"ports processed. Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
ports = port_info['current']
if self.ancillary_brs:
failed_ancillary_devices = (
self.process_ancillary_network_ports(
ancillary_port_info))
LOG.debug("Agent rpc_loop - iteration: "
"%(iter_num)d - ancillary ports "
"processed. Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
ancillary_ports = ancillary_port_info['current']
polling_manager.polling_completed()
failed_devices_retries_map = (
self.update_retries_map_and_remove_devs_not_to_retry(
failed_devices, failed_ancillary_devices,
failed_devices_retries_map))
ovs_restarted = False
self._dispose_local_vlan_hints()
except Exception:
LOG.exception("Error while processing VIF ports")
self.updated_ports |= updated_ports_copy
sync = True
port_stats = self.get_port_stats(port_info, ancillary_port_info)
self.loop_count_and_wait(start, port_stats)
process_port_info:scan_ports
def process_port_info(self, start, polling_manager, sync, ovs_restarted,
ports, ancillary_ports, updated_ports_copy,
consecutive_resyncs, ports_not_ready_yet,
failed_devices, failed_ancillary_devices):
if sync or not (hasattr(polling_manager, 'get_events')):
if sync:
LOG.info("Agent out of sync with plugin!")
consecutive_resyncs = consecutive_resyncs + 1
if (consecutive_resyncs >=
constants.MAX_DEVICE_RETRIES):
LOG.warning(
"Clearing cache of registered ports,"
" retries to resync were > %s",
constants.MAX_DEVICE_RETRIES)
ports.clear()
ancillary_ports.clear()
consecutive_resyncs = 0
else:
consecutive_resyncs = 0
sync = (any(failed_devices.values()) or
any(failed_ancillary_devices.values()))
reg_ports = (set() if ovs_restarted else ports)
port_info = self.scan_ports(reg_ports, sync,
updated_ports_copy)
if self.ancillary_brs:
ancillary_port_info = self.scan_ancillary_ports(
ancillary_ports, sync)
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d"
" - ancillary port info retrieved. "
"Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
else:
ancillary_port_info = {}
else:
consecutive_resyncs = 0
events = polling_manager.get_events()
port_info, ancillary_port_info, ports_not_ready_yet = (
self.process_ports_events(events, ports, ancillary_ports,
ports_not_ready_yet,
failed_devices,
failed_ancillary_devices,
updated_ports_copy))
registry.notify(
constants.OVSDB_RESOURCE,
callback_events.AFTER_READ,
self,
ovsdb_events=events)
return (port_info, ancillary_port_info, consecutive_resyncs,
ports_not_ready_yet)
def scan_ports(self, registered_ports, sync, updated_ports=None):
cur_ports = self.int_br.get_vif_port_set()
self.int_br_device_count = len(cur_ports)
port_info = self._get_port_info(registered_ports, cur_ports, sync)
if updated_ports is None:
updated_ports = set()
updated_ports.update(self.check_changed_vlans())
if updated_ports:
updated_ports &= cur_ports
if updated_ports:
port_info['updated'] = updated_ports
return port_info
def _get_port_info(self, registered_ports, cur_ports,
readd_registered_ports):
port_info = {'current': cur_ports}
if not readd_registered_ports and cur_ports == registered_ports:
return port_info
if readd_registered_ports:
port_info['added'] = cur_ports
else:
port_info['added'] = cur_ports - registered_ports
port_info['removed'] = registered_ports - cur_ports
return port_info
process_network_ports
def process_network_ports(self, port_info, ovs_restarted):
failed_devices = {'added': set(), 'removed': set()}
devices_added_updated = (port_info.get('added', set()) |
port_info.get('updated', set()))
need_binding_devices = []
skipped_devices = set()
if devices_added_updated:
start = time.time()
(skipped_devices, need_binding_devices,
failed_devices['added']) = (
self.treat_devices_added_or_updated(
devices_added_updated, ovs_restarted))
LOG.debug("process_network_ports - iteration:%(iter_num)d - "
"treat_devices_added_or_updated completed. "
"Skipped %(num_skipped)d devices of "
"%(num_current)d devices currently available. "
"Time elapsed: %(elapsed).3f",
{'iter_num': self.iter_num,
'num_skipped': len(skipped_devices),
'num_current': len(port_info['current']),
'elapsed': time.time() - start})
skipped_devices = set(skipped_devices)
port_info['current'] = (port_info['current'] - skipped_devices)
added_ports = port_info.get('added', set()) - skipped_devices
self._add_port_tag_info(need_binding_devices)
self.sg_agent.setup_port_filters(added_ports,
port_info.get('updated', set()))
failed_devices['added'] |= self._bind_devices(need_binding_devices)
if 'removed' in port_info and port_info['removed']:
start = time.time()
failed_devices['removed'] |= self.treat_devices_removed(
port_info['removed'])
LOG.debug("process_network_ports - iteration:%(iter_num)d - "
"treat_devices_removed completed in %(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
return failed_devices
初始化流程图
调用过程
参考资料
Neutron-server初始化 — Neutron L2 Agent服务初始化
OpenStack Neutron源码分析:ovs-neutron-agent启动源码解析
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)