@@ -35,6 +35,7 @@ import (
3535 "k8s.io/apimachinery/pkg/util/runtime"
3636 "k8s.io/client-go/informers"
3737 v2 "k8s.io/client-go/informers/core/v1"
38+ networkingv1 "k8s.io/client-go/informers/networking/v1"
3839 "k8s.io/client-go/kubernetes"
3940 kclientcmd "k8s.io/client-go/tools/clientcmd"
4041 "k8s.io/klog/v2"
@@ -303,6 +304,10 @@ func main() {
303304 return SyncServicePods (ctx , kdb , factory .Core ().V1 ().Services (), factory .Core ().V1 ().Pods ())
304305 })
305306
307+ g .Go (func () error {
308+ return SyncIngressServiceUuid (ctx , kdb , factory .Networking ().V1 ().Ingresses ())
309+ })
310+
306311 err = internal .SyncPrometheusConfig (ctx , db , & cfg .Prometheus , clusterInstance .Uuid )
307312 if err != nil {
308313 klog .Error (errors .Wrap (err , "cannot sync prometheus config" ))
@@ -702,3 +707,105 @@ func SyncServicePods(ctx context.Context, db *kdatabase.Database, serviceList v2
702707
703708 return g .Wait ()
704709}
710+
711+ func SyncIngressServiceUuid (ctx context.Context , db * kdatabase.Database , ingressList networkingv1.IngressInformer ) error {
712+ // TODO: Respect delete events. At the moment, service link entries will only be deleted if the corresponding ingress is deleted.
713+ ingressBackendServices := make (chan any )
714+ ingressRules := make (chan any )
715+
716+ g , ctx := errgroup .WithContext (ctx )
717+
718+ g .Go (func () error {
719+ return db .UpsertStreamed (ctx , ingressBackendServices )
720+ })
721+
722+ g .Go (func () error {
723+ return db .UpsertStreamed (ctx , ingressRules )
724+ })
725+
726+ g .Go (func () error {
727+ ch := cachev1 .Multiplexers ().Services ().UpsertEvents ().Out ()
728+ for {
729+ select {
730+ case service , more := <- ch :
731+ if ! more {
732+ return nil
733+ }
734+
735+ ingresses , err := ingressList .Lister ().Ingresses (service .(* schemav1.Service ).Namespace ).List (labels .Everything ())
736+ if err != nil {
737+ return err
738+ }
739+
740+ for _ , ingress := range ingresses {
741+ if ingress .Spec .DefaultBackend != nil {
742+ if ingress .Spec .DefaultBackend .Service != nil {
743+ if ingress .Spec .DefaultBackend .Service .Name == service .(* schemav1.Service ).Name {
744+ select {
745+ case ingressBackendServices <- schemav1.IngressBackendService {
746+ ServiceUuid : service .(* schemav1.Service ).Uuid ,
747+ IngressUuid : schemav1 .EnsureUUID (ingress .UID ),
748+ ServiceName : ingress .Spec .DefaultBackend .Service .Name ,
749+ ServicePortName : ingress .Spec .DefaultBackend .Service .Port .Name ,
750+ ServicePortNumber : ingress .Spec .DefaultBackend .Service .Port .Number ,
751+ }:
752+ case <- ctx .Done ():
753+ return ctx .Err ()
754+ }
755+ }
756+ }
757+ }
758+
759+ for _ , rules := range ingress .Spec .Rules {
760+ if rules .HTTP == nil {
761+ continue
762+ }
763+
764+ for _ , ruleValue := range rules .HTTP .Paths {
765+ if ruleValue .Backend .Service == nil {
766+ continue
767+ }
768+
769+ serviceName := ruleValue .Backend .Service .Name
770+ if service .(* schemav1.Service ).Name == serviceName {
771+ serviceUuid := service .(* schemav1.Service ).Uuid
772+ ingressUuid := schemav1 .EnsureUUID (ingress .UID )
773+ ingressRuleUuid := schemav1 .NewUUID (ingressUuid , rules .Host + ruleValue .Path + serviceName )
774+
775+ select {
776+ case ingressBackendServices <- schemav1.IngressBackendService {
777+ ServiceUuid : serviceUuid ,
778+ IngressUuid : ingressUuid ,
779+ IngressRuleUuid : ingressRuleUuid ,
780+ ServiceName : serviceName ,
781+ ServicePortName : ruleValue .Backend .Service .Port .Name ,
782+ ServicePortNumber : ruleValue .Backend .Service .Port .Number ,
783+ }:
784+ case <- ctx .Done ():
785+ return ctx .Err ()
786+ }
787+
788+ select {
789+ case ingressRules <- schemav1.IngressRule {
790+ Uuid : ingressRuleUuid ,
791+ BackendUuid : serviceUuid ,
792+ IngressUuid : ingressUuid ,
793+ Host : schemav1 .NewNullableString (rules .Host ),
794+ Path : schemav1 .NewNullableString (ruleValue .Path ),
795+ PathType : string (* ruleValue .PathType ),
796+ }:
797+ case <- ctx .Done ():
798+ return ctx .Err ()
799+ }
800+ }
801+ }
802+ }
803+ }
804+ case <- ctx .Done ():
805+ return ctx .Err ()
806+ }
807+ }
808+ })
809+
810+ return g .Wait ()
811+ }
0 commit comments