Skip to content

Commit 879c4ea

Browse files
committed
Feat: dqlite dump for core, dpm, integrations and license databases
1 parent f89f81f commit 879c4ea

File tree

2 files changed

+41
-107
lines changed

2 files changed

+41
-107
lines changed

pkg/data_collector/data_collector.go

Lines changed: 0 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -268,86 +268,6 @@ func (c *DataCollector) AllNamespacesExist() bool {
268268
return allExist
269269
}
270270

271-
// // CopyFileFromPod copies a file from a pod's container to the local filesystem.
272-
// func (c *DataCollector) CopyFileFromPod(namespace, pod, container, srcPath, destPath string, ctx context.Context) error {
273-
// cmd := []string{"tar", "cf", "-", "-C", filepath.Dir(srcPath), filepath.Base(srcPath)}
274-
// req := c.K8sCoreClientSet.CoreV1().RESTClient().Post().
275-
// Namespace(namespace).
276-
// Resource("pods").
277-
// Name(pod).
278-
// SubResource("exec").
279-
// VersionedParams(&corev1.PodExecOptions{
280-
// Container: container,
281-
// Command: cmd,
282-
// Stdin: false,
283-
// Stdout: true,
284-
// Stderr: true,
285-
// TTY: false,
286-
// }, scheme.ParameterCodec)
287-
288-
// exec, err := remotecommand.NewSPDYExecutor(c.K8sRestConfig, "POST", req.URL())
289-
// if err != nil {
290-
// return err
291-
// }
292-
// fmt.Printf("Started remote command\n")
293-
// reader, writer := io.Pipe()
294-
// // var wg sync.WaitGroup
295-
// var streamErr error
296-
// // wg.Add(1)
297-
// go func() {
298-
// defer writer.Close()
299-
// // defer wg.Done()
300-
// streamErr = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
301-
// Stdout: writer,
302-
// Stderr: os.Stderr,
303-
// })
304-
// }()
305-
306-
// tr := tar.NewReader(reader)
307-
// fmt.Printf("New Reader started\n")
308-
// var copyErr error
309-
// for {
310-
// header, err := tr.Next()
311-
// if err == io.EOF {
312-
// fmt.Printf("Reached end of tar stream\n")
313-
// break
314-
// }
315-
// if err != nil {
316-
// copyErr = err
317-
// break
318-
// }
319-
// if header.Typeflag == tar.TypeReg {
320-
// fmt.Printf("Copying file %s to destPath %s\n", header.Name, destPath)
321-
// outFile, err := os.Create(destPath)
322-
// if err != nil {
323-
// copyErr = err
324-
// break
325-
// }
326-
// fmt.Printf("Copying file %s to outFile %s\n", header.Name, outFile.Name())
327-
// defer outFile.Close()
328-
// _, err = io.Copy(outFile, tr)
329-
330-
// if err != nil {
331-
// copyErr = err
332-
// break
333-
// }
334-
// }
335-
// }
336-
// // Wait for the goroutine to finish
337-
// fmt.Printf("Waiting for stream to finish\n")
338-
// // wg.Wait()
339-
// fmt.Printf("Stream finished\n")
340-
// if copyErr != nil {
341-
// fmt.Printf("Error copying file: %v\n", copyErr)
342-
// return copyErr
343-
// }
344-
// if streamErr != nil {
345-
// fmt.Printf("Error executing command in pod: %v\n", streamErr)
346-
// return streamErr
347-
// }
348-
// return nil
349-
// }
350-
351271
// CopyFileFromPod copies a file from a pod's container to the local filesystem.
352272
func (c *DataCollector) CopyFileFromPod(namespace, pod, container, srcPath, destPath string, ctx context.Context) error {
353273
cmd := []string{"tar", "cf", "-", "-C", filepath.Dir(srcPath), filepath.Base(srcPath)}
@@ -377,7 +297,6 @@ func (c *DataCollector) CopyFileFromPod(namespace, pod, container, srcPath, dest
377297
Stderr: &stderr,
378298
})
379299
if err != nil {
380-
// return fmt.Errorf("error in streaming: %w. Stderr: %s", err, stderr.String())
381300
return err
382301
}
383302

