Skip to content

Commit 90a74f2

Browse files
authored
Merge pull request #465 from kzys/attach
Fix cio.NewAttach
2 parents 6db5f91 + a5d65a5 commit 90a74f2

File tree

2 files changed

+170
-0
lines changed

2 files changed

+170
-0
lines changed

runtime/service.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
// secure randomness
3333
"math/rand" // #nosec
3434

35+
"github.com/containerd/containerd/cio"
3536
"github.com/containerd/containerd/errdefs"
3637
"github.com/containerd/containerd/events/exchange"
3738
"github.com/containerd/containerd/log"
@@ -88,6 +89,10 @@ const (
8889

8990
// StopEventName is the topic published to when a VM stops
9091
StopEventName = "/firecracker-vm/stop"
92+
93+
// taskExecID is a special exec ID that is pointing its task itself.
94+
// While the constant is defined here, the convention is coming from containerd.
95+
taskExecID = ""
9196
)
9297

9398
var (
@@ -146,6 +151,10 @@ type service struct {
146151
machineConfig *firecracker.Config
147152
vsockIOPortCount uint32
148153
vsockPortMu sync.Mutex
154+
155+
// fifos have stdio FIFOs containerd passed to the shim. The key is [taskID][execID].
156+
fifos map[string]map[string]cio.Config
157+
fifosMu sync.Mutex
149158
}
150159

151160
func shimOpts(shimCtx context.Context) (*shim.Opts, error) {
@@ -210,6 +219,7 @@ func NewService(shimCtx context.Context, id string, remotePublisher shim.Publish
210219

211220
vmReady: make(chan struct{}),
212221
jailer: newNoopJailer(shimCtx, logger, shimDir),
222+
fifos: make(map[string]map[string]cio.Config),
213223
}
214224

215225
s.startEventForwarders(remotePublisher)
@@ -891,6 +901,39 @@ func (s *service) newIOProxy(logger *logrus.Entry, stdin, stdout, stderr string,
891901
return ioConnectorSet, nil
892902
}
893903

904+
func (s *service) addFIFOs(taskID, execID string, config cio.Config) error {
905+
s.fifosMu.Lock()
906+
defer s.fifosMu.Unlock()
907+
908+
_, exists := s.fifos[taskID]
909+
if !exists {
910+
s.fifos[taskID] = make(map[string]cio.Config)
911+
}
912+
913+
value, exists := s.fifos[taskID][execID]
914+
if exists {
915+
return fmt.Errorf("failed to add FIFO files for task %q (exec=%q). There was %+v already", taskID, execID, value)
916+
}
917+
s.fifos[taskID][execID] = config
918+
return nil
919+
}
920+
921+
func (s *service) deleteFIFOs(taskID, execID string) error {
922+
s.fifosMu.Lock()
923+
defer s.fifosMu.Unlock()
924+
925+
_, exists := s.fifos[taskID][execID]
926+
if !exists {
927+
return fmt.Errorf("task %q (exec=%q) doesn't have corresponding FIFOs to delete", taskID, execID)
928+
}
929+
delete(s.fifos[taskID], execID)
930+
931+
if execID == taskExecID {
932+
delete(s.fifos, taskID)
933+
}
934+
return nil
935+
}
936+
894937
func (s *service) Create(requestCtx context.Context, request *taskAPI.CreateTaskRequest) (*taskAPI.CreateTaskResponse, error) {
895938
logger := s.logger.WithField("task_id", request.ID)
896939
defer logPanicAndDie(logger)
@@ -982,6 +1025,15 @@ func (s *service) Create(requestCtx context.Context, request *taskAPI.CreateTask
9821025
return nil, err
9831026
}
9841027

1028+
err = s.addFIFOs(request.ID, taskExecID, cio.Config{
1029+
Stdin: request.Stdin,
1030+
Stdout: request.Stdout,
1031+
Stderr: request.Stderr,
1032+
})
1033+
if err != nil {
1034+
return nil, err
1035+
}
1036+
9851037
return resp, nil
9861038
}
9871039

@@ -1008,6 +1060,11 @@ func (s *service) Delete(requestCtx context.Context, req *taskAPI.DeleteRequest)
10081060
return nil, err
10091061
}
10101062

1063+
err = s.deleteFIFOs(req.ID, req.ExecID)
1064+
if err != nil {
1065+
return nil, err
1066+
}
1067+
10111068
// Only delete a process as like runc when there is ExecID
10121069
// https://github.com/containerd/containerd/blob/f3e148b1ccf268450c87427b5dbb6187db3d22f1/runtime/v2/runc/container.go#L320
10131070
if req.ExecID != "" {
@@ -1069,6 +1126,16 @@ func (s *service) Exec(requestCtx context.Context, req *taskAPI.ExecProcessReque
10691126
return nil, err
10701127
}
10711128

1129+
err = s.addFIFOs(req.ID, req.ExecID, cio.Config{
1130+
Terminal: req.Terminal,
1131+
Stdin: req.Stdin,
1132+
Stdout: req.Stdout,
1133+
Stderr: req.Stderr,
1134+
})
1135+
if err != nil {
1136+
return nil, err
1137+
}
1138+
10721139
return resp, nil
10731140
}
10741141

@@ -1095,6 +1162,14 @@ func (s *service) State(requestCtx context.Context, req *taskAPI.StateRequest) (
10951162
return nil, err
10961163
}
10971164

1165+
// These fields are pointing files inside the VM.
1166+
// Replace them with the corresponding files on the host, so clients can access.
1167+
s.fifosMu.Lock()
1168+
defer s.fifosMu.Unlock()
1169+
resp.Stdin = s.fifos[req.ID][req.ExecID].Stdin
1170+
resp.Stdout = s.fifos[req.ID][req.ExecID].Stdout
1171+
resp.Stderr = s.fifos[req.ID][req.ExecID].Stderr
1172+
10981173
return resp, nil
10991174
}
11001175

