【kubernetes/k8s源码分析】calico node felix源码分析之一

您所在的位置:网站首页 calico源码 【kubernetes/k8s源码分析】calico node felix源码分析之一

【kubernetes/k8s源码分析】calico node felix源码分析之一

#【kubernetes/k8s源码分析】calico node felix源码分析之一| 来源: 网络整理| 查看: 265

github: https://github.com/projectcalico/felix

 

    本文分析数据平面內容

 

      Felix是一个守护程序,在每个 endpoints 的节点上运行。Felix 负责编制路由和 ACL 规则等,以便为该主机上的 endpoints 资源正常运行提供所需的网络连接。

     Felix负责以下任务:

管理网络接口,Felix 将有关接口的一些信息写到内核,以使内核能够正确处理该 endpoint 发出的流量。 特别是,它将确保主机正确响应来自每个工作负载的ARP请求,并将为其管理的接口启用IP转发支持。它还监视网络接口的出现和消失,以便确保针对这些接口的编程得到了正确的应用。编写路由,Felix负责将到其主机上endpoints的路由编写到Linux内核FIB(转发信息库)中。 这可以确保那些发往目标主机的endpoints的数据包被正确地转发。编写 ACLs,Felix还负责将ACLs编程到Linux内核中。 这些ACLs用于确保只能在endpoints之间发送有效的网络流量,并确保endpoints无法绕过Calico的安全措施。报告状态,Felix 负责提供有关网络健康状况的数据。 特别是,它将报告配置其主机时发生的错误和问题。 该数据会被写入etcd,以使其对网络中的其他组件和操作才可见

 

Run

    -->  StartDataplaneDriver

    -->  newConnector

 

1. Run 函数

     这个函数做的事情比较多,逐个分析

// Initialising early logging config (log format and early debug settings). // // Parsing command line parameters. // // Loading datastore configuration from the environment or config file. // // Loading more configuration from the datastore (this is retried until success). // // Starting the configured internal (golang) or external dataplane driver. // // Starting the background processing goroutines, which load and keep in sync with the // state from the datastore, the "calculation graph". // // Starting the usage reporting and prometheus metrics endpoint threads (if configured). // // Then, it defers to monitorAndManageShutdown(), which blocks until one of the components // fails, then attempts a graceful shutdown. At that point, all the processing is in // background goroutines. // // To avoid having to maintain rarely-used code paths, Felix handles updates to its // main config parameters by exiting and allowing itself to be restarted by the init // daemon. func Run(configFile string) { // Go's RNG is not seeded by default. Do that now. rand.Seed(time.Now().UTC().UnixNano())     1.1 LoadConfigFromEnvironment

"ipv6support"="false"

"felixhostname"="master-node"

"etcdcertfile"=""

"logseverityscreen"="info"

"etcdscheme"=""

"defaultendpointtohostaction"="ACCEPT"

"etcdaddr"=""

"datastoretype"="kubernetes"

"etcdkeyfile"=""

"healthenabled"="true"

"ipinipmtu"="1440"

"etcdendpoints"=""

"etcdcafile"=""

// LoadConfigFromEnvironment extracts raw config parameters (identified by // case-insensitive prefix "felix_") from the given OS environment variables. // An environment entry of "FELIX_FOO=bar" is translated to "foo": "bar". func LoadConfigFromEnvironment(environ []string) map[string]string { result := make(map[string]string) for _, kv := range environ { splits := strings.SplitN(kv, "=", 2) if len(splits) < 2 { log.Warningf("Ignoring malformed environment variable: %#v", kv) continue } key := strings.ToLower(splits[0]) value := splits[1] if strings.Index(key, "felix_") == 0 { splits = strings.SplitN(key, "_", 2) paramName := splits[1] log.Infof("Found felix environment variable: %#v=%#v", paramName, value) result[paramName] = value } } return result }     1.2 从配置文件 /etc/calico/felix.cfg 读取

