Reacting to Network Events

In the previous section, we set up the SharedInformerFactory and registered placeholder event handler functions (onAddPod, onUpdatePod, onDeletePod, etc.). Now, we need to implement the actual logic within these functions to react when Pods or NetworkPolicies are created, updated, or deleted in the cluster.

These handlers are the core of our monitoring application. They receive the relevant Kubernetes object involved in the event and allow us to perform actions based on the change.

Key Tasks in Event Handlers:

  1. Type Assertion: The objects passed to the handlers (obj, oldObj, newObj) are of type interface{}. The first step is always to cast (perform a type assertion) the object to the expected Kubernetes resource type pointer (e.g., *v1.Pod, *networkingv1.NetworkPolicy). Always check the ok value returned by the type assertion to ensure the cast was successful.

  2. Handling Deletion Tombstones: The DeleteFunc might sometimes receive a special object called cache.DeletedFinalStateUnknown. This happens if the informer's watch connection dropped and it missed the actual DELETE event, but later reconciled its cache and realized the object is gone. This "tombstone" object contains the last known state of the deleted object. You need code to handle this possibility and extract the actual deleted object from the tombstone.

  3. Extracting Information: Once you have the correctly typed object, you can access its fields (metadata.name, metadata.namespace, status.podIP, spec, etc.) to get the information you need.

  4. Processing Logic: Perform your desired action based on the event. This could be logging, updating metrics, triggering alerts, modifying other Kubernetes resources (carefully!), or adding work items to a queue for more complex processing (common in controllers).

Implementing the Handlers:

Let's update our previous main.go file by filling in the logic for our placeholder handler functions. We'll focus on logging the events and accessing basic information.

package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"os"
	"os/signal"
	"path/filepath"
	"syscall"
	"time"

	// Kubernetes API imports
	corev1 "k8s.io/api/core/v1" // Import actual types
	networkingv1 "k8s.io/api/networking/v1"

	// client-go imports
	"k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes"
	listers "k8s.io/client-go/listers/core/v1" // Import Lister package
	netlisters "k8s.io/client-go/listers/networking/v1"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/homedir"
	// "k8s.io/apimachinery/pkg/labels" // For Lister label selection
)

// Global variables for Listers (initialize after cache sync)
var (
	podLister           listers.PodLister
	networkPolicyLister netlisters.NetworkPolicyLister
)


func main() {
	log.Println("Starting Informer Setup Example")

	// --- Step 1: Setup Kubeconfig and Create Clientset ---
	var kubeconfig *string
	if home := homedir.HomeDir(); home != "" {
		kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) kubeconfig path")
	} else {
		kubeconfig = flag.String("kubeconfig", "", "kubeconfig path")
	}
	namespace := flag.String("namespace", "", "namespace to watch (optional, default is all)")
	flag.Parse()

	config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
	if err != nil {
		log.Fatalf("Error building kubeconfig: %s", err.Error())
	}
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		log.Fatalf("Error creating clientset: %s", err.Error())
	}


	// --- Step 2: Instantiate SharedInformerFactory ---
	factory := informers.NewSharedInformerFactoryWithOptions(clientset, time.Minute*10, informers.WithNamespace(*namespace))
	log.Printf("SharedInformerFactory created (watching namespace: '%s')\n", *namespace)


	// --- Step 3: Get Specific Informers ---
	podInformer := factory.Core().V1().Pods().Informer()
	networkPolicyInformer := factory.Networking().V1().NetworkPolicies().Informer()
	log.Println("Specific Informers obtained (Pods, NetworkPolicies)")


	// --- Step 4: Get Listers ---
	// Listers provide efficient read access to the cache
	podLister = factory.Core().V1().Pods().Lister()
	networkPolicyLister = factory.Networking().V1().NetworkPolicies().Lister()
	log.Println("Listers obtained")


	// --- Step 5: Register Event Handlers ---
	podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    onAddPod,
		UpdateFunc: onUpdatePod,
		DeleteFunc: onDeletePod,
	})
	log.Println("Registered Pod event handlers")

	networkPolicyInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    onAddNetPol,
		UpdateFunc: onUpdateNetPol,
		DeleteFunc: onDeleteNetPol,
	})
	log.Println("Registered NetworkPolicy event handlers")


	// --- Step 6: Start the Factory ---
	stopCh := make(chan struct{})
	defer close(stopCh)
	factory.Start(stopCh)
	log.Println("SharedInformerFactory started")


	// --- Step 7: Wait for Cache Sync ---
	log.Println("Waiting for initial cache sync...")
	if !cache.WaitForCacheSync(stopCh, podInformer.HasSynced, networkPolicyInformer.HasSynced) {
		log.Fatal("Error: Timed out waiting for caches to sync")
	}
	log.Println("Caches synced successfully!")


	// --- Main Application Logic (Post-Sync) ---
	log.Println("Informers running. Press Ctrl+C to stop.")
	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
	<-sigCh
	log.Println("Termination signal received, shutting down...")
}

