use informers and context cancellation

This commit is contained in:
Antonio Ojea
2024-06-30 10:15:21 +00:00
parent d33e545fb0
commit b26553c42d
3 changed files with 64 additions and 18 deletions

View File

@@ -38,7 +38,7 @@ type CNIConfigInputs struct {
}
// ComputeCNIConfigInputs computes the template inputs for CNIConfigWriter
func ComputeCNIConfigInputs(node corev1.Node) CNIConfigInputs {
func ComputeCNIConfigInputs(node *corev1.Node) CNIConfigInputs {
defaultRoutes := []string{"0.0.0.0/0", "::/0"}
// check if is a dualstack cluster

View File

@@ -22,14 +22,17 @@ import (
"fmt"
"net"
"os"
"os/signal"
"strings"
"syscall"
"time"
"k8s.io/apimachinery/pkg/util/sets"
"golang.org/x/sys/unix"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
@@ -75,6 +78,9 @@ func main() {
if err != nil {
panic(err.Error())
}
// use protobuf to improve performance
config.AcceptContentTypes = "application/vnd.kubernetes.protobuf,application/json"
config.ContentType = "application/vnd.kubernetes.protobuf"
// override the internal apiserver endpoint to avoid
// waiting for kube-proxy to install the services rules.
@@ -101,6 +107,31 @@ func main() {
}
klog.Infof("connected to apiserver: %s", config.Host)
// trap Ctrl+C and call cancel on the context
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
// Enable signal handler
signalCh := make(chan os.Signal, 2)
defer func() {
close(signalCh)
cancel()
}()
signal.Notify(signalCh, os.Interrupt, unix.SIGINT)
go func() {
select {
case <-signalCh:
klog.Infof("Exiting: received signal")
cancel()
case <-ctx.Done():
}
}()
informersFactory := informers.NewSharedInformerFactory(clientset, 0)
nodeInformer := informersFactory.Core().V1().Nodes()
nodeLister := nodeInformer.Lister()
// obtain the host and pod ip addresses
// if both ips are different we are not using the host network
hostIP, podIP := os.Getenv("HOST_IP"), os.Getenv("POD_IP")
@@ -153,7 +184,7 @@ func main() {
panic(err.Error())
}
go func() {
if err := masqAgentIPv4.SyncRulesForever(time.Second * 60); err != nil {
if err := masqAgentIPv4.SyncRulesForever(ctx, time.Second*60); err != nil {
panic(err)
}
}()
@@ -168,7 +199,7 @@ func main() {
}
go func() {
if err := masqAgentIPv6.SyncRulesForever(time.Second * 60); err != nil {
if err := masqAgentIPv6.SyncRulesForever(ctx, time.Second*60); err != nil {
panic(err)
}
}()
@@ -178,13 +209,15 @@ func main() {
reconcileNodes := makeNodesReconciler(cniConfigWriter, hostIP, ipFamily)
// main control loop
informersFactory.Start(ctx.Done())
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
// Gets the Nodes information from the API
// TODO: use a proper controller instead
var nodes *corev1.NodeList
var nodes []*corev1.Node
var err error
for i := 0; i < 5; i++ {
nodes, err = clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
nodes, err = nodeLister.List(labels.Everything())
if err == nil {
break
}
@@ -209,14 +242,20 @@ func main() {
}
// rate limit
time.Sleep(10 * time.Second)
select {
case <-ctx.Done():
// grace period to cleanup resources
time.Sleep(1 * time.Second)
return
case <-ticker.C:
}
}
}
// nodeNodesReconciler returns a reconciliation func for nodes
func makeNodesReconciler(cniConfig *CNIConfigWriter, hostIP string, ipFamily IPFamily) func(*corev1.NodeList) error {
func makeNodesReconciler(cniConfig *CNIConfigWriter, hostIP string, ipFamily IPFamily) func([]*corev1.Node) error {
// reconciles a node
reconcileNode := func(node corev1.Node) error {
reconcileNode := func(node *corev1.Node) error {
// first get this node's IPs
// we don't support more than one IP address per IP family for simplification
nodeIPs := internalIPs(node)
@@ -252,7 +291,7 @@ func makeNodesReconciler(cniConfig *CNIConfigWriter, hostIP string, ipFamily IPF
// obtain the PodCIDR gateway
var nodeIPv4, nodeIPv6 string
for _, ip := range sets.List(nodeIPs) {
for _, ip := range nodeIPs.UnsortedList() {
if isIPv6String(ip) {
nodeIPv6 = ip
} else {
@@ -274,8 +313,8 @@ func makeNodesReconciler(cniConfig *CNIConfigWriter, hostIP string, ipFamily IPF
}
// return a reconciler for all the nodes
return func(nodes *corev1.NodeList) error {
for _, node := range nodes.Items {
return func(nodes []*corev1.Node) error {
for _, node := range nodes {
if err := reconcileNode(node); err != nil {
return err
}
@@ -285,7 +324,7 @@ func makeNodesReconciler(cniConfig *CNIConfigWriter, hostIP string, ipFamily IPF
}
// internalIPs returns the internal IP addresses for node
func internalIPs(node corev1.Node) sets.Set[string] {
func internalIPs(node *corev1.Node) sets.Set[string] {
ips := sets.New[string]()
// check the node.Status.Addresses
for _, address := range node.Status.Addresses {

View File

@@ -17,6 +17,7 @@ limitations under the License.
package main
import (
"context"
"fmt"
"time"
@@ -55,8 +56,11 @@ type IPMasqAgent struct {
// these rules only needs to be installed once, but we run it periodically to check that are
// not deleted by an external program. It fails if can't sync the rules during 3 iterations
// TODO: aggregate errors
func (ma *IPMasqAgent) SyncRulesForever(interval time.Duration) error {
func (ma *IPMasqAgent) SyncRulesForever(ctx context.Context, interval time.Duration) error {
errs := 0
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
if err := ma.SyncRules(); err != nil {
errs++
@@ -66,7 +70,10 @@ func (ma *IPMasqAgent) SyncRulesForever(interval time.Duration) error {
} else {
errs = 0
}
time.Sleep(interval)
select {
case <-ctx.Done():
case <-ticker.C:
}
}
}