From 886161fd0dc38870e55a2b0532f1dad67ffc61c8 Mon Sep 17 00:00:00 2001 From: mowangdk Date: Sun, 5 Jan 2025 16:38:32 +0800 Subject: [PATCH] Refactor command parameters initialize --- cmd/csi-attacher/main.go | 104 +++++++------------ pkg/commandflags/attacherflags.go | 165 ++++++++++++++++++++++++++++++ pkg/commandflags/commonflags.go | 78 ++++++++++++++ 3 files changed, 281 insertions(+), 66 deletions(-) create mode 100644 pkg/commandflags/attacherflags.go create mode 100644 pkg/commandflags/commonflags.go diff --git a/cmd/csi-attacher/main.go b/cmd/csi-attacher/main.go index 4b4c4d86b..e7e638958 100644 --- a/cmd/csi-attacher/main.go +++ b/cmd/csi-attacher/main.go @@ -43,6 +43,7 @@ import ( "github.com/kubernetes-csi/csi-lib-utils/metrics" "github.com/kubernetes-csi/csi-lib-utils/rpc" "github.com/kubernetes-csi/external-attacher/pkg/attacher" + cf "github.com/kubernetes-csi/external-attacher/pkg/commandflags" "github.com/kubernetes-csi/external-attacher/pkg/controller" "google.golang.org/grpc" ) @@ -53,39 +54,6 @@ const ( csiTimeout = time.Second ) -// Command line flags -var ( - kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.") - resync = flag.Duration("resync", 10*time.Minute, "Resync interval of the controller.") - csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.") - showVersion = flag.Bool("version", false, "Show version.") - timeout = flag.Duration("timeout", 15*time.Second, "Timeout for waiting for attaching or detaching the volume.") - workerThreads = flag.Uint("worker-threads", 10, "Number of attacher worker threads") - maxEntries = flag.Int("max-entries", 0, "Max entries per each page in volume lister call, 0 means no limit.") - - retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed create volume or deletion. It doubles with each failure, up to retry-interval-max.") - retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed create volume or deletion.") - - enableLeaderElection = flag.Bool("leader-election", false, "Enable leader election.") - leaderElectionNamespace = flag.String("leader-election-namespace", "", "Namespace where the leader election resource lives. Defaults to the pod namespace if not set.") - leaderElectionLeaseDuration = flag.Duration("leader-election-lease-duration", 15*time.Second, "Duration, in seconds, that non-leader candidates will wait to force acquire leadership. Defaults to 15 seconds.") - leaderElectionRenewDeadline = flag.Duration("leader-election-renew-deadline", 10*time.Second, "Duration, in seconds, that the acting leader will retry refreshing leadership before giving up. Defaults to 10 seconds.") - leaderElectionRetryPeriod = flag.Duration("leader-election-retry-period", 5*time.Second, "Duration, in seconds, the LeaderElector clients should wait between tries of actions. Defaults to 5 seconds.") - - defaultFSType = flag.String("default-fstype", "", "The default filesystem type of the volume to publish. Defaults to empty string") - - reconcileSync = flag.Duration("reconcile-sync", 1*time.Minute, "Resync interval of the VolumeAttachment reconciler.") - - metricsAddress = flag.String("metrics-address", "", "(deprecated) The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.") - httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics and leader election health check, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.") - metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.") - - kubeAPIQPS = flag.Float64("kube-api-qps", 5, "QPS to use while communicating with the kubernetes apiserver. Defaults to 5.0.") - kubeAPIBurst = flag.Int("kube-api-burst", 10, "Burst to use while communicating with the kubernetes apiserver. Defaults to 10.") - - maxGRPCLogLength = flag.Int("max-grpc-log-length", -1, "The maximum amount of characters logged for every grpc responses. Defaults to no limit") -) - var ( version = "unknown" ) @@ -93,42 +61,46 @@ var ( func main() { fg := featuregate.NewFeatureGate() logsapi.AddFeatureGates(fg) + cf.InitCommonFlags() + acf := cf.NewAttacherCommandFlags() c := logsapi.NewLoggingConfiguration() logsapi.AddGoFlags(c, flag.CommandLine) logs.InitLogs() flag.Parse() + acf.MergeFlags() logger := klog.Background() if err := logsapi.ValidateAndApply(c, fg); err != nil { logger.Error(err, "LoggingConfiguration is invalid") klog.FlushAndExit(klog.ExitFlushTimeout, 1) } - if *showVersion { + if cf.ShowVersion { fmt.Println(os.Args[0], version) return } logger.Info("Version", "version", version) + logger.Info("Timeout", "timeout", cf.Timeout) - if *metricsAddress != "" && *httpEndpoint != "" { + if cf.MetricsAddress != "" && cf.HttpEndpoint != "" { logger.Error(nil, "Only one of `--metrics-address` and `--http-endpoint` can be set") klog.FlushAndExit(klog.ExitFlushTimeout, 1) } - addr := *metricsAddress + addr := cf.MetricsAddress if addr == "" { - addr = *httpEndpoint + addr = cf.HttpEndpoint } // Create the client config. Use kubeconfig if given, otherwise assume in-cluster. - config, err := buildConfig(*kubeconfig) + config, err := buildConfig(cf.Kubeconfig) if err != nil { logger.Error(err, "Failed to build a Kubernetes config") klog.FlushAndExit(klog.ExitFlushTimeout, 1) } - config.QPS = (float32)(*kubeAPIQPS) - config.Burst = *kubeAPIBurst + config.QPS = (float32)(cf.KubeAPIQPS) + config.Burst = cf.KubeAPIBurst config.ContentType = runtime.ContentTypeProtobuf - if *workerThreads == 0 { + if cf.WorkerThreads == 0 { logger.Error(nil, "Option -worker-threads must be greater than zero") klog.FlushAndExit(klog.ExitFlushTimeout, 1) } @@ -139,20 +111,20 @@ func main() { klog.FlushAndExit(klog.ExitFlushTimeout, 1) } - factory := informers.NewSharedInformerFactory(clientset, *resync) + factory := informers.NewSharedInformerFactory(clientset, cf.Resync) var handler controller.Handler metricsManager := metrics.NewCSIMetricsManager("" /* driverName */) // Connect to CSI. - connection.SetMaxGRPCLogLength(*maxGRPCLogLength) + connection.SetMaxGRPCLogLength(cf.MaxGRPCLogLength) ctx := context.Background() - csiConn, err := connection.Connect(ctx, *csiAddress, metricsManager, connection.OnConnectionLoss(connection.ExitOnConnectionLoss())) + csiConn, err := connection.Connect(ctx, cf.CsiAddress, metricsManager, connection.OnConnectionLoss(connection.ExitOnConnectionLoss())) if err != nil { - logger.Error(err, "Failed to connect to the CSI driver", "csiAddress", *csiAddress) + logger.Error(err, "Failed to connect to the CSI driver", "csiAddress", cf.CsiAddress) klog.FlushAndExit(klog.ExitFlushTimeout, 1) } - err = rpc.ProbeForever(ctx, csiConn, *timeout) + err = rpc.ProbeForever(ctx, csiConn, cf.Timeout) if err != nil { logger.Error(err, "Failed to probe the CSI driver") klog.FlushAndExit(klog.ExitFlushTimeout, 1) @@ -173,15 +145,15 @@ func main() { translator := csitrans.New() if translator.IsMigratedCSIDriverByName(csiAttacher) { metricsManager = metrics.NewCSIMetricsManagerWithOptions(csiAttacher, metrics.WithMigration()) - migratedCsiClient, err := connection.Connect(ctx, *csiAddress, metricsManager, connection.OnConnectionLoss(connection.ExitOnConnectionLoss())) + migratedCsiClient, err := connection.Connect(ctx, cf.CsiAddress, metricsManager, connection.OnConnectionLoss(connection.ExitOnConnectionLoss())) if err != nil { - logger.Error(err, "Failed to connect to the CSI driver", "csiAddress", *csiAddress, "migrated", true) + logger.Error(err, "Failed to connect to the CSI driver", "csiAddress", cf.CsiAddress, "migrated", true) klog.FlushAndExit(klog.ExitFlushTimeout, 1) } csiConn.Close() csiConn = migratedCsiClient - err = rpc.ProbeForever(ctx, csiConn, *timeout) + err = rpc.ProbeForever(ctx, csiConn, cf.Timeout) if err != nil { logger.Error(err, "Failed to probe the CSI driver", "migrated", true) klog.FlushAndExit(klog.ExitFlushTimeout, 1) @@ -191,13 +163,13 @@ func main() { // Prepare http endpoint for metrics + leader election healthz mux := http.NewServeMux() if addr != "" { - metricsManager.RegisterToServer(mux, *metricsPath) + metricsManager.RegisterToServer(mux, cf.MetricsPath) metricsManager.SetDriverName(csiAttacher) go func() { - logger.Info("ServeMux listening", "address", addr, "metricsPath", *metricsPath) + logger.Info("ServeMux listening", "address", addr, "metricsPath", cf.MetricsPath) err := http.ListenAndServe(addr, mux) if err != nil { - logger.Error(err, "Failed to start HTTP server at specified address and metrics path", "address", addr, "metricsPath", *metricsPath) + logger.Error(err, "Failed to start HTTP server at specified address and metrics path", "address", addr, "metricsPath", cf.MetricsPath) klog.FlushAndExit(klog.ExitFlushTimeout, 1) } }() @@ -233,7 +205,7 @@ func main() { vaLister := factory.Storage().V1().VolumeAttachments().Lister() csiNodeLister := factory.Storage().V1().CSINodes().Lister() volAttacher := attacher.NewAttacher(csiConn) - CSIVolumeLister := attacher.NewVolumeLister(csiConn, *maxEntries) + CSIVolumeLister := attacher.NewVolumeLister(csiConn, cf.MaxEntries) handler = controller.NewCSIHandler( clientset, csiAttacher, @@ -242,11 +214,11 @@ func main() { pvLister, csiNodeLister, vaLister, - timeout, + &cf.Timeout, supportsReadOnly, supportsSingleNodeMultiWriter, csitrans.New(), - *defaultFSType, + cf.DefaultFSType, ) logger.V(2).Info("CSI driver supports ControllerPublishUnpublish, using real CSI handler") } else { @@ -266,19 +238,19 @@ func main() { handler, factory.Storage().V1().VolumeAttachments(), factory.Core().V1().PersistentVolumes(), - workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax), - workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax), + workqueue.NewItemExponentialFailureRateLimiter(cf.RetryIntervalStart, cf.RetryIntervalMax), + workqueue.NewItemExponentialFailureRateLimiter(cf.RetryIntervalStart, cf.RetryIntervalMax), supportsListVolumesPublishedNodes, - *reconcileSync, + cf.ReconcileSync, ) run := func(ctx context.Context) { stopCh := ctx.Done() factory.Start(stopCh) - ctrl.Run(ctx, int(*workerThreads)) + ctrl.Run(ctx, int(cf.WorkerThreads)) } - if !*enableLeaderElection { + if !cf.EnableLeaderElection { run(klog.NewContext(context.Background(), logger)) } else { // Create a new clientset for leader election. When the attacher @@ -293,17 +265,17 @@ func main() { // Name of config map with leader election lock lockName := "external-attacher-leader-" + csiAttacher le := leaderelection.NewLeaderElection(leClientset, lockName, run) - if *httpEndpoint != "" { + if cf.HttpEndpoint != "" { le.PrepareHealthCheck(mux, leaderelection.DefaultHealthCheckTimeout) } - if *leaderElectionNamespace != "" { - le.WithNamespace(*leaderElectionNamespace) + if cf.LeaderElectionNamespace != "" { + le.WithNamespace(cf.LeaderElectionNamespace) } - le.WithLeaseDuration(*leaderElectionLeaseDuration) - le.WithRenewDeadline(*leaderElectionRenewDeadline) - le.WithRetryPeriod(*leaderElectionRetryPeriod) + le.WithLeaseDuration(cf.LeaderElectionLeaseDuration) + le.WithRenewDeadline(cf.LeaderElectionRenewDeadline) + le.WithRetryPeriod(cf.LeaderElectionRetryPeriod) if err := le.Run(); err != nil { logger.Error(err, "Failed to initialize leader election") diff --git a/pkg/commandflags/attacherflags.go b/pkg/commandflags/attacherflags.go new file mode 100644 index 000000000..61b4b9e53 --- /dev/null +++ b/pkg/commandflags/attacherflags.go @@ -0,0 +1,165 @@ +package commandflags + +import ( + "flag" + "time" +) + +// attacher command line flags +var ( + Resync time.Duration + Timeout time.Duration + WorkerThreads uint64 + MaxEntries int + RetryIntervalStart time.Duration + RetryIntervalMax time.Duration + + DefaultFSType string + ReconcileSync time.Duration + + MetricsPath string + + KubeAPIQPS float64 + KubeAPIBurst int + + MaxGRPCLogLength int +) + +func init() { + flag.DurationVar(&Resync, "resync", 10*time.Minute, "Resync interval of the controller.") + flag.DurationVar(&Timeout, "timeout", 15*time.Second, "Timeout for waiting for attaching or detaching the volume.") + flag.Uint64Var(&WorkerThreads, "worker-threads", 10, "Number of attacher worker threads") + flag.IntVar(&MaxEntries, "max-entries", 0, "Max entries per each page in volume lister call, 0 means no limit.") + flag.DurationVar(&RetryIntervalStart, "retry-interval-start", time.Second, "Initial retry interval of failed create volume or deletion. It doubles with each failure, up to retry-interval-max.") + flag.DurationVar(&RetryIntervalMax, "retry-interval-max", 5*time.Minute, "Maximum retry interval of failed create volume or deletion.") + flag.StringVar(&DefaultFSType, "default-fstype", "", "The default filesystem type of the volume to publish. Defaults to empty string") + flag.DurationVar(&ReconcileSync, "reconcile-sync", 1*time.Minute, "Resync interval of the VolumeAttachment reconciler.") + flag.Float64Var(&KubeAPIQPS, "kube-api-qps", 5, "QPS to use while communicating with the kubernetes apiserver. Defaults to 5.0.") + flag.IntVar(&KubeAPIBurst, "kube-api-burst", 10, "Burst to use while communicating with the kubernetesapiserver. Defaults to 10.") + flag.IntVar(&MaxGRPCLogLength, "max-grpc-log-length", -1, "The maximum amount of characters logged for every grpc responses. Defaults to no limit") + flag.StringVar(&MetricsPath, "metrics-path", "/metrics", "The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). Defaults to `/metrics`.") +} + +type AttacherCommandFlags struct { + resync time.Duration + timeout time.Duration + workerThreads uint64 + maxEntries int + retryIntervalStart time.Duration + retryIntervalMax time.Duration + maxGRPCLogLength int + kubeAPIQPS float64 + kubeAPIBurst int + metricsPath string + defaultFSType string + reconcileSync time.Duration +} + +type SidecarControllerFlags interface { + MergeFlags() +} + +func NewAttacherCommandFlags() *AttacherCommandFlags { + acf := AttacherCommandFlags{} + flag.DurationVar(&acf.resync, "attacher-resync", -1*time.Minute, "Resync interval of the controller.") + flag.DurationVar(&acf.timeout, "attacher-timeout", -1*time.Second, "Timeout for waiting for attaching or detaching the volume.") + flag.Uint64Var(&acf.workerThreads, "attacher-worker-threads", 0, "Number of attacher worker threads") + flag.IntVar(&acf.maxEntries, "attacher-max-entries", -1, "Max entries per each page in volume lister call, 0 means no limit.") + flag.DurationVar(&acf.retryIntervalStart, "attacher-retry-interval-start", -1*time.Second, "Initial retry interval of failed create volume or deletion. It doubles with each failure, up to retry-interval-max.") + flag.DurationVar(&acf.retryIntervalMax, "attacher-retry-interval-max", -1*time.Minute, "Maximum retry interval of failed create volume or deletion.") + flag.StringVar(&acf.defaultFSType, "attacher-default-fstype", "", "The default filesystem type of the volume to publish. Defaults to empty string") + flag.DurationVar(&acf.reconcileSync, "attacher-reconcile-sync", -1*time.Minute, "Resync interval of the VolumeAttachment reconciler.") + flag.Float64Var(&acf.kubeAPIQPS, "attacher-kube-api-qps", -1, "QPS to use while communicating with the kubernetes apiserver. Defaults to 5.0.") + flag.IntVar(&acf.kubeAPIBurst, "attacher-kube-api-burst", -1, "Burst to use while communicating with the kubernetes apiserver. Defaults to 10.") + flag.IntVar(&acf.maxGRPCLogLength, "attacher-max-gprc-log-length", -1, "The maximum amount of characters logged for every grpc responses. Defaults to no limit") + flag.StringVar(&acf.metricsPath, "attacher-metrics-path", "", "The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). Defaults to `/metrics`.") + return &acf +} + +func (acf *AttacherCommandFlags) MergeFlags() { + acf.mergeResync() + acf.mergeTimeout() + acf.mergeWorkerThreads() + acf.mergeMaxEntries() + acf.mergeRetryIntervalStart() + acf.mergeRetryIntervalMax() + acf.mergeDefaultFSType() + acf.mergeReconcileSync() + acf.mergeKubeAPIQPS() + acf.mergeKubeAPIBurst() + acf.mergeMaxGPRCLogLength() + acf.mergeMetricPath() + +} + +func (acf *AttacherCommandFlags) mergeResync() { + if acf.resync != -1*time.Minute { + Resync = acf.resync + } +} + +func (acf *AttacherCommandFlags) mergeTimeout() { + if acf.timeout != -1*time.Second { + Timeout = acf.timeout + } +} + +func (acf *AttacherCommandFlags) mergeWorkerThreads() { + if acf.workerThreads != 0 { + WorkerThreads = acf.workerThreads + } +} + +func (acf *AttacherCommandFlags) mergeMaxEntries() { + if acf.maxEntries != -1 { + MaxEntries = acf.maxEntries + } +} + +func (acf *AttacherCommandFlags) mergeRetryIntervalStart() { + if acf.retryIntervalStart != -1*time.Second { + RetryIntervalStart = acf.retryIntervalStart + } +} + +func (acf *AttacherCommandFlags) mergeRetryIntervalMax() { + if acf.retryIntervalMax != -1*time.Minute { + RetryIntervalMax = acf.retryIntervalMax + } +} + +func (acf *AttacherCommandFlags) mergeDefaultFSType() { + if acf.defaultFSType != "" { + DefaultFSType = acf.defaultFSType + } +} + +func (acf *AttacherCommandFlags) mergeReconcileSync() { + if acf.reconcileSync != -1*time.Minute { + ReconcileSync = acf.reconcileSync + } +} + +func (acf *AttacherCommandFlags) mergeKubeAPIQPS() { + if acf.kubeAPIQPS != -1 { + KubeAPIQPS = acf.kubeAPIQPS + } +} + +func (acf *AttacherCommandFlags) mergeKubeAPIBurst() { + if acf.kubeAPIBurst != -1 { + KubeAPIBurst = acf.kubeAPIBurst + } +} + +func (acf *AttacherCommandFlags) mergeMaxGPRCLogLength() { + if acf.maxGRPCLogLength != -1 { + MaxGRPCLogLength = acf.maxGRPCLogLength + } +} + +func (acf *AttacherCommandFlags) mergeMetricPath() { + if acf.metricsPath != "" { + MetricsPath = acf.metricsPath + } +} diff --git a/pkg/commandflags/commonflags.go b/pkg/commandflags/commonflags.go new file mode 100644 index 000000000..264476801 --- /dev/null +++ b/pkg/commandflags/commonflags.go @@ -0,0 +1,78 @@ +package commandflags + +import ( + "flag" + "time" +) + +// common command flags +var ( + Kubeconfig string + CsiAddress string + ShowVersion bool + MetricsAddress string + HttpEndpoint string + EnableLeaderElection bool + LeaderElectionNamespace string + LeaderElectionLeaseDuration time.Duration + LeaderElectionRenewDeadline time.Duration + LeaderElectionRetryPeriod time.Duration +) + +func InitCommonFlags() { + initShowVersion() + + initKubeConfig() + + initMetricsAddress() + initHttpEndpoint() + + initCsiAddress() + + initEnableLeaderElection() + initLeaderElectionNamespace() + initLeaderElectionLeaseDuration() + initLeaderElectionRenewDeadline() + initLeaderElectionRetryPeriod() +} + +func initShowVersion() { + flag.BoolVar(&ShowVersion, "version", false, "Show version.") +} + +func initKubeConfig() { + flag.StringVar(&Kubeconfig, "kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.") +} + +func initMetricsAddress() { + // I think we should remove deprecated flag in AIO project, we must change the non-common flags name such as worker-thread to attacher-worker-thread. so it definately be a breaking change + flag.StringVar(&MetricsAddress, "metrics-address", "", "(deprecated) The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.") +} + +func initHttpEndpoint() { + flag.StringVar(&HttpEndpoint, "http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics and leader election health check, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `http-endpoint` and `metrics-address` can be set.") +} + +func initCsiAddress() { + flag.StringVar(&CsiAddress, "csi-address", "/run/csi/socket", "Address of the CSI driver socket.") +} + +func initEnableLeaderElection() { + flag.BoolVar(&EnableLeaderElection, "leader-election", false, "Enable leader election.") +} + +func initLeaderElectionNamespace() { + flag.StringVar(&LeaderElectionNamespace, "leader-election-namespace", "", "Namespace where the leader election resource lives. Defaults to the pod namespace if not set.") +} + +func initLeaderElectionLeaseDuration() { + flag.DurationVar(&LeaderElectionLeaseDuration, "leader-election-lease-duration", 15*time.Second, "Duration, in seconds, that non-leader candidates will wait to force acquire leadership. Defaults to 15 seconds.") +} + +func initLeaderElectionRenewDeadline() { + flag.DurationVar(&LeaderElectionRenewDeadline, "leader-election-renew-deadline", 10*time.Second, "Duration, in seconds, that the acting leader will retry refreshing leadership before giving up. Defaults to 10 seconds.") +} + +func initLeaderElectionRetryPeriod() { + flag.DurationVar(&LeaderElectionRetryPeriod, "leader-election-retry-period", 2*time.Second, "Duration, in seconds, the LeaderElector clients should wait between tries of actions. Defaults to 2 seconds.") +}