Setting up Shared Informers for Network-related Resources

Using the Shared Informer framework involves several key steps:

  1. Create a Clientset: As always, you need a kubernetes.Clientset configured to talk to your cluster.

  2. Instantiate a SharedInformerFactory: Create an instance of informers.SharedInformerFactory. This factory will manage the underlying watches and caches.

  3. Get Specific Informers: Request informers for the specific resource types you want to monitor (e.g., Pods, Services, Endpoints, NetworkPolicies) from the factory.

  4. Get Listers: Obtain "Listers" from the specific informers. Listers provide read-only access to the informer's local cache (the Indexer), allowing efficient retrieval of objects without hitting the API server directly.

  5. Register Event Handlers: Add callback functions (AddFunc, UpdateFunc, DeleteFunc) to the specific informers. These functions contain your application logic and will be executed when the informer detects changes.

  6. Start the Factory: Call the Start() method on the factory. This initiates all the underlying reflectors and starts populating the caches and processing events. It's crucial to handle stop signals (like SIGINT, SIGTERM) gracefully.

  7. Wait for Cache Sync: Before your main application logic relies on the Listers, you must wait for the informers' caches to be fully synchronized with the cluster state. The factory provides methods to check this.

Let's illustrate this by setting up informers for Pods (v1.Pod) and NetworkPolicies (networkingv1.NetworkPolicy).

package main

import (
	"flag"
	"log"
	"os"
	"os/signal"
	"path/filepath"
	"syscall"
	"time"

	// Kubernetes API imports
	// corev1 "k8s.io/api/core/v1" // Types used in handlers
	// networkingv1 "k8s.io/api/networking/v1" // Types used in handlers

	// client-go imports
	"k8s.io/client-go/informers" // Informer factory
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/cache" // For event handlers and cache sync
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/homedir"

	// For graceful shutdown context
	// "context"
)

func main() {
	log.Println("Starting Informer Setup Example")

	// --- Step 1: Setup Kubeconfig and Create Clientset ---
	var kubeconfig *string
	if home := homedir.HomeDir(); home != "" {
		kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) kubeconfig path")
	} else {
		kubeconfig = flag.String("kubeconfig", "", "kubeconfig path")
	}
	// Optional: Filter informers by namespace ( "" means all namespaces )
	namespace := flag.String("namespace", "", "namespace to watch (optional, default is all)")
	flag.Parse()

	config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
	if err != nil {
		log.Fatalf("Error building kubeconfig: %s", err.Error())
	}
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		log.Fatalf("Error creating clientset: %s", err.Error())
	}


	// --- Step 2: Instantiate SharedInformerFactory ---
	// We can specify a namespace to watch, or "" for all namespaces
	// We can also specify a resync period (e.g., 10*time.Minute) for periodic cache relisting,
	// though often 0 (no resync based on time) is fine if using event handlers properly.
	factory := informers.NewSharedInformerFactoryWithOptions(clientset, time.Minute*10, informers.WithNamespace(*namespace))
	log.Printf("SharedInformerFactory created (watching namespace: '%s')\n", *namespace)


	// --- Step 3: Get Specific Informers ---
	// Get informers for the resources we care about
	podInformer := factory.Core().V1().Pods().Informer()
	networkPolicyInformer := factory.Networking().V1().NetworkPolicies().Informer()
	// Could also get informers for Services, Endpoints, etc.
	// serviceInformer := factory.Core().V1().Services().Informer()
	// endpointsInformer := factory.Core().V1().Endpoints().Informer()
	log.Println("Specific Informers obtained (Pods, NetworkPolicies)")


	// --- Step 4: Get Listers (Optional but Recommended for Handlers) ---
	// Listers provide efficient read access to the cache
	// podLister := factory.Core().V1().Pods().Lister()
	// networkPolicyLister := factory.Networking().V1().NetworkPolicies().Lister()
	// We'll use Listers within the event handlers in the next section.


	// --- Step 5: Register Event Handlers ---
	// Define functions to be called on Add, Update, Delete events
	podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    onAddPod,
		UpdateFunc: onUpdatePod,
		DeleteFunc: onDeletePod,
	})
	log.Println("Registered Pod event handlers")

	networkPolicyInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    onAddNetPol,
		UpdateFunc: onUpdateNetPol,
		DeleteFunc: onDeleteNetPol,
	})
	log.Println("Registered NetworkPolicy event handlers")


	// --- Step 6: Start the Factory ---
	// Use a channel to handle termination signals gracefully
	stopCh := make(chan struct{})
	defer close(stopCh) // Ensure channel is closed on exit

	// Start the factory's informers
	// This starts all the informers listed above in background goroutines
	// They will begin listing and watching resources.
	factory.Start(stopCh)
	log.Println("SharedInformerFactory started")


	// --- Step 7: Wait for Cache Sync ---
	// It's crucial to wait for the initial cache synchronization before
	// proceeding with logic that relies on the listers being populated.
	log.Println("Waiting for initial cache sync...")
	if !cache.WaitForCacheSync(stopCh, podInformer.HasSynced, networkPolicyInformer.HasSynced) {
		// Add HasSynced functions for any other informers you started
		log.Fatal("Error: Timed out waiting for caches to sync")
	}
	log.Println("Caches synced successfully!")


	// --- Main Application Logic (Post-Sync) ---
	// At this point, the informers are running, caches are synced,
	// and event handlers will be called asynchronously.
	// Your main goroutine could do other work, or simply wait for termination.

	log.Println("Informers running. Press Ctrl+C to stop.")

	// Wait for termination signal (Ctrl+C)
	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
	<-sigCh // Block until a signal is received

	log.Println("Termination signal received, shutting down...")
	// Stopping the factory happens implicitly when stopCh is closed by defer

}

