Skip to content

Commit 3aa0686

Browse files
subprocess Add support for terminating processes instead of killing them (#624)
<!-- Copyright (C) 2020-2022 Arm Limited or its affiliates and Contributors. All rights reserved. SPDX-License-Identifier: Apache-2.0 --> ### Description <!-- Please add any detail or context that would be useful to a reviewer. --> Add support for terminating processes instead of killing them ### Test Coverage <!-- Please put an `x` in the correct box e.g. `[x]` to indicate the testing coverage of this change. --> - [x] This change is covered by existing or additional automated tests. - [ ] Manual testing has been performed (and evidence provided) as automated testing was not feasible. - [ ] Additional tests are not required for this change (e.g. documentation update). --------- Co-authored-by: Adrien CABARBAYE <adrien.cabarbaye@arm.com>
1 parent ea31d26 commit 3aa0686

15 files changed

+840
-192
lines changed

.secrets.baseline

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,5 +272,5 @@
272272
}
273273
]
274274
},
275-
"generated_at": "2025-06-05T07:39:32Z"
275+
"generated_at": "2025-06-04T19:30:24Z"
276276
}

changes/20250530171020.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
:sparkles: `subprocess` Add support for terminating processes instead of killing them

changes/20250604202816.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
:sparkles: Introducing [diodes] module which is a copy of [cloud foundary's](https://github.com/cloudfoundry/go-diodes) library.

utils/diodes/README

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Vendoring [cloud foundary](https://github.com/cloudfoundry/go-diodes)) diode libraries to avoid importing test dependencies.
2+

utils/diodes/many_to_one.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
package diodes
2+
3+
import (
4+
"log"
5+
"sync/atomic"
6+
"unsafe"
7+
)
8+
9+
// ManyToOne diode is optimal for many writers (go-routines B-n) and a single
10+
// reader (go-routine A). It is not thread safe for multiple readers.
11+
type ManyToOne struct {
12+
writeIndex uint64
13+
buffer []unsafe.Pointer
14+
readIndex uint64
15+
alerter Alerter
16+
}
17+
18+
// NewManyToOne creates a new diode (ring buffer). The ManyToOne diode
19+
// is optimzed for many writers (on go-routines B-n) and a single reader
20+
// (on go-routine A). The alerter is invoked on the read's go-routine. It is
21+
// called when it notices that the writer go-routine has passed it and wrote
22+
// over data. A nil can be used to ignore alerts.
23+
func NewManyToOne(size int, alerter Alerter) *ManyToOne {
24+
if alerter == nil {
25+
alerter = AlertFunc(func(int) {})
26+
}
27+
28+
d := &ManyToOne{
29+
buffer: make([]unsafe.Pointer, size),
30+
alerter: alerter,
31+
}
32+
33+
// Start write index at the value before 0
34+
// to allow the first write to use AddUint64
35+
// and still have a beginning index of 0
36+
d.writeIndex = ^d.writeIndex
37+
return d
38+
}
39+
40+
// Set sets the data in the next slot of the ring buffer.
41+
func (d *ManyToOne) Set(data GenericDataType) {
42+
for {
43+
writeIndex := atomic.AddUint64(&d.writeIndex, 1)
44+
idx := writeIndex % uint64(len(d.buffer))
45+
old := atomic.LoadPointer(&d.buffer[idx])
46+
47+
if old != nil &&
48+
(*bucket)(old) != nil &&
49+
(*bucket)(old).seq > writeIndex-uint64(len(d.buffer)) {
50+
log.Println("Diode set collision: consider using a larger diode")
51+
continue
52+
}
53+
54+
newBucket := &bucket{
55+
data: data,
56+
seq: writeIndex,
57+
}
58+
59+
if !atomic.CompareAndSwapPointer(&d.buffer[idx], old, unsafe.Pointer(newBucket)) {
60+
log.Println("Diode set collision: consider using a larger diode")
61+
continue
62+
}
63+
64+
return
65+
}
66+
}
67+
68+
// TryNext will attempt to read from the next slot of the ring buffer.
69+
// If there is not data available, it will return (nil, false).
70+
func (d *ManyToOne) TryNext() (data GenericDataType, ok bool) {
71+
// Read a value from the ring buffer based on the readIndex.
72+
idx := d.readIndex % uint64(len(d.buffer))
73+
result := (*bucket)(atomic.SwapPointer(&d.buffer[idx], nil))
74+
75+
// When the result is nil that means the writer has not had the
76+
// opportunity to write a value into the diode. This value must be ignored
77+
// and the read head must not increment.
78+
if result == nil {
79+
return nil, false
80+
}
81+
82+
// When the seq value is less than the current read index that means a
83+
// value was read from idx that was previously written but has since has
84+
// been dropped. This value must be ignored and the read head must not
85+
// increment.
86+
//
87+
// The simulation for this scenario assumes the fast forward occurred as
88+
// detailed below.
89+
//
90+
// 5. The reader reads again getting seq 5. It then reads again expecting
91+
// seq 6 but gets seq 2. This is a read of a stale value that was
92+
// effectively "dropped" so the read fails and the read head stays put.
93+
// `| 4 | 5 | 2 | 3 |` r: 7, w: 6
94+
//
95+
if result.seq < d.readIndex {
96+
return nil, false
97+
}
98+
99+
// When the seq value is greater than the current read index that means a
100+
// value was read from idx that overwrote the value that was expected to
101+
// be at this idx. This happens when the writer has lapped the reader. The
102+
// reader needs to catch up to the writer so it moves its write head to
103+
// the new seq, effectively dropping the messages that were not read in
104+
// between the two values.
105+
//
106+
// Here is a simulation of this scenario:
107+
//
108+
// 1. Both the read and write heads start at 0.
109+
// `| nil | nil | nil | nil |` r: 0, w: 0
110+
// 2. The writer fills the buffer.
111+
// `| 0 | 1 | 2 | 3 |` r: 0, w: 4
112+
// 3. The writer laps the read head.
113+
// `| 4 | 5 | 2 | 3 |` r: 0, w: 6
114+
// 4. The reader reads the first value, expecting a seq of 0 but reads 4,
115+
// this forces the reader to fast forward to 5.
116+
// `| 4 | 5 | 2 | 3 |` r: 5, w: 6
117+
//
118+
if result.seq > d.readIndex {
119+
dropped := result.seq - d.readIndex
120+
d.readIndex = result.seq
121+
d.alerter.Alert(int(dropped)) // nolint:gosec
122+
}
123+
124+
// Only increment read index if a regular read occurred (where seq was
125+
// equal to readIndex) or a value was read that caused a fast forward
126+
// (where seq was greater than readIndex).
127+
//
128+
d.readIndex++
129+
return result.data, true
130+
}

utils/diodes/one_to_one.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package diodes
2+
3+
import (
4+
"sync/atomic"
5+
"unsafe"
6+
)
7+
8+
// GenericDataType is the data type the diodes operate on.
9+
type GenericDataType unsafe.Pointer
10+
11+
// Alerter is used to report how many values were overwritten since the
12+
// last write.
13+
type Alerter interface {
14+
Alert(missed int)
15+
}
16+
17+
// AlertFunc type is an adapter to allow the use of ordinary functions as
18+
// Alert handlers.
19+
type AlertFunc func(missed int)
20+
21+
// Alert calls f(missed)
22+
func (f AlertFunc) Alert(missed int) {
23+
f(missed)
24+
}
25+
26+
type bucket struct {
27+
data GenericDataType
28+
seq uint64 // seq is the recorded write index at the time of writing
29+
}
30+
31+
// OneToOne diode is meant to be used by a single reader and a single writer.
32+
// It is not thread safe if used otherwise.
33+
type OneToOne struct {
34+
buffer []unsafe.Pointer
35+
writeIndex uint64
36+
readIndex uint64
37+
alerter Alerter
38+
}
39+
40+
// NewOneToOne creates a new diode is meant to be used by a single reader and
41+
// a single writer. The alerter is invoked on the read's go-routine. It is
42+
// called when it notices that the writer go-routine has passed it and wrote
43+
// over data. A nil can be used to ignore alerts.
44+
func NewOneToOne(size int, alerter Alerter) *OneToOne {
45+
if alerter == nil {
46+
alerter = AlertFunc(func(int) {})
47+
}
48+
49+
return &OneToOne{
50+
buffer: make([]unsafe.Pointer, size),
51+
alerter: alerter,
52+
}
53+
}
54+
55+
// Set sets the data in the next slot of the ring buffer.
56+
func (d *OneToOne) Set(data GenericDataType) {
57+
idx := d.writeIndex % uint64(len(d.buffer))
58+
59+
newBucket := &bucket{
60+
data: data,
61+
seq: d.writeIndex,
62+
}
63+
d.writeIndex++
64+
65+
atomic.StorePointer(&d.buffer[idx], unsafe.Pointer(newBucket))
66+
}
67+
68+
// TryNext will attempt to read from the next slot of the ring buffer.
69+
// If there is no data available, it will return (nil, false).
70+
func (d *OneToOne) TryNext() (data GenericDataType, ok bool) {
71+
// Read a value from the ring buffer based on the readIndex.
72+
idx := d.readIndex % uint64(len(d.buffer))
73+
result := (*bucket)(atomic.SwapPointer(&d.buffer[idx], nil))
74+
75+
// When the result is nil that means the writer has not had the
76+
// opportunity to write a value into the diode. This value must be ignored
77+
// and the read head must not increment.
78+
if result == nil {
79+
return nil, false
80+
}
81+
82+
// When the seq value is less than the current read index that means a
83+
// value was read from idx that was previously written but has since has
84+
// been dropped. This value must be ignored and the read head must not
85+
// increment.
86+
//
87+
// The simulation for this scenario assumes the fast forward occurred as
88+
// detailed below.
89+
//
90+
// 5. The reader reads again getting seq 5. It then reads again expecting
91+
// seq 6 but gets seq 2. This is a read of a stale value that was
92+
// effectively "dropped" so the read fails and the read head stays put.
93+
// `| 4 | 5 | 2 | 3 |` r: 7, w: 6
94+
//
95+
if result.seq < d.readIndex {
96+
return nil, false
97+
}
98+
99+
// When the seq value is greater than the current read index that means a
100+
// value was read from idx that overwrote the value that was expected to
101+
// be at this idx. This happens when the writer has lapped the reader. The
102+
// reader needs to catch up to the writer so it moves its write head to
103+
// the new seq, effectively dropping the messages that were not read in
104+
// between the two values.
105+
//
106+
// Here is a simulation of this scenario:
107+
//
108+
// 1. Both the read and write heads start at 0.
109+
// `| nil | nil | nil | nil |` r: 0, w: 0
110+
// 2. The writer fills the buffer.
111+
// `| 0 | 1 | 2 | 3 |` r: 0, w: 4
112+
// 3. The writer laps the read head.
113+
// `| 4 | 5 | 2 | 3 |` r: 0, w: 6
114+
// 4. The reader reads the first value, expecting a seq of 0 but reads 4,
115+
// this forces the reader to fast forward to 5.
116+
// `| 4 | 5 | 2 | 3 |` r: 5, w: 6
117+
//
118+
if result.seq > d.readIndex {
119+
dropped := result.seq - d.readIndex
120+
d.readIndex = result.seq
121+
d.alerter.Alert(int(dropped)) // nolint:gosec
122+
}
123+
124+
// Only increment read index if a regular read occurred (where seq was
125+
// equal to readIndex) or a value was read that caused a fast forward
126+
// (where seq was greater than readIndex).
127+
d.readIndex++
128+
return result.data, true
129+
}

utils/diodes/poller.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package diodes
2+
3+
import (
4+
"context"
5+
"time"
6+
)
7+
8+
// Diode is any implementation of a diode.
9+
type Diode interface {
10+
Set(GenericDataType)
11+
TryNext() (GenericDataType, bool)
12+
}
13+
14+
// Poller will poll a diode until a value is available.
15+
type Poller struct {
16+
Diode
17+
interval time.Duration
18+
ctx context.Context
19+
}
20+
21+
// PollerConfigOption can be used to setup the poller.
22+
type PollerConfigOption func(*Poller)
23+
24+
// WithPollingInterval sets the interval at which the diode is queried
25+
// for new data. The default is 10ms.
26+
func WithPollingInterval(interval time.Duration) PollerConfigOption {
27+
return PollerConfigOption(func(c *Poller) {
28+
c.interval = interval
29+
})
30+
}
31+
32+
// WithPollingContext sets the context to cancel any retrieval (Next()). It
33+
// will not change any results for adding data (Set()). Default is
34+
// context.Background().
35+
func WithPollingContext(ctx context.Context) PollerConfigOption {
36+
return PollerConfigOption(func(c *Poller) {
37+
c.ctx = ctx
38+
})
39+
}
40+
41+
// NewPoller returns a new Poller that wraps the given diode.
42+
func NewPoller(d Diode, opts ...PollerConfigOption) *Poller {
43+
p := &Poller{
44+
Diode: d,
45+
interval: 10 * time.Millisecond,
46+
ctx: context.Background(),
47+
}
48+
49+
for _, o := range opts {
50+
o(p)
51+
}
52+
53+
return p
54+
}
55+
56+
// Next polls the diode until data is available or until the context is done.
57+
// If the context is done, then nil will be returned.
58+
func (p *Poller) Next() GenericDataType {
59+
for {
60+
data, ok := p.Diode.TryNext() // nolint:staticcheck
61+
if !ok {
62+
if p.IsDone() {
63+
return nil
64+
}
65+
66+
time.Sleep(p.interval)
67+
continue
68+
}
69+
return data
70+
}
71+
}
72+
73+
func (p *Poller) IsDone() bool {
74+
select {
75+
case <-p.ctx.Done():
76+
return true
77+
default:
78+
return false
79+
}
80+
}

0 commit comments

Comments
 (0)