openstack-neutron-OVS agent(持续更新)

2023-05-16

概述

      ML2Plugin的主要工作是管理虚拟网络资源,保证数据正确无误,具体物理设备的设置则由Agent完成。L2Agent通常运行在Hypervisor,与neutron-server通过RPC通信,监听并通知设备的变化,创建新的设备来确保网络segment的正确性,应用security groups规则等。例如,OVS Agent,使用Open vSwitch来实现VLAN, GRE,VxLAN来实现网络的隔离,还包括了网络流量的转发控制。

初始化

各个组件启动流程图


这里写图片描述

Agent初始化

Agent启动命令

/usr/bin/python /usr/local/bin/neutron-openvswitch-agent --config-file /etc/neutron/neutron.conf --config-file /etc/neutron/plugins/ml2/ml2_conf.ini

脚本内容:

#!/usr/bin/python
# PBR Generated from u'console_scripts'

import sys

from neutron.cmd.eventlet.plugins.ovs_neutron_agent import main


if __name__ == "__main__":
    sys.exit(main())

通过查看setup.cfg文件可知,ovs agent的入口位于:

# 根据setup.cfg文件可以看出neutron-openvswitch-agent的代码路径是
# neutron\cmd\eventlet\plugins.ovs_neutron_agent
neutron-openvswitch-agent = neutron.cmd.eventlet.plugins.ovs_neutron_agent:main

Ovs agent在初始化阶段根据ml2_conf.ini的配置文件建立基本完整的虚拟网络环境,建立vlan和tunnel转发所需要的主要流表和默认规则。参考配置信息如下:

[ml2]
tenant_network_types = vxlan
extension_drivers = port_security
mechanism_drivers = openvswitch,linuxbridge
[ml2_type_flat]
flat_networks = public,
[ml2_type_geneve]
vni_ranges = 1:1000
[ml2_type_gre]
tunnel_id_ranges = 1:1000
[ml2_type_vlan]
network_vlan_ranges = public
[ml2_type_vxlan]
vni_ranges = 1:1000
[securitygroup]
firewall_driver = iptables_hybrid
[agent]
tunnel_types = vxlan
root_helper_daemon = sudo /usr/local/bin/neutron-rootwrap-daemon /etc/neutron/rootwrap.conf
root_helper = sudo /usr/local/bin/neutron-rootwrap /etc/neutron/rootwrap.conf

[ovs]
datapath_type = system
bridge_mappings = public:br-ex
tunnel_bridge = br-tun
local_ip = 192.168.209.134

启动过程代码分析

neutron.cmd.eventlet.plugins.ovs_neutron_agent:main cmd.ovs_neutron-agent.py

import neutron.plugins.ml2.drivers.openvswitch.agent.main as agent_main
# 虚拟交换机(vswitch)主要有两个作用:
# 1. 传递虚拟机VM之间的流量。
# 2. 实现VM和外界网络的通信
def main():
    agent_main.main()

neutron/plugins/ml2/drivers/openvswitch/agent/main.py:main

LOG = logging.getLogger(__name__)
cfg.CONF.import_group('OVS', 'neutron.plugins.ml2.drivers.openvswitch.agent.'
                      'common.config')


_main_modules = {
    'ovs-ofctl': 'neutron.plugins.ml2.drivers.openvswitch.agent.openflow.'
                 'ovs_ofctl.main',
    'native': 'neutron.plugins.ml2.drivers.openvswitch.agent.openflow.'
                 'native.main',
}

# neutron/plugins/ml2/drivers/openvswitch/agent/main.py:main
def main():
    common_config.init(sys.argv[1:])
    # driver_name = ovs-ofctl
    driver_name = cfg.CONF.OVS.of_interface
    mod_name = _main_modules[driver_name]
    mod = importutils.import_module(mod_name)
    mod.init_config()
    common_config.setup_logging()
    n_utils.log_opt_values(LOG)
    profiler.setup("neutron-ovs-agent", cfg.CONF.host)
    # 调用ovs-ofctl对应的main方法
    mod.main()

neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl.main

# neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl.main
def main():
    bridge_classes = {
        'br_int': br_int.OVSIntegrationBridge,
        'br_phys': br_phys.OVSPhysicalBridge,
        'br_tun': br_tun.OVSTunnelBridge,
    }
    # 启动解析过程
    ovs_neutron_agent.main(bridge_classes)

neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py:main

# neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py:main
def main(bridge_classes):
    # cfg.CONF中包含了agent的配置信息,主要是network-mappings,各个bridges的名称
    prepare_xen_compute()
    ovs_capabilities.register()
    validate_tunnel_config(cfg.CONF.AGENT.tunnel_types, cfg.CONF.OVS.local_ip)
    try:

        # 创建agent实例,在实例初始化过程中完成以下操作
        # 启动时做了以下工作:
        # 1.设置br - int。
        # 2.设置plugin_rpc,这是用来与neutron - server通信的。
        # 3.设置state_rpc,用于agent状态信息上报。
        # 4.设置connection,用于接收neutron - server的消息。
        # 5.启动心跳周期上报。
        # 6.设置bridge_mapping对应的网桥。br-eth
        # 7.设置DVR agent
        # 8.初始化sg_agent,用于处理security group。
        # 9.run_daemon_loop=True 开始循环
        # 周期检测br - int上的端口变化,调用process_network_ports处理添加 / 删除端口。
        # 实例化一个OVSAgent,并完成OVS Agent的一系列初始化工作
        agent = OVSNeutronAgent(bridge_classes, cfg.CONF)
        capabilities.notify_init_event(n_const.AGENT_TYPE_OVS, agent)
    except (RuntimeError, ValueError) as e:
        LOG.error("%s Agent terminated!", e)
        sys.exit(1)
    # 循环检查一些状态,发现状态发生变化,执行相应的操作
    agent.daemon_loop()

neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py:init

