Watching Endpoints Changes in Real-time using Go watch Interface

We've seen how the Endpoints object reflects the current set of ready Pods backing a Service. This list is highly dynamic – Pods scale up and down, become ready, fail readiness probes, or get rescheduled. Simply using Get or List periodically to check the Endpoints state is inefficient and doesn't provide real-time updates.

For scenarios where you need to react immediately to changes in the backend IPs of a Service (e.g., custom load balancers, dynamic configuration tools, monitoring systems), Kubernetes provides a powerful watch mechanism.

The Kubernetes watch API:

The Kubernetes API allows clients to establish a long-lived HTTP request (or WebSocket connection) to "watch" a specific resource or collection of resources. When changes occur to those resources (creation, update, deletion), the API server streams event notifications back to the client over this persistent connection.

Using client-go's Watch Method:

client-go makes it easy to use this underlying watch capability. Resource interfaces (like the one obtained from clientset.CoreV1().Endpoints(namespace)) provide a Watch method.

The Watch method takes:

  1. context.Context: Allows cancelling the watch request.

  2. metav1.ListOptions: Crucially, this can be used to filter the watch. You can specify:

    • FieldSelector: To watch a specific object by name (e.g., fields.OneTermEqualSelector("metadata.name", "my-service-endpoints")).

    • LabelSelector: To watch resources matching certain labels.

    • ResourceVersion: To start the watch from a specific point in the resource history (important for handling reconnections, though we won't delve deep here). For simple watches, starting without a specific ResourceVersion usually gets current state + future changes.

It returns a watch.Interface and an error.

The watch.Interface:

This interface represents the active watch connection. Its most important method is:

  • ResultChan() <-chan watch.Event: Returns a read-only channel. Event notifications from the API server are sent over this channel.

The watch.Event Struct:

Each item received on the result channel is a watch.Event struct, containing:

  • Type (watch.EventType): Indicates the type of event:

    • watch.Added: A new matching resource was created.

    • watch.Modified: An existing matching resource was updated.

    • watch.Deleted: A matching resource was deleted.

    • watch.Bookmark: Represents a specific resource version (less common for simple use).

    • watch.Error: An error occurred (e.g., watch expired, internal error). The actual error is often in the Object field (as *metav1.Status).

  • Object (runtime.Object): The actual Kubernetes resource object (e.g., *v1.Endpoints) associated with the event. You'll need to perform a type assertion to access its fields.

Example: Watching Endpoints for a Specific Service

Let's write a Go program that watches the Endpoints object associated with a specific Service name and prints updates as they occur.

package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"path/filepath"
	"time" // For potential delays/timeouts

	// Kubernetes API imports
	v1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/fields" // Required for FieldSelector
	"k8s.io/apimachinery/pkg/watch"  // Watch interface and Event types

	// client-go imports
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/homedir"
)

