Watching Endpoints Changes in Real-time using Go watch Interface
We've seen how the Endpoints
object reflects the current set of ready Pods backing a Service. This list is highly dynamic – Pods scale up and down, become ready, fail readiness probes, or get rescheduled. Simply using Get
or List
periodically to check the Endpoints
state is inefficient and doesn't provide real-time updates.
For scenarios where you need to react immediately to changes in the backend IPs of a Service (e.g., custom load balancers, dynamic configuration tools, monitoring systems), Kubernetes provides a powerful watch
mechanism.
The Kubernetes watch
API:
The Kubernetes API allows clients to establish a long-lived HTTP request (or WebSocket connection) to "watch" a specific resource or collection of resources. When changes occur to those resources (creation, update, deletion), the API server streams event notifications back to the client over this persistent connection.
Using client-go
's Watch
Method:
client-go
makes it easy to use this underlying watch capability. Resource interfaces (like the one obtained from clientset.CoreV1().Endpoints(namespace)
) provide a Watch
method.
The Watch
method takes:
context.Context
: Allows cancelling the watch request.metav1.ListOptions
: Crucially, this can be used to filter the watch. You can specify:FieldSelector
: To watch a specific object by name (e.g.,fields.OneTermEqualSelector("metadata.name", "my-service-endpoints")
).LabelSelector
: To watch resources matching certain labels.ResourceVersion
: To start the watch from a specific point in the resource history (important for handling reconnections, though we won't delve deep here). For simple watches, starting without a specificResourceVersion
usually gets current state + future changes.
It returns a watch.Interface
and an error
.
The watch.Interface
:
This interface represents the active watch connection. Its most important method is:
ResultChan() <-chan watch.Event
: Returns a read-only channel. Event notifications from the API server are sent over this channel.
The watch.Event
Struct:
Each item received on the result channel is a watch.Event
struct, containing:
Type
(watch.EventType): Indicates the type of event:watch.Added
: A new matching resource was created.watch.Modified
: An existing matching resource was updated.watch.Deleted
: A matching resource was deleted.watch.Bookmark
: Represents a specific resource version (less common for simple use).watch.Error
: An error occurred (e.g., watch expired, internal error). The actual error is often in theObject
field (as*metav1.Status
).
Object
(runtime.Object): The actual Kubernetes resource object (e.g.,*v1.Endpoints
) associated with the event. You'll need to perform a type assertion to access its fields.
Example: Watching Endpoints for a Specific Service
Let's write a Go program that watches the Endpoints
object associated with a specific Service name and prints updates as they occur.
package main
import (
"context"
"flag"
"fmt"
"log"
"path/filepath"
"time" // For potential delays/timeouts
// Kubernetes API imports
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" // Required for FieldSelector
"k8s.io/apimachinery/pkg/watch" // Watch interface and Event types
// client-go imports
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
func main() {
// --- Setup Kubeconfig and Flags ---
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) kubeconfig path")
} else {
kubeconfig = flag.String("kubeconfig", "", "kubeconfig path")
}
namespace := flag.String("namespace", "default", "namespace of the service")
serviceName := flag.String("service-name", "", "name of the service whose endpoints to watch") // Make service name required
flag.Parse()
if *serviceName == "" {
log.Fatal("Error: --service-name flag is required")
}
// --- Load Config and Create Clientset ---
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
log.Fatalf("Error building kubeconfig: %s", err.Error())
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Error creating clientset: %s", err.Error())
}
// --- Setup Watch ---
endpointsClient := clientset.CoreV1().Endpoints(*namespace)
// Use a FieldSelector to watch only the Endpoints object for our specific service
listOptions := metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", *serviceName).String(),
// Watch=true is implicitly handled by the Watch method, but setting it here is harmless
// Watch: true,
}
fmt.Printf("Starting watch for Endpoints '%s' in namespace '%s'...\n", *serviceName, *namespace)
// Start the watch - set a timeout if desired, otherwise it can run indefinitely
// Using context.TODO() here, but a cancellable context is better for graceful shutdown
watcher, err := endpointsClient.Watch(context.TODO(), listOptions)
if err != nil {
log.Fatalf("Error starting watch for endpoints '%s': %s\n", *serviceName, err.Error())
}
// --- Process Watch Events ---
// Use a channel returned by watcher.ResultChan()
eventChannel := watcher.ResultChan()
// Loop indefinitely processing events
for event := range eventChannel {
// Get the Endpoints object from the event
endpoints, ok := event.Object.(*v1.Endpoints)
if !ok {
log.Printf("Received non-Endpoints object in watch event: %T. Event type: %s\n", event.Object, event.Type)
// Handle potential errors, like watch.Error events
// if event.Type == watch.Error { ... }
continue // Skip if it's not an Endpoints object we can process
}
fmt.Printf("\n--- Event Received ---\n")
fmt.Printf("Type: %s\n", event.Type) // ADDED, MODIFIED, DELETED
fmt.Printf("Endpoints Name: %s\n", endpoints.Name)
// Process based on event type
switch event.Type {
case watch.Added, watch.Modified:
fmt.Println("Current Endpoints:")
if len(endpoints.Subsets) == 0 {
fmt.Println(" No subsets (no ready pods?).")
}
for i, subset := range endpoints.Subsets {
fmt.Printf(" Subset %d:\n", i+1)
fmt.Println(" Ready Addresses:")
if len(subset.Addresses) == 0 { fmt.Println(" <none>") }
for _, addr := range subset.Addresses { fmt.Printf(" - %s\n", addr.IP) }
fmt.Println(" Not Ready Addresses:")
if len(subset.NotReadyAddresses) == 0 { fmt.Println(" <none>") }
for _, addr := range subset.NotReadyAddresses { fmt.Printf(" - %s\n", addr.IP) }
}
case watch.Deleted:
// The Endpoints object itself was deleted (likely the Service was deleted)
fmt.Println("Endpoints object deleted.")
case watch.Error:
// Handle error events from the watch stream
// The object might be a *metav1.Status object indicating the error
log.Printf("Watch error event received: %+v\n", event.Object)
// Potentially break or attempt to restart the watch depending on the error
case watch.Bookmark:
// Typically ignored in simple watchers
// log.Printf("Bookmark event received at resource version %s\n", endpoints.ResourceVersion)
}
fmt.Println("--------------------")
}
// This part is reached if the eventChannel is closed by the server or watcher.Stop()
fmt.Println("Watch channel closed.")
}
How it Works:
Field Selector: We use
fields.OneTermEqualSelector("metadata.name", *serviceName)
to tell the API server we only want events related to theEndpoints
object with that specific name.Watch
Call:endpointsClient.Watch(...)
initiates the watch request.ResultChan()
: We get the channeleventChannel
to receive events.Event Loop: The
for event := range eventChannel
loop blocks until an event arrives or the channel is closed.Type Assertion: We use
event.Object.(*v1.Endpoints)
to safely cast the receivedruntime.Object
to the expected*v1.Endpoints
type. We check if the assertionok
.Event Processing: We print the event type and then extract and display the ready/not ready addresses from the
Endpoints
object, similar to how we did it withGet
.Loop Continuation: The loop continues, waiting for the next event.
Important Considerations:
Watch Lifetime: Watches are not guaranteed to last forever. The API server might close the connection due to timeouts, resource constraints, or other reasons. Robust applications need to handle the closure of the
eventChannel
and potentially restart the watch, often using theResourceVersion
from the last known event to avoid missing updates.Error Handling: The example includes basic error checking but doesn't fully handle
watch.Error
events or automatic reconnection.Informers (Preview): For applications that need reliable, cached, and automatically reconnecting watches (like controllers),
client-go
provides a higher-level abstraction called Informers. We will cover Informers in detail in Chapter 6 as they are the standard way to build robust watchers. The directWatch
interface is useful for simpler tools or understanding the underlying mechanism.
Watching Endpoints directly provides powerful real-time insight into the dynamic mapping between Services and their backend Pods, enabling sophisticated networking tools and automation.
Last updated
Was this helpful?