# 在OVSNeutronAgent的docstring中,概要说明了agent实现虚拟的方式,有以下几点:
# 1) 创建br-int, br-tun以及每个物理网络接口一个bridge。
# 2) 虚拟机的虚拟网卡都会接入到br-int。
#    使用同一个虚拟网络的虚拟网卡共享一个local的VLAN(与外部网络的VLAN无关,vlan id可以重叠)。
#    这个local的VLAN id会映射到外部网络的某个VLAN id。
# 3) 对于network_type是VLAN或者FLAT的网络,
#    在br-int和各个物理网络bridge之间创建一个虚拟网卡,
#    用于限定流规则、映射或者删除VLAN id等处理。
# 4) 对于network_type是GRE的,每个租户在不同hypervisor之间的
#    网络通信通过一个逻辑交换机标识符(Logical Switch identifier)进行区分,
#    并创建一个连通各个hypervisor的br-tun的通道(tunnel)网络。
#    Port patching用于连通br-int和各个hypervisor的br-tun上的VLAN。
@profiler.trace_cls("rpc")
class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
                      dvr_rpc.DVRAgentRpcCallbackMixin):
    '''Implements OVS-based tunneling, VLANs and flat networks.

    Two local bridges are created: an integration bridge (defaults to
    'br-int') and a tunneling bridge (defaults to 'br-tun'). An
    additional bridge is created for each physical network interface
    used for VLANs and/or flat networks.

    All VM VIFs are plugged into the integration bridge. VM VIFs on a
    given virtual network share a common "local" VLAN (i.e. not
    propagated externally). The VLAN id of this local VLAN is mapped
    to the physical networking details realizing that virtual network.

    For virtual networks realized as GRE tunnels, a Logical Switch
    (LS) identifier is used to differentiate tenant traffic on
    inter-HV tunnels. A mesh of tunnels is created to other
    Hypervisors in the cloud. These tunnels originate and terminate on
    the tunneling bridge of each hypervisor. Port patching is done to
    connect local VLANs on the integration bridge to inter-hypervisor
    tunnels on the tunnel bridge.

    For each virtual network realized as a VLAN or flat network, a
    veth or a pair of patch ports is used to connect the local VLAN on
    the integration bridge with the physical network bridge, with flow
    rules adding, modifying, or stripping VLAN tags as necessary.
    '''

    # history
    #   1.0 Initial version
    #   1.1 Support Security Group RPC
    #   1.2 Support DVR (Distributed Virtual Router) RPC
    #   1.3 Added param devices_to_update to security_groups_provider_updated
    #   1.4 Added support for network_update
    target = oslo_messaging.Target(version='1.4')

    def __init__(self, bridge_classes, conf=None):
        '''Constructor.

        :param bridge_classes: a dict for bridge classes.
        :param conf: an instance of ConfigOpts
        '''
        super(OVSNeutronAgent, self).__init__()
        self.conf = conf or cfg.CONF
        self.ovs = ovs_lib.BaseOVS()
        agent_conf = self.conf.AGENT
        ovs_conf = self.conf.OVS

        self.fullsync = False
        # init bridge classes with configured datapath type.
        self.br_int_cls, self.br_phys_cls, self.br_tun_cls = (
            functools.partial(bridge_classes[b],
                              datapath_type=ovs_conf.datapath_type)
            for b in ('br_int', 'br_phys', 'br_tun'))

        self.use_veth_interconnection = ovs_conf.use_veth_interconnection
        self.veth_mtu = agent_conf.veth_mtu
        # local VLAN id范围是[1, 4094]
        self.available_local_vlans = set(moves.range(p_const.MIN_VLAN_TAG,
                                                     p_const.MAX_VLAN_TAG + 1))
        self.tunnel_types = agent_conf.tunnel_types or []
        self.l2_pop = agent_conf.l2_population
        # TODO(ethuleau): Change ARP responder so it's not dependent on the
        #                 ML2 l2 population mechanism driver.
        # enable_distributed_routing是否使能分布式路由
        self.enable_distributed_routing = agent_conf.enable_distributed_routing
        self.arp_responder_enabled = agent_conf.arp_responder and self.l2_pop

        host = self.conf.host
        self.agent_id = 'ovs-agent-%s' % host

        self.enable_tunneling = bool(self.tunnel_types)

        # Validate agent configurations
        self._check_agent_configurations()

        # Keep track of int_br's device count for use by _report_state()
        self.int_br_device_count = 0
        # 创建br-int,重置流表规则等,通过调用brctl, ovs-vsctl, ip等命令实现
        self.int_br = self.br_int_cls(ovs_conf.integration_bridge)
        # setup_integration_br:安装整合网桥——int_br  
        # 创建patch ports,并移除所有现有的流规则  
        # 添加基本的流规则 
        self.setup_integration_br()
        # Stores port update notifications for processing in main rpc loop
        self.updated_ports = set()
        # Stores port delete notifications
        self.deleted_ports = set()

        self.network_ports = collections.defaultdict(set)
        # keeps association between ports and ofports to detect ofport change
        self.vifname_to_ofport_map = {}
        # 配置plugin的rpcapi连接(topic='q-plugin',接口neutron.agent.rpc.py:PluginApi)
        # 并监听其它服务对agent的rpc的调用(topic='q-agent-notifier')
        # setup_rpc完成以下任务:  
        # 设置plugin_rpc,这是用来与neutron-server通信的  
        # 设置state_rpc,用于agent状态信息上报  
        # 设置connection,用于接收neutron-server的消息  

        self.setup_rpc()
        # 配置文件中传入的参数
        self.bridge_mappings = self._parse_bridge_mappings(
            ovs_conf.bridge_mappings)
        # 给每个mapping创建一个bridge,并连接到br-int
        # 创建物理网络网桥,并用veth与br-int连接起来
        self.setup_physical_bridges(self.bridge_mappings)
        self.vlan_manager = vlanmanager.LocalVlanManager()

        self._reset_tunnel_ofports()

        self.polling_interval = agent_conf.polling_interval
        self.minimize_polling = agent_conf.minimize_polling
        self.ovsdb_monitor_respawn_interval = (
            agent_conf.ovsdb_monitor_respawn_interval or
            constants.DEFAULT_OVSDBMON_RESPAWN)
        self.local_ip = ovs_conf.local_ip
        self.tunnel_count = 0
        self.vxlan_udp_port = agent_conf.vxlan_udp_port
        self.dont_fragment = agent_conf.dont_fragment
        self.tunnel_csum = agent_conf.tunnel_csum
        self.tun_br = None
        self.patch_int_ofport = constants.OFPORT_INVALID
        self.patch_tun_ofport = constants.OFPORT_INVALID
        if self.enable_tunneling:
            # The patch_int_ofport and patch_tun_ofport are updated
            # here inside the call to setup_tunnel_br()
            self.setup_tunnel_br(ovs_conf.tunnel_bridge)
            self.setup_tunnel_br_flows()

        self.init_extension_manager(self.connection)

        self.dvr_agent = ovs_dvr_neutron_agent.OVSDVRNeutronAgent(
            self.context,
            self.dvr_plugin_rpc,
            self.int_br,
            self.tun_br,
            self.bridge_mappings,
            self.phys_brs,
            self.int_ofports,
            self.phys_ofports,
            self.patch_int_ofport,
            self.patch_tun_ofport,
            host,
            self.enable_tunneling,
            self.enable_distributed_routing)

        if self.enable_distributed_routing:
            self.dvr_agent.setup_dvr_flows()

        # Collect additional bridges to monitor
        self.ancillary_brs = self.setup_ancillary_bridges(
            ovs_conf.integration_bridge, ovs_conf.tunnel_bridge)

        # In order to keep existed device's local vlan unchanged,
        # restore local vlan mapping at start
        self._restore_local_vlan_map()

        # 创建tunnel的代码省略
        # Security group agent support
        self.sg_agent = agent_sg_rpc.SecurityGroupAgentRpc(
            self.context, self.sg_plugin_rpc, defer_refresh_firewall=True,
            integration_bridge=self.int_br)
        self.sg_plugin_rpc.register_legacy_sg_notification_callbacks(
            self.sg_agent)

        # we default to False to provide backward compat with out of tree
        # firewall drivers that expect the logic that existed on the Neutron
        # server which only enabled hybrid plugging based on the use of the
        # hybrid driver.
        hybrid_plug = getattr(self.sg_agent.firewall,
                              'OVS_HYBRID_PLUG_REQUIRED', False)
        self.prevent_arp_spoofing = (
            not self.sg_agent.firewall.provides_arp_spoofing_protection)

        #TODO(mangelajo): optimize resource_versions to only report
        #                 versions about resources which are common,
        #                 or which are used by specific extensions.
        self.agent_state = {
            'binary': 'neutron-openvswitch-agent',
            'host': host,
            'topic': n_const.L2_AGENT_TOPIC,
            'configurations': {'bridge_mappings': self.bridge_mappings,
                               'tunnel_types': self.tunnel_types,
                               'tunneling_ip': self.local_ip,
                               'l2_population': self.l2_pop,
                               'arp_responder_enabled':
                               self.arp_responder_enabled,
                               'enable_distributed_routing':
                               self.enable_distributed_routing,
                               'log_agent_heartbeats':
                               agent_conf.log_agent_heartbeats,
                               'extensions': self.ext_manager.names(),
                               'datapath_type': ovs_conf.datapath_type,
                               'ovs_capabilities': self.ovs.capabilities,
                               'vhostuser_socket_dir':
                               ovs_conf.vhostuser_socket_dir,
                               portbindings.OVS_HYBRID_PLUG: hybrid_plug},
            'resource_versions': resources.LOCAL_RESOURCE_VERSIONS,
            'agent_type': agent_conf.agent_type,
            'start_flag': True} 
        # 启动心跳周期上报,周期默认为30s。
        # Neutron server端启动rpc_listeners,对agent发送过来的消息进行监听
        # 对于心跳的监听,如果接收到心跳信号,就会对数据库中的时间戳进行更新
        # 如果一直不更新时间戳,当前时间-更新的时间戳,如果超过默认的agent_down_time = 75s
        # 则 agent处于down状态

        report_interval = agent_conf.report_interval
        if report_interval:
            heartbeat = loopingcall.FixedIntervalLoopingCall(
                self._report_state)
            heartbeat.start(interval=report_interval)
        # Initialize iteration counter
        self.iter_num = 0
        # 设置run_daemon_loop为true 开始后续的循环操作即执行rpc_loop
        self.run_daemon_loop = True

        self.catch_sigterm = False
        self.catch_sighup = False

        # The initialization is complete; we can start receiving messages
        self.connection.consume_in_threads()
        self.dead_topics.consume_in_threads()

        self.quitting_rpc_timeout = agent_conf.quitting_rpc_timeout

