Let's combine the setup and event handling logic into a practical tool: a basic network event logger. This application will use shared informers to monitor Pods and Network Policies within specified namespaces and print concise log messages when network-relevant events occur, such as Pod IP assignments/changes or Network Policy creations/deletions.
This serves as a concrete example of how informers can be used for real-time cluster monitoring.
The Goal:
Create a command-line Go application that:
Connects to a Kubernetes cluster using kubeconfig.
Optionally targets a specific namespace or monitors all namespaces.
Uses SharedInformerFactory to monitor v1.Pod and networkingv1.NetworkPolicy resources.
Logs messages to the console for the following events:
Pod Added: Log Name, Namespace, Assigned Pod IP, Node Name.
Pod Updated: Log Name, Namespace if Pod IP or Phase changes.
Pod Deleted: Log Name, Namespace.
NetworkPolicy Added: Log Name, Namespace.
NetworkPolicy Updated: Log Name, Namespace (indicating a change).
NetworkPolicy Deleted: Log Name, Namespace.
Handles graceful shutdown on SIGINT/SIGTERM.
The Code:
This code integrates the informer setup with more specific logging logic in the handlers.
// examples/chapter-6/network-event-logger/main.go
package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"
// Kubernetes API imports
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
// client-go imports
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
func main() {
log.SetOutput(os.Stdout) // Ensure logs go to stdout
log.Println("Starting Kubernetes Network Event Logger...")
// --- Kubeconfig and Flags ---
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) kubeconfig path")
} else {
kubeconfig = flag.String("kubeconfig", "", "kubeconfig path")
}
namespace := flag.String("namespace", "", "namespace to watch (default: all)")
flag.Parse()
// --- Clientset ---
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
log.Fatalf("Error building kubeconfig: %s", err.Error())
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Error creating clientset: %s", err.Error())
}
// --- Informer Factory ---
// Use a short resync period for demo purposes, or 0 for production
factory := informers.NewSharedInformerFactoryWithOptions(clientset, 5*time.Minute, informers.WithNamespace(*namespace))
watchNs := *namespace
if watchNs == "" {
watchNs = "ALL"
}
log.Printf("Created Informer Factory (Namespace: %s)\n", watchNs)
// --- Informers and Handlers ---
podInformer := factory.Core().V1().Pods().Informer()
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: podAdded,
UpdateFunc: podUpdated,
DeleteFunc: podDeleted,
})
log.Println("Pod Informer and handlers registered.")
netPolInformer := factory.Networking().V1().NetworkPolicies().Informer()
netPolInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: netPolAdded,
UpdateFunc: netPolUpdated,
DeleteFunc: netPolDeleted,
})
log.Println("NetworkPolicy Informer and handlers registered.")
// --- Start Informers and Wait for Sync ---
stopCh := make(chan struct{})
defer close(stopCh)
factory.Start(stopCh)
log.Println("Started Informer Factory.")
log.Println("Waiting for initial cache sync...")
if !cache.WaitForCacheSync(stopCh, podInformer.HasSynced, netPolInformer.HasSynced) {
log.Fatalln("Error: Timed out waiting for caches to sync")
}
log.Println("Caches synced successfully!")
// --- Wait for Shutdown Signal ---
log.Println("Network Event Logger is running. Press Ctrl+C to stop.")
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
log.Println("Shutdown signal received, stopping...")
}
// --- Event Handler Implementations ---
func podAdded(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
log.Printf("[PodAdded] ERROR: Invalid object type %T", obj)
return
}
log.Printf("[PodAdded] Pod %s/%s created. IP: %s, Node: %s, Phase: %s\n",
pod.Namespace, pod.Name, pod.Status.PodIP, pod.Spec.NodeName, pod.Status.Phase)
}
func podUpdated(oldObj, newObj interface{}) {
oldPod, okOld := oldObj.(*corev1.Pod)
newPod, okNew := newObj.(*corev1.Pod)
if !okOld || !okNew {
log.Printf("[PodUpdated] ERROR: Invalid object types")
return
}
// Log only if IP or Phase changed
if oldPod.Status.PodIP != newPod.Status.PodIP || oldPod.Status.Phase != newPod.Status.Phase {
log.Printf("[PodUpdated] Pod %s/%s changed.\n Old->New | IP: %s->%s, Phase: %s->%s\n",
newPod.Namespace, newPod.Name,
oldPod.Status.PodIP, newPod.Status.PodIP,
oldPod.Status.Phase, newPod.Status.Phase)
}
}
func podDeleted(obj interface{}) {
pod, err := getPodFromObj(obj)
if err != nil {
log.Printf("[PodDeleted] ERROR: %v", err)
return
}
log.Printf("[PodDeleted] Pod %s/%s deleted.\n", pod.Namespace, pod.Name)
}
func netPolAdded(obj interface{}) {
np, ok := obj.(*networkingv1.NetworkPolicy)
if !ok {
log.Printf("[NetPolAdded] ERROR: Invalid object type %T", obj)
return
}
log.Printf("[NetPolAdded] NetworkPolicy %s/%s created.\n", np.Namespace, np.Name)
}
func netPolUpdated(oldObj, newObj interface{}) {
oldNP, okOld := oldObj.(*networkingv1.NetworkPolicy)
newNP, okNew := newObj.(*networkingv1.NetworkPolicy)
if !okOld || !okNew {
log.Printf("[NetPolUpdated] ERROR: Invalid object types")
return
}
// Log simple update notification based on ResourceVersion change
if oldNP.ResourceVersion != newNP.ResourceVersion {
log.Printf("[NetPolUpdated] NetworkPolicy %s/%s updated (RV: %s -> %s).\n",
newNP.Namespace, newNP.Name, oldNP.ResourceVersion, newNP.ResourceVersion)
}
}
func netPolDeleted(obj interface{}) {
np, err := getNetPolFromObj(obj)
if err != nil {
log.Printf("[NetPolDeleted] ERROR: %v", err)
return
}
log.Printf("[NetPolDeleted] NetworkPolicy %s/%s deleted.\n", np.Namespace, np.Name)
}
// Helper function to handle potential DeletedFinalStateUnknown for Pods
func getPodFromObj(obj interface{}) (*corev1.Pod, error) {
pod, ok := obj.(*corev1.Pod)
if ok {
return pod, nil
}
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, fmt.Errorf("couldn't get object from tombstone %+v", obj)
}
pod, ok = tombstone.Obj.(*corev1.Pod)
if !ok {
return nil, fmt.Errorf("tombstone contained object that is not a Pod %+v", obj)
}
log.Printf(" (Recovered Pod %s/%s from tombstone)\n", pod.Namespace, pod.Name)
return pod, nil
}
// Helper function to handle potential DeletedFinalStateUnknown for NetworkPolicies
func getNetPolFromObj(obj interface{}) (*networkingv1.NetworkPolicy, error) {
np, ok := obj.(*networkingv1.NetworkPolicy)
if ok {
return np, nil
}
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, fmt.Errorf("couldn't get object from tombstone %+v", obj)
}
np, ok = tombstone.Obj.(*networkingv1.NetworkPolicy)
if !ok {
return nil, fmt.Errorf("tombstone contained object that is not a NetworkPolicy %+v", obj)
}
log.Printf(" (Recovered NetworkPolicy %s/%s from tombstone)\n", np.Namespace, np.Name)
return np, nil
}