[global] MetadataAddr = None LogFilePath = None LogSeverityFile = None LogSeveritySys = None

func LoadConfigFile(filename string) (map[string]string, error) { if _, err := os.Stat(filename); os.IsNotExist(err) { log.Infof("Ignoring absent config file: %v", filename) return nil, nil } data, err := ioutil.ReadFile(filename) if err != nil { return nil, err } return LoadConfigFileData(data) }

    剩下的一堆堆也是在拿到配置,此处略

 

    1.3 StartDataplaneDriver

      调用接口 netlink.LinkByName 查找 kube-ipvs0 是否使用 IPVS

4: dummy0: mtu 1500 qdisc noop state DOWN qlen 1000     link/ether c2:cb:fa:27:5e:c0 brd ff:ff:ff:ff:ff:ff 5: kube-ipvs0: mtu 1500 qdisc noop state DOWN      link/ether 62:ef:f4:5d:a5:02 brd ff:ff:ff:ff:ff:ff     inet 10.200.254.254/32 brd 10.200.254.254 scope global kube-ipvs0        valid_lft forever preferred_lft forever     inet 10.200.32.204/32 brd 10.200.32.204 scope global kube-ipvs0        valid_lft forever preferred_lft forever     inet 10.200.112.119/32 brd 10.200.112.119 scope global kube-ipvs0        valid_lft forever preferred_lft forever     inet 10.200.114.89/32 brd 10.200.114.89 scope global kube-ipvs0        valid_lft forever preferred_lft forever     inet 10.200.0.1/32 brd 10.200.0.1 scope global kube-ipvs0        valid_lft forever preferred_lft forever

