Skip to content

Commit 8b929d7

Browse files
NETOBSERV-2508: Network policy fix for HyperShift (#2157)
* Network policy fix for HyperShift
1 parent f42e844 commit 8b929d7

File tree

10 files changed

+325
-22
lines changed

10 files changed

+325
-22
lines changed

bundle/manifests/netobserv-operator.clusterserviceversion.yaml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ metadata:
253253
categories: Monitoring, Networking, Observability
254254
console.openshift.io/plugins: '["netobserv-plugin"]'
255255
containerImage: quay.io/netobserv/network-observability-operator:1.10.0-community
256-
createdAt: "2025-11-25T09:57:07Z"
256+
createdAt: "2025-11-26T13:16:01Z"
257257
description: Network flows collector and monitoring solution
258258
operatorframework.io/initialization-resource: '{"apiVersion":"flows.netobserv.io/v1beta2",
259259
"kind":"FlowCollector","metadata":{"name":"cluster"},"spec": {}}'
@@ -901,6 +901,14 @@ spec:
901901
- patch
902902
- update
903903
- watch
904+
- apiGroups:
905+
- discovery.k8s.io
906+
resources:
907+
- endpointslices
908+
verbs:
909+
- get
910+
- list
911+
- watch
904912
- apiGroups:
905913
- flows.netobserv.io
906914
resources:

config/rbac/role.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,14 @@ rules:
127127
- patch
128128
- update
129129
- watch
130+
- apiGroups:
131+
- discovery.k8s.io
132+
resources:
133+
- endpointslices
134+
verbs:
135+
- get
136+
- list
137+
- watch
130138
- apiGroups:
131139
- flows.netobserv.io
132140
resources:

