diff --git a/api/flowcollector/v1beta2/flowcollector_types.go b/api/flowcollector/v1beta2/flowcollector_types.go
index e3317d8676..43e77257be 100644
--- a/api/flowcollector/v1beta2/flowcollector_types.go
+++ b/api/flowcollector/v1beta2/flowcollector_types.go
@@ -1607,6 +1607,89 @@ type FlowCollectorExecution struct {
Mode ExecutionMode `json:"mode"`
}
+// `FlowCollectorComponentStatus` represents the status of a single operator component.
+type FlowCollectorComponentStatus struct {
+ // `state` reports the overall health of the component.
+ // +kubebuilder:validation:Enum:="Ready";"InProgress";"Failure";"Degraded";"Unknown";"Unused"
+ State string `json:"state"`
+
+ // `reason` is a one-word CamelCase reason for the component's current state.
+ // +optional
+ Reason string `json:"reason,omitempty"`
+
+ // `message` is a human-readable description of the component's current state.
+ // +optional
+ Message string `json:"message,omitempty"`
+
+ // `desiredReplicas` is the desired number of replicas (for Deployments) or nodes (for DaemonSets).
+ // +optional
+ DesiredReplicas *int32 `json:"desiredReplicas,omitempty"`
+
+ // `readyReplicas` is the number of ready replicas (for Deployments) or up-to-date nodes (for DaemonSets).
+ // +optional
+ ReadyReplicas *int32 `json:"readyReplicas,omitempty"`
+
+ // `unhealthyPodCount` is the number of pods in a degraded state (CrashLoopBackOff, OOMKilled, etc.).
+ // +optional
+ UnhealthyPodCount int32 `json:"unhealthyPodCount,omitempty"`
+
+ // `podIssues` is a summary of unhealthy pod issues (e.g., "3 pods CrashLoopBackOff: kafka connection refused").
+ // +optional
+ PodIssues string `json:"podIssues,omitempty"`
+}
+
+// `FlowCollectorExporterStatus` represents the status of a configured exporter.
+type FlowCollectorExporterStatus struct {
+ // `name` is a generated identifier for this exporter (e.g., "kafka-export-0"), derived from its type and position in spec.exporters.
+ Name string `json:"name"`
+
+ // `type` is the exporter type (Kafka, IPFIX, OpenTelemetry).
+ // +kubebuilder:validation:Enum:="Kafka";"IPFIX";"OpenTelemetry"
+ Type string `json:"type"`
+
+ // `state` reports the health of this exporter.
+ // +kubebuilder:validation:Enum:="Ready";"InProgress";"Failure";"Degraded";"Unknown"
+ State string `json:"state"`
+
+ // `reason` is a one-word CamelCase reason for the exporter's current state.
+ // +optional
+ Reason string `json:"reason,omitempty"`
+
+ // `message` is a human-readable description of the exporter's current state.
+ // +optional
+ Message string `json:"message,omitempty"`
+}
+
+// `FlowCollectorComponentsStatus` groups the status of operator-managed components.
+type FlowCollectorComponentsStatus struct {
+ // `agent` reports the status of the eBPF agent component.
+ // +optional
+ Agent *FlowCollectorComponentStatus `json:"agent,omitempty"`
+
+ // `processor` reports the status of the flowlogs-pipeline component.
+ // +optional
+ Processor *FlowCollectorComponentStatus `json:"processor,omitempty"`
+
+ // `plugin` reports the status of the console plugin component.
+ // +optional
+ Plugin *FlowCollectorComponentStatus `json:"plugin,omitempty"`
+}
+
+// `FlowCollectorIntegrationsStatus` groups the status of external integrations.
+type FlowCollectorIntegrationsStatus struct {
+ // `loki` reports the status of the Loki integration.
+ // +optional
+ Loki *FlowCollectorComponentStatus `json:"loki,omitempty"`
+
+ // `monitoring` reports the status of monitoring (dashboards, ServiceMonitor, etc.).
+ // +optional
+ Monitoring *FlowCollectorComponentStatus `json:"monitoring,omitempty"`
+
+ // `exporters` reports the status of configured exporters.
+ // +optional
+ Exporters []FlowCollectorExporterStatus `json:"exporters,omitempty"`
+}
+
// `FlowCollectorStatus` defines the observed state of FlowCollector
type FlowCollectorStatus struct {
// Important: Run "make" to regenerate code after modifying this file
@@ -1614,6 +1697,14 @@ type FlowCollectorStatus struct {
// `conditions` represents the latest available observations of an object's state
Conditions []metav1.Condition `json:"conditions"`
+ // `components` reports the status of operator-managed components (agent, processor, plugin).
+ // +optional
+ Components *FlowCollectorComponentsStatus `json:"components,omitempty"`
+
+ // `integrations` reports the status of external integrations (Loki, monitoring, exporters).
+ // +optional
+ Integrations *FlowCollectorIntegrationsStatus `json:"integrations,omitempty"`
+
// Namespace where console plugin and flowlogs-pipeline have been deployed.
//
// Deprecated: annotations are used instead
@@ -1623,8 +1714,10 @@ type FlowCollectorStatus struct {
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:resource:scope=Cluster
-// +kubebuilder:printcolumn:name="Agent",type="string",JSONPath=`.spec.agent.type`
-// +kubebuilder:printcolumn:name="Sampling (EBPF)",type="string",JSONPath=`.spec.agent.ebpf.sampling`
+// +kubebuilder:printcolumn:name="Agent",type="string",JSONPath=`.status.components.agent.state`
+// +kubebuilder:printcolumn:name="Processor",type="string",JSONPath=`.status.components.processor.state`
+// +kubebuilder:printcolumn:name="Plugin",type="string",JSONPath=`.status.components.plugin.state`
+// +kubebuilder:printcolumn:name="Sampling",type="string",JSONPath=`.spec.agent.ebpf.sampling`
// +kubebuilder:printcolumn:name="Deployment Model",type="string",JSONPath=`.spec.deploymentModel`
// +kubebuilder:printcolumn:name="Status",type="string",JSONPath=`.status.conditions[?(@.type=="Ready")].reason`
// +kubebuilder:printcolumn:name="Warnings",type="string",JSONPath=`.status.conditions[?(@.type=="ConfigurationIssue")].message`
diff --git a/api/flowcollector/v1beta2/zz_generated.deepcopy.go b/api/flowcollector/v1beta2/zz_generated.deepcopy.go
index 7f6c5838db..2f395815b3 100644
--- a/api/flowcollector/v1beta2/zz_generated.deepcopy.go
+++ b/api/flowcollector/v1beta2/zz_generated.deepcopy.go
@@ -581,6 +581,61 @@ func (in *FlowCollectorAgent) DeepCopy() *FlowCollectorAgent {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *FlowCollectorComponentStatus) DeepCopyInto(out *FlowCollectorComponentStatus) {
+ *out = *in
+ if in.DesiredReplicas != nil {
+ in, out := &in.DesiredReplicas, &out.DesiredReplicas
+ *out = new(int32)
+ **out = **in
+ }
+ if in.ReadyReplicas != nil {
+ in, out := &in.ReadyReplicas, &out.ReadyReplicas
+ *out = new(int32)
+ **out = **in
+ }
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlowCollectorComponentStatus.
+func (in *FlowCollectorComponentStatus) DeepCopy() *FlowCollectorComponentStatus {
+ if in == nil {
+ return nil
+ }
+ out := new(FlowCollectorComponentStatus)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *FlowCollectorComponentsStatus) DeepCopyInto(out *FlowCollectorComponentsStatus) {
+ *out = *in
+ if in.Agent != nil {
+ in, out := &in.Agent, &out.Agent
+ *out = new(FlowCollectorComponentStatus)
+ (*in).DeepCopyInto(*out)
+ }
+ if in.Processor != nil {
+ in, out := &in.Processor, &out.Processor
+ *out = new(FlowCollectorComponentStatus)
+ (*in).DeepCopyInto(*out)
+ }
+ if in.Plugin != nil {
+ in, out := &in.Plugin, &out.Plugin
+ *out = new(FlowCollectorComponentStatus)
+ (*in).DeepCopyInto(*out)
+ }
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlowCollectorComponentsStatus.
+func (in *FlowCollectorComponentsStatus) DeepCopy() *FlowCollectorComponentsStatus {
+ if in == nil {
+ return nil
+ }
+ out := new(FlowCollectorComponentsStatus)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *FlowCollectorConsolePlugin) DeepCopyInto(out *FlowCollectorConsolePlugin) {
*out = *in
@@ -706,6 +761,21 @@ func (in *FlowCollectorExporter) DeepCopy() *FlowCollectorExporter {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *FlowCollectorExporterStatus) DeepCopyInto(out *FlowCollectorExporterStatus) {
+ *out = *in
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlowCollectorExporterStatus.
+func (in *FlowCollectorExporterStatus) DeepCopy() *FlowCollectorExporterStatus {
+ if in == nil {
+ return nil
+ }
+ out := new(FlowCollectorExporterStatus)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *FlowCollectorFLP) DeepCopyInto(out *FlowCollectorFLP) {
*out = *in
@@ -834,6 +904,36 @@ func (in *FlowCollectorIPFIXReceiver) DeepCopy() *FlowCollectorIPFIXReceiver {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *FlowCollectorIntegrationsStatus) DeepCopyInto(out *FlowCollectorIntegrationsStatus) {
+ *out = *in
+ if in.Loki != nil {
+ in, out := &in.Loki, &out.Loki
+ *out = new(FlowCollectorComponentStatus)
+ (*in).DeepCopyInto(*out)
+ }
+ if in.Monitoring != nil {
+ in, out := &in.Monitoring, &out.Monitoring
+ *out = new(FlowCollectorComponentStatus)
+ (*in).DeepCopyInto(*out)
+ }
+ if in.Exporters != nil {
+ in, out := &in.Exporters, &out.Exporters
+ *out = make([]FlowCollectorExporterStatus, len(*in))
+ copy(*out, *in)
+ }
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlowCollectorIntegrationsStatus.
+func (in *FlowCollectorIntegrationsStatus) DeepCopy() *FlowCollectorIntegrationsStatus {
+ if in == nil {
+ return nil
+ }
+ out := new(FlowCollectorIntegrationsStatus)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *FlowCollectorKafka) DeepCopyInto(out *FlowCollectorKafka) {
*out = *in
@@ -1066,6 +1166,16 @@ func (in *FlowCollectorStatus) DeepCopyInto(out *FlowCollectorStatus) {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
+ if in.Components != nil {
+ in, out := &in.Components, &out.Components
+ *out = new(FlowCollectorComponentsStatus)
+ (*in).DeepCopyInto(*out)
+ }
+ if in.Integrations != nil {
+ in, out := &in.Integrations, &out.Integrations
+ *out = new(FlowCollectorIntegrationsStatus)
+ (*in).DeepCopyInto(*out)
+ }
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlowCollectorStatus.
diff --git a/bundle/manifests/flows.netobserv.io_flowcollectors.yaml b/bundle/manifests/flows.netobserv.io_flowcollectors.yaml
index 4145bd5eee..147948e1e9 100644
--- a/bundle/manifests/flows.netobserv.io_flowcollectors.yaml
+++ b/bundle/manifests/flows.netobserv.io_flowcollectors.yaml
@@ -15,11 +15,17 @@ spec:
scope: Cluster
versions:
- additionalPrinterColumns:
- - jsonPath: .spec.agent.type
+ - jsonPath: .status.components.agent.state
name: Agent
type: string
+ - jsonPath: .status.components.processor.state
+ name: Processor
+ type: string
+ - jsonPath: .status.components.plugin.state
+ name: Plugin
+ type: string
- jsonPath: .spec.agent.ebpf.sampling
- name: Sampling (EBPF)
+ name: Sampling
type: string
- jsonPath: .spec.deploymentModel
name: Deployment Model
@@ -6607,6 +6613,142 @@ spec:
status:
description: '`FlowCollectorStatus` defines the observed state of FlowCollector'
properties:
+ components:
+ description: '`components` reports the status of operator-managed
+ components (agent, processor, plugin).'
+ properties:
+ agent:
+ description: '`agent` reports the status of the eBPF agent component.'
+ properties:
+ desiredReplicas:
+ description: '`desiredReplicas` is the desired number of replicas
+ (for Deployments) or nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ message:
+ description: '`message` is a human-readable description of
+ the component''s current state.'
+ type: string
+ podIssues:
+ description: '`podIssues` is a summary of unhealthy pod issues
+ (e.g., "3 pods CrashLoopBackOff: kafka connection refused").'
+ type: string
+ readyReplicas:
+ description: '`readyReplicas` is the number of ready replicas
+ (for Deployments) or up-to-date nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ reason:
+ description: '`reason` is a one-word CamelCase reason for
+ the component''s current state.'
+ type: string
+ state:
+ description: '`state` reports the overall health of the component.'
+ enum:
+ - Ready
+ - InProgress
+ - Failure
+ - Degraded
+ - Unknown
+ - Unused
+ type: string
+ unhealthyPodCount:
+ description: '`unhealthyPodCount` is the number of pods in
+ a degraded state (CrashLoopBackOff, OOMKilled, etc.).'
+ format: int32
+ type: integer
+ required:
+ - state
+ type: object
+ plugin:
+ description: '`plugin` reports the status of the console plugin
+ component.'
+ properties:
+ desiredReplicas:
+ description: '`desiredReplicas` is the desired number of replicas
+ (for Deployments) or nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ message:
+ description: '`message` is a human-readable description of
+ the component''s current state.'
+ type: string
+ podIssues:
+ description: '`podIssues` is a summary of unhealthy pod issues
+ (e.g., "3 pods CrashLoopBackOff: kafka connection refused").'
+ type: string
+ readyReplicas:
+ description: '`readyReplicas` is the number of ready replicas
+ (for Deployments) or up-to-date nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ reason:
+ description: '`reason` is a one-word CamelCase reason for
+ the component''s current state.'
+ type: string
+ state:
+ description: '`state` reports the overall health of the component.'
+ enum:
+ - Ready
+ - InProgress
+ - Failure
+ - Degraded
+ - Unknown
+ - Unused
+ type: string
+ unhealthyPodCount:
+ description: '`unhealthyPodCount` is the number of pods in
+ a degraded state (CrashLoopBackOff, OOMKilled, etc.).'
+ format: int32
+ type: integer
+ required:
+ - state
+ type: object
+ processor:
+ description: '`processor` reports the status of the flowlogs-pipeline
+ component.'
+ properties:
+ desiredReplicas:
+ description: '`desiredReplicas` is the desired number of replicas
+ (for Deployments) or nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ message:
+ description: '`message` is a human-readable description of
+ the component''s current state.'
+ type: string
+ podIssues:
+ description: '`podIssues` is a summary of unhealthy pod issues
+ (e.g., "3 pods CrashLoopBackOff: kafka connection refused").'
+ type: string
+ readyReplicas:
+ description: '`readyReplicas` is the number of ready replicas
+ (for Deployments) or up-to-date nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ reason:
+ description: '`reason` is a one-word CamelCase reason for
+ the component''s current state.'
+ type: string
+ state:
+ description: '`state` reports the overall health of the component.'
+ enum:
+ - Ready
+ - InProgress
+ - Failure
+ - Degraded
+ - Unknown
+ - Unused
+ type: string
+ unhealthyPodCount:
+ description: '`unhealthyPodCount` is the number of pods in
+ a degraded state (CrashLoopBackOff, OOMKilled, etc.).'
+ format: int32
+ type: integer
+ required:
+ - state
+ type: object
+ type: object
conditions:
description: '`conditions` represents the latest available observations
of an object''s state'
@@ -6665,6 +6807,140 @@ spec:
- type
type: object
type: array
+ integrations:
+ description: '`integrations` reports the status of external integrations
+ (Loki, monitoring, exporters).'
+ properties:
+ exporters:
+ description: '`exporters` reports the status of configured exporters.'
+ items:
+ description: '`FlowCollectorExporterStatus` represents the status
+ of a configured exporter.'
+ properties:
+ message:
+ description: '`message` is a human-readable description
+ of the exporter''s current state.'
+ type: string
+ name:
+ description: '`name` is a generated identifier for this
+ exporter (e.g., "kafka-export-0"), derived from its type
+ and position in spec.exporters.'
+ type: string
+ reason:
+ description: '`reason` is a one-word CamelCase reason for
+ the exporter''s current state.'
+ type: string
+ state:
+ description: '`state` reports the health of this exporter.'
+ enum:
+ - Ready
+ - InProgress
+ - Failure
+ - Degraded
+ - Unknown
+ type: string
+ type:
+ description: '`type` is the exporter type (Kafka, IPFIX,
+ OpenTelemetry).'
+ enum:
+ - Kafka
+ - IPFIX
+ - OpenTelemetry
+ type: string
+ required:
+ - name
+ - state
+ - type
+ type: object
+ type: array
+ loki:
+ description: '`loki` reports the status of the Loki integration.'
+ properties:
+ desiredReplicas:
+ description: '`desiredReplicas` is the desired number of replicas
+ (for Deployments) or nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ message:
+ description: '`message` is a human-readable description of
+ the component''s current state.'
+ type: string
+ podIssues:
+ description: '`podIssues` is a summary of unhealthy pod issues
+ (e.g., "3 pods CrashLoopBackOff: kafka connection refused").'
+ type: string
+ readyReplicas:
+ description: '`readyReplicas` is the number of ready replicas
+ (for Deployments) or up-to-date nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ reason:
+ description: '`reason` is a one-word CamelCase reason for
+ the component''s current state.'
+ type: string
+ state:
+ description: '`state` reports the overall health of the component.'
+ enum:
+ - Ready
+ - InProgress
+ - Failure
+ - Degraded
+ - Unknown
+ - Unused
+ type: string
+ unhealthyPodCount:
+ description: '`unhealthyPodCount` is the number of pods in
+ a degraded state (CrashLoopBackOff, OOMKilled, etc.).'
+ format: int32
+ type: integer
+ required:
+ - state
+ type: object
+ monitoring:
+ description: '`monitoring` reports the status of monitoring (dashboards,
+ ServiceMonitor, etc.).'
+ properties:
+ desiredReplicas:
+ description: '`desiredReplicas` is the desired number of replicas
+ (for Deployments) or nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ message:
+ description: '`message` is a human-readable description of
+ the component''s current state.'
+ type: string
+ podIssues:
+ description: '`podIssues` is a summary of unhealthy pod issues
+ (e.g., "3 pods CrashLoopBackOff: kafka connection refused").'
+ type: string
+ readyReplicas:
+ description: '`readyReplicas` is the number of ready replicas
+ (for Deployments) or up-to-date nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ reason:
+ description: '`reason` is a one-word CamelCase reason for
+ the component''s current state.'
+ type: string
+ state:
+ description: '`state` reports the overall health of the component.'
+ enum:
+ - Ready
+ - InProgress
+ - Failure
+ - Degraded
+ - Unknown
+ - Unused
+ type: string
+ unhealthyPodCount:
+ description: '`unhealthyPodCount` is the number of pods in
+ a degraded state (CrashLoopBackOff, OOMKilled, etc.).'
+ format: int32
+ type: integer
+ required:
+ - state
+ type: object
+ type: object
namespace:
description: |-
Namespace where console plugin and flowlogs-pipeline have been deployed.
diff --git a/bundle/manifests/netobserv-operator.clusterserviceversion.yaml b/bundle/manifests/netobserv-operator.clusterserviceversion.yaml
index 3f6e483492..a9beb83255 100644
--- a/bundle/manifests/netobserv-operator.clusterserviceversion.yaml
+++ b/bundle/manifests/netobserv-operator.clusterserviceversion.yaml
@@ -714,6 +714,13 @@ spec:
- get
- list
- watch
+ - apiGroups:
+ - ""
+ resources:
+ - events
+ verbs:
+ - create
+ - patch
- apiGroups:
- apiextensions.k8s.io
resources:
diff --git a/config/crd/bases/flows.netobserv.io_flowcollectors.yaml b/config/crd/bases/flows.netobserv.io_flowcollectors.yaml
index 3872360848..a21b614646 100644
--- a/config/crd/bases/flows.netobserv.io_flowcollectors.yaml
+++ b/config/crd/bases/flows.netobserv.io_flowcollectors.yaml
@@ -15,11 +15,17 @@ spec:
scope: Cluster
versions:
- additionalPrinterColumns:
- - jsonPath: .spec.agent.type
+ - jsonPath: .status.components.agent.state
name: Agent
type: string
+ - jsonPath: .status.components.processor.state
+ name: Processor
+ type: string
+ - jsonPath: .status.components.plugin.state
+ name: Plugin
+ type: string
- jsonPath: .spec.agent.ebpf.sampling
- name: Sampling (EBPF)
+ name: Sampling
type: string
- jsonPath: .spec.deploymentModel
name: Deployment Model
@@ -6085,6 +6091,121 @@ spec:
status:
description: '`FlowCollectorStatus` defines the observed state of FlowCollector'
properties:
+ components:
+ description: '`components` reports the status of operator-managed components (agent, processor, plugin).'
+ properties:
+ agent:
+ description: '`agent` reports the status of the eBPF agent component.'
+ properties:
+ desiredReplicas:
+ description: '`desiredReplicas` is the desired number of replicas (for Deployments) or nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ message:
+ description: '`message` is a human-readable description of the component''s current state.'
+ type: string
+ podIssues:
+ description: '`podIssues` is a summary of unhealthy pod issues (e.g., "3 pods CrashLoopBackOff: kafka connection refused").'
+ type: string
+ readyReplicas:
+ description: '`readyReplicas` is the number of ready replicas (for Deployments) or up-to-date nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ reason:
+ description: '`reason` is a one-word CamelCase reason for the component''s current state.'
+ type: string
+ state:
+ description: '`state` reports the overall health of the component.'
+ enum:
+ - Ready
+ - InProgress
+ - Failure
+ - Degraded
+ - Unknown
+ - Unused
+ type: string
+ unhealthyPodCount:
+ description: '`unhealthyPodCount` is the number of pods in a degraded state (CrashLoopBackOff, OOMKilled, etc.).'
+ format: int32
+ type: integer
+ required:
+ - state
+ type: object
+ plugin:
+ description: '`plugin` reports the status of the console plugin component.'
+ properties:
+ desiredReplicas:
+ description: '`desiredReplicas` is the desired number of replicas (for Deployments) or nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ message:
+ description: '`message` is a human-readable description of the component''s current state.'
+ type: string
+ podIssues:
+ description: '`podIssues` is a summary of unhealthy pod issues (e.g., "3 pods CrashLoopBackOff: kafka connection refused").'
+ type: string
+ readyReplicas:
+ description: '`readyReplicas` is the number of ready replicas (for Deployments) or up-to-date nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ reason:
+ description: '`reason` is a one-word CamelCase reason for the component''s current state.'
+ type: string
+ state:
+ description: '`state` reports the overall health of the component.'
+ enum:
+ - Ready
+ - InProgress
+ - Failure
+ - Degraded
+ - Unknown
+ - Unused
+ type: string
+ unhealthyPodCount:
+ description: '`unhealthyPodCount` is the number of pods in a degraded state (CrashLoopBackOff, OOMKilled, etc.).'
+ format: int32
+ type: integer
+ required:
+ - state
+ type: object
+ processor:
+ description: '`processor` reports the status of the flowlogs-pipeline component.'
+ properties:
+ desiredReplicas:
+ description: '`desiredReplicas` is the desired number of replicas (for Deployments) or nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ message:
+ description: '`message` is a human-readable description of the component''s current state.'
+ type: string
+ podIssues:
+ description: '`podIssues` is a summary of unhealthy pod issues (e.g., "3 pods CrashLoopBackOff: kafka connection refused").'
+ type: string
+ readyReplicas:
+ description: '`readyReplicas` is the number of ready replicas (for Deployments) or up-to-date nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ reason:
+ description: '`reason` is a one-word CamelCase reason for the component''s current state.'
+ type: string
+ state:
+ description: '`state` reports the overall health of the component.'
+ enum:
+ - Ready
+ - InProgress
+ - Failure
+ - Degraded
+ - Unknown
+ - Unused
+ type: string
+ unhealthyPodCount:
+ description: '`unhealthyPodCount` is the number of pods in a degraded state (CrashLoopBackOff, OOMKilled, etc.).'
+ format: int32
+ type: integer
+ required:
+ - state
+ type: object
+ type: object
conditions:
description: '`conditions` represents the latest available observations of an object''s state'
items:
@@ -6141,6 +6262,120 @@ spec:
- type
type: object
type: array
+ integrations:
+ description: '`integrations` reports the status of external integrations (Loki, monitoring, exporters).'
+ properties:
+ exporters:
+ description: '`exporters` reports the status of configured exporters.'
+ items:
+ description: '`FlowCollectorExporterStatus` represents the status of a configured exporter.'
+ properties:
+ message:
+ description: '`message` is a human-readable description of the exporter''s current state.'
+ type: string
+ name:
+ description: '`name` is a generated identifier for this exporter (e.g., "kafka-export-0"), derived from its type and position in spec.exporters.'
+ type: string
+ reason:
+ description: '`reason` is a one-word CamelCase reason for the exporter''s current state.'
+ type: string
+ state:
+ description: '`state` reports the health of this exporter.'
+ enum:
+ - Ready
+ - InProgress
+ - Failure
+ - Degraded
+ - Unknown
+ type: string
+ type:
+ description: '`type` is the exporter type (Kafka, IPFIX, OpenTelemetry).'
+ enum:
+ - Kafka
+ - IPFIX
+ - OpenTelemetry
+ type: string
+ required:
+ - name
+ - state
+ - type
+ type: object
+ type: array
+ loki:
+ description: '`loki` reports the status of the Loki integration.'
+ properties:
+ desiredReplicas:
+ description: '`desiredReplicas` is the desired number of replicas (for Deployments) or nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ message:
+ description: '`message` is a human-readable description of the component''s current state.'
+ type: string
+ podIssues:
+ description: '`podIssues` is a summary of unhealthy pod issues (e.g., "3 pods CrashLoopBackOff: kafka connection refused").'
+ type: string
+ readyReplicas:
+ description: '`readyReplicas` is the number of ready replicas (for Deployments) or up-to-date nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ reason:
+ description: '`reason` is a one-word CamelCase reason for the component''s current state.'
+ type: string
+ state:
+ description: '`state` reports the overall health of the component.'
+ enum:
+ - Ready
+ - InProgress
+ - Failure
+ - Degraded
+ - Unknown
+ - Unused
+ type: string
+ unhealthyPodCount:
+ description: '`unhealthyPodCount` is the number of pods in a degraded state (CrashLoopBackOff, OOMKilled, etc.).'
+ format: int32
+ type: integer
+ required:
+ - state
+ type: object
+ monitoring:
+ description: '`monitoring` reports the status of monitoring (dashboards, ServiceMonitor, etc.).'
+ properties:
+ desiredReplicas:
+ description: '`desiredReplicas` is the desired number of replicas (for Deployments) or nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ message:
+ description: '`message` is a human-readable description of the component''s current state.'
+ type: string
+ podIssues:
+ description: '`podIssues` is a summary of unhealthy pod issues (e.g., "3 pods CrashLoopBackOff: kafka connection refused").'
+ type: string
+ readyReplicas:
+ description: '`readyReplicas` is the number of ready replicas (for Deployments) or up-to-date nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ reason:
+ description: '`reason` is a one-word CamelCase reason for the component''s current state.'
+ type: string
+ state:
+ description: '`state` reports the overall health of the component.'
+ enum:
+ - Ready
+ - InProgress
+ - Failure
+ - Degraded
+ - Unknown
+ - Unused
+ type: string
+ unhealthyPodCount:
+ description: '`unhealthyPodCount` is the number of pods in a degraded state (CrashLoopBackOff, OOMKilled, etc.).'
+ format: int32
+ type: integer
+ required:
+ - state
+ type: object
+ type: object
namespace:
description: |-
Namespace where console plugin and flowlogs-pipeline have been deployed.
diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml
index 7445fb273a..ec3cf668d4 100644
--- a/config/rbac/role.yaml
+++ b/config/rbac/role.yaml
@@ -31,6 +31,13 @@ rules:
- get
- list
- watch
+- apiGroups:
+ - ""
+ resources:
+ - events
+ verbs:
+ - create
+ - patch
- apiGroups:
- apiextensions.k8s.io
resources:
diff --git a/docs/FlowCollector.md b/docs/FlowCollector.md
index f48022fe44..e67d8ab75d 100644
--- a/docs/FlowCollector.md
+++ b/docs/FlowCollector.md
@@ -13045,6 +13045,20 @@ If the namespace is different, the config map or the secret is copied so that it
`conditions` represents the latest available observations of an object's state
true |
+
+ | components |
+ object |
+
+ `components` reports the status of operator-managed components (agent, processor, plugin).
+ |
+ false |
+
+ | integrations |
+ object |
+
+ `integrations` reports the status of external integrations (Loki, monitoring, exporters).
+ |
+ false |
| namespace |
string |
@@ -13133,3 +13147,529 @@ with respect to the current state of the instance.
false |
+
+
+### FlowCollector.status.components
+[↩ Parent](#flowcollectorstatus)
+
+
+
+`components` reports the status of operator-managed components (agent, processor, plugin).
+
+
+
+
+ | Name |
+ Type |
+ Description |
+ Required |
+
+
+
+ | agent |
+ object |
+
+ `agent` reports the status of the eBPF agent component.
+ |
+ false |
+
+ | plugin |
+ object |
+
+ `plugin` reports the status of the console plugin component.
+ |
+ false |
+
+ | processor |
+ object |
+
+ `processor` reports the status of the flowlogs-pipeline component.
+ |
+ false |
+
+
+
+
+### FlowCollector.status.components.agent
+[↩ Parent](#flowcollectorstatuscomponents)
+
+
+
+`agent` reports the status of the eBPF agent component.
+
+
+
+
+ | Name |
+ Type |
+ Description |
+ Required |
+
+
+
+ | state |
+ enum |
+
+ `state` reports the overall health of the component.
+
+ Enum: Ready, InProgress, Failure, Degraded, Unknown, Unused
+ |
+ true |
+
+ | desiredReplicas |
+ integer |
+
+ `desiredReplicas` is the desired number of replicas (for Deployments) or nodes (for DaemonSets).
+
+ Format: int32
+ |
+ false |
+
+ | message |
+ string |
+
+ `message` is a human-readable description of the component's current state.
+ |
+ false |
+
+ | podIssues |
+ string |
+
+ `podIssues` is a summary of unhealthy pod issues (e.g., "3 pods CrashLoopBackOff: kafka connection refused").
+ |
+ false |
+
+ | readyReplicas |
+ integer |
+
+ `readyReplicas` is the number of ready replicas (for Deployments) or up-to-date nodes (for DaemonSets).
+
+ Format: int32
+ |
+ false |
+
+ | reason |
+ string |
+
+ `reason` is a one-word CamelCase reason for the component's current state.
+ |
+ false |
+
+ | unhealthyPodCount |
+ integer |
+
+ `unhealthyPodCount` is the number of pods in a degraded state (CrashLoopBackOff, OOMKilled, etc.).
+
+ Format: int32
+ |
+ false |
+
+
+
+
+### FlowCollector.status.components.plugin
+[↩ Parent](#flowcollectorstatuscomponents)
+
+
+
+`plugin` reports the status of the console plugin component.
+
+
+
+
+ | Name |
+ Type |
+ Description |
+ Required |
+
+
+
+ | state |
+ enum |
+
+ `state` reports the overall health of the component.
+
+ Enum: Ready, InProgress, Failure, Degraded, Unknown, Unused
+ |
+ true |
+
+ | desiredReplicas |
+ integer |
+
+ `desiredReplicas` is the desired number of replicas (for Deployments) or nodes (for DaemonSets).
+
+ Format: int32
+ |
+ false |
+
+ | message |
+ string |
+
+ `message` is a human-readable description of the component's current state.
+ |
+ false |
+
+ | podIssues |
+ string |
+
+ `podIssues` is a summary of unhealthy pod issues (e.g., "3 pods CrashLoopBackOff: kafka connection refused").
+ |
+ false |
+
+ | readyReplicas |
+ integer |
+
+ `readyReplicas` is the number of ready replicas (for Deployments) or up-to-date nodes (for DaemonSets).
+
+ Format: int32
+ |
+ false |
+
+ | reason |
+ string |
+
+ `reason` is a one-word CamelCase reason for the component's current state.
+ |
+ false |
+
+ | unhealthyPodCount |
+ integer |
+
+ `unhealthyPodCount` is the number of pods in a degraded state (CrashLoopBackOff, OOMKilled, etc.).
+
+ Format: int32
+ |
+ false |
+
+
+
+
+### FlowCollector.status.components.processor
+[↩ Parent](#flowcollectorstatuscomponents)
+
+
+
+`processor` reports the status of the flowlogs-pipeline component.
+
+
+
+
+ | Name |
+ Type |
+ Description |
+ Required |
+
+
+
+ | state |
+ enum |
+
+ `state` reports the overall health of the component.
+
+ Enum: Ready, InProgress, Failure, Degraded, Unknown, Unused
+ |
+ true |
+
+ | desiredReplicas |
+ integer |
+
+ `desiredReplicas` is the desired number of replicas (for Deployments) or nodes (for DaemonSets).
+
+ Format: int32
+ |
+ false |
+
+ | message |
+ string |
+
+ `message` is a human-readable description of the component's current state.
+ |
+ false |
+
+ | podIssues |
+ string |
+
+ `podIssues` is a summary of unhealthy pod issues (e.g., "3 pods CrashLoopBackOff: kafka connection refused").
+ |
+ false |
+
+ | readyReplicas |
+ integer |
+
+ `readyReplicas` is the number of ready replicas (for Deployments) or up-to-date nodes (for DaemonSets).
+
+ Format: int32
+ |
+ false |
+
+ | reason |
+ string |
+
+ `reason` is a one-word CamelCase reason for the component's current state.
+ |
+ false |
+
+ | unhealthyPodCount |
+ integer |
+
+ `unhealthyPodCount` is the number of pods in a degraded state (CrashLoopBackOff, OOMKilled, etc.).
+
+ Format: int32
+ |
+ false |
+
+
+
+
+### FlowCollector.status.integrations
+[↩ Parent](#flowcollectorstatus)
+
+
+
+`integrations` reports the status of external integrations (Loki, monitoring, exporters).
+
+
+
+
+ | Name |
+ Type |
+ Description |
+ Required |
+
+
+
+ | exporters |
+ []object |
+
+ `exporters` reports the status of configured exporters.
+ |
+ false |
+
+ | loki |
+ object |
+
+ `loki` reports the status of the Loki integration.
+ |
+ false |
+
+ | monitoring |
+ object |
+
+ `monitoring` reports the status of monitoring (dashboards, ServiceMonitor, etc.).
+ |
+ false |
+
+
+
+
+### FlowCollector.status.integrations.exporters[index]
+[↩ Parent](#flowcollectorstatusintegrations)
+
+
+
+`FlowCollectorExporterStatus` represents the status of a configured exporter.
+
+
+
+
+ | Name |
+ Type |
+ Description |
+ Required |
+
+
+
+ | name |
+ string |
+
+ `name` is a generated identifier for this exporter (e.g., "kafka-export-0"), derived from its type and position in spec.exporters.
+ |
+ true |
+
+ | state |
+ enum |
+
+ `state` reports the health of this exporter.
+
+ Enum: Ready, InProgress, Failure, Degraded, Unknown
+ |
+ true |
+
+ | type |
+ enum |
+
+ `type` is the exporter type (Kafka, IPFIX, OpenTelemetry).
+
+ Enum: Kafka, IPFIX, OpenTelemetry
+ |
+ true |
+
+ | message |
+ string |
+
+ `message` is a human-readable description of the exporter's current state.
+ |
+ false |
+
+ | reason |
+ string |
+
+ `reason` is a one-word CamelCase reason for the exporter's current state.
+ |
+ false |
+
+
+
+
+### FlowCollector.status.integrations.loki
+[↩ Parent](#flowcollectorstatusintegrations)
+
+
+
+`loki` reports the status of the Loki integration.
+
+
+
+
+ | Name |
+ Type |
+ Description |
+ Required |
+
+
+
+ | state |
+ enum |
+
+ `state` reports the overall health of the component.
+
+ Enum: Ready, InProgress, Failure, Degraded, Unknown, Unused
+ |
+ true |
+
+ | desiredReplicas |
+ integer |
+
+ `desiredReplicas` is the desired number of replicas (for Deployments) or nodes (for DaemonSets).
+
+ Format: int32
+ |
+ false |
+
+ | message |
+ string |
+
+ `message` is a human-readable description of the component's current state.
+ |
+ false |
+
+ | podIssues |
+ string |
+
+ `podIssues` is a summary of unhealthy pod issues (e.g., "3 pods CrashLoopBackOff: kafka connection refused").
+ |
+ false |
+
+ | readyReplicas |
+ integer |
+
+ `readyReplicas` is the number of ready replicas (for Deployments) or up-to-date nodes (for DaemonSets).
+
+ Format: int32
+ |
+ false |
+
+ | reason |
+ string |
+
+ `reason` is a one-word CamelCase reason for the component's current state.
+ |
+ false |
+
+ | unhealthyPodCount |
+ integer |
+
+ `unhealthyPodCount` is the number of pods in a degraded state (CrashLoopBackOff, OOMKilled, etc.).
+
+ Format: int32
+ |
+ false |
+
+
+
+
+### FlowCollector.status.integrations.monitoring
+[↩ Parent](#flowcollectorstatusintegrations)
+
+
+
+`monitoring` reports the status of monitoring (dashboards, ServiceMonitor, etc.).
+
+
+
+
+ | Name |
+ Type |
+ Description |
+ Required |
+
+
+
+ | state |
+ enum |
+
+ `state` reports the overall health of the component.
+
+ Enum: Ready, InProgress, Failure, Degraded, Unknown, Unused
+ |
+ true |
+
+ | desiredReplicas |
+ integer |
+
+ `desiredReplicas` is the desired number of replicas (for Deployments) or nodes (for DaemonSets).
+
+ Format: int32
+ |
+ false |
+
+ | message |
+ string |
+
+ `message` is a human-readable description of the component's current state.
+ |
+ false |
+
+ | podIssues |
+ string |
+
+ `podIssues` is a summary of unhealthy pod issues (e.g., "3 pods CrashLoopBackOff: kafka connection refused").
+ |
+ false |
+
+ | readyReplicas |
+ integer |
+
+ `readyReplicas` is the number of ready replicas (for Deployments) or up-to-date nodes (for DaemonSets).
+
+ Format: int32
+ |
+ false |
+
+ | reason |
+ string |
+
+ `reason` is a one-word CamelCase reason for the component's current state.
+ |
+ false |
+
+ | unhealthyPodCount |
+ integer |
+
+ `unhealthyPodCount` is the number of pods in a degraded state (CrashLoopBackOff, OOMKilled, etc.).
+
+ Format: int32
+ |
+ false |
+
+
diff --git a/helm/crds/flows.netobserv.io_flowcollectors.yaml b/helm/crds/flows.netobserv.io_flowcollectors.yaml
index ad0f7d49fb..e27b185a70 100644
--- a/helm/crds/flows.netobserv.io_flowcollectors.yaml
+++ b/helm/crds/flows.netobserv.io_flowcollectors.yaml
@@ -15,11 +15,17 @@ spec:
scope: Cluster
versions:
- additionalPrinterColumns:
- - jsonPath: .spec.agent.type
+ - jsonPath: .status.components.agent.state
name: Agent
type: string
+ - jsonPath: .status.components.processor.state
+ name: Processor
+ type: string
+ - jsonPath: .status.components.plugin.state
+ name: Plugin
+ type: string
- jsonPath: .spec.agent.ebpf.sampling
- name: Sampling (EBPF)
+ name: Sampling
type: string
- jsonPath: .spec.deploymentModel
name: Deployment Model
@@ -6089,6 +6095,121 @@ spec:
status:
description: '`FlowCollectorStatus` defines the observed state of FlowCollector'
properties:
+ components:
+ description: '`components` reports the status of operator-managed components (agent, processor, plugin).'
+ properties:
+ agent:
+ description: '`agent` reports the status of the eBPF agent component.'
+ properties:
+ desiredReplicas:
+ description: '`desiredReplicas` is the desired number of replicas (for Deployments) or nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ message:
+ description: '`message` is a human-readable description of the component''s current state.'
+ type: string
+ podIssues:
+ description: '`podIssues` is a summary of unhealthy pod issues (e.g., "3 pods CrashLoopBackOff: kafka connection refused").'
+ type: string
+ readyReplicas:
+ description: '`readyReplicas` is the number of ready replicas (for Deployments) or up-to-date nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ reason:
+ description: '`reason` is a one-word CamelCase reason for the component''s current state.'
+ type: string
+ state:
+ description: '`state` reports the overall health of the component.'
+ enum:
+ - Ready
+ - InProgress
+ - Failure
+ - Degraded
+ - Unknown
+ - Unused
+ type: string
+ unhealthyPodCount:
+ description: '`unhealthyPodCount` is the number of pods in a degraded state (CrashLoopBackOff, OOMKilled, etc.).'
+ format: int32
+ type: integer
+ required:
+ - state
+ type: object
+ plugin:
+ description: '`plugin` reports the status of the console plugin component.'
+ properties:
+ desiredReplicas:
+ description: '`desiredReplicas` is the desired number of replicas (for Deployments) or nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ message:
+ description: '`message` is a human-readable description of the component''s current state.'
+ type: string
+ podIssues:
+ description: '`podIssues` is a summary of unhealthy pod issues (e.g., "3 pods CrashLoopBackOff: kafka connection refused").'
+ type: string
+ readyReplicas:
+ description: '`readyReplicas` is the number of ready replicas (for Deployments) or up-to-date nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ reason:
+ description: '`reason` is a one-word CamelCase reason for the component''s current state.'
+ type: string
+ state:
+ description: '`state` reports the overall health of the component.'
+ enum:
+ - Ready
+ - InProgress
+ - Failure
+ - Degraded
+ - Unknown
+ - Unused
+ type: string
+ unhealthyPodCount:
+ description: '`unhealthyPodCount` is the number of pods in a degraded state (CrashLoopBackOff, OOMKilled, etc.).'
+ format: int32
+ type: integer
+ required:
+ - state
+ type: object
+ processor:
+ description: '`processor` reports the status of the flowlogs-pipeline component.'
+ properties:
+ desiredReplicas:
+ description: '`desiredReplicas` is the desired number of replicas (for Deployments) or nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ message:
+ description: '`message` is a human-readable description of the component''s current state.'
+ type: string
+ podIssues:
+ description: '`podIssues` is a summary of unhealthy pod issues (e.g., "3 pods CrashLoopBackOff: kafka connection refused").'
+ type: string
+ readyReplicas:
+ description: '`readyReplicas` is the number of ready replicas (for Deployments) or up-to-date nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ reason:
+ description: '`reason` is a one-word CamelCase reason for the component''s current state.'
+ type: string
+ state:
+ description: '`state` reports the overall health of the component.'
+ enum:
+ - Ready
+ - InProgress
+ - Failure
+ - Degraded
+ - Unknown
+ - Unused
+ type: string
+ unhealthyPodCount:
+ description: '`unhealthyPodCount` is the number of pods in a degraded state (CrashLoopBackOff, OOMKilled, etc.).'
+ format: int32
+ type: integer
+ required:
+ - state
+ type: object
+ type: object
conditions:
description: '`conditions` represents the latest available observations of an object''s state'
items:
@@ -6145,6 +6266,120 @@ spec:
- type
type: object
type: array
+ integrations:
+ description: '`integrations` reports the status of external integrations (Loki, monitoring, exporters).'
+ properties:
+ exporters:
+ description: '`exporters` reports the status of configured exporters.'
+ items:
+ description: '`FlowCollectorExporterStatus` represents the status of a configured exporter.'
+ properties:
+ message:
+ description: '`message` is a human-readable description of the exporter''s current state.'
+ type: string
+ name:
+ description: '`name` is a generated identifier for this exporter (e.g., "kafka-export-0"), derived from its type and position in spec.exporters.'
+ type: string
+ reason:
+ description: '`reason` is a one-word CamelCase reason for the exporter''s current state.'
+ type: string
+ state:
+ description: '`state` reports the health of this exporter.'
+ enum:
+ - Ready
+ - InProgress
+ - Failure
+ - Degraded
+ - Unknown
+ type: string
+ type:
+ description: '`type` is the exporter type (Kafka, IPFIX, OpenTelemetry).'
+ enum:
+ - Kafka
+ - IPFIX
+ - OpenTelemetry
+ type: string
+ required:
+ - name
+ - state
+ - type
+ type: object
+ type: array
+ loki:
+ description: '`loki` reports the status of the Loki integration.'
+ properties:
+ desiredReplicas:
+ description: '`desiredReplicas` is the desired number of replicas (for Deployments) or nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ message:
+ description: '`message` is a human-readable description of the component''s current state.'
+ type: string
+ podIssues:
+ description: '`podIssues` is a summary of unhealthy pod issues (e.g., "3 pods CrashLoopBackOff: kafka connection refused").'
+ type: string
+ readyReplicas:
+ description: '`readyReplicas` is the number of ready replicas (for Deployments) or up-to-date nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ reason:
+ description: '`reason` is a one-word CamelCase reason for the component''s current state.'
+ type: string
+ state:
+ description: '`state` reports the overall health of the component.'
+ enum:
+ - Ready
+ - InProgress
+ - Failure
+ - Degraded
+ - Unknown
+ - Unused
+ type: string
+ unhealthyPodCount:
+ description: '`unhealthyPodCount` is the number of pods in a degraded state (CrashLoopBackOff, OOMKilled, etc.).'
+ format: int32
+ type: integer
+ required:
+ - state
+ type: object
+ monitoring:
+ description: '`monitoring` reports the status of monitoring (dashboards, ServiceMonitor, etc.).'
+ properties:
+ desiredReplicas:
+ description: '`desiredReplicas` is the desired number of replicas (for Deployments) or nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ message:
+ description: '`message` is a human-readable description of the component''s current state.'
+ type: string
+ podIssues:
+ description: '`podIssues` is a summary of unhealthy pod issues (e.g., "3 pods CrashLoopBackOff: kafka connection refused").'
+ type: string
+ readyReplicas:
+ description: '`readyReplicas` is the number of ready replicas (for Deployments) or up-to-date nodes (for DaemonSets).'
+ format: int32
+ type: integer
+ reason:
+ description: '`reason` is a one-word CamelCase reason for the component''s current state.'
+ type: string
+ state:
+ description: '`state` reports the overall health of the component.'
+ enum:
+ - Ready
+ - InProgress
+ - Failure
+ - Degraded
+ - Unknown
+ - Unused
+ type: string
+ unhealthyPodCount:
+ description: '`unhealthyPodCount` is the number of pods in a degraded state (CrashLoopBackOff, OOMKilled, etc.).'
+ format: int32
+ type: integer
+ required:
+ - state
+ type: object
+ type: object
namespace:
description: |-
Namespace where console plugin and flowlogs-pipeline have been deployed.
diff --git a/helm/templates/clusterrole.yaml b/helm/templates/clusterrole.yaml
index 4f406e5bb8..d1c69c8e6f 100644
--- a/helm/templates/clusterrole.yaml
+++ b/helm/templates/clusterrole.yaml
@@ -30,6 +30,13 @@ rules:
- get
- list
- watch
+ - apiGroups:
+ - ""
+ resources:
+ - events
+ verbs:
+ - create
+ - patch
- apiGroups:
- apiextensions.k8s.io
resources:
diff --git a/internal/controller/consoleplugin/consoleplugin_objects.go b/internal/controller/consoleplugin/consoleplugin_objects.go
index a893e3896c..92d113b9d8 100644
--- a/internal/controller/consoleplugin/consoleplugin_objects.go
+++ b/internal/controller/consoleplugin/consoleplugin_objects.go
@@ -632,7 +632,7 @@ func (b *builder) getHealthRecordingAnnotations() map[string]map[string]string {
// returns a configmap with a digest of its configuration contents, which will be used to
// detect any configuration change. externalRecordingAnnotations is optional (e.g. nil in tests);
// when non-empty, those annotations are merged into the frontend config (from PrometheusRules).
-func (b *builder) configMap(ctx context.Context, externalRecordingAnnotations map[string]map[string]string, lokiStatus status.ComponentStatus) (*corev1.ConfigMap, string, error) {
+func (b *builder) configMap(ctx context.Context, externalRecordingAnnotations map[string]map[string]string, lokiStatus *status.ComponentStatus) (*corev1.ConfigMap, string, error) {
config := cfg.PluginConfig{
Server: cfg.ServerConfig{
Port: int(*b.advanced.Port),
@@ -648,7 +648,7 @@ func (b *builder) configMap(ctx context.Context, externalRecordingAnnotations ma
// configure loki
var err error
config.Loki, err = b.getLokiConfig()
- if lokiStatus.Status != status.StatusUnknown {
+ if lokiStatus != nil && lokiStatus.Status != status.StatusUnknown && lokiStatus.Status != status.StatusUnused {
config.Loki.StatusURL = ""
if lokiStatus.Status == status.StatusReady {
config.Loki.Status = "ready"
diff --git a/internal/controller/consoleplugin/consoleplugin_reconciler.go b/internal/controller/consoleplugin/consoleplugin_reconciler.go
index f9d7c53704..bd8cf2a0a7 100644
--- a/internal/controller/consoleplugin/consoleplugin_reconciler.go
+++ b/internal/controller/consoleplugin/consoleplugin_reconciler.go
@@ -56,7 +56,7 @@ func NewReconciler(cmn *reconcilers.Instance) CPReconciler {
}
// Reconcile is the reconciler entry point to reconcile the current plugin state with the desired configuration
-func (r *CPReconciler) Reconcile(ctx context.Context, desired *flowslatest.FlowCollector, lokiStatus status.ComponentStatus) error {
+func (r *CPReconciler) Reconcile(ctx context.Context, desired *flowslatest.FlowCollector, lokiStatus *status.ComponentStatus) error {
l := log.FromContext(ctx).WithName("web-console")
ctx = log.IntoContext(ctx, l)
@@ -75,7 +75,7 @@ func (r *CPReconciler) Reconcile(ctx context.Context, desired *flowslatest.FlowC
return nil
}
-func (r *CPReconciler) reconcile(ctx context.Context, desired *flowslatest.FlowCollector, lokiStatus status.ComponentStatus) error {
+func (r *CPReconciler) reconcile(ctx context.Context, desired *flowslatest.FlowCollector, lokiStatus *status.ComponentStatus) error {
// Retrieve current owned objects
err := r.Managed.FetchAll(ctx)
if err != nil {
@@ -148,9 +148,9 @@ func (r *CPReconciler) checkAutoPatch(ctx context.Context, desired *flowslatest.
advancedConfig := helper.GetAdvancedPluginConfig(desired.Spec.ConsolePlugin.Advanced)
reg := desired.Spec.UseWebConsole() && *advancedConfig.Register
if err := r.Client.Get(ctx, types.NamespacedName{Name: "cluster"}, &console); err != nil {
- // Console operator CR not found => warn but continue execution
if reg {
log.FromContext(ctx).Error(err, "Could not get the Console Operator resource for plugin registration. Please register manually.")
+ r.Status.SetDegraded("PluginRegistrationFailed", "Could not auto-register console plugin; manual registration needed")
}
return nil
}
@@ -204,7 +204,7 @@ func (r *CPReconciler) reconcilePlugin(ctx context.Context, builder *builder, de
return nil
}
-func (r *CPReconciler) reconcileConfigMap(ctx context.Context, builder *builder, lokiStatus status.ComponentStatus) (string, error) {
+func (r *CPReconciler) reconcileConfigMap(ctx context.Context, builder *builder, lokiStatus *status.ComponentStatus) (string, error) {
externalRecordingAnnotations, err := getExternalRecordingAnnotations(ctx, r.Client)
if err != nil {
return "", err
diff --git a/internal/controller/consoleplugin/consoleplugin_test.go b/internal/controller/consoleplugin/consoleplugin_test.go
index 71a024d6e8..8ce1442cba 100644
--- a/internal/controller/consoleplugin/consoleplugin_test.go
+++ b/internal/controller/consoleplugin/consoleplugin_test.go
@@ -115,7 +115,7 @@ func getAutoScalerSpecs() (ascv2.HorizontalPodAutoscaler, flowslatest.FlowCollec
func getBuilder(spec *flowslatest.FlowCollectorSpec, lk *helper.LokiConfig) builder {
info := reconcilers.Common{Namespace: testNamespace, Loki: lk, ClusterInfo: &cluster.Info{}}
b := newBuilder(info.NewInstance(map[reconcilers.ImageRef]string{reconcilers.MainImage: testImage}, status.Instance{}), spec, constants.PluginName)
- _, _, _ = b.configMap(context.Background(), nil, lokiStatusUnused) // build configmap to update builder's volumes
+ _, _, _ = b.configMap(context.Background(), nil, &lokiStatusUnused) // build configmap to update builder's volumes
return b
}
@@ -228,8 +228,8 @@ func TestConfigMapUpdateCheck(t *testing.T) {
}
spec := flowslatest.FlowCollectorSpec{ConsolePlugin: plugin}
builder := getBuilder(&spec, &loki)
- old, _, _ := builder.configMap(context.Background(), nil, lokiStatusUnused)
- nEw, _, _ := builder.configMap(context.Background(), nil, lokiStatusUnused)
+ old, _, _ := builder.configMap(context.Background(), nil, &lokiStatusUnused)
+ nEw, _, _ := builder.configMap(context.Background(), nil, &lokiStatusUnused)
assert.Equal(old.Data, nEw.Data)
// update loki
@@ -244,7 +244,7 @@ func TestConfigMapUpdateCheck(t *testing.T) {
}},
}
builder = getBuilder(&spec, &loki)
- nEw, _, _ = builder.configMap(context.Background(), nil, lokiStatusUnused)
+ nEw, _, _ = builder.configMap(context.Background(), nil, &lokiStatusUnused)
assert.NotEqual(old.Data, nEw.Data)
old = nEw
@@ -252,7 +252,7 @@ func TestConfigMapUpdateCheck(t *testing.T) {
loki.LokiManualParams.StatusURL = "http://loki.status:3100/"
loki.LokiManualParams.StatusTLS.Enable = true
builder = getBuilder(&spec, &loki)
- nEw, _, _ = builder.configMap(context.Background(), nil, lokiStatusUnused)
+ nEw, _, _ = builder.configMap(context.Background(), nil, &lokiStatusUnused)
assert.NotEqual(old.Data, nEw.Data)
old = nEw
@@ -263,14 +263,14 @@ func TestConfigMapUpdateCheck(t *testing.T) {
CertFile: "status-ca.crt",
}
builder = getBuilder(&spec, &loki)
- nEw, _, _ = builder.configMap(context.Background(), nil, lokiStatusUnused)
+ nEw, _, _ = builder.configMap(context.Background(), nil, &lokiStatusUnused)
assert.NotEqual(old.Data, nEw.Data)
old = nEw
// update status user cert
loki.LokiManualParams.StatusTLS.UserCert = ptr.Deref(helper.DefaultCertificateReference("sec-name", ""), flowslatest.CertificateReference{})
builder = getBuilder(&spec, &loki)
- nEw, _, _ = builder.configMap(context.Background(), nil, lokiStatusUnused)
+ nEw, _, _ = builder.configMap(context.Background(), nil, &lokiStatusUnused)
assert.NotEqual(old.Data, nEw.Data)
}
@@ -286,8 +286,8 @@ func TestConfigMapUpdateWithLokistackMode(t *testing.T) {
loki := helper.NewLokiConfig(&lokiSpec, "any")
spec := flowslatest.FlowCollectorSpec{ConsolePlugin: plugin, Loki: lokiSpec}
builder := getBuilder(&spec, &loki)
- old, _, _ := builder.configMap(context.Background(), nil, lokiStatusUnused)
- nEw, _, _ := builder.configMap(context.Background(), nil, lokiStatusUnused)
+ old, _, _ := builder.configMap(context.Background(), nil, &lokiStatusUnused)
+ nEw, _, _ := builder.configMap(context.Background(), nil, &lokiStatusUnused)
assert.Equal(old.Data, nEw.Data)
// update lokistack name
@@ -296,7 +296,7 @@ func TestConfigMapUpdateWithLokistackMode(t *testing.T) {
spec = flowslatest.FlowCollectorSpec{ConsolePlugin: plugin, Loki: lokiSpec}
builder = getBuilder(&spec, &loki)
- nEw, _, _ = builder.configMap(context.Background(), nil, lokiStatusUnused)
+ nEw, _, _ = builder.configMap(context.Background(), nil, &lokiStatusUnused)
assert.NotEqual(old.Data, nEw.Data)
old = nEw
@@ -306,7 +306,7 @@ func TestConfigMapUpdateWithLokistackMode(t *testing.T) {
spec = flowslatest.FlowCollectorSpec{ConsolePlugin: plugin, Loki: lokiSpec}
builder = getBuilder(&spec, &loki)
- nEw, _, _ = builder.configMap(context.Background(), nil, lokiStatusUnused)
+ nEw, _, _ = builder.configMap(context.Background(), nil, &lokiStatusUnused)
assert.NotEqual(old.Data, nEw.Data)
}
@@ -331,7 +331,7 @@ func TestConfigMapContent(t *testing.T) {
Processor: flowslatest.FlowCollectorFLP{SubnetLabels: flowslatest.SubnetLabels{OpenShiftAutoDetect: ptr.To(false)}},
}
builder := getBuilder(&spec, &loki)
- cm, _, err := builder.configMap(context.Background(), nil, lokiStatusUnused)
+ cm, _, err := builder.configMap(context.Background(), nil, &lokiStatusUnused)
assert.NotNil(cm)
assert.Nil(err)
@@ -373,7 +373,7 @@ func TestConfigMapExternalRecordingAnnotations(t *testing.T) {
"netobserv_io_network_health": `{"recordingThresholds":{"info":"10"}}`,
},
}
- cm, _, err := builder.configMap(context.Background(), external, lokiStatusUnused)
+ cm, _, err := builder.configMap(context.Background(), external, &lokiStatusUnused)
assert.NotNil(cm)
assert.NoError(err)
@@ -530,7 +530,7 @@ func TestLokiStackStatusEmbedding(t *testing.T) {
Name: status.LokiStack,
Status: status.StatusReady,
}
- cm, _, err := builder.configMap(context.Background(), nil, lokiStackReady)
+ cm, _, err := builder.configMap(context.Background(), nil, &lokiStackReady)
assert.Nil(err)
assert.NotNil(cm)
@@ -546,7 +546,7 @@ func TestLokiStackStatusEmbedding(t *testing.T) {
Status: status.StatusInProgress,
Reason: "PendingComponents",
}
- cm, _, err = builder.configMap(context.Background(), nil, lokiStackPending)
+ cm, _, err = builder.configMap(context.Background(), nil, &lokiStackPending)
assert.Nil(err)
assert.NotNil(cm)
@@ -556,7 +556,7 @@ func TestLokiStackStatusEmbedding(t *testing.T) {
assert.Empty(cfg.Loki.StatusURL)
// Test 3: No LokiStack provided (nil)
- cm, _, err = builder.configMap(context.Background(), nil, lokiStatusUnused)
+ cm, _, err = builder.configMap(context.Background(), nil, &lokiStatusUnused)
assert.Nil(err)
assert.NotNil(cm)
@@ -617,7 +617,7 @@ func TestLokiStackNotFoundBehavior(t *testing.T) {
// Test behavior when LokiStack is not found (nil is passed)
// This simulates the reconciler behavior when Get() returns NotFound
- cm, digest, err := builder.configMap(context.Background(), nil, lokiStatusUnused)
+ cm, digest, err := builder.configMap(context.Background(), nil, &lokiStatusUnused)
// ConfigMap should still be created successfully
assert.Nil(err)
diff --git a/internal/controller/demoloki/demoloki_objects.go b/internal/controller/demoloki/demoloki_objects.go
index bbc1ce30cb..560df8a12e 100644
--- a/internal/controller/demoloki/demoloki_objects.go
+++ b/internal/controller/demoloki/demoloki_objects.go
@@ -42,7 +42,8 @@ func newBuilder(info *reconcilers.Instance, desired *flowslatest.FlowCollectorSp
return builder{
info: info,
labels: map[string]string{
- "app": name,
+ "part-of": constants.OperatorName,
+ "app": name,
},
selector: map[string]string{
"app": name,
diff --git a/internal/controller/ebpf/agent_controller.go b/internal/controller/ebpf/agent_controller.go
index 3a83bc08b9..90e0102499 100644
--- a/internal/controller/ebpf/agent_controller.go
+++ b/internal/controller/ebpf/agent_controller.go
@@ -3,6 +3,7 @@ package ebpf
import (
"context"
"encoding/json"
+ stderrors "errors"
"fmt"
"strconv"
"strings"
@@ -143,9 +144,13 @@ func (c *AgentController) Reconcile(ctx context.Context, target *flowslatest.Flo
err := c.reconcile(ctx, target)
if err != nil {
rlog.Error(err, "AgentController reconcile failure")
- // Set status failure unless it was already set
if !c.Status.HasFailure() {
- c.Status.SetFailure("AgentControllerError", err.Error())
+ reason := "AgentControllerError"
+ var ke *reconcilers.KafkaError
+ if stderrors.As(err, &ke) {
+ reason = "AgentKafkaError"
+ }
+ c.Status.SetFailure(reason, err.Error())
}
return err
}
@@ -204,7 +209,7 @@ func (c *AgentController) reconcile(ctx context.Context, target *flowslatest.Flo
err = c.UpdateIfOwned(ctx, current, desired)
default:
rlog.Info("action: nothing to do")
- c.Status.CheckDaemonSetProgress(current)
+ c.Status.CheckDaemonSetHealth(ctx, c.Client, current)
}
if err != nil {
@@ -308,6 +313,7 @@ func (c *AgentController) desired(ctx context.Context, coll *flowslatest.FlowCol
if coll.Spec.Agent.EBPF.IsAgentFeatureEnabled(flowslatest.PacketDrop) && !coll.Spec.Agent.EBPF.IsEbpfManagerEnabled() {
if !coll.Spec.Agent.EBPF.Privileged {
rlog.Error(fmt.Errorf("invalid configuration"), "To use PacketsDrop feature privileged mode needs to be enabled")
+ c.Status.SetDegraded("InvalidConfiguration", "PacketDrop feature requires privileged mode")
} else {
volume := corev1.Volume{
Name: bpfTraceMountName,
@@ -332,6 +338,7 @@ func (c *AgentController) desired(ctx context.Context, coll *flowslatest.FlowCol
coll.Spec.Agent.EBPF.IsAgentFeatureEnabled(flowslatest.UDNMapping) {
if !coll.Spec.Agent.EBPF.Privileged {
rlog.Error(fmt.Errorf("invalid configuration"), "To use NetworkEvents or UDNMapping features, privileged mode needs to be enabled")
+ c.Status.SetDegraded("InvalidConfiguration", "NetworkEvents/UDNMapping features require privileged mode")
} else {
hostPath := advancedConfig.Env[envOVNObservHostMountPath]
if hostPath == "" {
@@ -463,7 +470,7 @@ func (c *AgentController) envConfig(ctx context.Context, coll *flowslatest.FlowC
// If user cert is provided, it will use mTLS. Else, simple TLS (the userDigest and paths will be empty)
caDigest, userDigest, err := c.Watcher.ProcessMTLSCerts(ctx, c.Client, &coll.Spec.Kafka.TLS, c.PrivilegedNamespace())
if err != nil {
- return nil, err
+ return nil, reconcilers.WrapKafkaError(err)
}
annots[watchers.Annotation("kafka-ca")] = caDigest
annots[watchers.Annotation("kafka-user")] = userDigest
@@ -482,7 +489,7 @@ func (c *AgentController) envConfig(ctx context.Context, coll *flowslatest.FlowC
// Annotate pod with secret reference so that it is reloaded if modified
d1, d2, err := c.Watcher.ProcessSASL(ctx, c.Client, sasl, c.PrivilegedNamespace())
if err != nil {
- return nil, err
+ return nil, reconcilers.WrapKafkaError(err)
}
annots[watchers.Annotation("kafka-sd1")] = d1
annots[watchers.Annotation("kafka-sd2")] = d2
diff --git a/internal/controller/flowcollector_controller.go b/internal/controller/flowcollector_controller.go
index e82dba9af1..8555fc2c2b 100644
--- a/internal/controller/flowcollector_controller.go
+++ b/internal/controller/flowcollector_controller.go
@@ -3,6 +3,7 @@ package controllers
import (
"context"
"fmt"
+ "time"
osv1 "github.com/openshift/api/console/v1"
securityv1 "github.com/openshift/api/security/v1"
@@ -135,6 +136,9 @@ func (r *FlowCollectorReconciler) Reconcile(ctx context.Context, _ ctrl.Request)
}
r.status.SetReady()
+ if r.mgr.Status.NeedsRequeue() {
+ return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
+ }
return ctrl.Result{}, nil
}
@@ -185,7 +189,7 @@ func (r *FlowCollectorReconciler) reconcile(ctx context.Context, clh *helper.Cli
}
// Console plugin
- if err := cpReconciler.Reconcile(ctx, desired, lokiStatus); err != nil {
+ if err := cpReconciler.Reconcile(ctx, desired, &lokiStatus); err != nil {
return err
}
diff --git a/internal/controller/flp/flp_controller.go b/internal/controller/flp/flp_controller.go
index fff9bda7fa..2f435cf774 100644
--- a/internal/controller/flp/flp_controller.go
+++ b/internal/controller/flp/flp_controller.go
@@ -2,9 +2,11 @@ package flp
import (
"context"
+ "errors"
"fmt"
"slices"
"strings"
+ "time"
flowslatest "github.com/netobserv/netobserv-operator/api/flowcollector/v1beta2"
sliceslatest "github.com/netobserv/netobserv-operator/api/flowcollectorslice/v1alpha1"
@@ -106,14 +108,21 @@ func (r *Reconciler) Reconcile(ctx context.Context, _ ctrl.Request) (ctrl.Result
err = r.reconcile(ctx, clh, fc)
if err != nil {
l.Error(err, "FLP reconcile failure")
- // Set status failure unless it was already set
if !r.status.HasFailure() {
- r.status.SetFailure("FLPError", err.Error())
+ reason := "FLPError"
+ var ke *reconcilers.KafkaError
+ if errors.As(err, &ke) {
+ reason = "FLPKafkaError"
+ }
+ r.status.SetFailure(reason, err.Error())
}
return ctrl.Result{}, err
}
r.status.SetReady()
+ if r.mgr.Status.NeedsRequeue() {
+ return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
+ }
return ctrl.Result{}, nil
}
@@ -135,6 +144,7 @@ func (r *Reconciler) reconcile(ctx context.Context, clh *helper.Client, fc *flow
subnetLabels, err = r.getOpenShiftSubnets(ctx)
if err != nil {
log.Error(err, "error while reading subnet definitions")
+ r.status.SetDegraded("SubnetDetectionError", fmt.Sprintf("subnet auto-detect failed: %v", err))
}
}
@@ -183,9 +193,23 @@ func (r *Reconciler) reconcile(ctx context.Context, clh *helper.Client, fc *flow
}
}
+ // Track exporter status
+ r.updateExporterStatuses(fc)
+
return nil
}
+func (r *Reconciler) updateExporterStatuses(fc *flowslatest.FlowCollector) {
+ r.mgr.Status.ClearExporters()
+ for i, exp := range fc.Spec.Exporters {
+ if exp == nil {
+ continue
+ }
+ name := fmt.Sprintf("%s-export-%d", strings.ToLower(string(exp.Type)), i)
+ r.mgr.Status.SetExporterStatus(name, string(exp.Type), string(status.StatusReady), "Configured", "")
+ }
+}
+
func (r *Reconciler) newCommonInfo(clh *helper.Client, ns string, loki *helper.LokiConfig) reconcilers.Common {
return reconcilers.Common{
Client: *clh,
@@ -211,7 +235,7 @@ func annotateKafkaExporterCerts(ctx context.Context, info *reconcilers.Common, e
func annotateKafkaCerts(ctx context.Context, info *reconcilers.Common, spec *flowslatest.FlowCollectorKafka, prefix string, annotations map[string]string) error {
caDigest, userDigest, err := info.Watcher.ProcessMTLSCerts(ctx, info.Client, &spec.TLS, info.Namespace)
if err != nil {
- return err
+ return reconcilers.WrapKafkaError(err)
}
if caDigest != "" {
annotations[watchers.Annotation(prefix+"-ca")] = caDigest
@@ -222,7 +246,7 @@ func annotateKafkaCerts(ctx context.Context, info *reconcilers.Common, spec *flo
if spec.SASL.UseSASL() {
saslDigest1, saslDigest2, err := info.Watcher.ProcessSASL(ctx, info.Client, &spec.SASL, info.Namespace)
if err != nil {
- return err
+ return reconcilers.WrapKafkaError(err)
}
if saslDigest1 != "" {
annotations[watchers.Annotation(prefix+"-sd1")] = saslDigest1
diff --git a/internal/controller/lokistack/lokistack_watcher_test.go b/internal/controller/lokistack/lokistack_watcher_test.go
index 259daa6f51..f0c739d6f0 100644
--- a/internal/controller/lokistack/lokistack_watcher_test.go
+++ b/internal/controller/lokistack/lokistack_watcher_test.go
@@ -58,7 +58,7 @@ func TestCheckLoki_Disabled(t *testing.T) {
assert.Equal(t, status.ComponentStatus{
Name: status.LokiStack,
- Status: status.StatusUnknown,
+ Status: status.StatusUnused,
Reason: "ComponentUnused",
Message: "Loki is disabled",
}, st)
@@ -79,7 +79,7 @@ func TestCheckLoki_NotLokiStackMode(t *testing.T) {
assert.Equal(t, status.ComponentStatus{
Name: status.LokiStack,
- Status: status.StatusUnknown,
+ Status: status.StatusUnused,
Reason: "ComponentUnused",
Message: "Loki is not configured in LokiStack mode",
}, st)
diff --git a/internal/controller/monitoring/monitoring_controller.go b/internal/controller/monitoring/monitoring_controller.go
index 6dc363e9cf..e3a5e1a01d 100644
--- a/internal/controller/monitoring/monitoring_controller.go
+++ b/internal/controller/monitoring/monitoring_controller.go
@@ -121,51 +121,63 @@ func (r *Reconciler) reconcile(ctx context.Context, clh *helper.Client, desired
}
// Dashboards
+ if !r.mgr.ClusterInfo.IsOpenShift() {
+ log.Info("Non-OpenShift cluster: monitoring dashboards are not available")
+ } else if !r.mgr.ClusterInfo.HasSvcMonitor() {
+ log.Info("ServiceMonitor CRD not found: monitoring is limited (prometheus-operator not installed?)")
+ r.status.SetDegraded("ServiceMonitorMissing", "ServiceMonitor CRD not found; monitoring dashboards unavailable. Install prometheus-operator for full monitoring")
+ }
if r.mgr.ClusterInfo.IsOpenShift() && r.mgr.ClusterInfo.HasSvcMonitor() {
- // List custom metrics
- fm := metricslatest.FlowMetricList{}
- if err := r.Client.List(ctx, &fm, &client.ListOptions{Namespace: ns}); err != nil {
- return r.status.Error("CantListFlowMetrics", err)
+ if err := r.reconcileDashboards(ctx, clh, desired, ns); err != nil {
+ return err
}
- log.WithValues("items count", len(fm.Items)).Info("FlowMetrics loaded")
+ }
- allMetrics := metrics.MergePredefined(fm.Items, &desired.Spec)
- log.WithValues("metrics count", len(allMetrics)).Info("Merged metrics")
+ return nil
+}
- req, err := labels.NewRequirement("netobserv-managed", selection.Exists, []string{})
- if err != nil {
- return r.status.Error("CantQueryRequirement", err)
- }
- // List existing dashboards
- currentDashboards := corev1.ConfigMapList{}
- if err := r.Client.List(ctx, ¤tDashboards, &client.ListOptions{
- Namespace: dashboardCMNamespace,
- LabelSelector: labels.NewSelector().Add(*req),
- }); err != nil {
- return r.status.Error("CantListDashboards", err)
- }
+func (r *Reconciler) reconcileDashboards(ctx context.Context, clh *helper.Client, desired *flowslatest.FlowCollector, ns string) error {
+ log := log.FromContext(ctx)
- // Build desired dashboards
- cms := buildFlowMetricsDashboards(allMetrics)
- nsFlowsMetric := getNamespacedFlowsMetric(allMetrics)
- if desiredHealthDashboardCM, del, err := buildHealthDashboard(ns, nsFlowsMetric); err != nil {
- return err
- } else if !del {
- cms = append(cms, desiredHealthDashboardCM)
- }
+ fm := metricslatest.FlowMetricList{}
+ if err := r.Client.List(ctx, &fm, &client.ListOptions{Namespace: ns}); err != nil {
+ return r.status.Error("CantListFlowMetrics", err)
+ }
+ log.WithValues("items count", len(fm.Items)).Info("FlowMetrics loaded")
- for _, cm := range cms {
- current := findAndRemoveConfigMapFromList(¤tDashboards, cm.Name)
- if err := reconcilers.ReconcileConfigMap(ctx, clh, current, cm); err != nil {
- return err
- }
+ allMetrics := metrics.MergePredefined(fm.Items, &desired.Spec)
+ log.WithValues("metrics count", len(allMetrics)).Info("Merged metrics")
+
+ req, err := labels.NewRequirement("netobserv-managed", selection.Exists, []string{})
+ if err != nil {
+ return r.status.Error("CantQueryRequirement", err)
+ }
+ currentDashboards := corev1.ConfigMapList{}
+ if err := r.Client.List(ctx, ¤tDashboards, &client.ListOptions{
+ Namespace: dashboardCMNamespace,
+ LabelSelector: labels.NewSelector().Add(*req),
+ }); err != nil {
+ return r.status.Error("CantListDashboards", err)
+ }
+
+ cms := buildFlowMetricsDashboards(allMetrics)
+ nsFlowsMetric := getNamespacedFlowsMetric(allMetrics)
+ if desiredHealthDashboardCM, del, err := buildHealthDashboard(ns, nsFlowsMetric); err != nil {
+ return err
+ } else if !del {
+ cms = append(cms, desiredHealthDashboardCM)
+ }
+
+ for _, cm := range cms {
+ current := findAndRemoveConfigMapFromList(¤tDashboards, cm.Name)
+ if err := reconcilers.ReconcileConfigMap(ctx, clh, current, cm); err != nil {
+ return err
}
+ }
- // Delete any CM that remained in currentDashboards list
- for i := range currentDashboards.Items {
- if err := reconcilers.ReconcileConfigMap(ctx, clh, ¤tDashboards.Items[i], nil); err != nil {
- return err
- }
+ for i := range currentDashboards.Items {
+ if err := reconcilers.ReconcileConfigMap(ctx, clh, ¤tDashboards.Items[i], nil); err != nil {
+ return err
}
}
diff --git a/internal/controller/reconcilers/errors.go b/internal/controller/reconcilers/errors.go
new file mode 100644
index 0000000000..df486baa44
--- /dev/null
+++ b/internal/controller/reconcilers/errors.go
@@ -0,0 +1,16 @@
+package reconcilers
+
+// KafkaError wraps an error that originates from Kafka-specific configuration
+// (TLS certificates, SASL credentials). Use errors.As to detect it at the
+// top-level Reconcile and set an appropriate status reason.
+type KafkaError struct{ Err error }
+
+func (e *KafkaError) Error() string { return e.Err.Error() }
+func (e *KafkaError) Unwrap() error { return e.Err }
+
+func WrapKafkaError(err error) error {
+ if err == nil {
+ return nil
+ }
+ return &KafkaError{Err: err}
+}
diff --git a/internal/controller/reconcilers/errors_test.go b/internal/controller/reconcilers/errors_test.go
new file mode 100644
index 0000000000..12bcc905d5
--- /dev/null
+++ b/internal/controller/reconcilers/errors_test.go
@@ -0,0 +1,40 @@
+package reconcilers
+
+import (
+ "errors"
+ "fmt"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestWrapKafkaErrorNil(t *testing.T) {
+ assert.Nil(t, WrapKafkaError(nil))
+}
+
+func TestKafkaErrorDetection(t *testing.T) {
+ original := fmt.Errorf("secret not found")
+ wrapped := WrapKafkaError(original)
+
+ var ke *KafkaError
+ require.True(t, errors.As(wrapped, &ke))
+ assert.Equal(t, "secret not found", ke.Error())
+ assert.Equal(t, original, errors.Unwrap(wrapped))
+}
+
+func TestKafkaErrorThroughWrapping(t *testing.T) {
+ original := fmt.Errorf("TLS cert missing")
+ wrapped := WrapKafkaError(original)
+ rewrapped := fmt.Errorf("processing config: %w", wrapped)
+
+ var ke *KafkaError
+ assert.True(t, errors.As(rewrapped, &ke), "errors.As should find KafkaError through fmt.Errorf wrapping")
+}
+
+func TestNonKafkaErrorNotDetected(t *testing.T) {
+ plain := fmt.Errorf("permission denied")
+
+ var ke *KafkaError
+ assert.False(t, errors.As(plain, &ke))
+}
diff --git a/internal/controller/reconcilers/reconcilers.go b/internal/controller/reconcilers/reconcilers.go
index 8de47ef4ab..3b77094fd7 100644
--- a/internal/controller/reconcilers/reconcilers.go
+++ b/internal/controller/reconcilers/reconcilers.go
@@ -122,25 +122,26 @@ func ReconcileConfigMap(ctx context.Context, cl *helper.Client, current, desired
return cl.UpdateIfOwned(ctx, current, desired)
}
-// returns true if ready, false if still in progress
+// ReconcileDaemonSet reconciles a DaemonSet and checks pod health when not ready.
func ReconcileDaemonSet(ctx context.Context, ci *Instance, old, n *appsv1.DaemonSet, containerName string, report *helper.ChangeReport) error {
if !ci.Managed.Exists(old) {
ci.Status.SetCreatingDaemonSet(n)
return ci.CreateOwned(ctx, n)
}
- ci.Status.CheckDaemonSetProgress(old)
+ ci.Status.CheckDaemonSetHealth(ctx, ci.Client, old)
if helper.PodChanged(&old.Spec.Template, &n.Spec.Template, containerName, report) {
return ci.UpdateIfOwned(ctx, old, n)
}
return nil
}
+// ReconcileDeployment reconciles a Deployment and checks pod health when not ready.
func ReconcileDeployment(ctx context.Context, ci *Instance, old, n *appsv1.Deployment, containerName string, ignoreReplicas bool, report *helper.ChangeReport) error {
if !ci.Managed.Exists(old) {
ci.Status.SetCreatingDeployment(n)
return ci.CreateOwned(ctx, n)
}
- ci.Status.CheckDeploymentProgress(old)
+ ci.Status.CheckDeploymentHealth(ctx, ci.Client, old)
if ignoreReplicas {
n.Spec.Replicas = old.Spec.Replicas
}
diff --git a/internal/pkg/manager/manager.go b/internal/pkg/manager/manager.go
index c0efbdd850..052e0aaf67 100644
--- a/internal/pkg/manager/manager.go
+++ b/internal/pkg/manager/manager.go
@@ -5,13 +5,17 @@ import (
"fmt"
flowslatest "github.com/netobserv/netobserv-operator/api/flowcollector/v1beta2"
+ "github.com/netobserv/netobserv-operator/internal/controller/constants"
"github.com/netobserv/netobserv-operator/internal/pkg/cluster"
"github.com/netobserv/netobserv-operator/internal/pkg/manager/status"
"github.com/netobserv/netobserv-operator/internal/pkg/migrator"
"github.com/netobserv/netobserv-operator/internal/pkg/narrowcache"
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
+ "sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
@@ -19,6 +23,7 @@ import (
//+kubebuilder:rbac:groups=core,resources=namespaces;services;serviceaccounts;configmaps;persistentvolumeclaims;secrets,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=pods;nodes;endpoints,verbs=get;list;watch
+//+kubebuilder:rbac:groups=core,resources=events,verbs=create;patch
//+kubebuilder:rbac:groups=apps,resources=deployments;daemonsets,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=apps,resources=replicasets,verbs=get;list;watch
//+kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=clusterrolebindings;rolebindings,verbs=get;list;create;delete;update;watch
@@ -85,6 +90,13 @@ func NewManager(
narrowcache.EndpointSlices,
)
opts.Client = client.Options{Cache: narrowCache.ControllerRuntimeClientCacheOptions()}
+ opts.Cache = cache.Options{
+ ByObject: map[client.Object]cache.ByObject{
+ &corev1.Pod{}: {
+ Label: labels.SelectorFromSet(map[string]string{"part-of": constants.OperatorName}),
+ },
+ },
+ }
internalManager, err := ctrl.NewManager(kcfg, *opts)
if err != nil {
@@ -96,6 +108,7 @@ func NewManager(
}
statusMgr := status.NewManager()
+ statusMgr.SetEventRecorder(internalManager.GetEventRecorderFor("flowcollector-controller")) //nolint:staticcheck
log.Info("Discovering APIs")
dc, err := discovery.NewDiscoveryClientForConfig(kcfg)
diff --git a/internal/pkg/manager/status/pod_health.go b/internal/pkg/manager/status/pod_health.go
new file mode 100644
index 0000000000..825d35e75b
--- /dev/null
+++ b/internal/pkg/manager/status/pod_health.go
@@ -0,0 +1,164 @@
+package status
+
+import (
+ "context"
+ "fmt"
+ "sort"
+ "strings"
+
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/labels"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/log"
+)
+
+const maxPodNamesInSummary = 5
+
+// PodHealthSummary holds aggregated pod health information for a workload.
+type PodHealthSummary struct {
+ UnhealthyCount int32
+ Issues string
+}
+
+// CheckPodHealth lists pods matching the given label selector in the given namespace,
+// inspects container statuses, and returns a summary of unhealthy pods.
+// This is intended to be called only when a workload reports not-ready replicas,
+// to avoid unnecessary API calls when everything is healthy.
+func CheckPodHealth(ctx context.Context, c client.Client, namespace string, matchLabels map[string]string) PodHealthSummary {
+ rlog := log.FromContext(ctx)
+
+ podList := corev1.PodList{}
+ if err := c.List(ctx, &podList, &client.ListOptions{
+ Namespace: namespace,
+ LabelSelector: labels.SelectorFromSet(matchLabels),
+ }); err != nil {
+ rlog.Error(err, "Failed to list pods for health check")
+ return PodHealthSummary{}
+ }
+
+ type issueGroup struct {
+ reason string
+ podNames []string
+ sample string
+ }
+ groups := make(map[string]*issueGroup)
+ var unhealthyCount int32
+
+ for i := range podList.Items {
+ pod := &podList.Items[i]
+ reason, msg := classifyPodIssue(pod)
+ if reason == "" {
+ continue
+ }
+ unhealthyCount++
+ g, ok := groups[reason]
+ if !ok {
+ g = &issueGroup{reason: reason}
+ groups[reason] = g
+ }
+ g.podNames = append(g.podNames, pod.Name)
+ if g.sample == "" && msg != "" {
+ g.sample = msg
+ }
+ }
+
+ if unhealthyCount == 0 {
+ return PodHealthSummary{}
+ }
+
+ sortedReasons := make([]string, 0, len(groups))
+ for reason := range groups {
+ sortedReasons = append(sortedReasons, reason)
+ }
+ sort.Strings(sortedReasons)
+
+ var parts []string
+ for _, reason := range sortedReasons {
+ g := groups[reason]
+ count := len(g.podNames)
+ names := g.podNames
+ if len(names) > maxPodNamesInSummary {
+ names = names[:maxPodNamesInSummary]
+ }
+ part := fmt.Sprintf("%d %s (%s)", count, g.reason, strings.Join(names, ", "))
+ if len(g.podNames) > maxPodNamesInSummary {
+ part += fmt.Sprintf(" and %d more", count-maxPodNamesInSummary)
+ }
+ if g.sample != "" {
+ part += ": " + truncateMessage(g.sample, 200)
+ }
+ parts = append(parts, part)
+ }
+
+ return PodHealthSummary{
+ UnhealthyCount: unhealthyCount,
+ Issues: strings.Join(parts, "; "),
+ }
+}
+
+// classifyPodIssue returns a reason and message if the pod is unhealthy, or empty strings if healthy.
+func classifyPodIssue(pod *corev1.Pod) (string, string) {
+ for i := range pod.Status.ContainerStatuses {
+ cs := &pod.Status.ContainerStatuses[i]
+ if cs.State.Waiting != nil {
+ switch cs.State.Waiting.Reason {
+ case "CrashLoopBackOff":
+ msg := extractTerminationMessage(cs)
+ return "CrashLoopBackOff", msg
+ case "ImagePullBackOff", "ErrImagePull":
+ return "ImagePullError", cs.State.Waiting.Message
+ case "ContainerCreating", "PodInitializing":
+ // Normal transient states, not an issue
+ case "":
+ // No reason set yet
+ default:
+ return cs.State.Waiting.Reason, cs.State.Waiting.Message
+ }
+ }
+
+ if cs.LastTerminationState.Terminated != nil {
+ t := cs.LastTerminationState.Terminated
+ if t.Reason == "OOMKilled" {
+ return "OOMKilled", t.Message
+ }
+ }
+
+ if cs.RestartCount > 10 && cs.State.Running != nil {
+ msg := extractTerminationMessage(cs)
+ return "FrequentRestarts", msg
+ }
+ }
+
+ if pod.Status.Phase == corev1.PodFailed {
+ return "PodFailed", pod.Status.Message
+ }
+
+ if pod.Status.Phase == corev1.PodPending {
+ for i := range pod.Status.Conditions {
+ c := &pod.Status.Conditions[i]
+ if c.Type == corev1.PodScheduled && c.Status == corev1.ConditionFalse {
+ return "PendingScheduling", c.Message
+ }
+ }
+ }
+
+ return "", ""
+}
+
+func extractTerminationMessage(cs *corev1.ContainerStatus) string {
+ if cs.LastTerminationState.Terminated != nil {
+ msg := cs.LastTerminationState.Terminated.Message
+ if msg != "" {
+ return msg
+ }
+ return cs.LastTerminationState.Terminated.Reason
+ }
+ return ""
+}
+
+func truncateMessage(msg string, maxLen int) string {
+ if len(msg) <= maxLen {
+ return msg
+ }
+ return msg[:maxLen] + "..."
+}
diff --git a/internal/pkg/manager/status/pod_health_test.go b/internal/pkg/manager/status/pod_health_test.go
new file mode 100644
index 0000000000..35df359e78
--- /dev/null
+++ b/internal/pkg/manager/status/pod_health_test.go
@@ -0,0 +1,154 @@
+package status
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ corev1 "k8s.io/api/core/v1"
+)
+
+func TestClassifyPodIssue_Healthy(t *testing.T) {
+ pod := &corev1.Pod{
+ Status: corev1.PodStatus{
+ Phase: corev1.PodRunning,
+ ContainerStatuses: []corev1.ContainerStatus{{
+ State: corev1.ContainerState{Running: &corev1.ContainerStateRunning{}},
+ }},
+ },
+ }
+ reason, msg := classifyPodIssue(pod)
+ assert.Empty(t, reason)
+ assert.Empty(t, msg)
+}
+
+func TestClassifyPodIssue_CrashLoopBackOff(t *testing.T) {
+ pod := &corev1.Pod{
+ Status: corev1.PodStatus{
+ ContainerStatuses: []corev1.ContainerStatus{{
+ State: corev1.ContainerState{
+ Waiting: &corev1.ContainerStateWaiting{
+ Reason: "CrashLoopBackOff",
+ Message: "back-off 5m0s restarting failed container",
+ },
+ },
+ LastTerminationState: corev1.ContainerState{
+ Terminated: &corev1.ContainerStateTerminated{
+ Reason: "Error",
+ Message: "can't write messages into Kafka",
+ },
+ },
+ }},
+ },
+ }
+ reason, msg := classifyPodIssue(pod)
+ assert.Equal(t, "CrashLoopBackOff", reason)
+ assert.Equal(t, "can't write messages into Kafka", msg)
+}
+
+func TestClassifyPodIssue_OOMKilled(t *testing.T) {
+ pod := &corev1.Pod{
+ Status: corev1.PodStatus{
+ ContainerStatuses: []corev1.ContainerStatus{{
+ State: corev1.ContainerState{Running: &corev1.ContainerStateRunning{}},
+ LastTerminationState: corev1.ContainerState{
+ Terminated: &corev1.ContainerStateTerminated{
+ Reason: "OOMKilled",
+ },
+ },
+ }},
+ },
+ }
+ reason, msg := classifyPodIssue(pod)
+ assert.Equal(t, "OOMKilled", reason)
+ assert.Empty(t, msg)
+}
+
+func TestClassifyPodIssue_ImagePull(t *testing.T) {
+ pod := &corev1.Pod{
+ Status: corev1.PodStatus{
+ ContainerStatuses: []corev1.ContainerStatus{{
+ State: corev1.ContainerState{
+ Waiting: &corev1.ContainerStateWaiting{
+ Reason: "ImagePullBackOff",
+ Message: "Back-off pulling image",
+ },
+ },
+ }},
+ },
+ }
+ reason, msg := classifyPodIssue(pod)
+ assert.Equal(t, "ImagePullError", reason)
+ assert.Equal(t, "Back-off pulling image", msg)
+}
+
+func TestClassifyPodIssue_FrequentRestarts(t *testing.T) {
+ pod := &corev1.Pod{
+ Status: corev1.PodStatus{
+ ContainerStatuses: []corev1.ContainerStatus{{
+ State: corev1.ContainerState{Running: &corev1.ContainerStateRunning{}},
+ RestartCount: 15,
+ LastTerminationState: corev1.ContainerState{
+ Terminated: &corev1.ContainerStateTerminated{
+ Reason: "Error",
+ Message: "exit code 1",
+ },
+ },
+ }},
+ },
+ }
+ reason, msg := classifyPodIssue(pod)
+ assert.Equal(t, "FrequentRestarts", reason)
+ assert.Equal(t, "exit code 1", msg)
+}
+
+func TestClassifyPodIssue_PodFailed(t *testing.T) {
+ pod := &corev1.Pod{
+ Status: corev1.PodStatus{
+ Phase: corev1.PodFailed,
+ Message: "Pod exceeded memory",
+ },
+ }
+ reason, msg := classifyPodIssue(pod)
+ assert.Equal(t, "PodFailed", reason)
+ assert.Equal(t, "Pod exceeded memory", msg)
+}
+
+func TestClassifyPodIssue_PendingScheduling(t *testing.T) {
+ pod := &corev1.Pod{
+ Status: corev1.PodStatus{
+ Phase: corev1.PodPending,
+ Conditions: []corev1.PodCondition{{
+ Type: corev1.PodScheduled,
+ Status: corev1.ConditionFalse,
+ Reason: "Unschedulable",
+ Message: "0/3 nodes are available: insufficient memory",
+ }},
+ },
+ }
+ reason, msg := classifyPodIssue(pod)
+ assert.Equal(t, "PendingScheduling", reason)
+ assert.Equal(t, "0/3 nodes are available: insufficient memory", msg)
+}
+
+func TestClassifyPodIssue_PendingButScheduled(t *testing.T) {
+ pod := &corev1.Pod{
+ Status: corev1.PodStatus{
+ Phase: corev1.PodPending,
+ Conditions: []corev1.PodCondition{{
+ Type: corev1.PodScheduled,
+ Status: corev1.ConditionTrue,
+ }},
+ },
+ }
+ reason, msg := classifyPodIssue(pod)
+ assert.Empty(t, reason)
+ assert.Empty(t, msg)
+}
+
+func TestTruncateMessage(t *testing.T) {
+ short := "short message"
+ assert.Equal(t, short, truncateMessage(short, 100))
+
+ long := "a very long message that exceeds the limit"
+ assert.Equal(t, "a very ...", truncateMessage(long, 7))
+}
diff --git a/internal/pkg/manager/status/status_manager.go b/internal/pkg/manager/status/status_manager.go
index 86dd5d6f05..e0754c3b57 100644
--- a/internal/pkg/manager/status/status_manager.go
+++ b/internal/pkg/manager/status/status_manager.go
@@ -13,7 +13,9 @@ import (
kerr "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
+ "k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)
@@ -36,13 +38,21 @@ const (
)
type Manager struct {
- statuses sync.Map
+ statuses sync.Map
+ exporters sync.Map
+ eventRecorder record.EventRecorder
+ prevStatuses sync.Map
}
func NewManager() *Manager {
return &Manager{}
}
+// SetEventRecorder sets the EventRecorder for emitting Kubernetes Events.
+func (s *Manager) SetEventRecorder(recorder record.EventRecorder) {
+ s.eventRecorder = recorder
+}
+
func (s *Manager) getStatus(cpnt ComponentName) *ComponentStatus {
v, _ := s.statuses.Load(cpnt)
if v != nil {
@@ -54,30 +64,36 @@ func (s *Manager) getStatus(cpnt ComponentName) *ComponentStatus {
}
func (s *Manager) setInProgress(cpnt ComponentName, reason, message string) {
- s.statuses.Store(cpnt, ComponentStatus{
- Name: cpnt,
- Status: StatusInProgress,
- Reason: reason,
- Message: message,
- })
+ cs := s.preserveReplicas(cpnt)
+ cs.Status = StatusInProgress
+ cs.Reason = reason
+ cs.Message = message
+ s.statuses.Store(cpnt, cs)
}
func (s *Manager) setFailure(cpnt ComponentName, reason, message string) {
- s.statuses.Store(cpnt, ComponentStatus{
- Name: cpnt,
- Status: StatusFailure,
- Reason: reason,
- Message: message,
- })
+ cs := s.preserveReplicas(cpnt)
+ cs.Status = StatusFailure
+ cs.Reason = reason
+ cs.Message = message
+ s.statuses.Store(cpnt, cs)
}
func (s *Manager) setDegraded(cpnt ComponentName, reason, message string) {
- s.statuses.Store(cpnt, ComponentStatus{
- Name: cpnt,
- Status: StatusDegraded,
- Reason: reason,
- Message: message,
- })
+ cs := s.preserveReplicas(cpnt)
+ cs.Status = StatusDegraded
+ cs.Reason = reason
+ cs.Message = message
+ s.statuses.Store(cpnt, cs)
+}
+
+func (s *Manager) preserveReplicas(cpnt ComponentName) ComponentStatus {
+ cs := ComponentStatus{Name: cpnt}
+ if existing := s.getStatus(cpnt); existing != nil {
+ cs.DesiredReplicas = existing.DesiredReplicas
+ cs.ReadyReplicas = existing.ReadyReplicas
+ }
+ return cs
}
func (s *Manager) hasFailure(cpnt ComponentName) bool {
@@ -86,10 +102,16 @@ func (s *Manager) hasFailure(cpnt ComponentName) bool {
}
func (s *Manager) setReady(cpnt ComponentName) {
- s.statuses.Store(cpnt, ComponentStatus{
+ existing := s.getStatus(cpnt)
+ cs := ComponentStatus{
Name: cpnt,
Status: StatusReady,
- })
+ }
+ if existing != nil {
+ cs.DesiredReplicas = existing.DesiredReplicas
+ cs.ReadyReplicas = existing.ReadyReplicas
+ }
+ s.statuses.Store(cpnt, cs)
}
func (s *Manager) setUnknown(cpnt ComponentName) {
@@ -102,7 +124,7 @@ func (s *Manager) setUnknown(cpnt ComponentName) {
func (s *Manager) setUnused(cpnt ComponentName, message string) {
s.statuses.Store(cpnt, ComponentStatus{
Name: cpnt,
- Status: StatusUnknown,
+ Status: StatusUnused,
Reason: "ComponentUnused",
Message: message,
})
@@ -135,32 +157,153 @@ func (s *Manager) getConditions() []metav1.Condition {
return append([]metav1.Condition{global}, conds...)
}
+// populateComponentStatuses maps internal ComponentStatus instances to the CRD status fields.
+// Always start fresh to avoid stale data from a previous API server fetch influencing the merge.
+func (s *Manager) populateComponentStatuses(fc *flowslatest.FlowCollector) {
+ fc.Status.Components = &flowslatest.FlowCollectorComponentsStatus{}
+ fc.Status.Integrations = &flowslatest.FlowCollectorIntegrationsStatus{}
+
+ s.statuses.Range(func(_, v any) bool {
+ cs := v.(ComponentStatus)
+ switch cs.Name {
+ case EBPFAgents:
+ fc.Status.Components.Agent = cs.toCRDStatus()
+ case FLPParent, FLPMonolith, FLPTransformer:
+ fc.Status.Components.Processor = mergeProcessorStatus(fc.Status.Components.Processor, &cs)
+ case WebConsole:
+ fc.Status.Components.Plugin = cs.toCRDStatus()
+ case Monitoring:
+ fc.Status.Integrations.Monitoring = cs.toCRDStatus()
+ case LokiStack, DemoLoki:
+ existingIsWeak := fc.Status.Integrations.Loki == nil ||
+ fc.Status.Integrations.Loki.State == string(StatusUnknown) ||
+ fc.Status.Integrations.Loki.State == string(StatusUnused)
+ if existingIsWeak || cs.Status == StatusFailure || cs.Status == StatusDegraded || cs.Status == StatusReady {
+ fc.Status.Integrations.Loki = cs.toCRDStatus()
+ }
+ case FlowCollectorController, StaticController, NetworkPolicy:
+ // Reported only through conditions, not dedicated CRD status fields.
+ }
+ return true
+ })
+
+ var exporters []flowslatest.FlowCollectorExporterStatus
+ s.exporters.Range(func(_, v any) bool {
+ exp := v.(flowslatest.FlowCollectorExporterStatus)
+ exporters = append(exporters, exp)
+ return true
+ })
+ fc.Status.Integrations.Exporters = exporters
+}
+
+// mergeProcessorStatus handles FLP processor status aggregation from parent, monolith, and transformer.
+// Active sub-reconcilers (non-Unknown/Unused) take priority over the parent status.
+func mergeProcessorStatus(existing *flowslatest.FlowCollectorComponentStatus, cs *ComponentStatus) *flowslatest.FlowCollectorComponentStatus {
+ isInactive := cs.Status == StatusUnknown || cs.Status == StatusUnused
+ existingIsWeak := existing == nil ||
+ existing.State == string(StatusUnknown) ||
+ existing.State == string(StatusUnused)
+
+ if isInactive {
+ if existing == nil {
+ return cs.toCRDStatus()
+ }
+ return existing
+ }
+
+ if cs.Name == FLPParent {
+ if existingIsWeak {
+ return cs.toCRDStatus()
+ }
+ return existing
+ }
+
+ crd := cs.toCRDStatus()
+ if existingIsWeak || cs.Status == StatusFailure || cs.Status == StatusInProgress || cs.Status == StatusDegraded {
+ return crd
+ }
+ if existing.State == string(StatusReady) && crd.DesiredReplicas != nil {
+ existing.DesiredReplicas = crd.DesiredReplicas
+ existing.ReadyReplicas = crd.ReadyReplicas
+ existing.UnhealthyPodCount = crd.UnhealthyPodCount
+ existing.PodIssues = crd.PodIssues
+ }
+ return existing
+}
+
func (s *Manager) Sync(ctx context.Context, c client.Client) {
- updateStatus(ctx, c, s.getConditions()...)
+ s.updateStatus(ctx, c)
}
-func updateStatus(ctx context.Context, c client.Client, conditions ...metav1.Condition) {
- log := log.FromContext(ctx)
- log.Info("Updating FlowCollector status")
+func (s *Manager) emitStateTransitionEvents(ctx context.Context, fc *flowslatest.FlowCollector) {
+ if s.eventRecorder == nil {
+ return
+ }
+ rlog := log.FromContext(ctx)
+
+ s.statuses.Range(func(key, v any) bool {
+ cpnt := key.(ComponentName)
+ current := v.(ComponentStatus)
+
+ prev, hasPrev := s.prevStatuses.Load(cpnt)
+ s.prevStatuses.Store(cpnt, current)
+ if !hasPrev {
+ return true
+ }
+ prevStatus := prev.(ComponentStatus)
+ if prevStatus.Status == current.Status {
+ return true
+ }
+ switch {
+ case current.Status == StatusFailure:
+ msg := fmt.Sprintf("Component %s entered failure state: %s - %s", cpnt, current.Reason, current.Message)
+ s.eventRecorder.Event(fc, "Warning", "ComponentFailure", msg)
+ rlog.Info("Event emitted", "type", "ComponentFailure", "component", cpnt)
+ case current.Status == StatusDegraded:
+ msg := fmt.Sprintf("Component %s degraded: %s - %s", cpnt, current.Reason, current.Message)
+ s.eventRecorder.Event(fc, "Warning", "ComponentDegraded", msg)
+ case prevStatus.Status == StatusFailure && (current.Status == StatusReady || current.Status == StatusInProgress):
+ msg := fmt.Sprintf("Component %s recovered from failure", cpnt)
+ s.eventRecorder.Event(fc, "Normal", "ComponentRecovered", msg)
+ }
+ return true
+ })
+}
+
+func (s *Manager) updateStatus(ctx context.Context, c client.Client) {
+ rlog := log.FromContext(ctx)
+ rlog.Info("Updating FlowCollector status")
+
+ var updatedFC *flowslatest.FlowCollector
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
fc := flowslatest.FlowCollector{}
if err := c.Get(ctx, constants.FlowCollectorName, &fc); err != nil {
if kerr.IsNotFound(err) {
- // ignore: when it's being deleted, there's no point trying to update its status
return nil
}
return err
}
+ conditions := s.getConditions()
conditions = append(conditions, checkValidation(ctx, &fc))
- for _, c := range conditions {
- meta.SetStatusCondition(&fc.Status.Conditions, c)
+ if kafkaCond := s.GetKafkaCondition(); kafkaCond != nil {
+ conditions = append(conditions, *kafkaCond)
+ }
+ for _, cond := range conditions {
+ meta.SetStatusCondition(&fc.Status.Conditions, cond)
+ }
+ s.populateComponentStatuses(&fc)
+ if err := c.Status().Update(ctx, &fc); err != nil {
+ return err
}
- return c.Status().Update(ctx, &fc)
+ updatedFC = &fc
+ return nil
})
if err != nil {
- log.Error(err, "failed to update FlowCollector status")
+ rlog.Error(err, "failed to update FlowCollector status")
+ } else if updatedFC != nil {
+ s.emitStateTransitionEvents(ctx, updatedFC)
}
}
@@ -182,7 +325,6 @@ func checkValidation(ctx context.Context, fc *flowslatest.FlowCollector) metav1.
Message: strings.Join(warnings, "; "),
}
}
- // No issue
return metav1.Condition{
Type: ConditionConfigurationIssue,
Reason: "Valid",
@@ -190,6 +332,113 @@ func checkValidation(ctx context.Context, fc *flowslatest.FlowCollector) metav1.
}
}
+// GetKafkaCondition returns a KafkaReady condition if Kafka is being used.
+// It aggregates the health of Kafka-related components: agent (when using Kafka export),
+// FLP transformer (Kafka consumer), and any Kafka exporters.
+func (s *Manager) GetKafkaCondition() *metav1.Condition {
+ hasKafkaIssue := false
+ var messages []string
+
+ // Check transformer (only used with Kafka)
+ if ts := s.getStatus(FLPTransformer); ts != nil && ts.Status != StatusUnknown && ts.Status != StatusUnused {
+ if ts.Status == StatusFailure || ts.Status == StatusDegraded {
+ hasKafkaIssue = true
+ messages = append(messages, fmt.Sprintf("Transformer: %s", ts.Message))
+ }
+ if ts.PodHealth.UnhealthyCount > 0 {
+ hasKafkaIssue = true
+ messages = append(messages, fmt.Sprintf("Transformer pods: %s", ts.PodHealth.Issues))
+ }
+ }
+
+ // Check agent for Kafka-related pod issues
+ if as := s.getStatus(EBPFAgents); as != nil {
+ if as.PodHealth.UnhealthyCount > 0 && strings.Contains(strings.ToLower(as.PodHealth.Issues), "kafka") {
+ hasKafkaIssue = true
+ messages = append(messages, fmt.Sprintf("Agent pods: %s", as.PodHealth.Issues))
+ }
+ }
+
+ // Check Kafka exporters
+ s.exporters.Range(func(_, v any) bool {
+ exp := v.(flowslatest.FlowCollectorExporterStatus)
+ if exp.Type == "Kafka" && exp.State == string(StatusFailure) {
+ hasKafkaIssue = true
+ messages = append(messages, fmt.Sprintf("Exporter %s: %s", exp.Name, exp.Message))
+ }
+ return true
+ })
+
+ if hasKafkaIssue {
+ return &metav1.Condition{
+ Type: "KafkaReady",
+ Status: metav1.ConditionFalse,
+ Reason: "KafkaIssue",
+ Message: strings.Join(messages, "; "),
+ }
+ }
+
+ // If transformer is active (Kafka mode), report its state
+ if ts := s.getStatus(FLPTransformer); ts != nil {
+ switch ts.Status {
+ case StatusReady:
+ return &metav1.Condition{
+ Type: "KafkaReady",
+ Status: metav1.ConditionTrue,
+ Reason: "Ready",
+ }
+ case StatusInProgress:
+ return &metav1.Condition{
+ Type: "KafkaReady",
+ Status: metav1.ConditionFalse,
+ Reason: "KafkaPending",
+ Message: "Kafka transformer is rolling out",
+ }
+ case StatusUnknown, StatusUnused, StatusFailure, StatusDegraded:
+ // Failure/Degraded already handled above via hasKafkaIssue;
+ // Unknown/Unused mean Kafka mode is not active.
+ }
+ }
+
+ return nil
+}
+
+// SetExporterStatus sets the status of a specific exporter by name.
+func (s *Manager) SetExporterStatus(name, exporterType, state, reason, message string) {
+ s.exporters.Store(name, flowslatest.FlowCollectorExporterStatus{
+ Name: name,
+ Type: exporterType,
+ State: state,
+ Reason: reason,
+ Message: message,
+ })
+}
+
+// NeedsRequeue returns true if any component is in a transient state that warrants
+// periodic re-checking (e.g., pods crashing into CrashLoopBackOff after a deployment rollout).
+// Controllers should use this to return RequeueAfter so the status keeps updating
+// even when DaemonSet/Deployment watches don't fire for pod-level changes.
+func (s *Manager) NeedsRequeue() bool {
+ needsRequeue := false
+ s.statuses.Range(func(_, v any) bool {
+ cs := v.(ComponentStatus)
+ if cs.Status == StatusInProgress || cs.PodHealth.UnhealthyCount > 0 {
+ needsRequeue = true
+ return false
+ }
+ return true
+ })
+ return needsRequeue
+}
+
+// ClearExporters removes all exporter statuses (call before re-populating).
+func (s *Manager) ClearExporters() {
+ s.exporters.Range(func(key, _ any) bool {
+ s.exporters.Delete(key)
+ return true
+ })
+}
+
func (s *Manager) ForComponent(cpnt ComponentName) Instance {
s.setUnknown(cpnt)
return Instance{cpnt: cpnt, s: s}
@@ -220,40 +469,107 @@ func (i *Instance) SetUnused(message string) {
i.s.setUnused(i.cpnt, message)
}
-// CheckDeploymentProgress sets the status either as In Progress, or Ready.
+// CheckDeploymentProgress sets the status either as In Progress, or Ready,
+// and populates replica counts from the Deployment status.
func (i *Instance) CheckDeploymentProgress(d *appsv1.Deployment) {
if d == nil {
i.s.setInProgress(i.cpnt, "DeploymentNotCreated", "Deployment not created")
return
}
+ defer i.setDeploymentReplicas(d)
for _, c := range d.Status.Conditions {
if c.Type == appsv1.DeploymentAvailable {
if c.Status != v1.ConditionTrue {
- i.s.setInProgress(i.cpnt, "DeploymentNotReady", fmt.Sprintf("Deployment %s not ready: %d/%d (%s)", d.Name, d.Status.UpdatedReplicas, d.Status.Replicas, c.Message))
+ i.s.setInProgress(i.cpnt, "DeploymentNotReady", fmt.Sprintf("Deployment %s not ready: %d/%d (%s)", d.Name, d.Status.ReadyReplicas, d.Status.Replicas, c.Message))
} else {
i.s.setReady(i.cpnt)
}
return
}
}
- if d.Status.UpdatedReplicas == d.Status.Replicas {
+ if d.Status.ReadyReplicas == d.Status.Replicas && d.Status.Replicas > 0 {
i.s.setReady(i.cpnt)
} else {
- i.s.setInProgress(i.cpnt, "DeploymentNotReady", fmt.Sprintf("Deployment %s not ready: %d/%d (missing condition)", d.Name, d.Status.UpdatedReplicas, d.Status.Replicas))
+ i.s.setInProgress(i.cpnt, "DeploymentNotReady", fmt.Sprintf("Deployment %s not ready: %d/%d (missing condition)", d.Name, d.Status.ReadyReplicas, d.Status.Replicas))
}
}
-// CheckDaemonSetProgress sets the status either as In Progress, or Ready.
+func (i *Instance) setDeploymentReplicas(d *appsv1.Deployment) {
+ cs := i.s.getStatus(i.cpnt)
+ if cs != nil {
+ var desired int32 = 1
+ if d.Spec.Replicas != nil {
+ desired = *d.Spec.Replicas
+ }
+ cs.DesiredReplicas = ptr.To(desired)
+ cs.ReadyReplicas = ptr.To(d.Status.ReadyReplicas)
+ i.s.statuses.Store(i.cpnt, *cs)
+ }
+}
+
+// CheckDaemonSetProgress sets the status either as In Progress, or Ready,
+// and populates replica counts from the DaemonSet status.
func (i *Instance) CheckDaemonSetProgress(ds *appsv1.DaemonSet) {
if ds == nil {
i.s.setInProgress(i.cpnt, "DaemonSetNotCreated", "DaemonSet not created")
- } else if ds.Status.UpdatedNumberScheduled < ds.Status.DesiredNumberScheduled {
- i.s.setInProgress(i.cpnt, "DaemonSetNotReady", fmt.Sprintf("DaemonSet %s not ready: %d/%d", ds.Name, ds.Status.UpdatedNumberScheduled, ds.Status.DesiredNumberScheduled))
+ return
+ }
+ defer i.setDaemonSetReplicas(ds)
+ if ds.Status.NumberReady < ds.Status.DesiredNumberScheduled {
+ i.s.setInProgress(i.cpnt, "DaemonSetNotReady", fmt.Sprintf("DaemonSet %s not ready: %d/%d", ds.Name, ds.Status.NumberReady, ds.Status.DesiredNumberScheduled))
} else {
i.s.setReady(i.cpnt)
}
}
+func (i *Instance) setDaemonSetReplicas(ds *appsv1.DaemonSet) {
+ cs := i.s.getStatus(i.cpnt)
+ if cs != nil {
+ cs.DesiredReplicas = ptr.To(ds.Status.DesiredNumberScheduled)
+ cs.ReadyReplicas = ptr.To(ds.Status.NumberReady)
+ i.s.statuses.Store(i.cpnt, *cs)
+ }
+}
+
+// CheckDeploymentHealth combines CheckDeploymentProgress with pod health checking.
+// If the deployment has unhealthy pods, it inspects container statuses for details.
+func (i *Instance) CheckDeploymentHealth(ctx context.Context, c client.Client, d *appsv1.Deployment) {
+ i.CheckDeploymentProgress(d)
+ if d == nil || d.Spec.Selector == nil {
+ return
+ }
+ if d.Status.ReadyReplicas < d.Status.Replicas || d.Status.UnavailableReplicas > 0 {
+ health := CheckPodHealth(ctx, c, d.Namespace, d.Spec.Selector.MatchLabels)
+ i.setPodHealth(health)
+ }
+}
+
+// CheckDaemonSetHealth combines CheckDaemonSetProgress with pod health checking.
+// If the DaemonSet has unhealthy pods, it inspects container statuses for details.
+func (i *Instance) CheckDaemonSetHealth(ctx context.Context, c client.Client, ds *appsv1.DaemonSet) {
+ i.CheckDaemonSetProgress(ds)
+ if ds == nil || ds.Spec.Selector == nil {
+ return
+ }
+ if ds.Status.NumberReady < ds.Status.DesiredNumberScheduled || ds.Status.NumberUnavailable > 0 {
+ health := CheckPodHealth(ctx, c, ds.Namespace, ds.Spec.Selector.MatchLabels)
+ i.setPodHealth(health)
+ }
+}
+
+func (i *Instance) setPodHealth(health PodHealthSummary) {
+ cs := i.s.getStatus(i.cpnt)
+ if cs != nil {
+ cs.PodHealth = health
+ if health.UnhealthyCount > 0 && (cs.Status == StatusReady || cs.Status == StatusInProgress) {
+ cs.Status = StatusDegraded
+ cs.Reason = "UnhealthyPods"
+ cs.Message = health.Issues
+ }
+ i.s.statuses.Store(i.cpnt, *cs)
+ }
+}
+
func (i *Instance) SetCreatingDeployment(d *appsv1.Deployment) {
i.s.setInProgress(i.cpnt, "CreatingDeployment", fmt.Sprintf("Creating deployment %s", d.Name))
}
diff --git a/internal/pkg/manager/status/status_manager_test.go b/internal/pkg/manager/status/status_manager_test.go
index e5dd27b886..3d0e1216c9 100644
--- a/internal/pkg/manager/status/status_manager_test.go
+++ b/internal/pkg/manager/status/status_manager_test.go
@@ -1,12 +1,19 @@
package status
import (
+ "context"
+ "fmt"
"slices"
"testing"
+ flowslatest "github.com/netobserv/netobserv-operator/api/flowcollector/v1beta2"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
+ corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/utils/ptr"
)
func TestStatusWorkflow(t *testing.T) {
@@ -25,7 +32,7 @@ func TestStatusWorkflow(t *testing.T) {
sl.CheckDaemonSetProgress(&appsv1.DaemonSet{ObjectMeta: metav1.ObjectMeta{Name: "test"}, Status: appsv1.DaemonSetStatus{
DesiredNumberScheduled: 3,
- UpdatedNumberScheduled: 1,
+ NumberReady: 1,
}})
sm.SetUnknown()
@@ -33,11 +40,11 @@ func TestStatusWorkflow(t *testing.T) {
assertHasConditionTypes(t, conds, []string{"Ready", "WaitingFlowCollectorController", "WaitingMonitoring"})
assertHasCondition(t, conds, "Ready", "Pending", metav1.ConditionFalse)
assertHasCondition(t, conds, "WaitingFlowCollectorController", "DaemonSetNotReady", metav1.ConditionTrue)
- assertHasCondition(t, conds, "WaitingMonitoring", "Unused", metav1.ConditionUnknown)
+ assertHasCondition(t, conds, "WaitingMonitoring", "Unknown", metav1.ConditionUnknown)
sl.CheckDaemonSetProgress(&appsv1.DaemonSet{ObjectMeta: metav1.ObjectMeta{Name: "test"}, Status: appsv1.DaemonSetStatus{
DesiredNumberScheduled: 3,
- UpdatedNumberScheduled: 3,
+ NumberReady: 3,
}})
sm.SetUnused("message")
@@ -48,8 +55,8 @@ func TestStatusWorkflow(t *testing.T) {
assertHasCondition(t, conds, "WaitingMonitoring", "ComponentUnused", metav1.ConditionUnknown)
sl.CheckDeploymentProgress(&appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "test"}, Status: appsv1.DeploymentStatus{
- UpdatedReplicas: 2,
- Replicas: 2,
+ ReadyReplicas: 2,
+ Replicas: 2,
}})
sm.SetReady()
@@ -60,6 +67,785 @@ func TestStatusWorkflow(t *testing.T) {
assertHasCondition(t, conds, "WaitingMonitoring", "Ready", metav1.ConditionFalse)
}
+func TestDegradedStatus(t *testing.T) {
+ s := NewManager()
+ agent := s.ForComponent(EBPFAgents)
+ plugin := s.ForComponent(WebConsole)
+
+ agent.SetReady()
+ plugin.SetDegraded("PluginRegistrationFailed", "console operator unreachable")
+
+ conds := s.getConditions()
+ assertHasCondition(t, conds, "Ready", "Ready,Degraded", metav1.ConditionTrue)
+ assertHasCondition(t, conds, "WaitingEBPFAgents", "Ready", metav1.ConditionFalse)
+ assertHasCondition(t, conds, "WaitingWebConsole", "PluginRegistrationFailed", metav1.ConditionTrue)
+
+ cs := plugin.Get()
+ assert.Equal(t, StatusDegraded, cs.Status)
+ assert.Equal(t, "PluginRegistrationFailed", cs.Reason)
+ assert.Equal(t, "console operator unreachable", cs.Message)
+}
+
+func TestUnusedStatusInCRD(t *testing.T) {
+ s := NewManager()
+ agent := s.ForComponent(EBPFAgents)
+ agent.SetUnused("FlowCollector is on hold")
+
+ cs := agent.Get()
+ assert.Equal(t, StatusUnused, cs.Status)
+
+ fc := &flowslatest.FlowCollector{}
+ s.populateComponentStatuses(fc)
+ require.NotNil(t, fc.Status.Components)
+ require.NotNil(t, fc.Status.Components.Agent)
+ assert.Equal(t, "Unused", fc.Status.Components.Agent.State)
+ assert.Equal(t, "ComponentUnused", fc.Status.Components.Agent.Reason)
+ assert.Equal(t, "FlowCollector is on hold", fc.Status.Components.Agent.Message)
+}
+
+func TestReplicaCounts(t *testing.T) {
+ s := NewManager()
+ agent := s.ForComponent(EBPFAgents)
+
+ agent.CheckDaemonSetProgress(&appsv1.DaemonSet{
+ ObjectMeta: metav1.ObjectMeta{Name: "agent"},
+ Status: appsv1.DaemonSetStatus{
+ DesiredNumberScheduled: 5,
+ UpdatedNumberScheduled: 5,
+ NumberReady: 5,
+ },
+ })
+
+ cs := agent.Get()
+ assert.Equal(t, StatusReady, cs.Status)
+ assert.NotNil(t, cs.DesiredReplicas)
+ assert.NotNil(t, cs.ReadyReplicas)
+ assert.Equal(t, int32(5), *cs.DesiredReplicas)
+ assert.Equal(t, int32(5), *cs.ReadyReplicas)
+}
+
+func TestReplicaPreservationAcrossTransitions(t *testing.T) {
+ s := NewManager()
+ agent := s.ForComponent(EBPFAgents)
+
+ agent.CheckDaemonSetProgress(&appsv1.DaemonSet{
+ ObjectMeta: metav1.ObjectMeta{Name: "agent"},
+ Status: appsv1.DaemonSetStatus{
+ DesiredNumberScheduled: 5,
+ UpdatedNumberScheduled: 5,
+ NumberReady: 5,
+ },
+ })
+ cs := agent.Get()
+ require.Equal(t, int32(5), *cs.DesiredReplicas)
+
+ // Transition to failure: replicas should be preserved
+ agent.SetFailure("AgentKafkaError", "cannot connect to Kafka")
+ cs = agent.Get()
+ assert.Equal(t, StatusFailure, cs.Status)
+ require.NotNil(t, cs.DesiredReplicas, "DesiredReplicas should survive failure transition")
+ assert.Equal(t, int32(5), *cs.DesiredReplicas)
+
+ // Transition to degraded: replicas should still be preserved
+ agent.SetDegraded("SomeWarning", "non-critical issue")
+ cs = agent.Get()
+ assert.Equal(t, StatusDegraded, cs.Status)
+ require.NotNil(t, cs.DesiredReplicas, "DesiredReplicas should survive degraded transition")
+ assert.Equal(t, int32(5), *cs.DesiredReplicas)
+
+ // Transition to in-progress: replicas should still be preserved
+ agent.SetNotReady("Updating", "rolling update in progress")
+ cs = agent.Get()
+ assert.Equal(t, StatusInProgress, cs.Status)
+ require.NotNil(t, cs.DesiredReplicas, "DesiredReplicas should survive in-progress transition")
+ assert.Equal(t, int32(5), *cs.DesiredReplicas)
+
+ // Transition back to ready: replicas should still be preserved
+ agent.SetReady()
+ cs = agent.Get()
+ assert.Equal(t, StatusReady, cs.Status)
+ require.NotNil(t, cs.DesiredReplicas)
+ assert.Equal(t, int32(5), *cs.DesiredReplicas)
+}
+
+func TestDeploymentReplicaCounts(t *testing.T) {
+ s := NewManager()
+ plugin := s.ForComponent(WebConsole)
+
+ // Nil Spec.Replicas defaults to 1
+ plugin.CheckDeploymentProgress(&appsv1.Deployment{
+ ObjectMeta: metav1.ObjectMeta{Name: "plugin"},
+ Spec: appsv1.DeploymentSpec{},
+ Status: appsv1.DeploymentStatus{
+ ReadyReplicas: 1,
+ Replicas: 1,
+ Conditions: []appsv1.DeploymentCondition{{
+ Type: appsv1.DeploymentAvailable,
+ Status: corev1.ConditionTrue,
+ }},
+ },
+ })
+
+ cs := plugin.Get()
+ assert.Equal(t, StatusReady, cs.Status)
+ require.NotNil(t, cs.DesiredReplicas)
+ assert.Equal(t, int32(1), *cs.DesiredReplicas)
+ assert.Equal(t, int32(1), *cs.ReadyReplicas)
+}
+
+func TestDeploymentNotAvailable(t *testing.T) {
+ s := NewManager()
+ plugin := s.ForComponent(WebConsole)
+
+ plugin.CheckDeploymentProgress(&appsv1.Deployment{
+ ObjectMeta: metav1.ObjectMeta{Name: "plugin"},
+ Spec: appsv1.DeploymentSpec{Replicas: ptr.To(int32(2))},
+ Status: appsv1.DeploymentStatus{
+ ReadyReplicas: 1,
+ Replicas: 2,
+ Conditions: []appsv1.DeploymentCondition{{
+ Type: appsv1.DeploymentAvailable,
+ Status: corev1.ConditionFalse,
+ Message: "minimum availability not met",
+ }},
+ },
+ })
+
+ cs := plugin.Get()
+ assert.Equal(t, StatusInProgress, cs.Status)
+ assert.Contains(t, cs.Message, "not ready: 1/2")
+ require.NotNil(t, cs.DesiredReplicas)
+ assert.Equal(t, int32(2), *cs.DesiredReplicas)
+ assert.Equal(t, int32(1), *cs.ReadyReplicas)
+}
+
+func TestDeploymentNilSetsInProgress(t *testing.T) {
+ s := NewManager()
+ plugin := s.ForComponent(WebConsole)
+
+ plugin.CheckDeploymentProgress(nil)
+
+ cs := plugin.Get()
+ assert.Equal(t, StatusInProgress, cs.Status)
+ assert.Equal(t, "DeploymentNotCreated", cs.Reason)
+}
+
+func TestDaemonSetNilSetsInProgress(t *testing.T) {
+ s := NewManager()
+ agent := s.ForComponent(EBPFAgents)
+
+ agent.CheckDaemonSetProgress(nil)
+
+ cs := agent.Get()
+ assert.Equal(t, StatusInProgress, cs.Status)
+ assert.Equal(t, "DaemonSetNotCreated", cs.Reason)
+}
+
+func TestDeploymentMissingConditionFallback(t *testing.T) {
+ s := NewManager()
+ plugin := s.ForComponent(WebConsole)
+
+ // Deployment with no Available condition but ready replicas match
+ plugin.CheckDeploymentProgress(&appsv1.Deployment{
+ ObjectMeta: metav1.ObjectMeta{Name: "plugin"},
+ Status: appsv1.DeploymentStatus{
+ ReadyReplicas: 3,
+ Replicas: 3,
+ },
+ })
+ cs := plugin.Get()
+ assert.Equal(t, StatusReady, cs.Status)
+
+ // Now with mismatch
+ plugin2 := s.ForComponent(WebConsole)
+ plugin2.CheckDeploymentProgress(&appsv1.Deployment{
+ ObjectMeta: metav1.ObjectMeta{Name: "plugin"},
+ Status: appsv1.DeploymentStatus{
+ ReadyReplicas: 1,
+ Replicas: 3,
+ },
+ })
+ cs = plugin2.Get()
+ assert.Equal(t, StatusInProgress, cs.Status)
+ assert.Contains(t, cs.Message, "missing condition")
+}
+
+func TestSetPodHealthDegradation(t *testing.T) {
+ s := NewManager()
+ agent := s.ForComponent(EBPFAgents)
+
+ // Start with ready
+ agent.SetReady()
+ assert.Equal(t, StatusReady, agent.Get().Status)
+
+ // Inject pod health issues — should degrade to StatusDegraded
+ agent.setPodHealth(PodHealthSummary{
+ UnhealthyCount: 2,
+ Issues: "2 CrashLoopBackOff (pod-a, pod-b): can't write messages into Kafka",
+ })
+
+ cs := agent.Get()
+ assert.Equal(t, StatusDegraded, cs.Status)
+ assert.Equal(t, "UnhealthyPods", cs.Reason)
+ assert.Contains(t, cs.Message, "Kafka")
+ assert.Equal(t, int32(2), cs.PodHealth.UnhealthyCount)
+}
+
+func TestSetPodHealthFromInProgress(t *testing.T) {
+ s := NewManager()
+ agent := s.ForComponent(EBPFAgents)
+
+ agent.SetNotReady("DaemonSetNotReady", "DaemonSet not ready: 0/2")
+ assert.Equal(t, StatusInProgress, agent.Get().Status)
+
+ agent.setPodHealth(PodHealthSummary{
+ UnhealthyCount: 2,
+ Issues: "2 CrashLoopBackOff (pod-a, pod-b): Error",
+ })
+
+ cs := agent.Get()
+ assert.Equal(t, StatusDegraded, cs.Status, "InProgress + unhealthy pods should become Degraded")
+ assert.Equal(t, "UnhealthyPods", cs.Reason)
+ assert.Equal(t, int32(2), cs.PodHealth.UnhealthyCount)
+}
+
+func TestSetPodHealthNoDowngradeFromFailure(t *testing.T) {
+ s := NewManager()
+ agent := s.ForComponent(EBPFAgents)
+
+ // Already in failure — pod health should not override to degraded
+ agent.SetFailure("CriticalError", "crash")
+ agent.setPodHealth(PodHealthSummary{
+ UnhealthyCount: 1,
+ Issues: "1 CrashLoopBackOff (pod-x)",
+ })
+
+ cs := agent.Get()
+ assert.Equal(t, StatusFailure, cs.Status, "setPodHealth should not override Failure with Degraded")
+ assert.Equal(t, int32(1), cs.PodHealth.UnhealthyCount, "PodHealth should still be recorded")
+}
+
+func TestToCRDStatusWithPodHealth(t *testing.T) {
+ cs := ComponentStatus{
+ Name: EBPFAgents,
+ Status: StatusDegraded,
+ Reason: "UnhealthyPods",
+ Message: "2 CrashLoopBackOff (pod-a, pod-b)",
+ DesiredReplicas: ptr.To(int32(5)),
+ ReadyReplicas: ptr.To(int32(3)),
+ PodHealth: PodHealthSummary{
+ UnhealthyCount: 2,
+ Issues: "2 CrashLoopBackOff (pod-a, pod-b)",
+ },
+ }
+
+ crd := cs.toCRDStatus()
+ assert.Equal(t, "Degraded", crd.State)
+ assert.Equal(t, "UnhealthyPods", crd.Reason)
+ assert.Equal(t, int32(5), *crd.DesiredReplicas)
+ assert.Equal(t, int32(3), *crd.ReadyReplicas)
+ assert.Equal(t, int32(2), crd.UnhealthyPodCount)
+ assert.Equal(t, "2 CrashLoopBackOff (pod-a, pod-b)", crd.PodIssues)
+}
+
+func TestPopulateComponentStatuses(t *testing.T) {
+ s := NewManager()
+ agent := s.ForComponent(EBPFAgents)
+ plugin := s.ForComponent(WebConsole)
+ monitoring := s.ForComponent(Monitoring)
+
+ agent.CheckDaemonSetProgress(&appsv1.DaemonSet{
+ ObjectMeta: metav1.ObjectMeta{Name: "agent"},
+ Status: appsv1.DaemonSetStatus{
+ DesiredNumberScheduled: 3,
+ UpdatedNumberScheduled: 3,
+ NumberReady: 3,
+ },
+ })
+ plugin.SetReady()
+ monitoring.SetFailure("DashboardError", "dashboard CM missing")
+
+ fc := &flowslatest.FlowCollector{}
+ s.populateComponentStatuses(fc)
+
+ require.NotNil(t, fc.Status.Components)
+ require.NotNil(t, fc.Status.Components.Agent)
+ assert.Equal(t, "Ready", fc.Status.Components.Agent.State)
+ assert.Equal(t, ptr.To(int32(3)), fc.Status.Components.Agent.DesiredReplicas)
+ assert.Equal(t, ptr.To(int32(3)), fc.Status.Components.Agent.ReadyReplicas)
+
+ require.NotNil(t, fc.Status.Components.Plugin)
+ assert.Equal(t, "Ready", fc.Status.Components.Plugin.State)
+
+ require.NotNil(t, fc.Status.Integrations)
+ require.NotNil(t, fc.Status.Integrations.Monitoring)
+ assert.Equal(t, "Failure", fc.Status.Integrations.Monitoring.State)
+ assert.Equal(t, "DashboardError", fc.Status.Integrations.Monitoring.Reason)
+}
+
+func TestPopulateProcessorAggregation(t *testing.T) {
+ t.Run("monolith only", func(t *testing.T) {
+ s := NewManager()
+ parent := s.ForComponent(FLPParent)
+ mono := s.ForComponent(FLPMonolith)
+ _ = s.ForComponent(FLPTransformer) // registered but unused
+
+ parent.SetReady()
+ mono.CheckDeploymentProgress(&appsv1.Deployment{
+ ObjectMeta: metav1.ObjectMeta{Name: "flp"},
+ Spec: appsv1.DeploymentSpec{Replicas: ptr.To(int32(2))},
+ Status: appsv1.DeploymentStatus{
+ ReadyReplicas: 2,
+ Replicas: 2,
+ Conditions: []appsv1.DeploymentCondition{{
+ Type: appsv1.DeploymentAvailable,
+ Status: corev1.ConditionTrue,
+ }},
+ },
+ })
+
+ fc := &flowslatest.FlowCollector{}
+ s.populateComponentStatuses(fc)
+
+ require.NotNil(t, fc.Status.Components)
+ require.NotNil(t, fc.Status.Components.Processor)
+ assert.Equal(t, "Ready", fc.Status.Components.Processor.State)
+ require.NotNil(t, fc.Status.Components.Processor.DesiredReplicas)
+ assert.Equal(t, int32(2), *fc.Status.Components.Processor.DesiredReplicas)
+ })
+
+ t.Run("transformer failure overrides parent ready", func(t *testing.T) {
+ s := NewManager()
+ parent := s.ForComponent(FLPParent)
+ _ = s.ForComponent(FLPMonolith)
+ transformer := s.ForComponent(FLPTransformer)
+
+ parent.SetReady()
+ transformer.SetFailure("KafkaConnectionError", "cannot connect to broker")
+
+ fc := &flowslatest.FlowCollector{}
+ s.populateComponentStatuses(fc)
+
+ require.NotNil(t, fc.Status.Components)
+ require.NotNil(t, fc.Status.Components.Processor)
+ assert.Equal(t, "Failure", fc.Status.Components.Processor.State)
+ assert.Equal(t, "KafkaConnectionError", fc.Status.Components.Processor.Reason)
+ })
+
+ t.Run("parent ready with unused sub-reconcilers", func(t *testing.T) {
+ s := NewManager()
+ parent := s.ForComponent(FLPParent)
+ mono := s.ForComponent(FLPMonolith)
+ trans := s.ForComponent(FLPTransformer)
+
+ parent.SetReady()
+ mono.SetUnused("direct mode")
+ trans.SetUnused("direct mode")
+
+ fc := &flowslatest.FlowCollector{}
+ s.populateComponentStatuses(fc)
+
+ require.NotNil(t, fc.Status.Components)
+ require.NotNil(t, fc.Status.Components.Processor)
+ assert.Equal(t, "Ready", fc.Status.Components.Processor.State)
+ })
+}
+
+func TestPopulateLokiStatus(t *testing.T) {
+ t.Run("lokistack ready", func(t *testing.T) {
+ s := NewManager()
+ ls := s.ForComponent(LokiStack)
+ ls.SetReady()
+
+ fc := &flowslatest.FlowCollector{}
+ s.populateComponentStatuses(fc)
+
+ require.NotNil(t, fc.Status.Integrations)
+ require.NotNil(t, fc.Status.Integrations.Loki)
+ assert.Equal(t, "Ready", fc.Status.Integrations.Loki.State)
+ })
+
+ t.Run("demo loki failure overrides ready", func(t *testing.T) {
+ s := NewManager()
+ demo := s.ForComponent(DemoLoki)
+ demo.SetFailure("DeployFailed", "cannot create PVC")
+
+ fc := &flowslatest.FlowCollector{}
+ s.populateComponentStatuses(fc)
+
+ require.NotNil(t, fc.Status.Integrations)
+ require.NotNil(t, fc.Status.Integrations.Loki)
+ assert.Equal(t, "Failure", fc.Status.Integrations.Loki.State)
+ assert.Equal(t, "DeployFailed", fc.Status.Integrations.Loki.Reason)
+ })
+
+ t.Run("loki unused", func(t *testing.T) {
+ s := NewManager()
+ ls := s.ForComponent(LokiStack)
+ ls.SetUnused("Loki is disabled")
+
+ fc := &flowslatest.FlowCollector{}
+ s.populateComponentStatuses(fc)
+
+ require.NotNil(t, fc.Status.Integrations)
+ require.NotNil(t, fc.Status.Integrations.Loki)
+ assert.Equal(t, "Unused", fc.Status.Integrations.Loki.State)
+ })
+}
+
+func TestControllerComponentsNotInCRDStatus(t *testing.T) {
+ s := NewManager()
+ ctrl := s.ForComponent(FlowCollectorController)
+ static := s.ForComponent(StaticController)
+ np := s.ForComponent(NetworkPolicy)
+
+ ctrl.SetReady()
+ static.SetReady()
+ np.SetReady()
+
+ fc := &flowslatest.FlowCollector{}
+ s.populateComponentStatuses(fc)
+
+ assert.Nil(t, fc.Status.Components.Agent)
+ assert.Nil(t, fc.Status.Components.Plugin)
+ assert.Nil(t, fc.Status.Components.Processor)
+ assert.Nil(t, fc.Status.Integrations.Monitoring)
+ assert.Nil(t, fc.Status.Integrations.Loki)
+}
+
+func TestExporterStatus(t *testing.T) {
+ s := NewManager()
+ s.SetExporterStatus("kafka-export-0", "Kafka", "Ready", "Configured", "")
+ s.SetExporterStatus("ipfix-export-0", "IPFIX", "Failure", "ConnectionRefused", "cannot connect")
+
+ fc := &flowslatest.FlowCollector{}
+ s.populateComponentStatuses(fc)
+
+ require.NotNil(t, fc.Status.Integrations)
+ assert.Len(t, fc.Status.Integrations.Exporters, 2)
+ found := map[string]bool{}
+ for _, e := range fc.Status.Integrations.Exporters {
+ found[e.Name] = true
+ if e.Name == "kafka-export-0" {
+ assert.Equal(t, "Ready", e.State)
+ assert.Equal(t, "Kafka", e.Type)
+ } else if e.Name == "ipfix-export-0" {
+ assert.Equal(t, "Failure", e.State)
+ assert.Equal(t, "cannot connect", e.Message)
+ }
+ }
+ assert.True(t, found["kafka-export-0"])
+ assert.True(t, found["ipfix-export-0"])
+
+ s.ClearExporters()
+ fc2 := &flowslatest.FlowCollector{}
+ s.populateComponentStatuses(fc2)
+ assert.Empty(t, fc2.Status.Integrations.Exporters)
+}
+
+func TestKafkaCondition(t *testing.T) {
+ t.Run("no kafka components returns nil", func(t *testing.T) {
+ s := NewManager()
+ agent := s.ForComponent(EBPFAgents)
+ mono := s.ForComponent(FLPMonolith)
+ agent.SetReady()
+ mono.SetReady()
+
+ assert.Nil(t, s.GetKafkaCondition())
+ })
+
+ t.Run("healthy transformer returns KafkaReady=True", func(t *testing.T) {
+ s := NewManager()
+ transformer := s.ForComponent(FLPTransformer)
+ transformer.SetReady()
+
+ cond := s.GetKafkaCondition()
+ require.NotNil(t, cond)
+ assert.Equal(t, "KafkaReady", cond.Type)
+ assert.Equal(t, metav1.ConditionTrue, cond.Status)
+ assert.Equal(t, "Ready", cond.Reason)
+ })
+
+ t.Run("failed transformer returns KafkaReady=False", func(t *testing.T) {
+ s := NewManager()
+ transformer := s.ForComponent(FLPTransformer)
+ transformer.SetFailure("KafkaError", "broker unreachable")
+
+ cond := s.GetKafkaCondition()
+ require.NotNil(t, cond)
+ assert.Equal(t, metav1.ConditionFalse, cond.Status)
+ assert.Equal(t, "KafkaIssue", cond.Reason)
+ assert.Contains(t, cond.Message, "Transformer: broker unreachable")
+ })
+
+ t.Run("transformer with unhealthy pods", func(t *testing.T) {
+ s := NewManager()
+ transformer := s.ForComponent(FLPTransformer)
+ transformer.SetReady()
+ transformer.setPodHealth(PodHealthSummary{
+ UnhealthyCount: 1,
+ Issues: "1 CrashLoopBackOff (flp-kafka-0)",
+ })
+
+ cond := s.GetKafkaCondition()
+ require.NotNil(t, cond)
+ assert.Equal(t, metav1.ConditionFalse, cond.Status)
+ assert.Contains(t, cond.Message, "Transformer pods:")
+ })
+
+ t.Run("agent with kafka-related pod issues", func(t *testing.T) {
+ s := NewManager()
+ transformer := s.ForComponent(FLPTransformer)
+ transformer.SetReady()
+ agent := s.ForComponent(EBPFAgents)
+ agent.SetReady()
+ agent.setPodHealth(PodHealthSummary{
+ UnhealthyCount: 3,
+ Issues: "3 CrashLoopBackOff (agent-a, agent-b, agent-c): can't write Kafka messages",
+ })
+
+ cond := s.GetKafkaCondition()
+ require.NotNil(t, cond)
+ assert.Equal(t, metav1.ConditionFalse, cond.Status)
+ assert.Contains(t, cond.Message, "Agent pods:")
+ })
+
+ t.Run("agent with non-kafka issues does not trigger KafkaReady", func(t *testing.T) {
+ s := NewManager()
+ transformer := s.ForComponent(FLPTransformer)
+ transformer.SetReady()
+ agent := s.ForComponent(EBPFAgents)
+ agent.SetReady()
+ agent.setPodHealth(PodHealthSummary{
+ UnhealthyCount: 1,
+ Issues: "1 OOMKilled (agent-x)",
+ })
+
+ cond := s.GetKafkaCondition()
+ require.NotNil(t, cond)
+ assert.Equal(t, metav1.ConditionTrue, cond.Status, "Non-kafka agent issues should not affect KafkaReady")
+ })
+
+ t.Run("kafka exporter failure", func(t *testing.T) {
+ s := NewManager()
+ s.SetExporterStatus("kafka-export-0", "Kafka", string(StatusFailure), "BrokerDown", "connection refused")
+
+ cond := s.GetKafkaCondition()
+ require.NotNil(t, cond)
+ assert.Equal(t, metav1.ConditionFalse, cond.Status)
+ assert.Contains(t, cond.Message, "Exporter kafka-export-0: connection refused")
+ })
+
+ t.Run("IPFIX exporter failure does not trigger KafkaReady", func(t *testing.T) {
+ s := NewManager()
+ s.SetExporterStatus("ipfix-export-0", "IPFIX", string(StatusFailure), "Down", "timeout")
+
+ assert.Nil(t, s.GetKafkaCondition())
+ })
+}
+
+// fakeRecorder implements record.EventRecorder for testing event emission.
+type fakeRecorder struct {
+ events []fakeEvent
+}
+
+type fakeEvent struct {
+ object runtime.Object
+ eventType string
+ reason string
+ message string
+}
+
+func (r *fakeRecorder) Event(object runtime.Object, eventtype, reason, message string) {
+ r.events = append(r.events, fakeEvent{object, eventtype, reason, message})
+}
+
+func (r *fakeRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
+ r.events = append(r.events, fakeEvent{object, eventtype, reason, fmt.Sprintf(messageFmt, args...)})
+}
+
+func (r *fakeRecorder) AnnotatedEventf(object runtime.Object, _ map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
+ r.events = append(r.events, fakeEvent{object, eventtype, reason, fmt.Sprintf(messageFmt, args...)})
+}
+
+func TestEmitStateTransitionEvents(t *testing.T) {
+ rec := &fakeRecorder{}
+ s := NewManager()
+ s.SetEventRecorder(rec)
+ fc := &flowslatest.FlowCollector{}
+
+ agent := s.ForComponent(EBPFAgents)
+
+ // First call with ready — no previous state, should not emit
+ agent.SetReady()
+ s.emitStateTransitionEvents(ctx(), fc)
+ assert.Empty(t, rec.events, "First call should not emit events (no previous state)")
+
+ // Transition to failure
+ agent.SetFailure("KafkaError", "broker down")
+ s.emitStateTransitionEvents(ctx(), fc)
+ require.Len(t, rec.events, 1)
+ assert.Equal(t, "Warning", rec.events[0].eventType)
+ assert.Equal(t, "ComponentFailure", rec.events[0].reason)
+ assert.Contains(t, rec.events[0].message, "EBPFAgents")
+ assert.Contains(t, rec.events[0].message, "broker down")
+
+ // Transition to ready (recovery)
+ rec.events = nil
+ agent.SetReady()
+ s.emitStateTransitionEvents(ctx(), fc)
+ require.Len(t, rec.events, 1)
+ assert.Equal(t, "Normal", rec.events[0].eventType)
+ assert.Equal(t, "ComponentRecovered", rec.events[0].reason)
+
+ // Same state again — no event
+ rec.events = nil
+ agent.SetReady()
+ s.emitStateTransitionEvents(ctx(), fc)
+ assert.Empty(t, rec.events, "Same state should not emit events")
+
+ // Transition to degraded
+ rec.events = nil
+ agent.SetDegraded("HighRestarts", "5 pods restarting")
+ s.emitStateTransitionEvents(ctx(), fc)
+ require.Len(t, rec.events, 1)
+ assert.Equal(t, "Warning", rec.events[0].eventType)
+ assert.Equal(t, "ComponentDegraded", rec.events[0].reason)
+}
+
+func TestEmitEventsNilRecorder(_ *testing.T) {
+ s := NewManager()
+ fc := &flowslatest.FlowCollector{}
+
+ agent := s.ForComponent(EBPFAgents)
+ agent.SetFailure("Error", "bad")
+ s.emitStateTransitionEvents(ctx(), fc)
+}
+
+func TestConditionPolarity(t *testing.T) {
+ tests := []struct {
+ status Status
+ expected metav1.ConditionStatus
+ defReason string
+ }{
+ {StatusReady, metav1.ConditionFalse, "Ready"},
+ {StatusFailure, metav1.ConditionTrue, "NotReady"},
+ {StatusInProgress, metav1.ConditionTrue, "NotReady"},
+ {StatusDegraded, metav1.ConditionTrue, "NotReady"},
+ {StatusUnknown, metav1.ConditionUnknown, "Unknown"},
+ {StatusUnused, metav1.ConditionUnknown, "Unused"},
+ }
+ for _, tc := range tests {
+ t.Run(string(tc.status), func(t *testing.T) {
+ cs := ComponentStatus{Name: EBPFAgents, Status: tc.status}
+ cond := cs.toCondition()
+ assert.Equal(t, tc.expected, cond.Status)
+ assert.Equal(t, tc.defReason, cond.Reason)
+ })
+ }
+
+ // Custom reason overrides default
+ cs := ComponentStatus{Name: EBPFAgents, Status: StatusFailure, Reason: "KafkaError"}
+ cond := cs.toCondition()
+ assert.Equal(t, metav1.ConditionTrue, cond.Status)
+ assert.Equal(t, "KafkaError", cond.Reason)
+}
+
+func TestGlobalConditionCounts(t *testing.T) {
+ s := NewManager()
+ a := s.ForComponent(EBPFAgents)
+ b := s.ForComponent(WebConsole)
+ c := s.ForComponent(Monitoring)
+
+ a.SetReady()
+ b.SetReady()
+ c.SetReady()
+
+ conds := s.getConditions()
+ assertHasCondition(t, conds, "Ready", "Ready", metav1.ConditionTrue)
+ readyCond := findCondition(conds, "Ready")
+ require.NotNil(t, readyCond)
+ assert.Contains(t, readyCond.Message, "3 ready components")
+ assert.Contains(t, readyCond.Message, "0 with failure")
+}
+
+func TestGlobalConditionUnusedNotCounted(t *testing.T) {
+ s := NewManager()
+ a := s.ForComponent(EBPFAgents)
+ b := s.ForComponent(Monitoring)
+
+ a.SetReady()
+ b.SetUnused("disabled")
+
+ conds := s.getConditions()
+ readyCond := findCondition(conds, "Ready")
+ require.NotNil(t, readyCond)
+ assert.Equal(t, metav1.ConditionTrue, readyCond.Status)
+ assert.Equal(t, "Ready", readyCond.Reason)
+ assert.Contains(t, readyCond.Message, "1 ready components")
+ assert.Contains(t, readyCond.Message, "0 with failure")
+}
+
+func TestNeedsRequeue(t *testing.T) {
+ t.Run("all ready returns false", func(t *testing.T) {
+ s := NewManager()
+ agent := s.ForComponent(EBPFAgents)
+ plugin := s.ForComponent(WebConsole)
+ agent.SetReady()
+ plugin.SetReady()
+ assert.False(t, s.NeedsRequeue())
+ })
+
+ t.Run("in-progress component returns true", func(t *testing.T) {
+ s := NewManager()
+ agent := s.ForComponent(EBPFAgents)
+ plugin := s.ForComponent(WebConsole)
+ agent.SetReady()
+ plugin.SetNotReady("Deploying", "rolling out")
+ assert.True(t, s.NeedsRequeue())
+ })
+
+ t.Run("unhealthy pods returns true", func(t *testing.T) {
+ s := NewManager()
+ agent := s.ForComponent(EBPFAgents)
+ agent.SetReady()
+ agent.setPodHealth(PodHealthSummary{
+ UnhealthyCount: 2,
+ Issues: "2 CrashLoopBackOff (pod-a, pod-b)",
+ })
+ assert.True(t, s.NeedsRequeue())
+ })
+
+ t.Run("unused and unknown do not trigger", func(t *testing.T) {
+ s := NewManager()
+ a := s.ForComponent(EBPFAgents)
+ b := s.ForComponent(WebConsole)
+ a.SetUnused("disabled")
+ b.SetUnknown()
+ assert.False(t, s.NeedsRequeue())
+ })
+
+ t.Run("failure without pod issues does not trigger", func(t *testing.T) {
+ s := NewManager()
+ agent := s.ForComponent(EBPFAgents)
+ agent.SetFailure("ConfigError", "bad config")
+ assert.False(t, s.NeedsRequeue())
+ })
+}
+
+func ctx() context.Context {
+ return context.Background()
+}
+
+func findCondition(conditions []metav1.Condition, condType string) *metav1.Condition {
+ for i := range conditions {
+ if conditions[i].Type == condType {
+ return &conditions[i]
+ }
+ }
+ return nil
+}
+
func assertHasCondition(t *testing.T, conditions []metav1.Condition, searchType, reason string, value metav1.ConditionStatus) {
for _, c := range conditions {
if c.Type == searchType {
diff --git a/internal/pkg/manager/status/statuses.go b/internal/pkg/manager/status/statuses.go
index 9be8acef3f..b9451e8311 100644
--- a/internal/pkg/manager/status/statuses.go
+++ b/internal/pkg/manager/status/statuses.go
@@ -1,13 +1,16 @@
package status
import (
+ flowslatest "github.com/netobserv/netobserv-operator/api/flowcollector/v1beta2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/utils/ptr"
)
type Status string
const (
StatusUnknown Status = "Unknown"
+ StatusUnused Status = "Unused"
StatusInProgress Status = "InProgress"
StatusReady Status = "Ready"
StatusFailure Status = "Failure"
@@ -15,12 +18,19 @@ const (
)
type ComponentStatus struct {
- Name ComponentName
- Status Status
- Reason string
- Message string
+ Name ComponentName
+ Status Status
+ Reason string
+ Message string
+ DesiredReplicas *int32
+ ReadyReplicas *int32
+ PodHealth PodHealthSummary
}
+// toCondition returns a Kubernetes condition using "Waiting*" naming with negative polarity:
+// True means "component has an issue", False means "component is ready".
+// This matches the OpenShift console behavior which treats non-Ready conditions as
+// negative-polarity (True = problem).
func (s *ComponentStatus) toCondition() metav1.Condition {
c := metav1.Condition{
Type: "Waiting" + string(s.Name),
@@ -28,6 +38,9 @@ func (s *ComponentStatus) toCondition() metav1.Condition {
}
switch s.Status {
case StatusUnknown:
+ c.Status = metav1.ConditionUnknown
+ c.Reason = "Unknown"
+ case StatusUnused:
c.Status = metav1.ConditionUnknown
c.Reason = "Unused"
case StatusFailure, StatusInProgress, StatusDegraded:
@@ -36,9 +49,29 @@ func (s *ComponentStatus) toCondition() metav1.Condition {
case StatusReady:
c.Status = metav1.ConditionFalse
c.Reason = "Ready"
+ default:
+ c.Status = metav1.ConditionUnknown
+ c.Reason = "Unknown"
}
if s.Reason != "" {
c.Reason = s.Reason
}
return c
}
+
+func (s *ComponentStatus) toCRDStatus() *flowslatest.FlowCollectorComponentStatus {
+ cs := &flowslatest.FlowCollectorComponentStatus{
+ State: string(s.Status),
+ Reason: s.Reason,
+ Message: s.Message,
+ }
+ if s.DesiredReplicas != nil {
+ cs.DesiredReplicas = ptr.To(*s.DesiredReplicas)
+ }
+ if s.ReadyReplicas != nil {
+ cs.ReadyReplicas = ptr.To(*s.ReadyReplicas)
+ }
+ cs.UnhealthyPodCount = s.PodHealth.UnhealthyCount
+ cs.PodIssues = s.PodHealth.Issues
+ return cs
+}