Skip to content

Commit 8fb42eb

Browse files
committed
Add DirectSourceFetch feature gate to bypass cache for source objects
This feature gate enables fetching source objects (GitRepository, OCIRepository, Bucket) directly from the API server using APIReader, bypassing the controller's cache. This can be useful when immediate consistency is required for source object reads. When enabled via --feature-gates=DirectSourceFetch=true: Source objects are fetched using r.APIReader instead of r.Client A log message is emitted at startup indicating the feature is active Changes: Add DirectSourceFetch field to ArtifactGeneratorReconciler struct Update getSource() to use APIReader when feature is enabled Register feature gate with default value false (opt-in) Add unit tests for sources Update pkg/runtime dependency to v0.100.1 Signed-off-by: Dipti Pai <diptipai89@outlook.com>
1 parent 22df788 commit 8fb42eb

5 files changed

Lines changed: 158 additions & 10 deletions

File tree

cmd/main.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,15 @@ func main() {
182182
os.Exit(1)
183183
}
184184

185+
directSourceFetch, err := gotkfeatures.Enabled(gotkctrl.FeatureGateDirectSourceFetch)
186+
if err != nil {
187+
setupLog.Error(err, "unable to check feature gate "+gotkctrl.FeatureGateDirectSourceFetch)
188+
os.Exit(1)
189+
}
190+
if directSourceFetch {
191+
setupLog.Info("DirectSourceFetch feature gate is enabled, sources will be fetched directly from the API server bypassing the cache")
192+
}
193+
185194
// Note that the liveness check will pass beyond this point, but the readiness
186195
// check will continue to fail until this controller instance is elected leader.
187196
gotkprobes.SetupChecks(mgr, setupLog)
@@ -198,6 +207,7 @@ func main() {
198207
Storage: artifactStorage,
199208
ArtifactFetchRetries: httpRetry,
200209
DependencyRequeueInterval: requeueDependency,
210+
DirectSourceFetch: directSourceFetch,
201211
NoCrossNamespaceRefs: aclOptions.NoCrossNamespaceRefs,
202212
}).SetupWithManager(ctx, mgr, controller.ArtifactGeneratorReconcilerOptions{
203213
RateLimiter: gotkctrl.GetRateLimiter(rateLimiterOptions),

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ require (
1414
github.com/fluxcd/pkg/apis/meta v1.25.0
1515
github.com/fluxcd/pkg/artifact v0.8.0
1616
github.com/fluxcd/pkg/http/fetch v0.22.0
17-
github.com/fluxcd/pkg/runtime v0.100.0
17+
github.com/fluxcd/pkg/runtime v0.100.1
1818
github.com/fluxcd/pkg/tar v0.17.0
1919
github.com/fluxcd/pkg/testserver v0.13.0
2020
github.com/fluxcd/source-controller/api v1.7.2

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ github.com/fluxcd/pkg/lockedfile v0.7.0 h1:tmzW2GeMGuJMiCcVloXVd1vKZ92anm9WGkRgO
7777
github.com/fluxcd/pkg/lockedfile v0.7.0/go.mod h1:AzCV/h1N3hi/KtUDUCUgS8hl1+a1y+I6pmRo25dxdK0=
7878
github.com/fluxcd/pkg/oci v0.60.0 h1:uyAoYoj0i9rxFYQchThwfe4i/X0eb5l9wJuDbSAbqGs=
7979
github.com/fluxcd/pkg/oci v0.60.0/go.mod h1:5NT4IaYZocOsXLV3IGgj4FRQtSae46DL8Lq3EcDUqME=
80-
github.com/fluxcd/pkg/runtime v0.100.0 h1:7k2T/zlOLZ+knVr5fGB6cqq3Dr9D1k2jEe6AJo91JlI=
81-
github.com/fluxcd/pkg/runtime v0.100.0/go.mod h1:SctSsHvFwUfiOVP1zirP6mo7I8wQtXeWVl2lNQWal88=
80+
github.com/fluxcd/pkg/runtime v0.100.1 h1:UiPmgY8Yv7UF06MT5T8AG9uDGNszm75/DQtK6JEhnrM=
81+
github.com/fluxcd/pkg/runtime v0.100.1/go.mod h1:SctSsHvFwUfiOVP1zirP6mo7I8wQtXeWVl2lNQWal88=
8282
github.com/fluxcd/pkg/sourceignore v0.17.0 h1:Z72nruRMhC15zIEpWoDrAcJcJ1El6QDnP/aRDfE4WOA=
8383
github.com/fluxcd/pkg/sourceignore v0.17.0/go.mod h1:3e/VmYLId0pI/H5sK7W9Ibif+j0Ahns9RxNjDMtTTfY=
8484
github.com/fluxcd/pkg/tar v0.17.0 h1:uNxbFXy8ly8C7fJ8D7w3rjTNJFrb4Hp1aY/30XkfvxY=

internal/controller/artifactgenerator_controller.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ type ArtifactGeneratorReconciler struct {
6161
ArtifactFetchRetries int
6262
DependencyRequeueInterval time.Duration
6363
NoCrossNamespaceRefs bool
64+
DirectSourceFetch bool
6465
}
6566

6667
// +kubebuilder:rbac:groups=source.extensions.fluxcd.io,resources=artifactgenerators,verbs=get;list;watch;create;update;patchStatus;delete
@@ -286,6 +287,12 @@ func (r *ArtifactGeneratorReconciler) observeSources(ctx context.Context,
286287
// Map of source alias to observed state.
287288
observedSources := make(map[string]swapi.ObservedSource)
288289

290+
// Use APIReader to bypass the cache when DirectSourceFetch is enabled.
291+
var reader client.Reader = r.Client
292+
if r.DirectSourceFetch {
293+
reader = r.APIReader
294+
}
295+
289296
// Get the source objects referenced in the ArtifactGenerator spec.
290297
for _, src := range obj.Spec.Sources {
291298
namespacedName := client.ObjectKey{
@@ -301,7 +308,7 @@ func (r *ArtifactGeneratorReconciler) observeSources(ctx context.Context,
301308
switch src.Kind {
302309
case sourcev1.OCIRepositoryKind:
303310
var repository sourcev1.OCIRepository
304-
err := r.Get(ctx, namespacedName, &repository)
311+
err := reader.Get(ctx, namespacedName, &repository)
305312
if err != nil {
306313
if apierrors.IsNotFound(err) {
307314
return nil, err
@@ -311,7 +318,7 @@ func (r *ArtifactGeneratorReconciler) observeSources(ctx context.Context,
311318
source = &repository
312319
case sourcev1.GitRepositoryKind:
313320
var repository sourcev1.GitRepository
314-
err := r.Get(ctx, namespacedName, &repository)
321+
err := reader.Get(ctx, namespacedName, &repository)
315322
if err != nil {
316323
if apierrors.IsNotFound(err) {
317324
return nil, err
@@ -321,7 +328,7 @@ func (r *ArtifactGeneratorReconciler) observeSources(ctx context.Context,
321328
source = &repository
322329
case sourcev1.BucketKind:
323330
var bucket sourcev1.Bucket
324-
err := r.Get(ctx, namespacedName, &bucket)
331+
err := reader.Get(ctx, namespacedName, &bucket)
325332
if err != nil {
326333
if apierrors.IsNotFound(err) {
327334
return nil, err
@@ -331,7 +338,7 @@ func (r *ArtifactGeneratorReconciler) observeSources(ctx context.Context,
331338
source = &bucket
332339
case sourcev1.HelmChartKind:
333340
var chart sourcev1.HelmChart
334-
err := r.Get(ctx, namespacedName, &chart)
341+
err := reader.Get(ctx, namespacedName, &chart)
335342
if err != nil {
336343
if apierrors.IsNotFound(err) {
337344
return nil, err
@@ -340,15 +347,15 @@ func (r *ArtifactGeneratorReconciler) observeSources(ctx context.Context,
340347
}
341348
source = &chart
342349
case sourcev1.ExternalArtifactKind:
343-
var chart sourcev1.ExternalArtifact
344-
err := r.Get(ctx, namespacedName, &chart)
350+
var ea sourcev1.ExternalArtifact
351+
err := reader.Get(ctx, namespacedName, &ea)
345352
if err != nil {
346353
if apierrors.IsNotFound(err) {
347354
return nil, err
348355
}
349356
return nil, fmt.Errorf("unable to get source '%s': %w", namespacedName, err)
350357
}
351-
source = &chart
358+
source = &ea
352359
default:
353360
return nil, fmt.Errorf("source `%s` kind '%s' not supported",
354361
src.Name, src.Kind)
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
Copyright 2026 The Flux authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controller
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"testing"
23+
"time"
24+
25+
. "github.com/onsi/gomega"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
"sigs.k8s.io/controller-runtime/pkg/client"
28+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
29+
30+
gotkmeta "github.com/fluxcd/pkg/apis/meta"
31+
gotkconditions "github.com/fluxcd/pkg/runtime/conditions"
32+
gotktestsrv "github.com/fluxcd/pkg/testserver"
33+
sourcev1 "github.com/fluxcd/source-controller/api/v1"
34+
35+
swapi "github.com/fluxcd/source-watcher/api/v2/v1beta1"
36+
)
37+
38+
func TestArtifactGeneratorReconciler_DirectSourceFetch(t *testing.T) {
39+
g := NewWithT(t)
40+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
41+
defer cancel()
42+
43+
// Create a namespace
44+
ns, err := testEnv.CreateNamespace(ctx, "direct-fetch-test")
45+
g.Expect(err).ToNot(HaveOccurred())
46+
47+
t.Run("reconciles with DirectSourceFetch enabled (uses APIReader)", func(t *testing.T) {
48+
g := NewWithT(t)
49+
50+
// Create reconciler with DirectSourceFetch enabled
51+
reconciler := &ArtifactGeneratorReconciler{
52+
ControllerName: controllerName,
53+
Client: testClient,
54+
APIReader: testClient,
55+
Scheme: testEnv.Scheme(),
56+
EventRecorder: testEnv.GetEventRecorderFor(controllerName),
57+
Storage: testStorage,
58+
ArtifactFetchRetries: 1,
59+
DependencyRequeueInterval: 5 * time.Second,
60+
NoCrossNamespaceRefs: true,
61+
DirectSourceFetch: true, // Enable DirectSourceFetch
62+
}
63+
64+
// Create the ArtifactGenerator object
65+
objKey := client.ObjectKey{
66+
Name: "direct-fetch-enabled",
67+
Namespace: ns.Name,
68+
}
69+
obj := &swapi.ArtifactGenerator{
70+
TypeMeta: metav1.TypeMeta{
71+
Kind: swapi.ArtifactGeneratorKind,
72+
APIVersion: swapi.GroupVersion.String(),
73+
},
74+
ObjectMeta: metav1.ObjectMeta{
75+
Name: objKey.Name,
76+
Namespace: objKey.Namespace,
77+
},
78+
Spec: swapi.ArtifactGeneratorSpec{
79+
Sources: []swapi.SourceReference{
80+
{
81+
Alias: fmt.Sprintf("%s-git", objKey.Name),
82+
Kind: sourcev1.GitRepositoryKind,
83+
Name: objKey.Name,
84+
},
85+
},
86+
OutputArtifacts: []swapi.OutputArtifact{
87+
{
88+
Name: fmt.Sprintf("%s-git", objKey.Name),
89+
Copy: []swapi.CopyOperation{
90+
{
91+
From: fmt.Sprintf("@%s-git/**", objKey.Name),
92+
To: "@artifact/",
93+
},
94+
},
95+
},
96+
},
97+
},
98+
}
99+
err := testClient.Create(ctx, obj)
100+
g.Expect(err).ToNot(HaveOccurred())
101+
102+
// Create the GitRepository source
103+
gitFiles := []gotktestsrv.File{
104+
{Name: "app.yaml", Body: "apiVersion: apps/v1\nkind: Deployment\nmetadata:\n name: direct-fetch-test"},
105+
}
106+
err = applyGitRepository(objKey, "main@sha256:directfetch123", gitFiles)
107+
g.Expect(err).ToNot(HaveOccurred())
108+
109+
// Initialize the ArtifactGenerator with the finalizer
110+
r, err := reconciler.Reconcile(ctx, reconcile.Request{
111+
NamespacedName: objKey,
112+
})
113+
g.Expect(err).ToNot(HaveOccurred())
114+
g.Expect(r.RequeueAfter).To(BeEquivalentTo(time.Millisecond))
115+
116+
// Reconcile to process the sources and build artifacts
117+
r, err = reconciler.Reconcile(ctx, reconcile.Request{
118+
NamespacedName: objKey,
119+
})
120+
g.Expect(err).ToNot(HaveOccurred())
121+
g.Expect(r.RequeueAfter).To(Equal(obj.GetRequeueAfter()))
122+
123+
// Verify the ArtifactGenerator status
124+
err = testClient.Get(ctx, objKey, obj)
125+
g.Expect(err).ToNot(HaveOccurred())
126+
g.Expect(gotkconditions.IsReady(obj)).To(BeTrue())
127+
g.Expect(gotkconditions.GetReason(obj, gotkmeta.ReadyCondition)).To(Equal(gotkmeta.SucceededReason))
128+
129+
t.Log(objToYaml(obj))
130+
})
131+
}

0 commit comments

Comments
 (0)