Agent与Plugin的RPC通信

        Agent 与Plugin直接通过RPC通道通信。Plugin需要通知Agent删除network以及更新port,Agent则向Plugin汇报端口up/down以及进行tunnel sync。

        一个交互通道有一个发送发与一个接收方,ML2Plugin初始化的时候会建立两个RPC通道。其中一个用于通知Agent端口变化或者network被删除,另一个用于接收Agent请求消息,为Agent提供各种所需信息,以及接收端口的up/down事件。Agent同样也需要两个RPC端点分别与Plugin对应。具体流程参考Neutron-API服务初始化

        # 配置plugin的rpcapi连接(topic='q-plugin',接口neutron.agent.rpc.py:PluginApi)
        # 并监听其它服务对agent的rpc的调用(topic='q-agent-notifier')
        # setup_rpc完成以下任务:
        # 设置plugin_rpc,这是用来与neutron-server通信的
        # 设置state_rpc,用于agent状态信息上报
        # 设置connection,用于接收neutron-server的消息
        self.setup_rpc()
def setup_rpc(self):
        # 设置plugin_rpc,用来与neutron-server通信的
        self.plugin_rpc = OVSPluginApi(topics.PLUGIN)
        # allow us to receive port_update/delete callbacks from the cache
        self.plugin_rpc.register_legacy_notification_callbacks(self)
        self.sg_plugin_rpc = sg_rpc.SecurityGroupServerAPIShim(
            self.plugin_rpc.remote_resource_cache)
        self.dvr_plugin_rpc = dvr_rpc.DVRServerRpcApi(topics.PLUGIN)
        # 设置state_rpc,用于agent状态信息上报
        self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS)

        # 设置connection,并添加consumers,用于接收neutron-server的消息
        # RPC network init
        self.context = context.get_admin_context_without_session()
        # Define the listening consumers for the agent
        consumers = [[constants.TUNNEL, topics.UPDATE],
                     [constants.TUNNEL, topics.DELETE],
                     [topics.DVR, topics.UPDATE]]
        if self.l2_pop:
            consumers.append([topics.L2POPULATION, topics.UPDATE])
        self.connection = agent_rpc.create_consumers([self],
                                                     topics.AGENT,
                                                     consumers,
                                                     start_listening=False)
        self.setup_old_topic_sinkhole()

