Setting up Shared Informers for Network-related Resources
package main
import (
"flag"
"log"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"
// Kubernetes API imports
// corev1 "k8s.io/api/core/v1" // Types used in handlers
// networkingv1 "k8s.io/api/networking/v1" // Types used in handlers
// client-go imports
"k8s.io/client-go/informers" // Informer factory
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache" // For event handlers and cache sync
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
// For graceful shutdown context
// "context"
)
func main() {
log.Println("Starting Informer Setup Example")
// --- Step 1: Setup Kubeconfig and Create Clientset ---
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) kubeconfig path")
} else {
kubeconfig = flag.String("kubeconfig", "", "kubeconfig path")
}
// Optional: Filter informers by namespace ( "" means all namespaces )
namespace := flag.String("namespace", "", "namespace to watch (optional, default is all)")
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
log.Fatalf("Error building kubeconfig: %s", err.Error())
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Error creating clientset: %s", err.Error())
}
// --- Step 2: Instantiate SharedInformerFactory ---
// We can specify a namespace to watch, or "" for all namespaces
// We can also specify a resync period (e.g., 10*time.Minute) for periodic cache relisting,
// though often 0 (no resync based on time) is fine if using event handlers properly.
factory := informers.NewSharedInformerFactoryWithOptions(clientset, time.Minute*10, informers.WithNamespace(*namespace))
log.Printf("SharedInformerFactory created (watching namespace: '%s')\n", *namespace)
// --- Step 3: Get Specific Informers ---
// Get informers for the resources we care about
podInformer := factory.Core().V1().Pods().Informer()
networkPolicyInformer := factory.Networking().V1().NetworkPolicies().Informer()
// Could also get informers for Services, Endpoints, etc.
// serviceInformer := factory.Core().V1().Services().Informer()
// endpointsInformer := factory.Core().V1().Endpoints().Informer()
log.Println("Specific Informers obtained (Pods, NetworkPolicies)")
// --- Step 4: Get Listers (Optional but Recommended for Handlers) ---
// Listers provide efficient read access to the cache
// podLister := factory.Core().V1().Pods().Lister()
// networkPolicyLister := factory.Networking().V1().NetworkPolicies().Lister()
// We'll use Listers within the event handlers in the next section.
// --- Step 5: Register Event Handlers ---
// Define functions to be called on Add, Update, Delete events
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: onAddPod,
UpdateFunc: onUpdatePod,
DeleteFunc: onDeletePod,
})
log.Println("Registered Pod event handlers")
networkPolicyInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: onAddNetPol,
UpdateFunc: onUpdateNetPol,
DeleteFunc: onDeleteNetPol,
})
log.Println("Registered NetworkPolicy event handlers")
// --- Step 6: Start the Factory ---
// Use a channel to handle termination signals gracefully
stopCh := make(chan struct{})
defer close(stopCh) // Ensure channel is closed on exit
// Start the factory's informers
// This starts all the informers listed above in background goroutines
// They will begin listing and watching resources.
factory.Start(stopCh)
log.Println("SharedInformerFactory started")
// --- Step 7: Wait for Cache Sync ---
// It's crucial to wait for the initial cache synchronization before
// proceeding with logic that relies on the listers being populated.
log.Println("Waiting for initial cache sync...")
if !cache.WaitForCacheSync(stopCh, podInformer.HasSynced, networkPolicyInformer.HasSynced) {
// Add HasSynced functions for any other informers you started
log.Fatal("Error: Timed out waiting for caches to sync")
}
log.Println("Caches synced successfully!")
// --- Main Application Logic (Post-Sync) ---
// At this point, the informers are running, caches are synced,
// and event handlers will be called asynchronously.
// Your main goroutine could do other work, or simply wait for termination.
log.Println("Informers running. Press Ctrl+C to stop.")
// Wait for termination signal (Ctrl+C)
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh // Block until a signal is received
log.Println("Termination signal received, shutting down...")
// Stopping the factory happens implicitly when stopCh is closed by defer
}
// --- Placeholder Event Handler Functions ---
// We'll implement the actual logic in the next section.
func onAddPod(obj interface{}) {
// Cast obj to *v1.Pod
// pod, ok := obj.(*corev1.Pod)
// if !ok { return }
// log.Printf("[AddPod] Name: %s/%s, IP: %s\n", pod.Namespace, pod.Name, pod.Status.PodIP)
log.Printf("[AddPod] Received Add event\n")
}
func onUpdatePod(oldObj, newObj interface{}) {
// Cast oldObj and newObj to *v1.Pod
// oldPod, okOld := oldObj.(*corev1.Pod)
// newPod, okNew := newObj.(*corev1.Pod)
// if !okOld || !okNew { return }
// // Only log if something relevant changed (e.g., IP, phase, readiness)
// if oldPod.ResourceVersion != newPod.ResourceVersion {
// log.Printf("[UpdatePod] Name: %s/%s, RV: %s -> %s\n", newPod.Namespace, newPod.Name, oldPod.ResourceVersion, newPod.ResourceVersion)
// }
log.Printf("[UpdatePod] Received Update event\n")
}
func onDeletePod(obj interface{}) {
// Handle cases where the object might be a DeletionFinalStateUnknown marker
// pod, ok := obj.(*corev1.Pod)
// if !ok {
// tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
// if !ok { log.Println("DeletePod: Could not get object from tombstone"); return }
// pod, ok = tombstone.Obj.(*corev1.Pod)
// if !ok { log.Println("DeletePod: Tombstone contained object that is not a Pod"); return }
// }
// log.Printf("[DeletePod] Name: %s/%s\n", pod.Namespace, pod.Name)
log.Printf("[DeletePod] Received Delete event\n")
}
func onAddNetPol(obj interface{}) {
// netpol, ok := obj.(*networkingv1.NetworkPolicy)
// if !ok { return }
// log.Printf("[AddNetPol] Name: %s/%s\n", netpol.Namespace, netpol.Name)
log.Printf("[AddNetPol] Received Add event\n")
}
func onUpdateNetPol(oldObj, newObj interface{}) {
// oldNetPol, okOld := oldObj.(*networkingv1.NetworkPolicy)
// newNetPol, okNew := newObj.(*networkingv1.NetworkPolicy)
// if !okOld || !okNew { return }
// if oldNetPol.ResourceVersion != newNetPol.ResourceVersion {
// log.Printf("[UpdateNetPol] Name: %s/%s, RV: %s -> %s\n", newNetPol.Namespace, newNetPol.Name, oldNetPol.ResourceVersion, newNetPol.ResourceVersion)
// }
log.Printf("[UpdateNetPol] Received Update event\n")
}
func onDeleteNetPol(obj interface{}) {
// Handle DeletedFinalStateUnknown similarly to onDeletePod
// netpol, ok := obj.(*networkingv1.NetworkPolicy)
// // ... tombstone logic ...
// log.Printf("[DeleteNetPol] Name: %s/%s\n", netpol.Namespace, netpol.Name)
log.Printf("[DeleteNetPol] Received Delete event\n")
}PreviousBeyond Basic watch: Using Informers for Efficient Resource Caching and Event HandlingNextReacting to Network Events
Last updated
Was this helpful?