// --- Placeholder Event Handler Functions ---
// We'll implement the actual logic in the next section.

func onAddPod(obj interface{}) {
	// Cast obj to *v1.Pod
	// pod, ok := obj.(*corev1.Pod)
	// if !ok { return }
	// log.Printf("[AddPod] Name: %s/%s, IP: %s\n", pod.Namespace, pod.Name, pod.Status.PodIP)
	log.Printf("[AddPod] Received Add event\n")

}

func onUpdatePod(oldObj, newObj interface{}) {
	// Cast oldObj and newObj to *v1.Pod
	// oldPod, okOld := oldObj.(*corev1.Pod)
	// newPod, okNew := newObj.(*corev1.Pod)
	// if !okOld || !okNew { return }
	// // Only log if something relevant changed (e.g., IP, phase, readiness)
	// if oldPod.ResourceVersion != newPod.ResourceVersion {
	//	 log.Printf("[UpdatePod] Name: %s/%s, RV: %s -> %s\n", newPod.Namespace, newPod.Name, oldPod.ResourceVersion, newPod.ResourceVersion)
	// }
	log.Printf("[UpdatePod] Received Update event\n")
}

func onDeletePod(obj interface{}) {
	// Handle cases where the object might be a DeletionFinalStateUnknown marker
	// pod, ok := obj.(*corev1.Pod)
	// if !ok {
	// 	 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
	// 	 if !ok { log.Println("DeletePod: Could not get object from tombstone"); return }
	//	 pod, ok = tombstone.Obj.(*corev1.Pod)
	//	 if !ok { log.Println("DeletePod: Tombstone contained object that is not a Pod"); return }
	// }
	// log.Printf("[DeletePod] Name: %s/%s\n", pod.Namespace, pod.Name)
	log.Printf("[DeletePod] Received Delete event\n")
}

func onAddNetPol(obj interface{}) {
	// netpol, ok := obj.(*networkingv1.NetworkPolicy)
	// if !ok { return }
	// log.Printf("[AddNetPol] Name: %s/%s\n", netpol.Namespace, netpol.Name)
	log.Printf("[AddNetPol] Received Add event\n")
}

func onUpdateNetPol(oldObj, newObj interface{}) {
	// oldNetPol, okOld := oldObj.(*networkingv1.NetworkPolicy)
	// newNetPol, okNew := newObj.(*networkingv1.NetworkPolicy)
	// if !okOld || !okNew { return }
	// if oldNetPol.ResourceVersion != newNetPol.ResourceVersion {
	//	 log.Printf("[UpdateNetPol] Name: %s/%s, RV: %s -> %s\n", newNetPol.Namespace, newNetPol.Name, oldNetPol.ResourceVersion, newNetPol.ResourceVersion)
	// }
	log.Printf("[UpdateNetPol] Received Update event\n")
}

func onDeleteNetPol(obj interface{}) {
	// Handle DeletedFinalStateUnknown similarly to onDeletePod
	// netpol, ok := obj.(*networkingv1.NetworkPolicy)
	// // ... tombstone logic ...
	// log.Printf("[DeleteNetPol] Name: %s/%s\n", netpol.Namespace, netpol.Name)
	log.Printf("[DeleteNetPol] Received Delete event\n")
}

Explanation:

  1. Factory Creation: informers.NewSharedInformerFactoryWithOptions creates the factory, optionally scoped to a namespace.

  2. Informer Retrieval: factory.Core().V1().Pods().Informer() and factory.Networking().V1().NetworkPolicies().Informer() retrieve the specific shared informers for Pods and NetworkPolicies respectively. Note the chaining: factory -> Group() -> Version() -> Resource() -> Informer().

  3. Event Handlers: We define simple placeholder functions (onAddPod, etc.) and register them using AddEventHandler. The cache.ResourceEventHandlerFuncs struct conveniently bundles the add, update, and delete callbacks.

  4. Starting: factory.Start(stopCh) launches background goroutines for all requested informers. These goroutines handle the listing, watching, cache updates, and calling event handlers. The stopCh channel allows for graceful shutdown.

  5. Cache Sync: cache.WaitForCacheSync(stopCh, ...) is essential. It blocks until the initial list operation for each specified informer is complete and the local cache is considered synchronized with the cluster's state at that point in time. You must wait for sync before relying on Lister data.

  6. Graceful Shutdown: The code sets up signal handling (SIGINT, SIGTERM). When a signal is received, the main goroutine unblocks, the defer close(stopCh) ensures the stop channel is closed, signaling the factory and its informers to stop their background processing.

With this setup, the informers are now running, maintaining local caches, and ready to invoke our handler functions whenever Pods or NetworkPolicies are created, updated, or deleted in the target namespace(s). The next step is to implement the actual logic within those event handler functions.

Last updated

Was this helpful?