Building a Simple Network Event Logger

Let's combine the setup and event handling logic into a practical tool: a basic network event logger. This application will use shared informers to monitor Pods and Network Policies within specified namespaces and print concise log messages when network-relevant events occur, such as Pod IP assignments/changes or Network Policy creations/deletions.

This serves as a concrete example of how informers can be used for real-time cluster monitoring.

The Goal:

Create a command-line Go application that:

  1. Connects to a Kubernetes cluster using kubeconfig.

  2. Optionally targets a specific namespace or monitors all namespaces.

  3. Uses SharedInformerFactory to monitor v1.Pod and networkingv1.NetworkPolicy resources.

  4. Logs messages to the console for the following events:

    • Pod Added: Log Name, Namespace, Assigned Pod IP, Node Name.

    • Pod Updated: Log Name, Namespace if Pod IP or Phase changes.

    • Pod Deleted: Log Name, Namespace.

    • NetworkPolicy Added: Log Name, Namespace.

    • NetworkPolicy Updated: Log Name, Namespace (indicating a change).

    • NetworkPolicy Deleted: Log Name, Namespace.

  5. Handles graceful shutdown on SIGINT/SIGTERM.

The Code:

This code integrates the informer setup with more specific logging logic in the handlers.

// examples/chapter-6/network-event-logger/main.go
package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"os"
	"os/signal"
	"path/filepath"
	"syscall"
	"time"

	// Kubernetes API imports
	corev1 "k8s.io/api/core/v1"
	networkingv1 "k8s.io/api/networking/v1"

	// client-go imports
	"k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/homedir"
)

func main() {
	log.SetOutput(os.Stdout) // Ensure logs go to stdout
	log.Println("Starting Kubernetes Network Event Logger...")

	// --- Kubeconfig and Flags ---
	var kubeconfig *string
	if home := homedir.HomeDir(); home != "" {
		kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) kubeconfig path")
	} else {
		kubeconfig = flag.String("kubeconfig", "", "kubeconfig path")
	}
	namespace := flag.String("namespace", "", "namespace to watch (default: all)")
	flag.Parse()

	// --- Clientset ---
	config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
	if err != nil {
		log.Fatalf("Error building kubeconfig: %s", err.Error())
	}
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		log.Fatalf("Error creating clientset: %s", err.Error())
	}

	// --- Informer Factory ---
	// Use a short resync period for demo purposes, or 0 for production
	factory := informers.NewSharedInformerFactoryWithOptions(clientset, 5*time.Minute, informers.WithNamespace(*namespace))
	watchNs := *namespace
	if watchNs == "" {
		watchNs = "ALL"
	}
	log.Printf("Created Informer Factory (Namespace: %s)\n", watchNs)

	// --- Informers and Handlers ---
	podInformer := factory.Core().V1().Pods().Informer()
	podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    podAdded,
		UpdateFunc: podUpdated,
		DeleteFunc: podDeleted,
	})
	log.Println("Pod Informer and handlers registered.")

	netPolInformer := factory.Networking().V1().NetworkPolicies().Informer()
	netPolInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    netPolAdded,
		UpdateFunc: netPolUpdated,
		DeleteFunc: netPolDeleted,
	})
	log.Println("NetworkPolicy Informer and handlers registered.")

	// --- Start Informers and Wait for Sync ---
	stopCh := make(chan struct{})
	defer close(stopCh)
	factory.Start(stopCh)
	log.Println("Started Informer Factory.")

	log.Println("Waiting for initial cache sync...")
	if !cache.WaitForCacheSync(stopCh, podInformer.HasSynced, netPolInformer.HasSynced) {
		log.Fatalln("Error: Timed out waiting for caches to sync")
	}
	log.Println("Caches synced successfully!")

	// --- Wait for Shutdown Signal ---
	log.Println("Network Event Logger is running. Press Ctrl+C to stop.")
	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
	<-sigCh
	log.Println("Shutdown signal received, stopping...")
}

// --- Event Handler Implementations ---

