Skip to content

Commit a5d65a5

Browse files
committed
Fix cio.NewAttach
cio.NewAttach didn't work with with firecracker-containerd. The operation is doing the following; - Calls State() to find FIFO files that are connected to the task - Passes the FIFO files to the client. - The client can open the FIFO files to interact with the task's stdio streams. However firecracker-containerd was proxying the request to in-VM agent as is. As a result the FIFO files on the response are pointing the files inside the VM, which the client couldn't access. Signed-off-by: Kazuyoshi Kato <katokazu@amazon.com>
1 parent b823f89 commit a5d65a5

File tree

2 files changed

+142
-46
lines changed

2 files changed

+142
-46
lines changed

runtime/service.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
// secure randomness
3333
"math/rand" // #nosec
3434

35+
"github.com/containerd/containerd/cio"
3536
"github.com/containerd/containerd/errdefs"
3637
"github.com/containerd/containerd/events/exchange"
3738
"github.com/containerd/containerd/log"
@@ -87,6 +88,10 @@ const (
8788

8889
// StopEventName is the topic published to when a VM stops
8990
StopEventName = "/firecracker-vm/stop"
91+
92+
// taskExecID is a special exec ID that is pointing its task itself.
93+
// While the constant is defined here, the convention is coming from containerd.
94+
taskExecID = ""
9095
)
9196

9297
var (
@@ -145,6 +150,10 @@ type service struct {
145150
machineConfig *firecracker.Config
146151
vsockIOPortCount uint32
147152
vsockPortMu sync.Mutex
153+
154+
// fifos have stdio FIFOs containerd passed to the shim. The key is [taskID][execID].
155+
fifos map[string]map[string]cio.Config
156+
fifosMu sync.Mutex
148157
}
149158

150159
func shimOpts(shimCtx context.Context) (*shim.Opts, error) {
@@ -209,6 +218,7 @@ func NewService(shimCtx context.Context, id string, remotePublisher shim.Publish
209218

210219
vmReady: make(chan struct{}),
211220
jailer: newNoopJailer(shimCtx, logger, shimDir),
221+
fifos: make(map[string]map[string]cio.Config),
212222
}
213223

214224
s.startEventForwarders(remotePublisher)
@@ -884,6 +894,39 @@ func (s *service) newIOProxy(logger *logrus.Entry, stdin, stdout, stderr string,
884894
return ioConnectorSet, nil
885895
}
886896

897+
func (s *service) addFIFOs(taskID, execID string, config cio.Config) error {
898+
s.fifosMu.Lock()
899+
defer s.fifosMu.Unlock()
900+
901+
_, exists := s.fifos[taskID]
902+
if !exists {
903+
s.fifos[taskID] = make(map[string]cio.Config)
904+
}
905+
906+
value, exists := s.fifos[taskID][execID]
907+
if exists {
908+
return fmt.Errorf("failed to add FIFO files for task %q (exec=%q). There was %+v already", taskID, execID, value)
909+
}
910+
s.fifos[taskID][execID] = config
911+
return nil
912+
}
913+
914+
func (s *service) deleteFIFOs(taskID, execID string) error {
915+
s.fifosMu.Lock()
916+
defer s.fifosMu.Unlock()
917+
918+
_, exists := s.fifos[taskID][execID]
919+
if !exists {
920+
return fmt.Errorf("task %q (exec=%q) doesn't have corresponding FIFOs to delete", taskID, execID)
921+
}
922+
delete(s.fifos[taskID], execID)
923+
924+
if execID == taskExecID {
925+
delete(s.fifos, taskID)
926+
}
927+
return nil
928+
}
929+
887930
func (s *service) Create(requestCtx context.Context, request *taskAPI.CreateTaskRequest) (*taskAPI.CreateTaskResponse, error) {
888931
logger := s.logger.WithField("task_id", request.ID)
889932
defer logPanicAndDie(logger)
@@ -975,6 +1018,15 @@ func (s *service) Create(requestCtx context.Context, request *taskAPI.CreateTask
9751018
return nil, err
9761019
}
9771020

1021+
err = s.addFIFOs(request.ID, taskExecID, cio.Config{
1022+
Stdin: request.Stdin,
1023+
Stdout: request.Stdout,
1024+
Stderr: request.Stderr,
1025+
})
1026+
if err != nil {
1027+
return nil, err
1028+
}
1029+
9781030
return resp, nil
9791031
}
9801032

@@ -1001,6 +1053,11 @@ func (s *service) Delete(requestCtx context.Context, req *taskAPI.DeleteRequest)
10011053
return nil, err
10021054
}
10031055

1056+
err = s.deleteFIFOs(req.ID, req.ExecID)
1057+
if err != nil {
1058+
return nil, err
1059+
}
1060+
10041061
// Only delete a process as like runc when there is ExecID
10051062
// https://github.com/containerd/containerd/blob/f3e148b1ccf268450c87427b5dbb6187db3d22f1/runtime/v2/runc/container.go#L320
10061063
if req.ExecID != "" {
@@ -1062,6 +1119,16 @@ func (s *service) Exec(requestCtx context.Context, req *taskAPI.ExecProcessReque
10621119
return nil, err
10631120
}
10641121

1122+
err = s.addFIFOs(req.ID, req.ExecID, cio.Config{
1123+
Terminal: req.Terminal,
1124+
Stdin: req.Stdin,
1125+
Stdout: req.Stdout,
1126+
Stderr: req.Stderr,
1127+
})
1128+
if err != nil {
1129+
return nil, err
1130+
}
1131+
10651132
return resp, nil
10661133
}
10671134

@@ -1088,6 +1155,14 @@ func (s *service) State(requestCtx context.Context, req *taskAPI.StateRequest) (
10881155
return nil, err
10891156
}
10901157

1158+
// These fields are pointing files inside the VM.
1159+
// Replace them with the corresponding files on the host, so clients can access.
1160+
s.fifosMu.Lock()
1161+
defer s.fifosMu.Unlock()
1162+
resp.Stdin = s.fifos[req.ID][req.ExecID].Stdin
1163+
resp.Stdout = s.fifos[req.ID][req.ExecID].Stdout
1164+
resp.Stderr = s.fifos[req.ID][req.ExecID].Stderr
1165+
10911166
return resp, nil
10921167
}
10931168

