Reacting to Network Events
package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"
// Kubernetes API imports
corev1 "k8s.io/api/core/v1" // Import actual types
networkingv1 "k8s.io/api/networking/v1"
// client-go imports
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
listers "k8s.io/client-go/listers/core/v1" // Import Lister package
netlisters "k8s.io/client-go/listers/networking/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
// "k8s.io/apimachinery/pkg/labels" // For Lister label selection
)
// Global variables for Listers (initialize after cache sync)
var (
podLister listers.PodLister
networkPolicyLister netlisters.NetworkPolicyLister
)
func main() {
log.Println("Starting Informer Setup Example")
// --- Step 1: Setup Kubeconfig and Create Clientset ---
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) kubeconfig path")
} else {
kubeconfig = flag.String("kubeconfig", "", "kubeconfig path")
}
namespace := flag.String("namespace", "", "namespace to watch (optional, default is all)")
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
log.Fatalf("Error building kubeconfig: %s", err.Error())
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Error creating clientset: %s", err.Error())
}
// --- Step 2: Instantiate SharedInformerFactory ---
factory := informers.NewSharedInformerFactoryWithOptions(clientset, time.Minute*10, informers.WithNamespace(*namespace))
log.Printf("SharedInformerFactory created (watching namespace: '%s')\n", *namespace)
// --- Step 3: Get Specific Informers ---
podInformer := factory.Core().V1().Pods().Informer()
networkPolicyInformer := factory.Networking().V1().NetworkPolicies().Informer()
log.Println("Specific Informers obtained (Pods, NetworkPolicies)")
// --- Step 4: Get Listers ---
// Listers provide efficient read access to the cache
podLister = factory.Core().V1().Pods().Lister()
networkPolicyLister = factory.Networking().V1().NetworkPolicies().Lister()
log.Println("Listers obtained")
// --- Step 5: Register Event Handlers ---
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: onAddPod,
UpdateFunc: onUpdatePod,
DeleteFunc: onDeletePod,
})
log.Println("Registered Pod event handlers")
networkPolicyInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: onAddNetPol,
UpdateFunc: onUpdateNetPol,
DeleteFunc: onDeleteNetPol,
})
log.Println("Registered NetworkPolicy event handlers")
// --- Step 6: Start the Factory ---
stopCh := make(chan struct{})
defer close(stopCh)
factory.Start(stopCh)
log.Println("SharedInformerFactory started")
// --- Step 7: Wait for Cache Sync ---
log.Println("Waiting for initial cache sync...")
if !cache.WaitForCacheSync(stopCh, podInformer.HasSynced, networkPolicyInformer.HasSynced) {
log.Fatal("Error: Timed out waiting for caches to sync")
}
log.Println("Caches synced successfully!")
// --- Main Application Logic (Post-Sync) ---
log.Println("Informers running. Press Ctrl+C to stop.")
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
log.Println("Termination signal received, shutting down...")
}
// --- Implemented Event Handler Functions ---
func onAddPod(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
log.Printf("[AddPod] Error: Could not cast object to *v1.Pod: %T", obj)
return
}
log.Printf("[AddPod] Name: %s/%s, IP: %s, Phase: %s\n",
pod.Namespace, pod.Name, pod.Status.PodIP, pod.Status.Phase)
// Example: Use Lister to get related info (e.g., node)
// This is safe AFTER cache sync
/*
if pod.Spec.NodeName != "" {
node, err := factory.Core().V1().Nodes().Lister().Get(pod.Spec.NodeName) // Assuming node informer was set up
if err == nil {
log.Printf(" -> Node: %s\n", node.Name)
}
}
*/
}
func onUpdatePod(oldObj, newObj interface{}) {
oldPod, okOld := oldObj.(*corev1.Pod)
newPod, okNew := newObj.(*corev1.Pod)
if !okOld || !okNew {
log.Printf("[UpdatePod] Error: Could not cast objects to *v1.Pod")
return
}
// Avoid logging noise if only ResourceVersion changed (common during resyncs)
if oldPod.ResourceVersion == newPod.ResourceVersion &&
oldPod.Status.Phase == newPod.Status.Phase &&
oldPod.Status.PodIP == newPod.Status.PodIP {
//log.Printf("[UpdatePod] Skipping update for pod %s/%s - only RV changed?", newPod.Namespace, newPod.Name)
return // Skip if nothing interesting changed
}
log.Printf("[UpdatePod] Name: %s/%s\n Old RV: %s, New RV: %s\n Old Phase: %s, New Phase: %s\n Old IP: %s, New IP: %s\n",
newPod.Namespace, newPod.Name,
oldPod.ResourceVersion, newPod.ResourceVersion,
oldPod.Status.Phase, newPod.Status.Phase,
oldPod.Status.PodIP, newPod.Status.PodIP)
}
func onDeletePod(obj interface{}) {
// Handle DeletionFinalStateUnknown
pod, ok := obj.(*corev1.Pod)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
log.Printf("[DeletePod] Error: Couldn't get object from tombstone %+v", obj)
return
}
pod, ok = tombstone.Obj.(*corev1.Pod)
if !ok {
log.Printf("[DeletePod] Error: Tombstone contained object that is not a Pod %+v", obj)
return
}
log.Printf("[DeletePod] Tombstone: Recovered deleted Pod %s/%s\n", pod.Namespace, pod.Name)
} else {
// Object was not a tombstone
log.Printf("[DeletePod] Name: %s/%s, Final Status: %s\n", pod.Namespace, pod.Name, pod.Status.Phase)
}
// Perform cleanup logic if needed
}
func onAddNetPol(obj interface{}) {
netpol, ok := obj.(*networkingv1.NetworkPolicy)
if !ok {
log.Printf("[AddNetPol] Error: Could not cast object to *v1.NetworkPolicy: %T", obj)
return
}
log.Printf("[AddNetPol] Name: %s/%s\n", netpol.Namespace, netpol.Name)
// Potentially log selector, policy types etc.
}
func onUpdateNetPol(oldObj, newObj interface{}) {
oldNetPol, okOld := oldObj.(*networkingv1.NetworkPolicy)
newNetPol, okNew := newObj.(*networkingv1.NetworkPolicy)
if !okOld || !okNew {
log.Printf("[UpdateNetPol] Error: Could not cast objects to *v1.NetworkPolicy")
return
}
// Only log if ResourceVersion changed (or potentially compare specs if needed)
if oldNetPol.ResourceVersion != newNetPol.ResourceVersion {
log.Printf("[UpdateNetPol] Name: %s/%s, RV: %s -> %s\n",
newNetPol.Namespace, newNetPol.Name, oldNetPol.ResourceVersion, newNetPol.ResourceVersion)
}
}
func onDeleteNetPol(obj interface{}) {
netpol, ok := obj.(*networkingv1.NetworkPolicy)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
log.Printf("[DeleteNetPol] Error: Couldn't get object from tombstone %+v", obj)
return
}
netpol, ok = tombstone.Obj.(*networkingv1.NetworkPolicy)
if !ok {
log.Printf("[DeleteNetPol] Error: Tombstone contained object that is not a NetworkPolicy %+v", obj)
return
}
log.Printf("[DeleteNetPol] Tombstone: Recovered deleted NetworkPolicy %s/%s\n", netpol.Namespace, netpol.Name)
} else {
log.Printf("[DeleteNetPol] Name: %s/%s\n", netpol.Namespace, netpol.Name)
}
}PreviousSetting up Shared Informers for Network-related ResourcesNextBuilding a Simple Network Event Logger
Last updated
Was this helpful?