@@ -392,23 +311,17 @@ func (c *DataCollector) CopyFileFromPod(namespace, pod, container, srcPath, dest
392311
// Untar the stream and write the content to the local file
393312
tarReader := tar.NewReader(&stdout)
394313
for {
395-
// fmt.Printf("Reading tar stream\n")
396-
// fmt.Println("Tar output length:", stdout.Len())
397314
header, err := tarReader.Next()
398315

399316
if err == io.EOF {
400-
// fmt.Printf("Reached end of tar stream\n")
401317
break // End of tar archive
402318
}
403319
if err != nil {
404-
// return fmt.Errorf("error reading tar stream: %w", err)
405320
return err
406321
}
407322

408323
// Ensure the tar file matches the expected file path
409-
// fmt.Printf("Header Name: %s\n", header.Name)
410324
if header.Name == filepath.Base(srcPath) {
411-
// fmt.Printf("Copying file %s to destPath %s\n", header.Name, destPath)
412325
_, err = io.Copy(localFile, tarReader)
413326
if err != nil {
414327
return fmt.Errorf("failed to write to local file: %w", err)

pkg/jobs/nim_job_list.go

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -186,38 +186,59 @@ func NIMJobList() []Job {
186186
},
187187
},
188188
{
189-
Name: "exec-dqlite-dump-core",
189+
Name: "exec-dqlite-dump",
190190
Timeout: time.Second * 30,
191191
Execute: func(dc *data_collector.DataCollector, ctx context.Context, ch chan JobResult) {
192192
jobResult := JobResult{Files: make(map[string][]byte), Error: nil}
193-
containerName := "core"
194-
dbName := "core"
195-
outputFile := "/tmp/core.sql"
196-
dbAddr := "0.0.0.0:7891"
193+
194+
dbConfigs := []struct {
195+
dbName string
196+
containerName string
197+
outputFile string
198+
dbAddr string
199+
}{
200+
{"core", "core", "/tmp/core.sql", "0.0.0.0:7891"},
201+
{"dpm", "dpm", "/tmp/dpm.sql", "0.0.0.0:7890"},
202+
{"integrations", "integrations", "/tmp/integrations.sql", "0.0.0.0:7892"},
203+
{"license", "integrations", "/tmp/license.sql", "0.0.0.0:7893"},
204+
// Add more containers as needed
205+
}
197206

198207
// /etc/nms/scripts/dqlite-backup -n core -c /etc/nms/nms.conf -a 0.0.0.0:7891 -o /tmp/core.sql -k
199-
command := []string{dqliteBackupPath, "-n", dbName, "-c", nmsConfigPath, "-a", dbAddr, "-o", outputFile, "-k"}
208+
200209
for _, namespace := range dc.Namespaces {
201210
pods, err := dc.K8sCoreClientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
202211
if err != nil {
203212
dc.Logger.Printf("\tCould not retrieve pod list for namespace %s: %v\n", namespace, err)
204213
} else {
205-
for _, pod := range pods.Items {
206-
if strings.Contains(pod.Name, containerName) {
207-
res, err := dc.PodExecutor(namespace, pod.Name, containerName, command, ctx)
208-
if err != nil {
209-
jobResult.Error = err
210-
dc.Logger.Printf("\tCommand execution %s failed for pod %s in namespace %s: %v\n", command, pod.Name, namespace, err)
211-
} else {
212-
jobResult.Files[filepath.Join(dc.BaseDir, "exec", namespace, pod.Name+"__dqlite-dump-"+containerName+".txt")] = res
213-
214-
// Move the dumped file to the base directory
215-
destPathFilename := filepath.Join(dc.BaseDir, "exec", namespace, pod.Name+"__dqlite-dump-"+filepath.Base(outputFile))
216-
if err := dc.CopyFileFromPod(namespace, pod.Name, containerName, outputFile, destPathFilename, ctx); err != nil {
214+
for _, config := range dbConfigs {
215+
command := []string{dqliteBackupPath, "-n", config.dbName, "-c", nmsConfigPath, "-a", config.dbAddr, "-o", config.outputFile, "-k"}
216+
for _, pod := range pods.Items {
217+
if strings.Contains(pod.Name, config.containerName) {
218+
res, err := dc.PodExecutor(namespace, pod.Name, config.containerName, command, ctx)
219+
if err != nil {
217220
jobResult.Error = err
218-
dc.Logger.Printf("\tFailed to copy dumped file for pod %s in namespace %s: %v\n", pod.Name, namespace, err)
221+
dc.Logger.Printf("\tCommand execution %s failed for pod %s in namespace %s: %v\n", command, pod.Name, namespace, err)
219222
} else {
220-
dc.Logger.Printf("\tSuccessfully copied dumped file for pod %s in namespace %s\n", pod.Name, namespace)
223+
jobResult.Files[filepath.Join(dc.BaseDir, "exec", namespace, pod.Name+"__dqlite-dump-"+config.dbName+".txt")] = res
224+
225+
// Copy the dumped file from the pod to the host
226+
destPathFilename := filepath.Join(dc.BaseDir, "exec", namespace, pod.Name+"__dqlite-dump-"+filepath.Base(config.outputFile))
227+
if err := dc.CopyFileFromPod(namespace, pod.Name, config.containerName, config.outputFile, destPathFilename, ctx); err != nil {
228+
jobResult.Error = err
229+
dc.Logger.Printf("\tFailed to copy dumped file %s from pod %s in namespace %s to %s: %v\n", config.outputFile, pod.Name, namespace, destPathFilename, err)
230+
} else {
231+
dc.Logger.Printf("\tSuccessfully copied dumped file %s from pod %s in namespace %s to %s\n", config.outputFile, pod.Name, namespace, destPathFilename)
232+
}
233+
234+
// Remove/delete the dumped file from the pod
235+
_, err := dc.PodExecutor(namespace, pod.Name, config.containerName, []string{"rm", "-f", config.outputFile}, ctx)
236+
if err != nil {
237+
jobResult.Error = err
238+
dc.Logger.Printf("\tFailed to remove dumped file %s from pod %s in namespace %s: %v\n", config.outputFile, pod.Name, namespace, err)
239+
} else {
240+
dc.Logger.Printf("\tSuccessfully removed dumped file %s from pod %s in namespace %s\n", config.outputFile, pod.Name, namespace)
241+
}
221242
}
222243
}
223244
}

0 commit comments

Comments
 (0)