br-int创建与初始化

br-int用于local vlan的二层交换,功能比较简单。主要是创建bridge并初始化到标准二层交换机模式,清空流表。代码分析如下:

# 创建br-int,重置流表规则等,通过调用brctl, ovs-vsctl, ip等命令实现
self.int_br = self.br_int_cls(ovs_conf.integration_bridge)
self.setup_integration_br()
# init bridge classes with configured datapath type.
self.br_int_cls, self.br_phys_cls, self.br_tun_cls = (
        functools.partial(bridge_classes[b],
                          datapath_type=ovs_conf.datapath_type)
        for b in ('br_int', 'br_phys', 'br_tun'))

# 安装integration网桥 
    # 创建patch ports,并移除所有现有的流规则 
    # 添加基本的流规则 
    def setup_integration_br(self):
        '''Setup the integration bridge.

        '''
        # Ensure the integration bridge is created.
        # ovs_lib.OVSBridge.create() will run
        #   ovs-vsctl -- --may-exist add-br BRIDGE_NAME
        # which does nothing if bridge already exists.
        # 通过执行ovs-vsctl中add-br创建int_br  
        self.int_br.create()
        self.int_br.set_secure_mode()
        self.int_br.setup_controllers(self.conf)

        if self.conf.AGENT.drop_flows_on_start:
            # Delete the patch port between br-int and br-tun if we're deleting
            # the flows on br-int, so that traffic doesn't get flooded over
            # while flows are missing.
            # 清空所有端口和流表
            self.int_br.delete_port(self.conf.OVS.int_peer_patch_port)
            self.int_br.uninstall_flows(cookie=ovs_lib.COOKIE_ANY)
        # switch all traffic using L2 learning  
        # 增加actions为normal,优先级为1的流规则  
        # 用L2学习来交换所有通信内容  
        # Add a canary flow to int_br to track OVS restarts  
        # 添加canary流规则给int_br来跟踪OVS的重启 优先级0级,actions drop  


        # 第一条流规则是优先级为1、actions为normal的流规则,
        # 这个规则是用来将连接到br-int的网络设备的通信内容进行转发给所有其他网络设备;
        # 第二条流规则是优先级为0、actions为drop的流规则,用来跟踪OVS的重启。
        self.int_br.setup_default_table()

br-eth初始化

对于物理网络设备的映射,每个物理bridge都需要管理员创建,并将物理接口attach到这个bridge上,参考配置

bridge_mappings = public:br-ex
ovs-vsctl add-port br-ex eth0

这个bridge完成物理网络vlan到local vlan的转换,转换过程不需要像tunnel那样复杂的流表,只需要作为一个二层交换机,因此br-eth的设置主要是初始化默认流表,建立与br-int 相连的端口,参考代码如下:

# 配置文件中传入的参数
        self.bridge_mappings = self._parse_bridge_mappings(
            ovs_conf.bridge_mappings)
        # 给每个mapping创建一个bridge,并连接到br-int
        # 创建物理网络网桥,并用veth与br-int连接起来
        self.setup_physical_bridges(self.bridge_mappings)
