Unverified Commit a5351292 authored by OpenShift Merge Robot's avatar OpenShift Merge Robot Committed by GitHub
Browse files

Merge pull request #622 from Miciah/BZ1960284-set-the-local-with-fallback-service-annotation

Bug 1960284: Set the "local-with-fallback" service annotation
parents ed74bed0 fcc0f4a2
......@@ -2,6 +2,7 @@ package ingress
import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
......@@ -21,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
crclient "sigs.k8s.io/controller-runtime/pkg/client"
)
......@@ -104,6 +106,12 @@ const (
// openstackInternalLBAnnotation is the annotation used on a service to specify an
// OpenStack load balancer as being internal.
openstackInternalLBAnnotation = "service.beta.kubernetes.io/openstack-internal-load-balancer"
// localWithFallbackAnnotation is the annotation used on a service that
// has "Local" external traffic policy to indicate that the service
// proxy should prefer using a local endpoint but forward traffic to any
// available endpoint if no local endpoint is available.
localWithFallbackAnnotation = "traffic-policy.network.alpha.openshift.io/local-with-fallback"
)
var (
......@@ -298,12 +306,50 @@ func desiredLoadBalancerService(ci *operatorv1.IngressController, deploymentRef
}
// Azure load balancers are not customizable and are set to (2 fail @ 5s interval, 2 healthy)
// GCP load balancers are not customizable and are set to (3 fail @ 8s interval, 1 healthy)
if v, err := shouldUseLocalWithFallback(ci, service); err != nil {
return true, service, err
} else if v {
service.Annotations[localWithFallbackAnnotation] = ""
}
}
service.SetOwnerReferences([]metav1.OwnerReference{deploymentRef})
return true, service, nil
}
// shouldUseLocalWithFallback returns a Boolean value indicating whether the
// local-with-fallback annotation should be set for the given service, and
// returns an error if the given ingresscontroller has an invalid unsupported
// config override.
func shouldUseLocalWithFallback(ic *operatorv1.IngressController, service *corev1.Service) (bool, error) {
// By default, use local-with-fallback when using the "Local" external
// traffic policy.
if service.Spec.ExternalTrafficPolicy != corev1.ServiceExternalTrafficPolicyTypeLocal {
return false, nil
}
// Allow the user to override local-with-fallback.
if len(ic.Spec.UnsupportedConfigOverrides.Raw) > 0 {
var unsupportedConfigOverrides struct {
LocalWithFallback string `json:"localWithFallback"`
}
if err := json.Unmarshal(ic.Spec.UnsupportedConfigOverrides.Raw, &unsupportedConfigOverrides); err != nil {
return false, fmt.Errorf("ingresscontroller %q has invalid spec.unsupportedConfigOverrides: %w", ic.Name, err)
}
override := unsupportedConfigOverrides.LocalWithFallback
if len(override) != 0 {
if val, err := strconv.ParseBool(override); err != nil {
return false, fmt.Errorf("ingresscontroller %q has invalid spec.unsupportedConfigOverrides.localWithFallback: %w", ic.Name, err)
} else {
return val, nil
}
}
}
return true, nil
}
// currentLoadBalancerService returns any existing LB service for the
// ingresscontroller.
func (r *reconciler) currentLoadBalancerService(ci *operatorv1.IngressController) (bool, *corev1.Service, error) {
......@@ -394,14 +440,31 @@ func (r *reconciler) updateLoadBalancerService(current, desired *corev1.Service,
return true, nil
}
// managedLoadBalancerServiceAnnotations is a set of annotation keys for
// annotations that the operator manages for LoadBalancer-type services.
var managedLoadBalancerServiceAnnotations = sets.NewString(
awsLBHealthCheckIntervalAnnotation,
GCPGlobalAccessAnnotation,
localWithFallbackAnnotation,
)
// loadBalancerServiceChanged checks if the current load balancer service
// matches the expected and if not returns an updated one.
func loadBalancerServiceChanged(current, expected *corev1.Service) (bool, *corev1.Service) {
annotationCmpOpts := []cmp.Option{
cmpopts.IgnoreMapEntries(func(k, _ string) bool {
return !managedLoadBalancerServiceAnnotations.Has(k)
}),
}
if cmp.Equal(current.Annotations, expected.Annotations, annotationCmpOpts...) {
return false, nil
}
updated := current.DeepCopy()
changed := false
// Preserve everything but the AWS LB health check interval annotation &
// GCP Global Access internal Load Balancer annotation.
// Preserve everything but the AWS LB health check interval annotation,
// GCP Global Access internal Load Balancer annotation, and
// local-with-fallback annotation
// (see <https://bugzilla.redhat.com/show_bug.cgi?id=1908758>).
// Updating annotations and spec fields cannot be done unless the
// previous release blocks upgrades when the user has modified those
......@@ -409,18 +472,15 @@ func loadBalancerServiceChanged(current, expected *corev1.Service) (bool, *corev
if updated.Annotations == nil {
updated.Annotations = map[string]string{}
}
if current.Annotations[awsLBHealthCheckIntervalAnnotation] != expected.Annotations[awsLBHealthCheckIntervalAnnotation] {
updated.Annotations[awsLBHealthCheckIntervalAnnotation] = expected.Annotations[awsLBHealthCheckIntervalAnnotation]
changed = true
}
if current.Annotations[GCPGlobalAccessAnnotation] != expected.Annotations[GCPGlobalAccessAnnotation] {
updated.Annotations[GCPGlobalAccessAnnotation] = expected.Annotations[GCPGlobalAccessAnnotation]
changed = true
}
if !changed {
return false, nil
for annotation := range managedLoadBalancerServiceAnnotations {
currentVal, have := current.Annotations[annotation]
expectedVal, want := expected.Annotations[annotation]
if want && (!have || currentVal != expectedVal) {
updated.Annotations[annotation] = expected.Annotations[annotation]
} else if have && !want {
delete(updated.Annotations, annotation)
}
}
return true, updated
......
......@@ -10,6 +10,7 @@ import (
operatorv1 "github.com/openshift/api/operator/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
)
......@@ -319,6 +320,9 @@ func TestDesiredLoadBalancerService(t *testing.T) {
if err := checkServiceHasAnnotation(svc, awsLBHealthCheckHealthyThresholdAnnotation, true, awsLBHealthCheckHealthyThresholdDefault); err != nil {
t.Errorf("annotation check for test %q failed: %v", tc.description, err)
}
if err := checkServiceHasAnnotation(svc, localWithFallbackAnnotation, true, ""); err != nil {
t.Errorf("local-with-fallback annotation check for test %q failed: %v", tc.description, err)
}
classicLB := tc.lbStrategy.ProviderParameters == nil || tc.lbStrategy.ProviderParameters.AWS.Type == operatorv1.AWSClassicLoadBalancer
switch {
case classicLB:
......@@ -388,6 +392,9 @@ func TestDesiredLoadBalancerService(t *testing.T) {
t.Errorf("annotation check for test %q failed; unexpected annotation %s", tc.description, azureInternalLBAnnotation)
}
}
if err := checkServiceHasAnnotation(svc, localWithFallbackAnnotation, true, ""); err != nil {
t.Errorf("local-with-fallback annotation check for test %q failed: %v", tc.description, err)
}
case configv1.GCPPlatformType:
if isInternal {
if err := checkServiceHasAnnotation(svc, gcpLBTypeAnnotation, true, "Internal"); err != nil {
......@@ -399,6 +406,9 @@ func TestDesiredLoadBalancerService(t *testing.T) {
t.Errorf("annotation check for test %q failed; unexpected annotation %s", tc.description, gcpLBTypeAnnotation)
}
}
if err := checkServiceHasAnnotation(svc, localWithFallbackAnnotation, true, ""); err != nil {
t.Errorf("local-with-fallback annotation check for test %q failed: %v", tc.description, err)
}
case configv1.OpenStackPlatformType:
if isInternal {
if err := checkServiceHasAnnotation(svc, openstackInternalLBAnnotation, true, "true"); err != nil {
......@@ -410,6 +420,9 @@ func TestDesiredLoadBalancerService(t *testing.T) {
t.Errorf("annotation check for test %q failed; unexpected annotation %s", tc.description, openstackInternalLBAnnotation)
}
}
if err := checkServiceHasAnnotation(svc, localWithFallbackAnnotation, true, ""); err != nil {
t.Errorf("local-with-fallback annotation check for test %q failed: %v", tc.description, err)
}
}
}
}
......@@ -439,6 +452,72 @@ func checkServiceHasAnnotation(svc *corev1.Service, name string, expectValue boo
}
}
// TestShouldUseLocalWithFallback verifies that shouldUseLocalWithFallback
// behaves as expected.
func TestShouldUseLocalWithFallback(t *testing.T) {
testCases := []struct {
description string
local bool
override string
expect bool
expectError bool
}{
{
description: "if using Cluster without an override",
local: false,
expect: false,
},
{
description: "if using Local without an override",
local: true,
expect: true,
},
{
description: "if using Local with an override",
local: true,
override: `{"localWithFallback":"false"}`,
expect: false,
},
{
description: "if using Local with a garbage override",
local: true,
override: `{"localWithFallback":"x"}`,
expectError: true,
},
}
for _, tc := range testCases {
var override []byte
if len(tc.override) != 0 {
override = []byte(tc.override)
}
ic := &operatorv1.IngressController{
Spec: operatorv1.IngressControllerSpec{
UnsupportedConfigOverrides: runtime.RawExtension{
Raw: override,
},
},
}
policy := corev1.ServiceExternalTrafficPolicyTypeCluster
if tc.local {
policy = corev1.ServiceExternalTrafficPolicyTypeLocal
}
service := corev1.Service{
Spec: corev1.ServiceSpec{
ExternalTrafficPolicy: policy,
},
}
actual, err := shouldUseLocalWithFallback(ic, &service)
switch {
case !tc.expectError && err != nil:
t.Errorf("%q: unexpected error: %w", tc.description, err)
case tc.expectError && err == nil:
t.Errorf("%q: expected error, got nil", tc.description)
case tc.expect != actual:
t.Errorf("%q: expected %t, got %t", tc.description, tc.expect, actual)
}
}
}
func TestLoadBalancerServiceChanged(t *testing.T) {
testCases := []struct {
description string
......@@ -478,6 +557,13 @@ func TestLoadBalancerServiceChanged(t *testing.T) {
},
expect: false,
},
{
description: "if the local-with-fallback annotation is added",
mutate: func(svc *corev1.Service) {
svc.Annotations[localWithFallbackAnnotation] = ""
},
expect: true,
},
{
description: "if .spec.healthCheckNodePort changes",
mutate: func(svc *corev1.Service) {
......@@ -548,6 +634,13 @@ func TestLoadBalancerServiceChanged(t *testing.T) {
},
expect: true,
},
{
description: "if the service.beta.kubernetes.io/aws-load-balancer-healthcheck-interval annotation is deleted",
mutate: func(svc *corev1.Service) {
delete(svc.Annotations, "service.beta.kubernetes.io/aws-load-balancer-healthcheck-interval")
},
expect: true,
},
}
for _, tc := range testCases {
......
......@@ -15,6 +15,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/intstr"
)
......@@ -46,7 +47,10 @@ func (r *reconciler) ensureNodePortService(ic *operatorv1.IngressController, dep
}
}
wantService, desired := desiredNodePortService(ic, deploymentRef, wantMetricsPort)
wantService, desired, err := desiredNodePortService(ic, deploymentRef, wantMetricsPort)
if err != nil {
return false, nil, err
}
switch {
case !wantService && !haveService:
......@@ -79,16 +83,17 @@ func (r *reconciler) ensureNodePortService(ic *operatorv1.IngressController, dep
// desiredNodePortService returns a Boolean indicating whether a NodePort
// service is desired, as well as the NodePort service if one is desired.
func desiredNodePortService(ic *operatorv1.IngressController, deploymentRef metav1.OwnerReference, wantMetricsPort bool) (bool, *corev1.Service) {
func desiredNodePortService(ic *operatorv1.IngressController, deploymentRef metav1.OwnerReference, wantMetricsPort bool) (bool, *corev1.Service, error) {
if ic.Status.EndpointPublishingStrategy.Type != operatorv1.NodePortServiceStrategyType {
return false, nil
return false, nil, nil
}
name := controller.NodePortServiceName(ic)
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: name.Namespace,
Name: name.Name,
Annotations: map[string]string{},
Namespace: name.Namespace,
Name: name.Name,
Labels: map[string]string{
"app": "router",
"router": name.Name,
......@@ -126,7 +131,13 @@ func desiredNodePortService(ic *operatorv1.IngressController, deploymentRef meta
service.Spec.Ports = service.Spec.Ports[0:2]
}
return true, service
if v, err := shouldUseLocalWithFallback(ic, service); err != nil {
return true, service, err
} else if v {
service.Annotations[localWithFallbackAnnotation] = ""
}
return true, service, nil
}
// currentNodePortService returns a Boolean indicating whether a NodePort
......@@ -160,9 +171,17 @@ func (r *reconciler) updateNodePortService(current, desired *corev1.Service) (bo
return true, nil
}
// managedNodePortServiceAnnotations is a set of annotation keys for annotations
// that the operator manages for NodePort-type services.
var managedNodePortServiceAnnotations = sets.NewString(
localWithFallbackAnnotation,
)
// nodePortServiceChanged checks if the current NodePort service spec matches
// the expected spec and if not returns an updated one.
func nodePortServiceChanged(current, expected *corev1.Service) (bool, *corev1.Service) {
changed := false
serviceCmpOpts := []cmp.Option{
// Ignore fields that the API, other controllers, or user may
// have modified.
......@@ -171,13 +190,38 @@ func nodePortServiceChanged(current, expected *corev1.Service) (bool, *corev1.Se
cmp.Comparer(cmpServiceAffinity),
cmpopts.EquateEmpty(),
}
if cmp.Equal(current.Spec, expected.Spec, serviceCmpOpts...) {
if !cmp.Equal(current.Spec, expected.Spec, serviceCmpOpts...) {
changed = true
}
annotationCmpOpts := []cmp.Option{
cmpopts.IgnoreMapEntries(func(k, _ string) bool {
return !managedNodePortServiceAnnotations.Has(k)
}),
}
if !cmp.Equal(current.Annotations, expected.Annotations, annotationCmpOpts...) {
changed = true
}
if !changed {
return false, nil
}
updated := current.DeepCopy()
updated.Spec = expected.Spec
if updated.Annotations == nil {
updated.Annotations = map[string]string{}
}
for annotation := range managedNodePortServiceAnnotations {
currentVal, have := current.Annotations[annotation]
expectedVal, want := expected.Annotations[annotation]
if want && (!have || currentVal != expectedVal) {
updated.Annotations[annotation] = expected.Annotations[annotation]
} else if have && !want {
delete(updated.Annotations, annotation)
}
}
// Preserve fields that the API, other controllers, or user may have
// modified.
updated.Spec.ClusterIP = current.Spec.ClusterIP
......
......@@ -37,6 +37,9 @@ func TestDesiredNodePortService(t *testing.T) {
expect: true,
expectService: corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
localWithFallbackAnnotation: "",
},
Namespace: "openshift-ingress",
Name: "router-nodeport-default",
Labels: map[string]string{
......@@ -75,6 +78,9 @@ func TestDesiredNodePortService(t *testing.T) {
expect: true,
expectService: corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
localWithFallbackAnnotation: "",
},
Namespace: "openshift-ingress",
Name: "router-nodeport-default",
Labels: map[string]string{
......@@ -126,8 +132,10 @@ func TestDesiredNodePortService(t *testing.T) {
},
},
}
want, svc := desiredNodePortService(ic, deploymentRef, tc.wantMetricsPort)
if want != tc.expect {
want, svc, err := desiredNodePortService(ic, deploymentRef, tc.wantMetricsPort)
if err != nil {
t.Errorf("unexpected error from desiredNodePortService: %w", err)
} else if want != tc.expect {
t.Errorf("expected desiredNodePortService to return %t for endpoint publishing strategy type %v, got %t, with service %#v", tc.expect, tc.strategyType, want, svc)
} else if tc.expect && !reflect.DeepEqual(svc, &tc.expectService) {
t.Errorf("expected desiredNodePortService to return %#v, got %#v", &tc.expectService, svc)
......@@ -175,6 +183,20 @@ func TestNodePortServiceChanged(t *testing.T) {
},
expect: true,
},
{
description: "if the local-with-fallback annotation changes",
mutate: func(svc *corev1.Service) {
svc.Annotations["traffic-policy.network.alpha.openshift.io/local-with-fallback"] = "x"
},
expect: true,
},
{
description: "if the local-with-fallback annotation is deleted",
mutate: func(svc *corev1.Service) {
delete(svc.Annotations, "traffic-policy.network.alpha.openshift.io/local-with-fallback")
},
expect: true,
},
{
description: "if .spec.healthCheckNodePort changes",
mutate: func(svc *corev1.Service) {
......@@ -236,6 +258,9 @@ func TestNodePortServiceChanged(t *testing.T) {
for _, tc := range testCases {
original := corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
"traffic-policy.network.alpha.openshift.io/local-with-fallback": "",
},
Namespace: "openshift-ingress",
Name: "router-original",
UID: "1",
......
......@@ -1978,6 +1978,128 @@ func TestLoadBalancingAlgorithmUnsupportedConfigOverride(t *testing.T) {
}
}
// TestLocalWithFallbackOverrideForLoadBalancerService verifies that the
// operator does not set the local-with-fallback annotation on a LoadBalancer
// service if the the localWithFallback unsupported config override is set to
// "false".
//
// Note: This test mutates the default ingresscontroller rather than creating a
// new one to reduce the risk of failing due to cloud provider API throttling.
func TestLocalWithFallbackOverrideForLoadBalancerService(t *testing.T) {
supportedPlatforms := map[configv1.PlatformType]struct{}{
configv1.AWSPlatformType: {},
configv1.AzurePlatformType: {},
configv1.GCPPlatformType: {},
}
platform := infraConfig.Status.Platform
if _, supported := supportedPlatforms[platform]; !supported {
t.Skipf("test skipped on platform %q", platform)
}
ic := &operatorv1.IngressController{}
if err := kclient.Get(context.TODO(), defaultName, ic); err != nil {
t.Fatalf("failed to get ingresscontroller %q: %v", defaultName, err)
}
if err := waitForIngressControllerCondition(t, kclient, 5*time.Minute, defaultName, defaultAvailableConditions...); err != nil {
t.Fatalf("failed to observe expected conditions: %v", err)
}
service := &corev1.Service{}
serviceName := controller.LoadBalancerServiceName(ic)
if err := kclient.Get(context.TODO(), serviceName, service); err != nil {
t.Fatalf("failed to get service %q: %v", serviceName, err)
}
const annotation = "traffic-policy.network.alpha.openshift.io/local-with-fallback"
if _, ok := service.Annotations[annotation]; !ok {
t.Fatalf("failed to observe the %q annotation on service %q", annotation, serviceName)
}
ic.Spec.UnsupportedConfigOverrides = runtime.RawExtension{
Raw: []byte(`{"localWithFallback":"false"}`),
}
if err := kclient.Update(context.TODO(), ic); err != nil {
t.Fatalf("failed to update ingresscontroller %q with override: %v", defaultName, err)
}
defer func() {
if err := kclient.Get(context.TODO(), defaultName, ic); err != nil {
t.Fatalf("failed to get ingresscontroller %q: %v", defaultName, err)
}
ic.Spec.UnsupportedConfigOverrides = runtime.RawExtension{}
if err := kclient.Update(context.TODO(), ic); err != nil {
t.Fatalf("failed to update ingresscontroller %q to remove the override: %v", defaultName, err)
}
}()
wait.PollImmediate(1*time.Second, 30*time.Second, func() (bool, error) {
if err := kclient.Get(context.TODO(), serviceName, service); err != nil {
t.Logf("failed to get service %q: %v", serviceName, err)
return false, nil
}
_, ok := service.Annotations[annotation]
return !ok, nil
})
if _, ok := service.Annotations[annotation]; ok {
t.Fatalf("failed to observe removal of the %q annotation on service %q", annotation, serviceName)
}
}
// TestLocalWithFallbackOverrideForNodePortService verifies that the operator
// does not set the local-with-fallback annotation on a NodePort service if the
// the localWithFallback unsupported config override is set to "false".
func TestLocalWithFallbackOverrideForNodePortService(t *testing.T) {
icName := types.NamespacedName{
Namespace: operatorNamespace,
Name: "local-with-fallback",
}
domain := icName.Name + "." + dnsConfig.Spec.BaseDomain
ic := newNodePortController(icName, domain)
if err := kclient.Create(context.TODO(), ic); err != nil {
t.Fatalf("failed to create ingresscontroller %q: %v", icName, err)
}
defer assertIngressControllerDeleted(t, kclient, ic)
if err := waitForIngressControllerCondition(t, kclient, 5*time.Minute, icName, availableConditionsForIngressControllerWithNodePort...); err != nil {
t.Fatalf("failed to observe expected conditions: %v", err)
}
service := &corev1.Service{}
serviceName := controller.NodePortServiceName(ic)
if err := kclient.Get(context.TODO(), serviceName, service); err != nil {
t.Fatalf("failed to get service %q: %v", serviceName, err)
}
const annotation = "traffic-policy.network.alpha.openshift.io/local-with-fallback"
if _, ok := service.Annotations[annotation]; !ok {
t.Fatalf("failed to observe the %q annotation on ingresscontroller %q", annotation, icName)
}
if err := kclient.Get(context.TODO(), icName, ic); err != nil {
t.Fatalf("failed to get ingresscontroller %q: %v", icName, err)
}
ic.Spec.UnsupportedConfigOverrides = runtime.RawExtension{
Raw: []byte(`{"localWithFallback":"false"}`),
}
if err := kclient.Update(context.TODO(), ic); err != nil {
t.Fatalf("failed to update ingresscontroller %q with override: %v", icName, err)
}
wait.PollImmediate(1*time.Second, 30*time.Second, func() (bool, error) {
if err := kclient.Get(context.TODO(), serviceName, service); err != nil {
t.Logf("failed to get service %q: %v", serviceName, err)
return false, nil
}
_, ok := service.Annotations[annotation]
return !ok, nil
})
if _, ok := service.Annotations[annotation]; ok {
t.Fatalf("failed to observe removal of the %q annotation on service %q", annotation, serviceName)
}
}
func newLoadBalancerController(name types.NamespacedName, domain string) *operatorv1.IngressController {