func main() {
	// --- Setup Kubeconfig and Flags ---
	var kubeconfig *string
	if home := homedir.HomeDir(); home != "" {
		kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) kubeconfig path")
	} else {
		kubeconfig = flag.String("kubeconfig", "", "kubeconfig path")
	}
	namespace := flag.String("namespace", "default", "namespace of the service")
	serviceName := flag.String("service-name", "", "name of the service whose endpoints to watch") // Make service name required

	flag.Parse()

	if *serviceName == "" {
		log.Fatal("Error: --service-name flag is required")
	}

	// --- Load Config and Create Clientset ---
	config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
	if err != nil {
		log.Fatalf("Error building kubeconfig: %s", err.Error())
	}
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		log.Fatalf("Error creating clientset: %s", err.Error())
	}

	// --- Setup Watch ---
	endpointsClient := clientset.CoreV1().Endpoints(*namespace)

	// Use a FieldSelector to watch only the Endpoints object for our specific service
	listOptions := metav1.ListOptions{
		FieldSelector: fields.OneTermEqualSelector("metadata.name", *serviceName).String(),
		// Watch=true is implicitly handled by the Watch method, but setting it here is harmless
		// Watch: true,
	}

	fmt.Printf("Starting watch for Endpoints '%s' in namespace '%s'...\n", *serviceName, *namespace)

	// Start the watch - set a timeout if desired, otherwise it can run indefinitely
	// Using context.TODO() here, but a cancellable context is better for graceful shutdown
	watcher, err := endpointsClient.Watch(context.TODO(), listOptions)
	if err != nil {
		log.Fatalf("Error starting watch for endpoints '%s': %s\n", *serviceName, err.Error())
	}

	// --- Process Watch Events ---
	// Use a channel returned by watcher.ResultChan()
	eventChannel := watcher.ResultChan()

	// Loop indefinitely processing events
	for event := range eventChannel {
		// Get the Endpoints object from the event
		endpoints, ok := event.Object.(*v1.Endpoints)
		if !ok {
			log.Printf("Received non-Endpoints object in watch event: %T. Event type: %s\n", event.Object, event.Type)
			// Handle potential errors, like watch.Error events
			// if event.Type == watch.Error { ... }
			continue // Skip if it's not an Endpoints object we can process
		}

		fmt.Printf("\n--- Event Received ---\n")
		fmt.Printf("Type: %s\n", event.Type) // ADDED, MODIFIED, DELETED
		fmt.Printf("Endpoints Name: %s\n", endpoints.Name)

		// Process based on event type
		switch event.Type {
		case watch.Added, watch.Modified:
			fmt.Println("Current Endpoints:")
			if len(endpoints.Subsets) == 0 {
				fmt.Println("  No subsets (no ready pods?).")
			}
			for i, subset := range endpoints.Subsets {
				fmt.Printf("  Subset %d:\n", i+1)
				fmt.Println("    Ready Addresses:")
				if len(subset.Addresses) == 0 { fmt.Println("      <none>") }
				for _, addr := range subset.Addresses { fmt.Printf("      - %s\n", addr.IP) }

				fmt.Println("    Not Ready Addresses:")
				if len(subset.NotReadyAddresses) == 0 { fmt.Println("      <none>") }
				for _, addr := range subset.NotReadyAddresses { fmt.Printf("      - %s\n", addr.IP) }
			}
		case watch.Deleted:
			// The Endpoints object itself was deleted (likely the Service was deleted)
			fmt.Println("Endpoints object deleted.")
		case watch.Error:
			// Handle error events from the watch stream
			// The object might be a *metav1.Status object indicating the error
			log.Printf("Watch error event received: %+v\n", event.Object)
            // Potentially break or attempt to restart the watch depending on the error
		case watch.Bookmark:
			// Typically ignored in simple watchers
			// log.Printf("Bookmark event received at resource version %s\n", endpoints.ResourceVersion)
		}
		fmt.Println("--------------------")
	}

	// This part is reached if the eventChannel is closed by the server or watcher.Stop()
	fmt.Println("Watch channel closed.")
}

How it Works:

  1. Field Selector: We use fields.OneTermEqualSelector("metadata.name", *serviceName) to tell the API server we only want events related to the Endpoints object with that specific name.

  2. Watch Call: endpointsClient.Watch(...) initiates the watch request.

  3. ResultChan(): We get the channel eventChannel to receive events.

  4. Event Loop: The for event := range eventChannel loop blocks until an event arrives or the channel is closed.

  5. Type Assertion: We use event.Object.(*v1.Endpoints) to safely cast the received runtime.Object to the expected *v1.Endpoints type. We check if the assertion ok.

  6. Event Processing: We print the event type and then extract and display the ready/not ready addresses from the Endpoints object, similar to how we did it with Get.

  7. Loop Continuation: The loop continues, waiting for the next event.

Important Considerations:

  • Watch Lifetime: Watches are not guaranteed to last forever. The API server might close the connection due to timeouts, resource constraints, or other reasons. Robust applications need to handle the closure of the eventChannel and potentially restart the watch, often using the ResourceVersion from the last known event to avoid missing updates.

  • Error Handling: The example includes basic error checking but doesn't fully handle watch.Error events or automatic reconnection.

  • Informers (Preview): For applications that need reliable, cached, and automatically reconnecting watches (like controllers), client-go provides a higher-level abstraction called Informers. We will cover Informers in detail in Chapter 6 as they are the standard way to build robust watchers. The direct Watch interface is useful for simpler tools or understanding the underlying mechanism.

Watching Endpoints directly provides powerful real-time insight into the dynamic mapping between Services and their backend Pods, enabling sophisticated networking tools and automation.

Last updated

Was this helpful?