# 该函数完成物理bridge br-eth*的创建,创建完成之后
    # 删除现有流规则,并且添加同样为normal的流规则,转发消息
    # 根据use_veth_interconnection决定与br-int的通信方式(veth还是patch)
    # 通过设置drop流规则,封锁桥之间的通信,然后使用veth或者patch ports进行通信。
    def setup_physical_bridges(self, bridge_mappings):
        '''Setup the physical network bridges.

        Creates physical network bridges and links them to the
        integration bridge using veths or patch ports.

        :param bridge_mappings: map physical network names to bridge names.
        '''
        # 安装物理网络网桥
        # 创建物理网络网桥,并用veth/patchs与br-int连接起来
        self.phys_brs = {}
        self.int_ofports = {}
        self.phys_ofports = {}
        ip_wrapper = ip_lib.IPWrapper()
        ovs = ovs_lib.BaseOVS()
        ovs_bridges = ovs.get_bridges()
        # 针对bridge_mapping内的每个物理网络
        for physical_network, bridge in bridge_mappings.items():
            LOG.info("Mapping physical network %(physical_network)s to "
                     "bridge %(bridge)s",
                     {'physical_network': physical_network,
                      'bridge': bridge})
            # setup physical bridge
            if bridge not in ovs_bridges:
                LOG.error("Bridge %(bridge)s for physical network "
                          "%(physical_network)s does not exist. Agent "
                          "terminated!",
                          {'physical_network': physical_network,
                           'bridge': bridge})
                sys.exit(1)
            br = self.br_phys_cls(bridge)
            # The bridge already exists, so create won't recreate it, but will
            # handle things like changing the datapath_type
            # 创建br-eth
            br.create()
            br.set_secure_mode()
            br.setup_controllers(self.conf)
            if cfg.CONF.AGENT.drop_flows_on_start:
                br.uninstall_flows(cookie=ovs_lib.COOKIE_ANY)
            br.setup_default_table()
            self.phys_brs[physical_network] = br

            # 使用veth/patchs使br-eth1与br-int互联
            # 删除原有的patchs,创建int-br-eth1和phy-br-eth1
            # 使用ovs-vsctl show
            # interconnect physical and integration bridges using veth/patches
            int_if_name = p_utils.get_interface_name(
                bridge, prefix=constants.PEER_INTEGRATION_PREFIX)
            phys_if_name = p_utils.get_interface_name(
                bridge, prefix=constants.PEER_PHYSICAL_PREFIX)
            # Interface type of port for physical and integration bridges must
            # be same, so check only one of them.
            # Not logging error here, as the interface may not exist yet.
            # Type check is done to cleanup wrong interface if any.
            int_type = self.int_br.db_get_val("Interface",
                int_if_name, "type", log_errors=False)
            if self.use_veth_interconnection:
                # Drop ports if the interface types doesn't match the
                # configuration value.
                if int_type == 'patch':
                    self.int_br.delete_port(int_if_name)
                    br.delete_port(phys_if_name)
                device = ip_lib.IPDevice(int_if_name)
                if device.exists():
                    device.link.delete()
                    # Give udev a chance to process its rules here, to avoid
                    # race conditions between commands launched by udev rules
                    # and the subsequent call to ip_wrapper.add_veth
                    utils.execute(['udevadm', 'settle', '--timeout=10'])
                # 通过ip netns exec 'namespace' ip link add veth命令添加veth
                int_veth, phys_veth = ip_wrapper.add_veth(int_if_name,
                                                          phys_if_name)
                int_ofport = self.int_br.add_port(int_if_name)
                phys_ofport = br.add_port(phys_if_name)
            else:
                # Drop ports if the interface type doesn't match the
                # configuration value
                if int_type == 'veth':
                    self.int_br.delete_port(int_if_name)
                    br.delete_port(phys_if_name)

                # Setup int_br to physical bridge patches.  If they already
                # exist we leave them alone, otherwise we create them but don't
                # connect them until after the drop rules are in place.
                if self.int_br.port_exists(int_if_name):
                    int_ofport = self.int_br.get_port_ofport(int_if_name)
                else:
                    int_ofport = self.int_br.add_patch_port(
                        int_if_name, constants.NONEXISTENT_PEER)
                if br.port_exists(phys_if_name):
                    phys_ofport = br.get_port_ofport(phys_if_name)
                else:
                    phys_ofport = br.add_patch_port(
                        phys_if_name, constants.NONEXISTENT_PEER)

            self.int_ofports[physical_network] = int_ofport
            self.phys_ofports[physical_network] = phys_ofport
            # 封锁桥梁之间的所有通信翻译
            # block all untranslated traffic between bridges
            self.int_br.drop_port(in_port=int_ofport)
            br.drop_port(in_port=phys_ofport)

            if self.use_veth_interconnection:
                # 使能veth传递通信
                # enable veth to pass traffic
                int_veth.link.set_up()
                phys_veth.link.set_up()
                if self.veth_mtu:
                    # set up mtu size for veth interfaces
                    int_veth.link.set_mtu(self.veth_mtu)
                    phys_veth.link.set_mtu(self.veth_mtu)
            else:
                # 关联patch ports传递通信
                # associate patch ports to pass traffic
                self.int_br.set_db_attribute('Interface', int_if_name,
                                             'options', {'peer': phys_if_name})
                br.set_db_attribute('Interface', phys_if_name,
                                    'options', {'peer': int_if_name})

br-tun初始化

创建tunnel port

执行rpc_loop

通过neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py:main中的的如下代码,调用rpc_loop

# 循环检查一些状态,发现状态发生变化,执行相应的操作
agent.daemon_loop()
    def daemon_loop(self):
        # Start everything.
        LOG.info("Agent initialized successfully, now running... ")
        signal.signal(signal.SIGTERM, self._handle_sigterm)
        if hasattr(signal, 'SIGHUP'):
            signal.signal(signal.SIGHUP, self._handle_sighup)
        with polling.get_polling_manager(
            self.minimize_polling,
            self.ovsdb_monitor_respawn_interval) as pm:

            self.rpc_loop(polling_manager=pm)