func podAdded(obj interface{}) {
	pod, ok := obj.(*corev1.Pod)
	if !ok {
		log.Printf("[PodAdded] ERROR: Invalid object type %T", obj)
		return
	}
	log.Printf("[PodAdded] Pod %s/%s created. IP: %s, Node: %s, Phase: %s\n",
		pod.Namespace, pod.Name, pod.Status.PodIP, pod.Spec.NodeName, pod.Status.Phase)
}

func podUpdated(oldObj, newObj interface{}) {
	oldPod, okOld := oldObj.(*corev1.Pod)
	newPod, okNew := newObj.(*corev1.Pod)
	if !okOld || !okNew {
		log.Printf("[PodUpdated] ERROR: Invalid object types")
		return
	}

	// Log only if IP or Phase changed
	if oldPod.Status.PodIP != newPod.Status.PodIP || oldPod.Status.Phase != newPod.Status.Phase {
		log.Printf("[PodUpdated] Pod %s/%s changed.\n    Old->New | IP: %s->%s, Phase: %s->%s\n",
			newPod.Namespace, newPod.Name,
			oldPod.Status.PodIP, newPod.Status.PodIP,
			oldPod.Status.Phase, newPod.Status.Phase)
	}
}

func podDeleted(obj interface{}) {
	pod, err := getPodFromObj(obj)
	if err != nil {
		log.Printf("[PodDeleted] ERROR: %v", err)
		return
	}
	log.Printf("[PodDeleted] Pod %s/%s deleted.\n", pod.Namespace, pod.Name)
}

func netPolAdded(obj interface{}) {
	np, ok := obj.(*networkingv1.NetworkPolicy)
	if !ok {
		log.Printf("[NetPolAdded] ERROR: Invalid object type %T", obj)
		return
	}
	log.Printf("[NetPolAdded] NetworkPolicy %s/%s created.\n", np.Namespace, np.Name)
}

func netPolUpdated(oldObj, newObj interface{}) {
	oldNP, okOld := oldObj.(*networkingv1.NetworkPolicy)
	newNP, okNew := newObj.(*networkingv1.NetworkPolicy)
	if !okOld || !okNew {
		log.Printf("[NetPolUpdated] ERROR: Invalid object types")
		return
	}
	// Log simple update notification based on ResourceVersion change
	if oldNP.ResourceVersion != newNP.ResourceVersion {
		log.Printf("[NetPolUpdated] NetworkPolicy %s/%s updated (RV: %s -> %s).\n",
			newNP.Namespace, newNP.Name, oldNP.ResourceVersion, newNP.ResourceVersion)
	}
}

func netPolDeleted(obj interface{}) {
	np, err := getNetPolFromObj(obj)
	if err != nil {
		log.Printf("[NetPolDeleted] ERROR: %v", err)
		return
	}
	log.Printf("[NetPolDeleted] NetworkPolicy %s/%s deleted.\n", np.Namespace, np.Name)
}


// Helper function to handle potential DeletedFinalStateUnknown for Pods
func getPodFromObj(obj interface{}) (*corev1.Pod, error) {
	pod, ok := obj.(*corev1.Pod)
	if ok {
		return pod, nil
	}
	tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
	if !ok {
		return nil, fmt.Errorf("couldn't get object from tombstone %+v", obj)
	}
	pod, ok = tombstone.Obj.(*corev1.Pod)
	if !ok {
		return nil, fmt.Errorf("tombstone contained object that is not a Pod %+v", obj)
	}
	log.Printf("    (Recovered Pod %s/%s from tombstone)\n", pod.Namespace, pod.Name)
	return pod, nil
}

// Helper function to handle potential DeletedFinalStateUnknown for NetworkPolicies
func getNetPolFromObj(obj interface{}) (*networkingv1.NetworkPolicy, error) {
	np, ok := obj.(*networkingv1.NetworkPolicy)
	if ok {
		return np, nil
	}
	tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
	if !ok {
		return nil, fmt.Errorf("couldn't get object from tombstone %+v", obj)
	}
	np, ok = tombstone.Obj.(*networkingv1.NetworkPolicy)
	if !ok {
		return nil, fmt.Errorf("tombstone contained object that is not a NetworkPolicy %+v", obj)
	}
	log.Printf("    (Recovered NetworkPolicy %s/%s from tombstone)\n", np.Namespace, np.Name)
	return np, nil
}

Last updated

Was this helpful?