Reacting to Network Events
In the previous section, we set up the SharedInformerFactory
and registered placeholder event handler functions (onAddPod
, onUpdatePod
, onDeletePod
, etc.). Now, we need to implement the actual logic within these functions to react when Pods or NetworkPolicies are created, updated, or deleted in the cluster.
These handlers are the core of our monitoring application. They receive the relevant Kubernetes object involved in the event and allow us to perform actions based on the change.
Key Tasks in Event Handlers:
Type Assertion: The objects passed to the handlers (
obj
,oldObj
,newObj
) are of typeinterface{}
. The first step is always to cast (perform a type assertion) the object to the expected Kubernetes resource type pointer (e.g.,*v1.Pod
,*networkingv1.NetworkPolicy
). Always check theok
value returned by the type assertion to ensure the cast was successful.Handling Deletion Tombstones: The
DeleteFunc
might sometimes receive a special object calledcache.DeletedFinalStateUnknown
. This happens if the informer's watch connection dropped and it missed the actualDELETE
event, but later reconciled its cache and realized the object is gone. This "tombstone" object contains the last known state of the deleted object. You need code to handle this possibility and extract the actual deleted object from the tombstone.Extracting Information: Once you have the correctly typed object, you can access its fields (
metadata.name
,metadata.namespace
,status.podIP
,spec
, etc.) to get the information you need.Processing Logic: Perform your desired action based on the event. This could be logging, updating metrics, triggering alerts, modifying other Kubernetes resources (carefully!), or adding work items to a queue for more complex processing (common in controllers).
Implementing the Handlers:
Let's update our previous main.go
file by filling in the logic for our placeholder handler functions. We'll focus on logging the events and accessing basic information.
package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"
// Kubernetes API imports
corev1 "k8s.io/api/core/v1" // Import actual types
networkingv1 "k8s.io/api/networking/v1"
// client-go imports
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
listers "k8s.io/client-go/listers/core/v1" // Import Lister package
netlisters "k8s.io/client-go/listers/networking/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
// "k8s.io/apimachinery/pkg/labels" // For Lister label selection
)
// Global variables for Listers (initialize after cache sync)
var (
podLister listers.PodLister
networkPolicyLister netlisters.NetworkPolicyLister
)
func main() {
log.Println("Starting Informer Setup Example")
// --- Step 1: Setup Kubeconfig and Create Clientset ---
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) kubeconfig path")
} else {
kubeconfig = flag.String("kubeconfig", "", "kubeconfig path")
}
namespace := flag.String("namespace", "", "namespace to watch (optional, default is all)")
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
log.Fatalf("Error building kubeconfig: %s", err.Error())
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Error creating clientset: %s", err.Error())
}
// --- Step 2: Instantiate SharedInformerFactory ---
factory := informers.NewSharedInformerFactoryWithOptions(clientset, time.Minute*10, informers.WithNamespace(*namespace))
log.Printf("SharedInformerFactory created (watching namespace: '%s')\n", *namespace)
// --- Step 3: Get Specific Informers ---
podInformer := factory.Core().V1().Pods().Informer()
networkPolicyInformer := factory.Networking().V1().NetworkPolicies().Informer()
log.Println("Specific Informers obtained (Pods, NetworkPolicies)")
// --- Step 4: Get Listers ---
// Listers provide efficient read access to the cache
podLister = factory.Core().V1().Pods().Lister()
networkPolicyLister = factory.Networking().V1().NetworkPolicies().Lister()
log.Println("Listers obtained")
// --- Step 5: Register Event Handlers ---
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: onAddPod,
UpdateFunc: onUpdatePod,
DeleteFunc: onDeletePod,
})
log.Println("Registered Pod event handlers")
networkPolicyInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: onAddNetPol,
UpdateFunc: onUpdateNetPol,
DeleteFunc: onDeleteNetPol,
})
log.Println("Registered NetworkPolicy event handlers")
// --- Step 6: Start the Factory ---
stopCh := make(chan struct{})
defer close(stopCh)
factory.Start(stopCh)
log.Println("SharedInformerFactory started")
// --- Step 7: Wait for Cache Sync ---
log.Println("Waiting for initial cache sync...")
if !cache.WaitForCacheSync(stopCh, podInformer.HasSynced, networkPolicyInformer.HasSynced) {
log.Fatal("Error: Timed out waiting for caches to sync")
}
log.Println("Caches synced successfully!")
// --- Main Application Logic (Post-Sync) ---
log.Println("Informers running. Press Ctrl+C to stop.")
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
log.Println("Termination signal received, shutting down...")
}
// --- Implemented Event Handler Functions ---
func onAddPod(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
log.Printf("[AddPod] Error: Could not cast object to *v1.Pod: %T", obj)
return
}
log.Printf("[AddPod] Name: %s/%s, IP: %s, Phase: %s\n",
pod.Namespace, pod.Name, pod.Status.PodIP, pod.Status.Phase)
// Example: Use Lister to get related info (e.g., node)
// This is safe AFTER cache sync
/*
if pod.Spec.NodeName != "" {
node, err := factory.Core().V1().Nodes().Lister().Get(pod.Spec.NodeName) // Assuming node informer was set up
if err == nil {
log.Printf(" -> Node: %s\n", node.Name)
}
}
*/
}
func onUpdatePod(oldObj, newObj interface{}) {
oldPod, okOld := oldObj.(*corev1.Pod)
newPod, okNew := newObj.(*corev1.Pod)
if !okOld || !okNew {
log.Printf("[UpdatePod] Error: Could not cast objects to *v1.Pod")
return
}
// Avoid logging noise if only ResourceVersion changed (common during resyncs)
if oldPod.ResourceVersion == newPod.ResourceVersion &&
oldPod.Status.Phase == newPod.Status.Phase &&
oldPod.Status.PodIP == newPod.Status.PodIP {
//log.Printf("[UpdatePod] Skipping update for pod %s/%s - only RV changed?", newPod.Namespace, newPod.Name)
return // Skip if nothing interesting changed
}
log.Printf("[UpdatePod] Name: %s/%s\n Old RV: %s, New RV: %s\n Old Phase: %s, New Phase: %s\n Old IP: %s, New IP: %s\n",
newPod.Namespace, newPod.Name,
oldPod.ResourceVersion, newPod.ResourceVersion,
oldPod.Status.Phase, newPod.Status.Phase,
oldPod.Status.PodIP, newPod.Status.PodIP)
}
func onDeletePod(obj interface{}) {
// Handle DeletionFinalStateUnknown
pod, ok := obj.(*corev1.Pod)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
log.Printf("[DeletePod] Error: Couldn't get object from tombstone %+v", obj)
return
}
pod, ok = tombstone.Obj.(*corev1.Pod)
if !ok {
log.Printf("[DeletePod] Error: Tombstone contained object that is not a Pod %+v", obj)
return
}
log.Printf("[DeletePod] Tombstone: Recovered deleted Pod %s/%s\n", pod.Namespace, pod.Name)
} else {
// Object was not a tombstone
log.Printf("[DeletePod] Name: %s/%s, Final Status: %s\n", pod.Namespace, pod.Name, pod.Status.Phase)
}
// Perform cleanup logic if needed
}
func onAddNetPol(obj interface{}) {
netpol, ok := obj.(*networkingv1.NetworkPolicy)
if !ok {
log.Printf("[AddNetPol] Error: Could not cast object to *v1.NetworkPolicy: %T", obj)
return
}
log.Printf("[AddNetPol] Name: %s/%s\n", netpol.Namespace, netpol.Name)
// Potentially log selector, policy types etc.
}
func onUpdateNetPol(oldObj, newObj interface{}) {
oldNetPol, okOld := oldObj.(*networkingv1.NetworkPolicy)
newNetPol, okNew := newObj.(*networkingv1.NetworkPolicy)
if !okOld || !okNew {
log.Printf("[UpdateNetPol] Error: Could not cast objects to *v1.NetworkPolicy")
return
}
// Only log if ResourceVersion changed (or potentially compare specs if needed)
if oldNetPol.ResourceVersion != newNetPol.ResourceVersion {
log.Printf("[UpdateNetPol] Name: %s/%s, RV: %s -> %s\n",
newNetPol.Namespace, newNetPol.Name, oldNetPol.ResourceVersion, newNetPol.ResourceVersion)
}
}
func onDeleteNetPol(obj interface{}) {
netpol, ok := obj.(*networkingv1.NetworkPolicy)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
log.Printf("[DeleteNetPol] Error: Couldn't get object from tombstone %+v", obj)
return
}
netpol, ok = tombstone.Obj.(*networkingv1.NetworkPolicy)
if !ok {
log.Printf("[DeleteNetPol] Error: Tombstone contained object that is not a NetworkPolicy %+v", obj)
return
}
log.Printf("[DeleteNetPol] Tombstone: Recovered deleted NetworkPolicy %s/%s\n", netpol.Namespace, netpol.Name)
} else {
log.Printf("[DeleteNetPol] Name: %s/%s\n", netpol.Namespace, netpol.Name)
}
}
Key Changes and Concepts:
Import API Types: We now import
k8s.io/api/core/v1
andk8s.io/api/networking/v1
to work with the specific struct types.Type Assertions: Each handler now performs
pod, ok := obj.(*corev1.Pod)
or the equivalent forNetworkPolicy
. It logs an error if the assertion fails.Tombstone Handling:
onDeletePod
andonDeleteNetPol
include the standard pattern for checking ifobj
is acache.DeletedFinalStateUnknown
and extracting the actual object (tombstone.Obj
) if it is.Accessing Data: We access fields like
pod.Name
,pod.Namespace
,pod.Status.PodIP
,pod.Status.Phase
,netpol.Name
,netpol.Namespace
, etc., for logging.Update Noise Reduction: The
onUpdatePod
handler includes a basic check to see if relevant fields (like Phase or IP) or theResourceVersion
have changed before logging. This helps reduce noise from periodic resyncs or minor status updates that might trigger the UpdateFunc without a significant change. You might implement more sophisticated checks based on your specific needs (e.g., deep comparison of specs).Listers: We obtained the
podLister
andnetworkPolicyLister
and stored them globally (this is a simple approach; dependency injection is better for larger apps). While not heavily used in these basic logging handlers, a comment inonAddPod
shows how you could usepodLister.Get(pod.Spec.NodeName)
(or similar) to efficiently fetch related objects from the cache after sync.
Important Considerations for Handler Logic:
Keep Handlers Fast: Event handlers should execute quickly and avoid blocking operations (like complex computations or long network calls). The informer framework uses a limited number of goroutines to process events; blocking handlers can delay the processing of subsequent events.
Offload Work: For complex tasks triggered by an event, the recommended pattern (especially in controllers) is for the event handler to simply add a work item (e.g., the object's key
namespace/name
) to a rate-limited work queue. Separate worker goroutines pull items from the queue and perform the actual processing, using the Lister to get the current state of the object(s).Use Listers for Reads: When a handler needs information about the current state of the triggering object or related objects, always prefer using the Lister (
podLister.Get()
,podLister.List()
) over making directclientset.Get()
calls. Listers read from the synchronized local cache and avoid hitting the API server.
Last updated
Was this helpful?