Skip to content
This repository was archived by the owner on Dec 28, 2024. It is now read-only.

Commit 497eb98

Browse files
jaymellsvix-james
authored andcommitted
Relay: Improved Error Channel Usage
Use `stopRead` and `stopWrite` channels to communicate with sender / receive loops about . If channel close is initiated, as indicated by a message to `stopRead` or `stopWrite`, don't send errors on the error channel, which won't be read at that point anyway. Furthermore, add a small amount of buffering to the channels just to avoid possibility that a single message could inadvertently cause blocking.
1 parent 2449e63 commit 497eb98

File tree

1 file changed

+30
-26
lines changed

1 file changed

+30
-26
lines changed

relay/relay.go

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -87,17 +87,17 @@ func NewClient(token string, localURL *url.URL, opts *ClientOptions) *Client {
8787
},
8888
Timeout: defaultTimeout,
8989
},
90-
stopRead: make(chan struct{}),
91-
stopWrite: make(chan struct{}),
90+
stopRead: make(chan struct{}, 10),
91+
stopWrite: make(chan struct{}, 10),
9292

93-
errChan: make(chan error),
94-
95-
// TODO should these be buffered?
96-
sendChan: make(chan *OutgoingMessageEvent),
97-
recChan: make(chan *IncomingMessage),
93+
errChan: make(chan error, 10),
94+
sendChan: make(chan *OutgoingMessageEvent, 10),
95+
recChan: make(chan *IncomingMessage, 10),
9896
}
9997
}
10098

99+
type Stop = struct {}
100+
101101
func (c *Client) Listen(ctx context.Context) {
102102
if c.conn != nil {
103103
fmt.Printf("relay already listening\n")
@@ -114,15 +114,13 @@ func (c *Client) Listen(ctx context.Context) {
114114

115115
select {
116116
case <-ctx.Done():
117-
close(c.sendChan)
118-
close(c.recChan)
119-
120-
close(c.stopRead)
121-
close(c.stopWrite)
117+
c.stopRead <- Stop{}
118+
c.stopWrite <- Stop{}
122119
return
123120
case <-c.errChan:
124-
close(c.stopRead)
125-
close(c.stopWrite)
121+
c.stopRead <- Stop{}
122+
c.stopWrite <- Stop{}
123+
c.close();
126124
c.wg.Wait()
127125
}
128126
}
@@ -136,9 +134,9 @@ func (c *Client) close() {
136134

137135
func (c *Client) changeConnection(conn *websocket.Conn) {
138136
c.conn = conn
139-
c.errChan = make(chan error)
140-
c.stopRead = make(chan struct{})
141-
c.stopWrite = make(chan struct{})
137+
c.errChan = make(chan error, 10)
138+
c.stopRead = make(chan struct{}, 10)
139+
c.stopWrite = make(chan struct{}, 10)
142140
}
143141

144142
func (c *Client) connect(ctx context.Context) error {
@@ -218,13 +216,7 @@ func (c *Client) recLoop() {
218216
for {
219217
_, packet, err := c.conn.ReadMessage()
220218
if err != nil {
221-
select {
222-
case <-c.stopRead:
223-
// dont send error if
224-
// stop was requested
225-
default:
226-
c.errChan <- err
227-
}
219+
c.sendErrorMaybe(err, c.stopRead)
228220
return
229221
}
230222
go c.handleIncommingMessage(packet)
@@ -250,16 +242,18 @@ func (c *Client) sendLoop() {
250242
if err != nil {
251243
// resend message when we reconnect
252244
c.SendMessage(msg)
253-
c.errChan <- err
245+
c.sendErrorMaybe(err, c.stopWrite)
254246
return
255247
}
248+
256249
case <-ticker.C:
257250
_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
258251

259252
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
260-
c.errChan <- err
253+
c.sendErrorMaybe(err, c.stopWrite)
261254
return
262255
}
256+
263257
case <-c.stopWrite:
264258
// ignore error
265259
return
@@ -347,3 +341,13 @@ func (c *Client) processResponse(id string, res *http.Response) {
347341
color.Green("-> Recieved \"%s\" response, forwarding to webhook sender\n", res.Status)
348342
c.SendMessage(msg)
349343
}
344+
345+
func (c *Client) sendErrorMaybe(err error, stopChan chan(struct{})) {
346+
select {
347+
case <-stopChan:
348+
// dont send error if
349+
// stop was requested
350+
default:
351+
c.errChan <- err
352+
}
353+
}

0 commit comments

Comments
 (0)