helm/templates/clusterrole.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,14 @@ rules:
126126
- patch
127127
- update
128128
- watch
129+
- apiGroups:
130+
- discovery.k8s.io
131+
resources:
132+
- endpointslices
133+
verbs:
134+
- get
135+
- list
136+
- watch
129137
- apiGroups:
130138
- flows.netobserv.io
131139
resources:
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package networkpolicy
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/netobserv/network-observability-operator/internal/pkg/cluster"
8+
corev1 "k8s.io/api/core/v1"
9+
discoveryv1 "k8s.io/api/discovery/v1"
10+
"k8s.io/apimachinery/pkg/api/errors"
11+
"k8s.io/apimachinery/pkg/types"
12+
"sigs.k8s.io/controller-runtime/pkg/client"
13+
"sigs.k8s.io/controller-runtime/pkg/log"
14+
)
15+
16+
const (
17+
kubernetesServiceName = "kubernetes"
18+
kubernetesServiceNamespace = "default"
19+
)
20+
21+
// GetAPIServerEndpointIPs retrieves the API server endpoint IP addresses.
22+
// It uses EndpointSlice API if available, otherwise falls back to Endpoints API.
23+
func GetAPIServerEndpointIPs(ctx context.Context, cl client.Client, clusterInfo *cluster.Info) ([]string, error) {
24+
logger := log.FromContext(ctx)
25+
26+
var ips []string
27+
var err error
28+
29+
// Use EndpointSlice if available (discovery.k8s.io/v1, available since k8s 1.21)
30+
if clusterInfo.HasEndpointSlices() {
31+
logger.V(1).Info("Using EndpointSlice API to get API server endpoint IPs")
32+
ips, err = getEndpointIPsFromEndpointSlice(ctx, cl)
33+
if err != nil {
34+
return nil, fmt.Errorf("failed to get API server endpoint IPs from EndpointSlice: %w", err)
35+
}
36+
logger.V(1).Info("Retrieved API server endpoint IPs from EndpointSlice", "ips", ips)
37+
} else {
38+
// Fallback to Endpoints API (core/v1, deprecated but widely available)
39+
logger.V(1).Info("EndpointSlice API not available, using Endpoints API")
40+
ips, err = getEndpointIPsFromEndpoints(ctx, cl)
41+
if err != nil {
42+
return nil, fmt.Errorf("failed to get API server endpoint IPs from Endpoints: %w", err)
43+
}
44+
logger.V(1).Info("Retrieved API server endpoint IPs from Endpoints", "ips", ips)
45+
}
46+
47+
if len(ips) == 0 {
48+
return nil, fmt.Errorf("no API server endpoint IPs found")
49+
}
50+
51+
return ips, nil
52+
}
53+
54+
// getEndpointIPsFromEndpointSlice retrieves endpoint IPs using the EndpointSlice API
55+
func getEndpointIPsFromEndpointSlice(ctx context.Context, cl client.Client) ([]string, error) {
56+
// Get the EndpointSlice directly by name
57+
endpointSlice := &discoveryv1.EndpointSlice{}
58+
err := cl.Get(ctx, types.NamespacedName{
59+
Name: kubernetesServiceName,
60+
Namespace: kubernetesServiceNamespace,
61+
}, endpointSlice)
62+
if err != nil {
63+
if errors.IsNotFound(err) {
64+
return nil, fmt.Errorf("EndpointSlice for kubernetes service not found")
65+
}
66+
return nil, err
67+
}
68+
69+
var ips []string
70+
for j := range endpointSlice.Endpoints {
71+
endpoint := &endpointSlice.Endpoints[j]
72+
// Only use ready endpoints
73+
if endpoint.Conditions.Ready != nil && *endpoint.Conditions.Ready {
74+
ips = append(ips, endpoint.Addresses...)
75+
}
76+
}
77+
78+
return ips, nil
79+
}
80+
81+
// getEndpointIPsFromEndpoints retrieves endpoint IPs using the legacy Endpoints API
82+
func getEndpointIPsFromEndpoints(ctx context.Context, cl client.Client) ([]string, error) {
83+
//nolint:staticcheck // SA1019: Endpoints is deprecated but used as fallback for k8s < 1.21
84+
endpoints := &corev1.Endpoints{}
85+
err := cl.Get(ctx, types.NamespacedName{
86+
Name: kubernetesServiceName,
87+
Namespace: kubernetesServiceNamespace,
88+
}, endpoints)
89+
if err != nil {
90+
if errors.IsNotFound(err) {
91+
return nil, fmt.Errorf("endpoints for kubernetes service not found")
92+
}
93+
return nil, err
94+
}
95+
96+
var ips []string
97+
for _, subset := range endpoints.Subsets {
98+
for _, address := range subset.Addresses {
99+
ips = append(ips, address.IP)
100+
}
101+
}
102+
103+
return ips, nil
104+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package networkpolicy
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestIPToCIDR(t *testing.T) {
10+
assert := assert.New(t)
11+
12+
// Test IPv4 addresses
13+
assert.Equal("192.168.1.1/32", ipToCIDR("192.168.1.1"))
14+
assert.Equal("10.0.0.1/32", ipToCIDR("10.0.0.1"))
15+
assert.Equal("172.20.0.1/32", ipToCIDR("172.20.0.1"))
16+
17+
// Test IPv6 addresses
18+
assert.Equal("2001:db8::1/128", ipToCIDR("2001:db8::1"))
19+
assert.Equal("fe80::1/128", ipToCIDR("fe80::1"))
20+
assert.Equal("::1/128", ipToCIDR("::1"))
21+
22+
// Test invalid IP
23+
assert.Equal("", ipToCIDR("invalid"))
24+
assert.Equal("", ipToCIDR(""))
25+
assert.Equal("", ipToCIDR("256.256.256.256"))
26+
}

internal/controller/networkpolicy/np_controller.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,24 @@ func (r *Reconciler) Reconcile(ctx context.Context, _ ctrl.Request) (ctrl.Result
7070
}
7171

7272
func (r *Reconciler) reconcile(ctx context.Context, clh *helper.Client, desired *flowslatest.FlowCollector) error {
73+
l := log.FromContext(ctx)
74+
7375
cni, err := r.mgr.ClusterInfo.GetCNI()
7476
if err != nil {
7577
return err
7678
}
77-
npName, desiredNp := buildMainNetworkPolicy(desired, r.mgr, cni)
79+
80+
// Get API server endpoint IPs for network policy
81+
var apiServerIPs []string
82+
if r.mgr.ClusterInfo.IsOpenShift() {
83+
apiServerIPs, err = GetAPIServerEndpointIPs(ctx, r.Client, r.mgr.ClusterInfo)
84+
if err != nil {
85+
l.Error(err, "Failed to get API server endpoint IPs")
86+
return fmt.Errorf("cannot determine API server endpoint IPs: %w", err)
87+
}
88+
}
89+
90+
npName, desiredNp := buildMainNetworkPolicy(desired, r.mgr, cni, apiServerIPs)
7891
if err := reconcilers.ReconcileNetworkPolicy(ctx, clh, npName, desiredNp); err != nil {
7992
return err
8093
}

internal/controller/networkpolicy/np_objects.go

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package networkpolicy
22

33
import (
4+
"net"
5+
46
flowslatest "github.com/netobserv/network-observability-operator/api/flowcollector/v1beta2"
57
"github.com/netobserv/network-observability-operator/internal/controller/constants"
68
"github.com/netobserv/network-observability-operator/internal/pkg/cluster"
@@ -16,6 +18,22 @@ import (
1618

1719
const netpolName = "netobserv"
1820

21+
// ipToCIDR converts an IP address to a CIDR with proper prefix length
22+
// IPv4 addresses get /32, IPv6 addresses get /128
23+
func ipToCIDR(ipStr string) string {
24+
ip := net.ParseIP(ipStr)
25+
if ip == nil {
26+
return ""
27+
}
28+
29+
// Check if it's IPv4 (net.IP.To4() returns nil for IPv6)
30+
if ip.To4() != nil {
31+
return ipStr + "/32"
32+
}
33+
// IPv6
34+
return ipStr + "/128"
35+
}
36+
1937
func peerInNamespace(ns string) networkingv1.NetworkPolicyPeer {
2038
return networkingv1.NetworkPolicyPeer{
2139
NamespaceSelector: &metav1.LabelSelector{
@@ -50,7 +68,7 @@ func addAllowedNamespaces(np *networkingv1.NetworkPolicy, in, out []string) {
5068
}
5169
}
5270

53-
func buildMainNetworkPolicy(desired *flowslatest.FlowCollector, mgr *manager.Manager, cni cluster.NetworkType) (types.NamespacedName, *networkingv1.NetworkPolicy) {
71+
func buildMainNetworkPolicy(desired *flowslatest.FlowCollector, mgr *manager.Manager, cni cluster.NetworkType, apiServerIPs []string) (types.NamespacedName, *networkingv1.NetworkPolicy) {
5472
ns := desired.Spec.GetNamespace()
5573

5674
name := types.NamespacedName{Name: netpolName, Namespace: ns}
@@ -156,7 +174,8 @@ func buildMainNetworkPolicy(desired *flowslatest.FlowCollector, mgr *manager.Man
156174
},
157175
Ports: hostNetworkPorts,
158176
})
159-
// Allow fetching from apiserver
177+
178+
// Allow fetching from in-cluster apiserver namespaces
160179
np.Spec.Egress = append(np.Spec.Egress, networkingv1.NetworkPolicyEgressRule{
161180
To: []networkingv1.NetworkPolicyPeer{
162181
peerInNamespaces([]string{constants.OpenShiftAPIServerNamespace, constants.OpenShiftKubeAPIServerNamespace}),
@@ -166,6 +185,30 @@ func buildMainNetworkPolicy(desired *flowslatest.FlowCollector, mgr *manager.Man
166185
Port: ptr.To(intstr.FromInt32(constants.K8sAPIServerPort)),
167186
}},
168187
})
188+
189+
// Allow fetching from external apiserver (HyperShift and other external control planes)
190+
// The kubernetes service may redirect to external endpoints on port 6443
191+
if len(apiServerIPs) > 0 {
192+
// Build a single egress rule with multiple IP peers
193+
peers := []networkingv1.NetworkPolicyPeer{}
194+
for _, ip := range apiServerIPs {
195+
cidr := ipToCIDR(ip)
196+
if cidr != "" {
197+
peers = append(peers, networkingv1.NetworkPolicyPeer{
198+
IPBlock: &networkingv1.IPBlock{
199+
CIDR: cidr,
200+
},
201+
})
202+
}
203+
}
204+
np.Spec.Egress = append(np.Spec.Egress, networkingv1.NetworkPolicyEgressRule{
205+
To: peers,
206+
Ports: []networkingv1.NetworkPolicyPort{{
207+
Protocol: ptr.To(corev1.ProtocolTCP),
208+
Port: ptr.To(intstr.FromInt32(constants.K8sAPIServerPort)),
209+
}},
210+
})
211+
}
169212
} else {
170213
// Not OpenShift
171214
// Allow fetching from apiserver / kube-system

0 commit comments

Comments
 (0)