func StartDataplaneDriver(configParams *config.Config, healthAggregator *health.HealthAggregator, configChangedRestartCallback func()) (DataplaneDriver, *exec.Cmd) { if configParams.UseInternalDataplaneDriver { log.Info("Using internal (linux) dataplane driver.") // If kube ipvs interface is present, enable ipvs support. kubeIPVSSupportEnabled := ifacemonitor.IsInterfacePresent(intdataplane.KubeIPVSInterface) if kubeIPVSSupportEnabled { log.Info("Kube-proxy in ipvs mode, enabling felix kube-proxy ipvs support.") } if configChangedRestartCallback == nil { log.Panic("Starting dataplane with nil callback func.") }

    1.3.1 mark bit

     acceptMark=0x10000 endpointMark=0xfff00000 endpointMarkNonCali=0x100000 passMark=0x20000 scratch0Mark=0x40000 scratch1Mark=0x80000

-A cali-PREROUTING -i cali+ -m comment --comment "cali:EWMPb0zVROM-woQp" -j MARK --set-xmark 0x40000/0x40000 -A cali-FORWARD -m comment --comment "cali:vjrMJCRpqwy5oRoX" -j MARK --set-xmark 0x0/0xe0000 -A cali-INPUT -m comment --comment "cali:d4znnv6_6rx6sE6M" -j MARK --set-xmark 0x0/0xfff00000 -A cali-INPUT -m comment --comment "cali:czgL26xl8reOnh13" -j MARK --set-xmark 0x0/0xf0000 -A cali-OUTPUT -m comment --comment "cali:qO3aVIhjZ5EawFCC" -j MARK --set-xmark 0x0/0xf0000 -A cali-forward-endpoint-mark -m comment --comment "cali:96HaP1sFtb-NYoYA" -j MARK --set-xmark 0x0/0xfff00000

markBitsManager := markbits.NewMarkBitsManager(configParams.IptablesMarkMask, "felix-iptables") // Dedicated mark bits for accept and pass actions. These are long lived bits // that we use for communicating between chains. markAccept, _ := markBitsManager.NextSingleBitMark() markPass, _ := markBitsManager.NextSingleBitMark() // Short-lived mark bits for local calculations within a chain. markScratch0, _ := markBitsManager.NextSingleBitMark() markScratch1, _ := markBitsManager.NextSingleBitMark() if markAccept == 0 || markPass == 0 || markScratch0 == 0 || markScratch1 == 0 { log.WithFields(log.Fields{ "Name": "felix-iptables", "MarkMask": configParams.IptablesMarkMask, }).Panic("Not enough mark bits available.") }

    1.3.2 NewIntDataplaneDriver

      第 2 章节讲解

intDP := intdataplane.NewIntDataplaneDriver(dpConfig) intDP.Start()

 

2. NewIntDataplaneDriver

     这主要是设置 iptables 规则

      路径 dataplane/linux/int_dataplane.go

func NewIntDataplaneDriver(config Config) *InternalDataplane { log.WithField("config", config).Info("Creating internal dataplane driver.") ruleRenderer := config.RuleRendererOverride if ruleRenderer == nil { ruleRenderer = rules.NewRenderer(config.RulesConfig) }     2.1 NewRenderer 函数

      2.1.1 根据 EndpointToHostAction:"ACCEPT",允许 workload 到 host 的数据

func NewRenderer(config Config) RuleRenderer { log.WithField("config", config).Info("Creating rule renderer.") config.validate() // Convert configured actions to rule slices. // First, what should we do with packets that come from workloads to the host itself. var inputAcceptActions []iptables.Action switch config.EndpointToHostAction { case "DROP": log.Info("Workload to host packets will be dropped.") inputAcceptActions = []iptables.Action{iptables.DropAction{}} case "ACCEPT": log.Info("Workload to host packets will be accepted.") inputAcceptActions = []iptables.Action{iptables.AcceptAction{}} default: log.Info("Workload to host packets will be returned to INPUT chain.") inputAcceptActions = []iptables.Action{iptables.ReturnAction{}} }

      2.1.2 根据 IptablesFilterAllowAction:"ACCEPT"

       filter 表允许数据包立刻通过

// What should we do with packets that are accepted in the forwarding chain var filterAllowAction, mangleAllowAction iptables.Action switch config.IptablesFilterAllowAction { case "RETURN": log.Info("filter table allowed packets will be returned to FORWARD chain.") filterAllowAction = iptables.ReturnAction{} default: log.Info("filter table allowed packets will be accepted immediately.") filterAllowAction = iptables.AcceptAction{} }

      2.1.3 根据 IptablesMangleAllowAction:"ACCEPT"

      mangle 表允许数据包立刻通过

switch config.IptablesMangleAllowAction { case "RETURN": log.Info("mangle table allowed packets will be returned to PREROUTING chain.") mangleAllowAction = iptables.ReturnAction{} default: log.Info("mangle table allowed packets will be accepted immediately.") mangleAllowAction = iptables.AcceptAction{} } return &DefaultRuleRenderer{ Config: config, inputAcceptActions: inputAcceptActions, filterAllowAction: filterAllowAction, mangleAllowAction: mangleAllowAction, }     2.2 iptables 表需要设置的参数    // AllHistoricChainNamePrefixes lists all the prefixes that we've used for chains. Keeping // track of the old names lets us clean them up. AllHistoricChainNamePrefixes = []string{ // Current. "cali-", // Early RCs of Felix 2.1 used "cali" as the prefix for some chains rather than // "cali-". This led to name clashes with the DHCP agent, which uses "calico-" as // its prefix. We need to explicitly list these exceptions. "califw-", "calitw-", "califh-", "calith-", "calipi-", "calipo-", // Pre Felix v2.1. "felix-", } // Most iptables tables need the same options. iptablesOptions := iptables.TableOptions{ HistoricChainPrefixes: rules.AllHistoricChainNamePrefixes, InsertMode: config.IptablesInsertMode, RefreshInterval: config.IptablesRefreshInterval, PostWriteInterval: config.IptablesPostWriteCheckInterval, LockTimeout: config.IptablesLockTimeout, LockProbeInterval: config.IptablesLockProbeInterval, BackendMode: config.IptablesBackend, LookPathOverride: config.LookPathOverride, }     2.3 实例化 mangle nat raw filter 表 mangleTableV4 := iptables.NewTable( "mangle", 4, rules.RuleHashPrefix, iptablesLock, featureDetector, iptablesOptions) natTableV4 := iptables.NewTable( "nat", 4, rules.RuleHashPrefix, iptablesLock, featureDetector, iptablesNATOptions, ) rawTableV4 := iptables.NewTable( "raw", 4, rules.RuleHashPrefix, iptablesLock, featureDetector, iptablesOptions) filterTableV4 := iptables.NewTable( "filter", 4, rules.RuleHashPrefix, iptablesLock, featureDetector, iptablesOptions)     2.4 开启 VXLAN,未开启,暂时略过 if config.RulesConfig.VXLANEnabled { routeTableVXLAN := routetable.New([]string{"vxlan.calico"}, 4, true, config.NetlinkTimeout) dp.routeTables = append(dp.routeTables, routeTableVXLAN) vxlanManager := newVXLANManager( ipSetsV4, config.MaxIPSetSize, config.Hostname, routeTableVXLAN, "vxlan.calico", config.RulesConfig.VXLANVNI, config.RulesConfig.VXLANPort, config.ExternalNodesCidrs, ) go vxlanManager.KeepVXLANDeviceInSync(config.VXLANMTU) dp.RegisterManager(vxlanManager) }     2.5 根据 XDPEnabled:false,先略过 if config.XDPEnabled { if err := bpf.SupportsXDP(); err != nil { log.WithError(err).Warn("Can't enable XDP acceleration.") } else { st, err := NewXDPState(config.XDPAllowGeneric) if err != nil { log.WithError(err).Warn("Can't enable XDP acceleration.") } else { dp.xdpState = st dp.xdpState.PopulateCallbacks(callbacks) log.Info("XDP acceleration enabled.") } } } else { log.Info("XDP acceleration disabled.") }     2.6 注册一堆 manager

     包括 ipset hostip policy endpoint floatingip masq

ipsetsManager := newIPSetsManager(ipSetsV4, config.MaxIPSetSize, callbacks) dp.RegisterManager(ipsetsManager) dp.ipsetsSourceV4 = ipsetsManager dp.RegisterManager(newHostIPManager( config.RulesConfig.WorkloadIfacePrefixes, rules.IPSetIDThisHostIPs, ipSetsV4, config.MaxIPSetSize)) dp.RegisterManager(newPolicyManager(rawTableV4, mangleTableV4, filterTableV4, ruleRenderer, 4, callbacks)) epManager := newEndpointManager( rawTableV4, mangleTableV4, filterTableV4, ruleRenderer, routeTableV4, 4, epMarkMapper, config.RulesConfig.KubeIPVSSupportEnabled, config.RulesConfig.WorkloadIfacePrefixes, dp.endpointStatusCombiner.OnEndpointStatusUpdate, callbacks) dp.RegisterManager(epManager) dp.endpointsSourceV4 = epManager dp.RegisterManager(newFloatingIPManager(natTableV4, ruleRenderer, 4)) dp.RegisterManager(newMasqManager(ipSetsV4, natTableV4, ruleRenderer, config.MaxIPSetSize, 4))

    总结,主要针对 iptables 进行初始化工作,包括四个表,乱七八糟的规则

    2.6.1 hostip_mgr OnUpdate 函數

    路徑 dataplane/linux/hostip_mgr.go,更新 hostIPManager 緩存 hostIfaceToAddrs

func (m *hostIPManager) OnUpdate(msg interface{}) { switch msg := msg.(type) { case *ifaceAddrsUpdate: log.WithField("update", msg).Info("Interface addrs changed.") if m.nonHostIfacesRegexp.MatchString(msg.Name) { log.WithField("update", msg).Debug("Not a real host interface, ignoring.") return } if msg.Addrs != nil { m.hostIfaceToAddrs[msg.Name] = msg.Addrs } else { delete(m.hostIfaceToAddrs, msg.Name) } // Host ip update is a relative rare event. Flush entire ipsets to make it simple. metadata := ipsets.IPSetMetadata{ Type: ipsets.IPSetTypeHashIP, SetID: m.hostIPSetID, MaxSize: m.maxSize, } m.ipsetsDataplane.AddOrReplaceIPSet(metadata, m.getCurrentMembers()) } }

 

3. Start 函数

    简单清晰,逐个进行分析

func (d *InternalDataplane) Start() { // Do our start-of-day configuration. d.doStaticDataplaneConfig() // Then, start the worker threads. go d.loopUpdatingDataplane() go d.loopReportingStatus() go d.ifaceMonitor.MonitorInterfaces() }     3.1 doStaticDataplaneConfig 函数

     配置内核参数详情看后文 rp_filter 说明部分

     /proc/sys/net/ipv4/conf/default/rp_filter 设置为 1 

// doStaticDataplaneConfig sets up the kernel and our static iptables chains. Should be called // once at start of day before starting the main loop. The actual iptables programming is deferred // to the main loop. func (d *InternalDataplane) doStaticDataplaneConfig() { // Check/configure global kernel parameters. d.configureKernel() // Endure that the default value of rp_filter is set to "strict" for newly-created // interfaces. This is required to prevent a race between starting an interface and // Felix being able to configure it. writeProcSys("/proc/sys/net/ipv4/conf/default/rp_filter", "1")

         引用网上其他同学的图片 

    3.1.1 raw 表规则链

        Raw表——两个链:OUTPUT、PREROUTING,作用:决定数据包是否被状态跟踪机制处理 内核模块:iptable_raw

for _, t := range d.iptablesRawTables { rawChains := d.ruleRenderer.StaticRawTableChains(t.IPVersion) t.UpdateChains(rawChains) t.SetRuleInsertions("PREROUTING", []iptables.Rule{{ Action: iptables.JumpAction{Target: rules.ChainRawPrerouting}, }}) t.SetRuleInsertions("OUTPUT", []iptables.Rule{{ Action: iptables.JumpAction{Target: rules.ChainRawOutput}, }}) }

    3.1.2 filter 表规则链

       filter表——三个链:INPUT、FORWARD、OUTPUT,作用:过滤数据包 内核模块:iptables_filter.

for _, t := range d.iptablesFilterTables { filterChains := d.ruleRenderer.StaticFilterTableChains(t.IPVersion) t.UpdateChains(filterChains) t.SetRuleInsertions("FORWARD", []iptables.Rule{{ Action: iptables.JumpAction{Target: rules.ChainFilterForward}, }}) t.SetRuleInsertions("INPUT", []iptables.Rule{{ Action: iptables.JumpAction{Target: rules.ChainFilterInput}, }}) t.SetRuleInsertions("OUTPUT", []iptables.Rule{{ Action: iptables.JumpAction{Target: rules.ChainFilterOutput}, }}) }     3.2 loopUpdatingDataplane func (d *InternalDataplane) loopUpdatingDataplane() { log.Info("Started internal iptables dataplane driver loop") healthTicks := time.NewTicker(healthInterval).C d.reportHealth()

    3.2.1 定义函数 processMsgFromCalcGraph

processMsgFromCalcGraph := func(msg interface{}) { log.WithField("msg", proto.MsgStringer{Msg: msg}).Infof( "Received %T update from calculation graph", msg) d.recordMsgStat(msg) for _, mgr := range d.allManagers { mgr.OnUpdate(msg) } switch msg.(type) { case *proto.InSync: log.WithField("timeSinceStart", time.Since(processStartTime)).Info( "Datastore in sync, flushing the dataplane for the first time...") datastoreInSync = true } }

    3.2.2 定義 processIfaceUpdate 函數

processIfaceUpdate := func(ifaceUpdate *ifaceUpdate) { log.WithField("msg", ifaceUpdate).Info("Received interface update") if ifaceUpdate.Name == KubeIPVSInterface { d.checkIPVSConfigOnStateUpdate(ifaceUpdate.State) return } for _, mgr := range d.allManagers { mgr.OnUpdate(ifaceUpdate) } for _, routeTable := range d.routeTables { routeTable.OnIfaceStateChanged(ifaceUpdate.Name, ifaceUpdate.State) } }

      for 循环一大堆 channel 这些就是传来传去的

 

      比较重要的执行了 apply 函数,第 4 章节讲解

     

    3.3 MonitorInterfaces 函数

      路径 iface_monitor/iface_monitor.go

      使用 updates 與 addrUpdates 來監管 iface 與 addr 的變化更新

func (m *InterfaceMonitor) MonitorInterfaces() { log.Info("Interface monitoring thread started.") updates := make(chan netlink.LinkUpdate, 10) addrUpdates := make(chan netlink.AddrUpdate, 10) if err := m.netlinkStub.Subscribe(updates, addrUpdates); err != nil { log.WithError(err).Panic("Failed to subscribe to netlink stub") } log.Info("Subscribed to netlink updates.") // Start of day, do a resync to notify all our existing interfaces. We also do periodic // resyncs because it's not clear what the ordering guarantees are for our netlink // subscription vs a list operation as used by resync(). err := m.resync() if err != nil { log.WithError(err).Panic("Failed to read link states from netlink.") }     3.4 onIfaceAddrsChange 函數

     如果 iface 有更新,則調用該函數,扔到 channel ifaceAddrUpdates 中

// onIfaceAddrsChange is our interface address monitor callback. It gets called // from the monitor's thread. func (d *InternalDataplane) onIfaceAddrsChange(ifaceName string, addrs set.Set) { log.WithFields(log.Fields{ "ifaceName": ifaceName, "addrs": addrs, }).Info("Linux interface addrs changed.") d.ifaceAddrUpdates   apply

               -->  CompleteDeferredWork

 

4. apply 函数在数据平面更新操作 func (d *InternalDataplane) apply() { // Update sequencing is important here because iptables rules have dependencies on ipsets. // Creating a rule that references an unknown IP set fails, as does deleting an IP set that // is in use. // Unset the needs-sync flag, we'll set it again if something fails. d.dataplaneNeedsSync = false     4.1 注册的 managers 更新 ipset 和 iptables

       包括 ipset hostip policy endpoint floatingip masq

// First, give the managers a chance to update IP sets and iptables. for _, mgr := range d.allManagers { err := mgr.CompleteDeferredWork() if err != nil { d.dataplaneNeedsSync = true } }     4.2 将每一个路由表结构至为非 insync if d.forceRouteRefresh { // Refresh timer popped. for _, r := range d.routeTables { // Queue a resync on the next Apply(). r.QueueResync() } d.forceRouteRefresh = false }     4.3 ipSets ApplyUpdates

       主要是调用 ipset list 与 ipset restore 命令同步与更新操作

       看文章 https://blog.csdn.net/zhonglinzhang/article/details/98174781 第 2 章节分析

    4.4 routetable Apply 函数

       主要功能是调用 route add 添加路由,如果 ipv4 有mac地址则添加静态 arp 

// Update the routing table in parallel with the other updates. We'll wait for it to finish // before we return. var routesWG sync.WaitGroup for _, r := range d.routeTables { routesWG.Add(1) go func(r *routetable.RouteTable) { err := r.Apply() if err != nil { log.Warn("Failed to synchronize routing table, will retry...") d.dataplaneNeedsSync = true } routesWG.Done() }(r) }

 

5. 实例化 DataplaneConnector func newConnector(configParams *config.Config, configUpdChan chan


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3