runtime/service_integ_test.go

Lines changed: 67 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -2025,64 +2025,85 @@ func TestAttach_Isolated(t *testing.T) {
20252025
image, err := alpineImage(ctx, client, defaultSnapshotterName)
20262026
require.NoError(t, err, "failed to get alpine image")
20272027

2028-
vmID := testNameToVMID(t.Name())
2028+
testcases := []struct {
2029+
name string
2030+
newIO func(context.Context, string) (cio.IO, error)
2031+
expected string
2032+
}{
2033+
{
2034+
name: "attach",
2035+
newIO: func(ctx context.Context, id string) (cio.IO, error) {
2036+
set, err := cio.NewFIFOSetInDir("", id, false)
2037+
if err != nil {
2038+
return nil, err
2039+
}
20292040

2030-
c, err := client.NewContainer(ctx,
2031-
"container-"+vmID,
2032-
containerd.WithSnapshotter(defaultSnapshotterName),
2033-
containerd.WithNewSnapshot("snapshot-"+vmID, image),
2034-
containerd.WithNewSpec(oci.WithProcessArgs(
2035-
"/bin/cat",
2036-
)),
2037-
)
2038-
require.NoError(t, err)
2041+
return cio.NewDirectIO(ctx, set)
2042+
},
2043+
expected: "hello\n",
2044+
},
2045+
{
2046+
name: "null io",
20392047

2040-
// cio.NewCreator creates FIFOs and *the readers* implicitly. Use cio.NewDirectIO() to
2041-
// only create FIFOs.
2042-
fifos, err := cio.NewFIFOSetInDir("", t.Name(), false)
2043-
require.NoError(t, err)
2048+
// firecracker-containerd doesn't create IO Proxy objects in this case.
2049+
newIO: func(ctx context.Context, id string) (cio.IO, error) {
2050+
return cio.NullIO(id)
2051+
},
20442052

2045-
io, err := cio.NewDirectIO(ctx, fifos)
2046-
require.NoError(t, err)
2053+
// So, attaching new IOs doesn't work.
2054+
// While it looks odd, containerd's v2 shim has the same behavior.
2055+
expected: "",
2056+
},
2057+
}
20472058

2048-
task1, err := c.NewTask(ctx, func(id string) (cio.IO, error) {
2049-
// Pass FIFO files, but don't create the readers.
2050-
return io, nil
2051-
})
2052-
require.NoError(t, err, "failed to create task for container %s", c.ID())
2053-
defer task1.Delete(ctx)
2059+
for _, tc := range testcases {
2060+
tc := tc
2061+
t.Run(tc.name, func(t *testing.T) {
2062+
name := testNameToVMID(t.Name())
20542063

2055-
err = task1.Start(ctx)
2056-
require.NoError(t, err, "failed to start task for container %s", c.ID())
2064+
c, err := client.NewContainer(ctx,
2065+
"container-"+name,
2066+
containerd.WithSnapshotter(defaultSnapshotterName),
2067+
containerd.WithNewSnapshot("snapshot-"+name, image),
2068+
containerd.WithNewSpec(oci.WithProcessArgs("/bin/cat")),
2069+
)
2070+
require.NoError(t, err)
20572071

2058-
// Directly reading/writing bytes to make sure "cat" is working.
2059-
input := "line1\n"
2060-
io.Stdin.Write([]byte(input))
2072+
io, err := tc.newIO(ctx, name)
2073+
require.NoError(t, err)
20612074

2062-
output := make([]byte, len(input))
2063-
io.Stdout.Read(output)
2064-
assert.Equal(t, input, string(output))
2075+
t1, err := c.NewTask(ctx, func(id string) (cio.IO, error) {
2076+
return io, nil
2077+
})
2078+
require.NoError(t, err)
20652079

2066-
c, err = client.LoadContainer(ctx, "container-"+vmID)
2067-
require.NoError(t, err)
2080+
ch, err := t1.Wait(ctx)
2081+
require.NoError(t, err)
20682082

2069-
var stderr, stdout bytes.Buffer
2070-
task2, err := c.Task(
2071-
ctx,
2072-
cio.NewAttach(cio.WithStreams(bytes.NewBufferString("line2\n"), &stdout, &stderr)),
2073-
)
2074-
require.NoError(t, err, "failed to load the task")
2083+
err = t1.Start(ctx)
2084+
require.NoError(t, err)
20752085

2076-
assert.Equal(t, task1.ID(), task2.ID(), "task1 and task2 are pointing the same task")
2086+
stdin := bytes.NewBufferString("hello\n")
2087+
var stdout bytes.Buffer
2088+
t2, err := c.Task(
2089+
ctx,
2090+
cio.NewAttach(cio.WithStreams(stdin, &stdout, nil)),
2091+
)
2092+
require.NoError(t, err)
2093+
assert.Equal(t, t1.ID(), t2.ID())
20772094

2078-
ch, err := task2.Wait(ctx)
2079-
require.NoError(t, err)
2095+
err = io.Close()
2096+
assert.NoError(t, err)
20802097

2081-
err = task2.CloseIO(ctx, containerd.WithStdinCloser)
2082-
require.NoError(t, err)
2098+
err = t2.CloseIO(ctx, containerd.WithStdinCloser)
2099+
assert.NoError(t, err)
2100+
2101+
<-ch
20832102

2084-
<-ch
2103+
_, err = t2.Delete(ctx)
2104+
require.NoError(t, err)
20852105

2086-
assert.Equal(t, "", stderr.String(), "stderr")
2087-
assert.Equal(t, "line2\n", stdout.String(), "stdout")
2106+
assert.Equal(t, tc.expected, stdout.String())
2107+
})
2108+
}
20882109
}

0 commit comments

Comments
 (0)