docsarchitecturewatch streams
Watch streams
This is a **contributor / architecture** doc. The audience is someone extending Periscope's real-time list updates to a new resource kind, or debugging an existing one. For the **operator** view — the `watchStreams:`…
This is a contributor / architecture doc. The audience is someone
extending Periscope's real-time list updates to a new resource kind,
or debugging an existing one. For the operator view — the
watchStreams: helm block, when to opt out, troubleshooting — see
docs/setup/watch-streams.md.
For the SSE plumbing on the frontend (useResourceStream,
StreamHealthBadge, drop-in dispatch into React Query), see the
relevant web/ files; this page covers only the server side.
1. What "watch stream" means here
Periscope's resource list pages (Pods, Events, ReplicaSets, Jobs) need to update in near-real-time. The naive answer is "poll every 5s and diff in the client." We do better:
- The browser opens an
EventSourceto/api/clusters/{cluster}/{kind}/watch. - The server runs a single list-then-watch loop against the apiserver under the impersonated user's identity.
- The server emits SSE frames: one
Snapshotevent with the initial list, thenAdded/Modified/Deleteddeltas as the apiserver reports them. - The frontend's React Query cache patches itself from the deltas — no full re-fetch.
- On disconnect the browser's
EventSourcereconnects automatically; the server usesLast-Event-IDto resume from the lastresourceVersionwithout re-emitting the full snapshot.
When watch is disabled (or the kind isn't supported), the SPA falls
back to a polling useResource query. Both code paths return the
same DTO shape, so feature parity is automatic.
2. The shipped kinds
| Kind | Path | DTO | Code |
|---|---|---|---|
| Pods | /api/clusters/{cluster}/pods/watch | Pod | internal/k8s/watch.go: WatchPods |
| Events | /api/clusters/{cluster}/events/watch | ClusterEvent | internal/k8s/watch.go: WatchEvents |
| Deployments | /api/clusters/{cluster}/deployments/watch | Deployment | internal/k8s/watch.go: WatchDeployments |
| StatefulSets | /api/clusters/{cluster}/statefulsets/watch | StatefulSet | internal/k8s/watch.go: WatchStatefulSets |
| DaemonSets | /api/clusters/{cluster}/daemonsets/watch | DaemonSet | internal/k8s/watch.go: WatchDaemonSets |
| ReplicaSets | /api/clusters/{cluster}/replicasets/watch | ReplicaSet | internal/k8s/watch.go: WatchReplicaSets |
| Jobs | /api/clusters/{cluster}/jobs/watch | Job | internal/k8s/watch.go: WatchJobs |
| CronJobs | /api/clusters/{cluster}/cronjobs/watch | CronJob | internal/k8s/watch.go: WatchCronJobs |
| HorizontalPodAutoscalers | /api/clusters/{cluster}/horizontalpodautoscalers/watch | HPA | internal/k8s/watch.go: WatchHorizontalPodAutoscalers |
| PodDisruptionBudgets | /api/clusters/{cluster}/poddisruptionbudgets/watch | PDB | internal/k8s/watch.go: WatchPodDisruptionBudgets |
| Services | /api/clusters/{cluster}/services/watch | Service | internal/k8s/watch.go: WatchServices |
| Ingresses | /api/clusters/{cluster}/ingresses/watch | Ingress | internal/k8s/watch.go: WatchIngresses |
| NetworkPolicies | /api/clusters/{cluster}/networkpolicies/watch | NetworkPolicy | internal/k8s/watch.go: WatchNetworkPolicies |
| EndpointSlices | /api/clusters/{cluster}/endpointslices/watch | EndpointSlice | internal/k8s/watch.go: WatchEndpointSlices |
| IngressClasses | /api/clusters/{cluster}/ingressclasses/watch | IngressClass | internal/k8s/watch.go: WatchIngressClasses |
| PersistentVolumes | /api/clusters/{cluster}/pvs/watch | PV | internal/k8s/watch.go: WatchPVs |
| PersistentVolumeClaims | /api/clusters/{cluster}/pvcs/watch | PVC | internal/k8s/watch.go: WatchPVCs |
| StorageClasses | /api/clusters/{cluster}/storageclasses/watch | StorageClass | internal/k8s/watch.go: WatchStorageClasses |
| Nodes | /api/clusters/{cluster}/nodes/watch | Node | internal/k8s/watch.go: WatchNodes |
| Namespaces | /api/clusters/{cluster}/namespaces/watch | Namespace | internal/k8s/watch.go: WatchNamespaces |
| PriorityClasses | /api/clusters/{cluster}/priorityclasses/watch | PriorityClass | internal/k8s/watch.go: WatchPriorityClasses |
| RuntimeClasses | /api/clusters/{cluster}/runtimeclasses/watch | RuntimeClass | internal/k8s/watch.go: WatchRuntimeClasses |
Each is a thin wrapper around the generic watchKind[T, S] primitive,
registered in the watchKinds slice in cmd/periscope/main.go.
3. The watchKind[T, S] primitive
T = the Kubernetes API type (e.g. corev1.Pod)
S = the list-view DTO (e.g. internal/k8s.Pod)A watchSpec[T, S] carries the per-kind plumbing:
type watchSpec[T any, S any] struct {
Kind string
List func(ctx, opts) ([]T, resourceVersion, error)
Watch func(ctx, opts) (watch.Interface, error)
Summary func(*T) S // T → list-view DTO; same fn for snapshot and deltas
PostList func([]S) []S // optional: sort/cap snapshot before sending
}watchKind runs the canonical lifecycle:
- If
resumeFrom != ""(client reconnected withLast-Event-ID): skip List, open Watch directly at that resourceVersion. Do not emit a Snapshot — the client cache is preserved across the blip. - Else: List → emit
Snapshot→ Watch from the list's RV. - Handle apiserver events:
ADDED/MODIFIED/DELETED→ emit the corresponding SSE event.BOOKMARK→ no emit; the next relist refreshes the RV.- Status
410 Gone→ emitRelist, restart the loop.
- Stale
Last-Event-IDthat the apiserver rejects with410 Goneon Watch open → fall through to a fresh List+Watch one time. Stale resume IDs degrade gracefully; they never hard-fail the stream. - ctx cancelled or
sink.Sendreturnedfalse(backpressure) → returnnil.
The same Summary function projects T → S for both the snapshot and
each delta, so the frontend's cache patcher always operates on
shape-identical objects.
4. Why PostList exists (Events-shaped resources)
For most resources, the snapshot is the apiserver's full list. Events
are different: there are typically thousands of stale events, and the
list page only shows the most recent ~200. The PostList hook lets
WatchEvents sort newest-first and cap the snapshot to match
ListClusterEvents (the polled path).
Delta events bypass PostList — they're emitted raw, and the
frontend's patchRowInList reconciler decides whether each delta
fits the visible window. A MODIFIED for an event outside the top-N
is treated as ADDED by the patcher (standard upsert semantics).
When you add a new kind, leave PostList nil unless the snapshot
needs a kind-specific sort or cap.
5. The handler layer (resourceWatchHandler)
cmd/periscope/main.go: resourceWatchHandler is a generic SSE handler
factored over the Watch* primitives. For a new kind, you don't
write a fresh handler — you instantiate the generic one with:
- the route path
- a closure that calls your
Watch*function - the per-kind authz pre-flight (e.g.
list podsSAR)
The handler takes care of:
- WebSocket upgrade rejection on non-SSE clients
Last-Event-IDparsing and threading intoWatchArgs.ResumeFrom- Heartbeat frames every 15s (keeps proxies from killing the socket)
- Backpressure: if the client falls behind,
sink.Sendreturnsfalseand the watch loop exits cleanly - Stream lifecycle: register with the per-actor limiter and the
streamTracker; deregister on close - Closing the stream on session expiry (the auth layer broadcasts
event:server_shutdownand per-session expiry signals)
6. Per-user concurrency cap
PERISCOPE_WATCH_PER_USER_LIMIT (default 60) caps concurrent watch
streams per OIDC subject across all clusters and kinds. Exceeding it
returns HTTP 429. The cap exists to stop a runaway SPA bug from
opening hundreds of EventSources and exhausting apiserver watch
budget on the user's behalf.
The 30-stream default reflects "10 list pages × 3 kinds at peak," which is well above realistic usage.
7. /debug/streams
GET /debug/streams returns a JSON snapshot of every active watch
stream:
[
{
"id": 17,
"actor": "auth0|alice",
"cluster": "prod-eu-west-1",
"kind": "pods",
"namespace": "default",
"openedAt": "2026-05-03T16:21:00Z"
}
]The endpoint is gated by the same authz that gates /api/audit (the
admin scope). It exists for operator visibility — "is the per-user
cap close to limit?", "which kinds dominate stream count?", "is a
specific actor leaking streams?" — and as a debugging aid when stream
counts look wrong.
8. Adding a new kind
The minimal recipe to add Foos:
8.1 The DTO
In internal/k8s/foos.go, define the list-view DTO and a fooSummary
function:
type Foo struct {
Namespace string `json:"namespace"`
Name string `json:"name"`
UID string `json:"uid"`
// … only the fields the list page renders
}
func fooSummary(f *foosv1.Foo) Foo {
return Foo{
Namespace: f.Namespace,
Name: f.Name,
UID: string(f.UID),
// …
}
}The DTO must be stable — the SSE delta and the polled list both emit it; changing field names is a breaking change for the SPA cache.
8.2 WatchFoos
In internal/k8s/watch.go, add a thin wrapper:
func WatchFoos(ctx context.Context, p credentials.Provider, args WatchArgs, sink WatchSink) error {
cs, err := newClientFn(ctx, p, args.Cluster)
if err != nil {
return fmt.Errorf("build clientset: %w", err)
}
return watchKind(ctx, watchSpec[foosv1.Foo, Foo]{
Kind: "foos",
List: func(ctx context.Context, opts metav1.ListOptions) ([]foosv1.Foo, string, error) {
list, err := cs.FoosV1().Foos(args.Namespace).List(ctx, opts)
if err != nil { return nil, "", err }
return list.Items, list.ResourceVersion, nil
},
Watch: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
return cs.FoosV1().Foos(args.Namespace).Watch(ctx, opts)
},
Summary: fooSummary,
}, args.ResumeFrom, sink)
}If the snapshot needs sorting or capping, add a PostList field. If
not, leave it nil.
8.3 The registry entry
In cmd/periscope/main.go, append a kindReg to watchKinds:
var watchKinds = []kindReg{
// …existing entries…
{Name: "foos", Group: "workloads", Watch: k8s.WatchFoos},
}That single entry wires up:
- the route
GET /api/clusters/{cluster}/foos/watch - inclusion in
/api/features.watchStreams(so the SPA enables the stream) - the
PERISCOPE_WATCH_STREAMS=foostoken (and any group alias the kind belongs to, e.g.workloads) - the
/debug/streamsregistry, per-user limiter, heartbeat, resume, shutdown — all inherited from the generic handler
8.4 The helm schema
Add the new token (and its group alias if it's a new group) to the
regex in deploy/helm/periscope/values.schema.json so operators
typing the new kind into watchStreams.kinds pass helm template
validation.
8.5 The test
Add a TestWatchFoos_* block in internal/k8s/watch_test.go. The
existing tests cover the generic primitive (snapshot, deltas, ctx
cancellation, backpressure) — your job is to confirm the
fooSummary projection is correct and the wrapper plumbing works.
9. Backpressure model
WatchSink.Send returns bool:
true→ the consumer accepted the event; loop continues.false→ backpressure detected (the SSE buffer is full because the client fell behind). Loop exits cleanly withnil.
The SSE handler implements Send with a bounded channel; if the
channel is full, it returns false rather than block. The browser
sees the connection close and reconnects with Last-Event-ID, which
either resumes from the last RV or falls through to a fresh List.
Watch loops must not block on slow consumers. The whole point of the SSE primitive is to scale to many concurrent slow clients without pinning watcher goroutines.
10. Auth integration
Watch streams close cleanly on:
- Session expiry (
event:auth_expiredbroadcast from the auth layer) - Server shutdown (
event:server_shutdown, sent before the binary's graceful-shutdown deadline expires) - ctx cancellation (the request's HTTP context)
The frontend listens for these and either prompts re-auth
(auth_expired) or marks the stream disconnected and falls back to
polling (server_shutdown). New kinds inherit this for free — the
auth layer broadcasts to every registered stream.
11. What's intentionally not here
- No write semantics. Watch streams are read-only; the dynamic PATCH / DELETE handlers are separate.
- No per-cluster watch budget. Apiserver watch budget is a cluster-level concern that Periscope can't enforce — we cap per user instead, which is what the apiserver actually meters.
- No batching of deltas. Deltas emit as-is; the frontend's React Query cache patcher is fast enough that batching server-side doesn't help for the kinds we ship. Revisit if a future kind has pathological churn.
- No stream multiplexing. One EventSource per kind per page. We considered a single multiplexed socket; the complexity didn't pay off for v1.
12. Related code
internal/k8s/watch.go—watchKind,watchSpec, the fourWatch*wrappersinternal/k8s/watch_test.go— generic primitive testsinternal/sse/— SSE writer, heartbeat, last-event-id parsingcmd/periscope/main.go—resourceWatchHandler,streamTracker,userStreamLimiter,/debug/streams,/api/featuresweb/src/hooks/useResourceStream.ts— frontend SSE consumerweb/src/components/StreamHealthBadge.tsx— stream lifecycle UI