rpc_loop代码



    # rpc_loop()中最重要的两个函数为tunnel_sync(查询并建立隧道)
    # 和process_network_ports(处理port和安全组变更)
    # rpc_loop做的工作主要是进行循环地查询一些状态,根据这些状态,进行相应的操作
    # 其中最重要的就是扫描数据库中的ports信息,然后对这些信息进行处理
    def rpc_loop(self, polling_manager=None):
        if not polling_manager:
            polling_manager = polling.get_polling_manager(
                minimize_polling=False)
        # 初始化设置
        sync = False
        ports = set()
        updated_ports_copy = set()
        ancillary_ports = set()
        tunnel_sync = True
        ovs_restarted = False
        consecutive_resyncs = 0
        need_clean_stale_flow = True
        ports_not_ready_yet = set()
        failed_devices = {'added': set(), 'removed': set()}
        failed_ancillary_devices = {'added': set(), 'removed': set()}
        failed_devices_retries_map = {}
        while self._check_and_handle_signal():
            if self.fullsync:
                LOG.info("rpc_loop doing a full sync.")
                sync = True
                self.fullsync = False
            port_info = {}
            ancillary_port_info = {}
            start = time.time()
            LOG.debug("Agent rpc_loop - iteration:%d started",
                      self.iter_num)
            # 根据之前br-int中设置canary flow的有无判断是否进行restart操作
            ovs_status = self.check_ovs_status()
            if ovs_status == constants.OVS_RESTARTED:
                self.setup_integration_br()
                self.setup_physical_bridges(self.bridge_mappings)
                if self.enable_tunneling:
                    self._reset_tunnel_ofports()
                    self.setup_tunnel_br()
                    self.setup_tunnel_br_flows()
                    tunnel_sync = True
                if self.enable_distributed_routing:
                    self.dvr_agent.reset_ovs_parameters(self.int_br,
                                                 self.tun_br,
                                                 self.patch_int_ofport,
                                                 self.patch_tun_ofport)
                    self.dvr_agent.reset_dvr_parameters()
                    self.dvr_agent.setup_dvr_flows()
                # notify that OVS has restarted
                registry.notify(
                    callback_resources.AGENT,
                    callback_events.OVS_RESTARTED,
                    self)
                # restart the polling manager so that it will signal as added
                # all the current ports
                # REVISIT (rossella_s) Define a method "reset" in
                # BasePollingManager that will be implemented by AlwaysPoll as
                # no action and by InterfacePollingMinimizer as start/stop
                if isinstance(
                    polling_manager, polling.InterfacePollingMinimizer):
                    polling_manager.stop()
                    polling_manager.start()
            elif ovs_status == constants.OVS_DEAD:
                # Agent doesn't apply any operations when ovs is dead, to
                # prevent unexpected failure or crash. Sleep and continue
                # loop in which ovs status will be checked periodically.
                port_stats = self.get_port_stats({}, {})
                self.loop_count_and_wait(start, port_stats)
                continue
            # Notify the plugin of tunnel IP
            if self.enable_tunneling and tunnel_sync:
                try:
                    tunnel_sync = self.tunnel_sync()
                except Exception:
                    LOG.exception("Error while configuring tunnel endpoints")
                    tunnel_sync = True
            ovs_restarted |= (ovs_status == constants.OVS_RESTARTED)
            devices_need_retry = (any(failed_devices.values()) or
                any(failed_ancillary_devices.values()) or
                ports_not_ready_yet)
            if (self._agent_has_updates(polling_manager) or sync
                    or devices_need_retry):
                try:
                    LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
                              "starting polling. Elapsed:%(elapsed).3f",
                              {'iter_num': self.iter_num,
                               'elapsed': time.time() - start})
                    # Save updated ports dict to perform rollback in
                    # case resync would be needed, and then clear
                    # self.updated_ports. As the greenthread should not yield
                    # between these two statements, this will be thread-safe
                    updated_ports_copy = self.updated_ports
                    self.updated_ports = set()
                    # 从br-int确定配置更新或者删除的端口信息
                    # port_info 通过scan_ports处理
                    (port_info, ancillary_port_info, consecutive_resyncs,
                     ports_not_ready_yet) = (self.process_port_info(
                            start, polling_manager, sync, ovs_restarted,
                            ports, ancillary_ports, updated_ports_copy,
                            consecutive_resyncs, ports_not_ready_yet,
                            failed_devices, failed_ancillary_devices))
                    sync = False
                    self.process_deleted_ports(port_info)
                    ofport_changed_ports = self.update_stale_ofport_rules()
                    if ofport_changed_ports:
                        port_info.setdefault('updated', set()).update(
                            ofport_changed_ports)
                    LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
                              "port information retrieved. "
                              "Elapsed:%(elapsed).3f",
                              {'iter_num': self.iter_num,
                               'elapsed': time.time() - start})
                    # Secure and wire/unwire VIFs and update their status
                    # on Neutron server
                    if (self._port_info_has_changes(port_info) or
                        self.sg_agent.firewall_refresh_needed() or
                        ovs_restarted):
                        LOG.debug("Starting to process devices in:%s",
                                  port_info)
                        # If treat devices fails - must resync with plugin
                        # # If treat devices fails - must resync with plugin
                        # 这个方法会从plugin查询port的详情,根据port的admin_state_up状态,
                        # 分别执行self.port_bound()或者self.port_dead()
                        # 并调用plugin rpc的update_device_up或update_device_down方法更新端口状态
                        failed_devices = self.process_network_ports(
                            port_info, ovs_restarted)
                        if need_clean_stale_flow:
                            self.cleanup_stale_flows()
                            need_clean_stale_flow = False
                        LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
                                  "ports processed. Elapsed:%(elapsed).3f",
                                  {'iter_num': self.iter_num,
                                   'elapsed': time.time() - start})

                    ports = port_info['current']

                    if self.ancillary_brs:
                        failed_ancillary_devices = (
                            self.process_ancillary_network_ports(
                                ancillary_port_info))
                        LOG.debug("Agent rpc_loop - iteration: "
                                  "%(iter_num)d - ancillary ports "
                                  "processed. Elapsed:%(elapsed).3f",
                                  {'iter_num': self.iter_num,
                                   'elapsed': time.time() - start})
                        ancillary_ports = ancillary_port_info['current']

                    polling_manager.polling_completed()
                    failed_devices_retries_map = (
                        self.update_retries_map_and_remove_devs_not_to_retry(
                            failed_devices, failed_ancillary_devices,
                            failed_devices_retries_map))
                    # Keep this flag in the last line of "try" block,
                    # so we can sure that no other Exception occurred.
                    ovs_restarted = False
                    self._dispose_local_vlan_hints()
                except Exception:
                    LOG.exception("Error while processing VIF ports")
                    # Put the ports back in self.updated_port
                    self.updated_ports |= updated_ports_copy
                    sync = True
            port_stats = self.get_port_stats(port_info, ancillary_port_info)
            self.loop_count_and_wait(start, port_stats)