runtime/service_integ_test.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2031,3 +2031,98 @@ func TestCreateVM_Isolated(t *testing.T) {
20312031
})
20322032
}
20332033
}
2034+
2035+
func TestAttach_Isolated(t *testing.T) {
2036+
prepareIntegTest(t)
2037+
2038+
client, err := containerd.New(containerdSockPath, containerd.WithDefaultRuntime(firecrackerRuntime))
2039+
require.NoError(t, err, "unable to create client to containerd service at %s, is containerd running?", containerdSockPath)
2040+
defer client.Close()
2041+
2042+
ctx := namespaces.WithNamespace(context.Background(), "default")
2043+
2044+
image, err := alpineImage(ctx, client, defaultSnapshotterName)
2045+
require.NoError(t, err, "failed to get alpine image")
2046+
2047+
testcases := []struct {
2048+
name string
2049+
newIO func(context.Context, string) (cio.IO, error)
2050+
expected string
2051+
}{
2052+
{
2053+
name: "attach",
2054+
newIO: func(ctx context.Context, id string) (cio.IO, error) {
2055+
set, err := cio.NewFIFOSetInDir("", id, false)
2056+
if err != nil {
2057+
return nil, err
2058+
}
2059+
2060+
return cio.NewDirectIO(ctx, set)
2061+
},
2062+
expected: "hello\n",
2063+
},
2064+
{
2065+
name: "null io",
2066+
2067+
// firecracker-containerd doesn't create IO Proxy objects in this case.
2068+
newIO: func(ctx context.Context, id string) (cio.IO, error) {
2069+
return cio.NullIO(id)
2070+
},
2071+
2072+
// So, attaching new IOs doesn't work.
2073+
// While it looks odd, containerd's v2 shim has the same behavior.
2074+
expected: "",
2075+
},
2076+
}
2077+
2078+
for _, tc := range testcases {
2079+
tc := tc
2080+
t.Run(tc.name, func(t *testing.T) {
2081+
name := testNameToVMID(t.Name())
2082+
2083+
c, err := client.NewContainer(ctx,
2084+
"container-"+name,
2085+
containerd.WithSnapshotter(defaultSnapshotterName),
2086+
containerd.WithNewSnapshot("snapshot-"+name, image),
2087+
containerd.WithNewSpec(oci.WithProcessArgs("/bin/cat")),
2088+
)
2089+
require.NoError(t, err)
2090+
2091+
io, err := tc.newIO(ctx, name)
2092+
require.NoError(t, err)
2093+
2094+
t1, err := c.NewTask(ctx, func(id string) (cio.IO, error) {
2095+
return io, nil
2096+
})
2097+
require.NoError(t, err)
2098+
2099+
ch, err := t1.Wait(ctx)
2100+
require.NoError(t, err)
2101+
2102+
err = t1.Start(ctx)
2103+
require.NoError(t, err)
2104+
2105+
stdin := bytes.NewBufferString("hello\n")
2106+
var stdout bytes.Buffer
2107+
t2, err := c.Task(
2108+
ctx,
2109+
cio.NewAttach(cio.WithStreams(stdin, &stdout, nil)),
2110+
)
2111+
require.NoError(t, err)
2112+
assert.Equal(t, t1.ID(), t2.ID())
2113+
2114+
err = io.Close()
2115+
assert.NoError(t, err)
2116+
2117+
err = t2.CloseIO(ctx, containerd.WithStdinCloser)
2118+
assert.NoError(t, err)
2119+
2120+
<-ch
2121+
2122+
_, err = t2.Delete(ctx)
2123+
require.NoError(t, err)
2124+
2125+
assert.Equal(t, tc.expected, stdout.String())
2126+
})
2127+
}
2128+
}

0 commit comments

Comments
 (0)