Skip to content

Commit 4e54f4f

Browse files
committed
csi: plugin name normalization
Normalize plugin names to facilitate correct reuse of existing plugin instances. Signed-off-by: Paul Pignet <[email protected]>
1 parent 17b8d22 commit 4e54f4f

File tree

2 files changed

+103
-24
lines changed

2 files changed

+103
-24
lines changed

manager/csi/manager.go

Lines changed: 49 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -463,28 +463,53 @@ func (vm *Manager) deleteVolume(ctx context.Context, v *api.Volume) error {
463463
// leak. It's acceptable for now because we expect neither exceptionally long
464464
// lived managers nor exceptionally high plugin churn.
465465
func (vm *Manager) getPlugin(name string) (Plugin, error) {
466-
// if the plugin already exists, we can just return it.
467-
if p, ok := vm.plugins[name]; ok {
468-
return p, nil
469-
}
470-
471-
// otherwise, we need to load the plugin.
472-
pc, err := vm.pg.Get(name, DockerCSIPluginCap)
473-
if err != nil {
474-
return nil, err
475-
}
476-
477-
if pc == nil {
478-
return nil, errors.New("driver \"" + name + "\" not found")
479-
}
480-
481-
pa, ok := pc.(mobyplugin.AddrPlugin)
482-
if !ok {
483-
return nil, errors.New("plugin for driver \"" + name + "\" does not implement PluginAddr")
484-
}
485-
486-
p := vm.newPlugin(pa, vm.provider)
487-
vm.plugins[name] = p
488-
489-
return p, nil
466+
// normalize driver name by stripping any tag (e.g. ":latest")
467+
canon := name
468+
if i := strings.IndexRune(name, ':'); i >= 0 {
469+
canon = name[:i]
470+
}
471+
472+
// Fast path: exact key or canonical key already loaded
473+
if p, ok := vm.plugins[name]; ok {
474+
return p, nil
475+
}
476+
if p, ok := vm.plugins[canon]; ok {
477+
// also alias the original name to it for future lookups
478+
vm.plugins[name] = p
479+
return p, nil
480+
}
481+
482+
// Try plugin getter with full name first
483+
pc, err := vm.pg.Get(name, DockerCSIPluginCap)
484+
if err != nil {
485+
// retry using canonical name if different
486+
if canon != name {
487+
pc2, err2 := vm.pg.Get(canon, DockerCSIPluginCap)
488+
if err2 == nil && pc2 != nil {
489+
pc = pc2
490+
}
491+
}
492+
if pc == nil {
493+
return nil, err
494+
}
495+
}
496+
497+
if pc == nil {
498+
return nil, errors.New("driver \"" + name + "\" not found")
499+
}
500+
501+
pa, ok := pc.(mobyplugin.AddrPlugin)
502+
if !ok {
503+
return nil, errors.New("plugin for driver \"" + name + "\" does not implement PluginAddr")
504+
}
505+
506+
// create plugin instance once
507+
p := vm.newPlugin(pa, vm.provider)
508+
509+
// store under canonical, plugin-reported, and requested names
510+
vm.plugins[canon] = p
511+
vm.plugins[pa.Name()] = p
512+
vm.plugins[name] = p
513+
514+
return p, nil
490515
}

manager/csi/manager_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -755,4 +755,58 @@ var _ = Describe("Manager", func() {
755755
Expect(vm.pendingVolumes.Outstanding()).To(Equal(1))
756756
})
757757
})
758+
759+
Describe("plugin name canonicalization", func() {
760+
It("should reuse the same plugin instance for tagged and untagged names", func() {
761+
pluginGetter.Plugins["plug1"] = &testutils.FakePlugin{
762+
PluginName: "plug1",
763+
PluginAddr: &net.UnixAddr{
764+
Net: "unix",
765+
Name: "unix:///whatever.sock",
766+
},
767+
}
768+
769+
node := &api.Node{
770+
ID: "nodeA",
771+
Description: &api.NodeDescription{
772+
CSIInfo: []*api.NodeCSIInfo{{
773+
PluginName: "plug1", // node reports untagged name
774+
NodeID: "plug1NodeA",
775+
}},
776+
},
777+
}
778+
779+
volume := &api.Volume{
780+
ID: "volumeA",
781+
Spec: api.VolumeSpec{
782+
Annotations: api.Annotations{Name: "volumeA"},
783+
Driver: &api.Driver{
784+
Name: "plug1:latest", // volume uses tagged name
785+
},
786+
},
787+
VolumeInfo: &api.VolumeInfo{
788+
VolumeContext: map[string]string{},
789+
VolumeID: "plug1VolA",
790+
},
791+
PublishStatus: []*api.VolumePublishStatus{{
792+
NodeID: "nodeA",
793+
State: api.VolumePublishStatus_PENDING_PUBLISH,
794+
}},
795+
}
796+
797+
err := s.Update(func(tx store.Tx) error {
798+
Expect(store.CreateNode(tx, node)).To(Succeed())
799+
Expect(store.CreateVolume(tx, volume)).To(Succeed())
800+
return nil
801+
})
802+
Expect(err).ToNot(HaveOccurred())
803+
804+
vm.init(context.Background())
805+
vm.processVolume(ctx, volume.ID, 0)
806+
807+
Expect(pluginMaker.plugins).To(HaveKey("plug1"))
808+
// verify that publish succeeded and reused same fakePlugin instance
809+
Expect(pluginMaker.plugins["plug1"].volumesPublished[volume.ID]).To(ContainElement("nodeA"))
810+
})
811+
})
758812
})

0 commit comments

Comments
 (0)