process_port_info:scan_ports


    def process_port_info(self, start, polling_manager, sync, ovs_restarted,
                       ports, ancillary_ports, updated_ports_copy,
                       consecutive_resyncs, ports_not_ready_yet,
                       failed_devices, failed_ancillary_devices):
        # There are polling managers that don't have get_events, e.g.
        # AlwaysPoll used by windows implementations
        # REVISIT (rossella_s) This needs to be reworked to hide implementation
        # details regarding polling in BasePollingManager subclasses
        if sync or not (hasattr(polling_manager, 'get_events')):
            if sync:
                LOG.info("Agent out of sync with plugin!")
                consecutive_resyncs = consecutive_resyncs + 1
                if (consecutive_resyncs >=
                        constants.MAX_DEVICE_RETRIES):
                    LOG.warning(
                        "Clearing cache of registered ports,"
                        " retries to resync were > %s",
                        constants.MAX_DEVICE_RETRIES)
                    ports.clear()
                    ancillary_ports.clear()
                    consecutive_resyncs = 0
            else:
                consecutive_resyncs = 0
                # TODO(rossella_s): For implementations that use AlwaysPoll
                # resync if a device failed. This can be improved in future
                sync = (any(failed_devices.values()) or
                    any(failed_ancillary_devices.values()))

            # NOTE(rossella_s) don't empty the queue of events
            # calling polling_manager.get_events() since
            # the agent might miss some event (for example a port
            # deletion)
            reg_ports = (set() if ovs_restarted else ports)
            # 通过数据库获取port info
            port_info = self.scan_ports(reg_ports, sync,
                                        updated_ports_copy)
            # Treat ancillary devices if they exist
            if self.ancillary_brs:
                ancillary_port_info = self.scan_ancillary_ports(
                    ancillary_ports, sync)
                LOG.debug("Agent rpc_loop - iteration:%(iter_num)d"
                          " - ancillary port info retrieved. "
                          "Elapsed:%(elapsed).3f",
                          {'iter_num': self.iter_num,
                           'elapsed': time.time() - start})
            else:
                ancillary_port_info = {}

        else:
            consecutive_resyncs = 0
            events = polling_manager.get_events()
            port_info, ancillary_port_info, ports_not_ready_yet = (
                self.process_ports_events(events, ports, ancillary_ports,
                                          ports_not_ready_yet,
                                          failed_devices,
                                          failed_ancillary_devices,
                                          updated_ports_copy))
            registry.notify(
                constants.OVSDB_RESOURCE,
                callback_events.AFTER_READ,
                self,
                ovsdb_events=events)

        return (port_info, ancillary_port_info, consecutive_resyncs,
                ports_not_ready_yet)
    def scan_ports(self, registered_ports, sync, updated_ports=None):
        # 通过ovs-vsctl命令获取数据库中的port设置信息
        cur_ports = self.int_br.get_vif_port_set()
        self.int_br_device_count = len(cur_ports)
        port_info = self._get_port_info(registered_ports, cur_ports, sync)
        if updated_ports is None:
            updated_ports = set()
        # 获取已经注册的port更新信息
        updated_ports.update(self.check_changed_vlans())
        if updated_ports:
            # Some updated ports might have been removed in the
            # meanwhile, and therefore should not be processed.
            # In this case the updated port won't be found among
            # current ports.
            updated_ports &= cur_ports
            if updated_ports:
                port_info['updated'] = updated_ports
        return port_info
    def _get_port_info(self, registered_ports, cur_ports,
                       readd_registered_ports):
        port_info = {'current': cur_ports}
        # FIXME(salv-orlando): It's not really necessary to return early
        # if nothing has changed.
        if not readd_registered_ports and cur_ports == registered_ports:
            return port_info

        # 更新added_ports的数量  
        if readd_registered_ports:
            port_info['added'] = cur_ports
        else:
            port_info['added'] = cur_ports - registered_ports
        # Update port_info with ports not found on the integration bridge
        # 更新removed_ports的数量,移除所有没有在br-int上发现的已知ports  
        port_info['removed'] = registered_ports - cur_ports
        return port_info

process_network_ports


    # 对port信息进行处理的方法
    # process_network_ports完成对port的添加、删除和更新操作
    # 之后循环检测是否已经到了循环间隔,如果没有则就sleep到那个时间,然后继续循环工作。
    def process_network_ports(self, port_info, ovs_restarted):
        failed_devices = {'added': set(), 'removed': set()}
        # TODO(salv-orlando): consider a solution for ensuring notifications
        # are processed exactly in the same order in which they were
        # received. This is tricky because there are two notification
        # sources: the neutron server, and the ovs db monitor process
        # If there is an exception while processing security groups ports
        # will not be wired anyway, and a resync will be triggered
        # VIF wiring needs to be performed always for 'new' devices.
        # For updated ports, re-wiring is not needed in most cases, but needs
        # to be performed anyway when the admin state of a device is changed.
        # A device might be both in the 'added' and 'updated'
        # list at the same time; avoid processing it twice.
        devices_added_updated = (port_info.get('added', set()) |
                                 port_info.get('updated', set()))
        need_binding_devices = []
        skipped_devices = set()
        if devices_added_updated:
            start = time.time()
            # treat_devices_added_or_updated根据是否已经存在这个port分别进行添加和更新操作
            # 更新:通过treat_vif_port将port添加并且绑定到net_uuid/lsw_id并且 为没有绑定的通信设置流规则
            # 添加:skipped_devices.append(device)进行添加之后,将做与update一样的操作  
            (skipped_devices, need_binding_devices,
            failed_devices['added']) = (
                self.treat_devices_added_or_updated(
                    devices_added_updated, ovs_restarted))
            LOG.debug("process_network_ports - iteration:%(iter_num)d - "
                      "treat_devices_added_or_updated completed. "
                      "Skipped %(num_skipped)d devices of "
                      "%(num_current)d devices currently available. "
                      "Time elapsed: %(elapsed).3f",
                      {'iter_num': self.iter_num,
                       'num_skipped': len(skipped_devices),
                       'num_current': len(port_info['current']),
                       'elapsed': time.time() - start})
            # Update the list of current ports storing only those which
            # have been actually processed.
            skipped_devices = set(skipped_devices)
            port_info['current'] = (port_info['current'] - skipped_devices)

        # TODO(salv-orlando): Optimize avoiding applying filters
        # unnecessarily, (eg: when there are no IP address changes)
        added_ports = port_info.get('added', set()) - skipped_devices
        self._add_port_tag_info(need_binding_devices)
        self.sg_agent.setup_port_filters(added_ports,
                                         port_info.get('updated', set()))
        failed_devices['added'] |= self._bind_devices(need_binding_devices)

        if 'removed' in port_info and port_info['removed']:
            start = time.time()
            # 完成移除port的功能,通过发送RPC命令给Neutron server完成
            failed_devices['removed'] |= self.treat_devices_removed(
                port_info['removed'])
            LOG.debug("process_network_ports - iteration:%(iter_num)d - "
                      "treat_devices_removed completed in %(elapsed).3f",
                      {'iter_num': self.iter_num,
                       'elapsed': time.time() - start})
        return failed_devices