// --- Implemented Event Handler Functions ---

func onAddPod(obj interface{}) {
	pod, ok := obj.(*corev1.Pod)
	if !ok {
		log.Printf("[AddPod] Error: Could not cast object to *v1.Pod: %T", obj)
		return
	}
	log.Printf("[AddPod] Name: %s/%s, IP: %s, Phase: %s\n",
		pod.Namespace, pod.Name, pod.Status.PodIP, pod.Status.Phase)
	// Example: Use Lister to get related info (e.g., node)
	// This is safe AFTER cache sync
	/*
	if pod.Spec.NodeName != "" {
		node, err := factory.Core().V1().Nodes().Lister().Get(pod.Spec.NodeName) // Assuming node informer was set up
		if err == nil {
			log.Printf("    -> Node: %s\n", node.Name)
		}
	}
	*/
}

func onUpdatePod(oldObj, newObj interface{}) {
	oldPod, okOld := oldObj.(*corev1.Pod)
	newPod, okNew := newObj.(*corev1.Pod)
	if !okOld || !okNew {
		log.Printf("[UpdatePod] Error: Could not cast objects to *v1.Pod")
		return
	}

	// Avoid logging noise if only ResourceVersion changed (common during resyncs)
	if oldPod.ResourceVersion == newPod.ResourceVersion &&
	   oldPod.Status.Phase == newPod.Status.Phase &&
	   oldPod.Status.PodIP == newPod.Status.PodIP {
		//log.Printf("[UpdatePod] Skipping update for pod %s/%s - only RV changed?", newPod.Namespace, newPod.Name)
		return // Skip if nothing interesting changed
	}

	log.Printf("[UpdatePod] Name: %s/%s\n    Old RV: %s, New RV: %s\n    Old Phase: %s, New Phase: %s\n    Old IP: %s, New IP: %s\n",
		newPod.Namespace, newPod.Name,
		oldPod.ResourceVersion, newPod.ResourceVersion,
		oldPod.Status.Phase, newPod.Status.Phase,
		oldPod.Status.PodIP, newPod.Status.PodIP)
}

func onDeletePod(obj interface{}) {
	// Handle DeletionFinalStateUnknown
	pod, ok := obj.(*corev1.Pod)
	if !ok {
		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
		if !ok {
			log.Printf("[DeletePod] Error: Couldn't get object from tombstone %+v", obj)
			return
		}
		pod, ok = tombstone.Obj.(*corev1.Pod)
		if !ok {
			log.Printf("[DeletePod] Error: Tombstone contained object that is not a Pod %+v", obj)
			return
		}
		log.Printf("[DeletePod] Tombstone: Recovered deleted Pod %s/%s\n", pod.Namespace, pod.Name)
	} else {
		// Object was not a tombstone
		log.Printf("[DeletePod] Name: %s/%s, Final Status: %s\n", pod.Namespace, pod.Name, pod.Status.Phase)
	}
	// Perform cleanup logic if needed
}