初始化流程图


这里写图片描述

调用过程

参考资料

Neutron-server初始化 — Neutron L2 Agent服务初始化
OpenStack Neutron源码分析:ovs-neutron-agent启动源码解析

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

openstack-neutron-OVS agent(持续更新) 的相关文章

随机推荐

  • Adversarial Robustness - Theory and Practice

    文章目录 第一章 Introduction to adversarial robustness第二章 linear models第三章 Adversarial examples solving the inner maximization1
  • 筋斗云移动应用框架

    筋斗云框架是用于移动产品开发的一揽子解决方案 筋斗云的设计思想是做优雅的全平台应用 xff0c 可以制作各类移动端 xff08 如安卓 苹果平台 xff09 或桌面端 xff08 如Windows等桌面系统 xff09 的Web应用和原生应
  • 筋斗云接口编程 / 虚拟表和视图

    虚拟表和视图 表ApiLog中有一个字段叫app xff0c 表示前端应用名 xff1a 64 ApiLog id tm addr app userId userId 如果app 61 user xff0c 则关联到User表 xff1b
  • 用zt-zip Java库进行zip文件处理

    Java 标准库本身自带java util zip包 xff0c 利用该包可以解决zip文件的处理问题 但是该包提供的功能相对底层 xff0c 想要实现zip文件的处理 xff0c 需要写一 些代码 xff0c 该包并没有封装API到调用一
  • Linux临时目录/tmp与/var/tmp

    Linux有两个公知的临时目录 xff1a tmp与 var tmp xff0c 这两个目录被用户用于存储临时性的文件 xff0c 亦经常被程读写用户存储临时性数据 两个目录没有本质上的区别 xff0c 最根本的区别仅仅是系统对其中文件清理
  • Python 日志打印

    核心概念 Python标准库自带日志模块logging xff0c logging中涉及到4个核心组件 xff0c 这些组件构建了logging体系 Logger xff1a 应用程序直接使用的接口对象 xff0c 通过logger操作完成
  • Docker指定网桥和指定网桥IP

    docker network ls NETWORK ID NAME DRIVER 7fca4eb8c647 bridge bridge 9f904ee27bf5 none null cf03ee007fb4 host host Bridge
  • Python Hash操作-MD5-SHA-HMAC

    Python标准库提供了计算数据Hash的功能 xff0c 支持许多不同的算法 xff0c 常见的MD5 SHA1 SHA256 HMAC均在其中 MD5 SHA1 SHA256在hashlib模块中 xff0c HMAC在hmac模块中
  • 离线环境下火狐浏览器Firefox完全信息迁移

    火狐浏览器Firefox是一个历史比较久的网页浏览器 xff0c 当前的火狐采用顺序数字命名的版本号演进 xff0c 迭代速度较快 xff0c Windows上可以自动升级 xff0c 升级带来了安全更新和功能特性上的变化 火狐浏览器具书签
  • 用SLF4J输出log的正确姿势

    slf4j是Java的一种Log Api xff0c 类似Apache Commons Logging 最直接的log方式 logger debug 34 Entry number 34 43 i 43 34 is 34 43 String
  • Thymeleaf消息表达式

    消息表达式用于从消息源中提取消息内容实现国际化 表达式的语法 xff1a span class token tag span class token tag span class token punctuation lt span p sp
  • Thymeleaf URL表达式

    URL在Thymleaf中是第一类公民 xff0c 有其专有的表达式语法 64 共存在2大类URL xff1a 绝对URL http www your domain相对URL xff0c 分为四类相对于页面 user login html相
  • Maven resources的include和exclude

    Maven resources plugin支持明确声明 lt directory gt 指定的资源目录中哪些资源需要处理 xff0c 哪些资源可以不被处理 lt include gt 指明需要包括的资源 xff0c 位于src my re
  • Thymeleaf条件判断

    th if th if属性求Bool值 xff0c 只有true的时候其所在的标签及该标签中的内容才会被渲染到输出结果中 lt a href 61 34 comments html 34 th href 61 34 64 product c
  • 2019 蓝桥杯省赛 A 组模拟赛(一)C. 结果填空:马的管辖 (暴力搜索)

    题目 xff1a 在中国象棋中 xff0c 马是走日字的 一个马的管辖范围指的是当前位置以及一步之内能走到的位置 xff0c 下图的绿色旗子表示马能走到的位置 如果一匹马的某个方向被蹩马脚 xff0c 它就不能往这个方向跳了 xff0c 如
  • Ubuntu下安装使用Xfce4

    安装 xff1a 代码 sudo apt get install xfce4 xfce4 taskbar plugin xfce4 taskbar plugin是我需要 xff0c 你可不用 xff0c 完整安装xfce4 的桌面环境 su
  • isdigit()函数如何判断负数

    在使用字符序列isdigt函数时 xff0c 我们会发现它无法判断负数 xff0c 如 xff1a a 61 39 2 39 39 3 39 4 5 print a 0 isdigit print a 1 isdigit 输出 xff1a
  • 用python操作浏览器的三种方式

    第一种 xff1a selenium导入浏览器驱动 xff0c 用get方法打开浏览器 xff0c 例如 xff1a import time from selenium import webdriver def mac driver 61
  • Linux-虚拟网络设备-veth pair

    基本概念 Virtual Ethernet CableBidirectional FIFOOften used to cross namespaces Linux container 中用到一个叫做veth的东西 xff0c 这是一种新的设
  • openstack-neutron-OVS agent(持续更新)

    概述 ML2Plugin的主要工作是管理虚拟网络资源 xff0c 保证数据正确无误 xff0c 具体物理设备的设置则由Agent完成 L2Agent通常运行在Hypervisor xff0c 与neutron server通过RPC通信 x