func onAddNetPol(obj interface{}) {
	netpol, ok := obj.(*networkingv1.NetworkPolicy)
	if !ok {
		log.Printf("[AddNetPol] Error: Could not cast object to *v1.NetworkPolicy: %T", obj)
		return
	}
	log.Printf("[AddNetPol] Name: %s/%s\n", netpol.Namespace, netpol.Name)
	// Potentially log selector, policy types etc.
}

func onUpdateNetPol(oldObj, newObj interface{}) {
	oldNetPol, okOld := oldObj.(*networkingv1.NetworkPolicy)
	newNetPol, okNew := newObj.(*networkingv1.NetworkPolicy)
	if !okOld || !okNew {
		log.Printf("[UpdateNetPol] Error: Could not cast objects to *v1.NetworkPolicy")
		return
	}
	// Only log if ResourceVersion changed (or potentially compare specs if needed)
	if oldNetPol.ResourceVersion != newNetPol.ResourceVersion {
		log.Printf("[UpdateNetPol] Name: %s/%s, RV: %s -> %s\n",
			newNetPol.Namespace, newNetPol.Name, oldNetPol.ResourceVersion, newNetPol.ResourceVersion)
	}
}

func onDeleteNetPol(obj interface{}) {
	netpol, ok := obj.(*networkingv1.NetworkPolicy)
	if !ok {
		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
		if !ok {
			log.Printf("[DeleteNetPol] Error: Couldn't get object from tombstone %+v", obj)
			return
		}
		netpol, ok = tombstone.Obj.(*networkingv1.NetworkPolicy)
		if !ok {
			log.Printf("[DeleteNetPol] Error: Tombstone contained object that is not a NetworkPolicy %+v", obj)
			return
		}
		log.Printf("[DeleteNetPol] Tombstone: Recovered deleted NetworkPolicy %s/%s\n", netpol.Namespace, netpol.Name)
	} else {
		log.Printf("[DeleteNetPol] Name: %s/%s\n", netpol.Namespace, netpol.Name)
	}
}

Key Changes and Concepts:

  1. Import API Types: We now import k8s.io/api/core/v1 and k8s.io/api/networking/v1 to work with the specific struct types.

  2. Type Assertions: Each handler now performs pod, ok := obj.(*corev1.Pod) or the equivalent for NetworkPolicy. It logs an error if the assertion fails.

  3. Tombstone Handling: onDeletePod and onDeleteNetPol include the standard pattern for checking if obj is a cache.DeletedFinalStateUnknown and extracting the actual object (tombstone.Obj) if it is.

  4. Accessing Data: We access fields like pod.Name, pod.Namespace, pod.Status.PodIP, pod.Status.Phase, netpol.Name, netpol.Namespace, etc., for logging.

  5. Update Noise Reduction: The onUpdatePod handler includes a basic check to see if relevant fields (like Phase or IP) or the ResourceVersion have changed before logging. This helps reduce noise from periodic resyncs or minor status updates that might trigger the UpdateFunc without a significant change. You might implement more sophisticated checks based on your specific needs (e.g., deep comparison of specs).

  6. Listers: We obtained the podLister and networkPolicyLister and stored them globally (this is a simple approach; dependency injection is better for larger apps). While not heavily used in these basic logging handlers, a comment in onAddPod shows how you could use podLister.Get(pod.Spec.NodeName) (or similar) to efficiently fetch related objects from the cache after sync.

Important Considerations for Handler Logic:

  • Keep Handlers Fast: Event handlers should execute quickly and avoid blocking operations (like complex computations or long network calls). The informer framework uses a limited number of goroutines to process events; blocking handlers can delay the processing of subsequent events.

  • Offload Work: For complex tasks triggered by an event, the recommended pattern (especially in controllers) is for the event handler to simply add a work item (e.g., the object's key namespace/name) to a rate-limited work queue. Separate worker goroutines pull items from the queue and perform the actual processing, using the Lister to get the current state of the object(s).

  • Use Listers for Reads: When a handler needs information about the current state of the triggering object or related objects, always prefer using the Lister (podLister.Get(), podLister.List()) over making direct clientset.Get() calls. Listers read from the synchronized local cache and avoid hitting the API server.

Last updated

Was this helpful?