Skip to content

Commit 8c04d5c

Browse files
committed
1 parent cba10b3 commit 8c04d5c

File tree

18 files changed

+2384
-2034
lines changed

18 files changed

+2384
-2034
lines changed

cmd/streampanel/FyneApp.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@ Website = "https://github.com/xaionaro/streamctl"
55
Name = "streampanel"
66
ID = "center.dx.streampanel"
77
Version = "0.1.0"
8-
Build = 428
8+
Build = 432

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ replace github.com/asticode/go-astiav v0.36.0 => github.com/xaionaro-go/astiav v
2727

2828
replace github.com/bluenviron/mediacommon/v2 v2.0.1-0.20250324151931-b8ce69d15d3d => github.com/xaionaro-go/mediacommon/v2 v2.0.0-20250420012906-03d6d69ac3b7
2929

30+
replace github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef => github.com/mysteriumnetwork/EventBus v0.0.0-20220414214953-84469ec2b111
31+
3032
require (
3133
github.com/facebookincubator/go-belt v0.0.0-20250308011339-62fb7027b11f
3234
github.com/go-git/go-billy/v5 v5.6.2

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,6 @@ github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj
141141
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
142142
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
143143
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
144-
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef h1:2JGTg6JapxP9/R33ZaagQtAM4EkkSYnIAlOG5EI8gkM=
145-
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef/go.mod h1:JS7hed4L1fj0hXcyEejnW57/7LCetXggd+vwrRnYeII=
146144
github.com/asticode/go-astikit v0.30.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0=
147145
github.com/asticode/go-astikit v0.54.0 h1:uq9eurgisdkYwJU9vSWIQaPH4MH0cac82sQH00kmSNQ=
148146
github.com/asticode/go-astikit v0.54.0/go.mod h1:fV43j20UZYfXzP9oBn33udkvCvDvCDhzjVqoLFuuYZE=
@@ -743,6 +741,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
743741
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
744742
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
745743
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
744+
github.com/mysteriumnetwork/EventBus v0.0.0-20220414214953-84469ec2b111 h1:7s+VqlctjdVjy1z0slV2giUawTnv1A6vWj9oKKfgPhI=
745+
github.com/mysteriumnetwork/EventBus v0.0.0-20220414214953-84469ec2b111/go.mod h1:ef8wV5ITJhXSTG1sUkcHPAQF7lh83c7l875IvrYU7H0=
746746
github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo=
747747
github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM=
748748
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 h1:zYyBkD/k9seD2A7fsi6Oo2LfFZAehjjQMERAvZLEDnQ=

pkg/chatmessagesstorage/add_message.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ func (s *ChatMessagesStorage) addMessageLocked(
2626
if len(s.Messages) > 0 && !msg.CreatedAt.After(s.Messages[len(s.Messages)-1].CreatedAt) {
2727
s.IsSorted = false
2828
}
29+
msg.IsLive = false
2930
s.Messages = append(s.Messages, msg)
3031
s.IsChanged = true
3132
if len(s.Messages) <= MaxMessages {

pkg/chatmessagesstorage/get_messages_since.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,9 @@ func (s *ChatMessagesStorage) getMessagesSinceLocked(
4141

4242
idx := sort.Search(len(s.Messages), func(i int) bool {
4343
m := &s.Messages[i]
44-
return !m.CreatedAt.After(since)
44+
return !m.CreatedAt.Before(since)
4545
})
46+
logger.Tracef(ctx, "search result index: %d", idx)
4647

4748
if idx >= len(s.Messages) {
4849
lastMessage := s.Messages[len(s.Messages)-1]
@@ -54,7 +55,15 @@ func (s *ChatMessagesStorage) getMessagesSinceLocked(
5455
}
5556

5657
if limit > 0 && len(s.Messages)-idx > int(limit) {
58+
oldIdx := idx
5759
idx = len(s.Messages) - int(limit)
60+
logger.Tracef(ctx, "corrected the idx from %d to %d as per the count limit", oldIdx, idx)
61+
}
62+
63+
if idx < len(s.Messages) {
64+
if s.Messages[idx].CreatedAt.Before(since) {
65+
logger.Errorf(ctx, "internal error, for some reason we loaded messages older than %v, for example %v", since, s.Messages[idx])
66+
}
5867
}
5968

6069
logger.Tracef(ctx, "s.Messages[%d:%d]", idx, len(s.Messages))

pkg/streamd/api/streamd.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,13 +78,14 @@ type StreamD interface {
7878
ctx context.Context,
7979
platID streamcontrol.PlatformName,
8080
) (*streamcontrol.StreamStatus, error)
81-
GetVariable(ctx context.Context, key consts.VarKey) ([]byte, error)
81+
GetVariable(ctx context.Context, key consts.VarKey) (VariableValue, error)
8282
GetVariableHash(
8383
ctx context.Context,
8484
key consts.VarKey,
8585
hashType crypto.Hash,
8686
) ([]byte, error)
87-
SetVariable(ctx context.Context, key consts.VarKey, value []byte) error
87+
SetVariable(ctx context.Context, key consts.VarKey, value VariableValue) error
88+
SubscribeToVariable(ctx context.Context, key consts.VarKey) (<-chan VariableValue, error)
8889

8990
OBS(ctx context.Context) (obs_grpc.OBSServer, context.CancelFunc, error)
9091

@@ -396,4 +397,7 @@ type TriggerRules = config.TriggerRules
396397
type ChatMessage struct {
397398
streamcontrol.ChatMessage
398399
Platform streamcontrol.PlatformName
400+
IsLive bool
399401
}
402+
403+
type VariableValue []byte

pkg/streamd/chat.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ import (
1111
"github.com/xaionaro-go/streamctl/pkg/streamd/api"
1212
)
1313

14+
const (
15+
debugSendArchiveMessagesAsLive = false
16+
)
17+
1418
type ChatMessageStorage interface {
1519
AddMessage(context.Context, api.ChatMessage) error
1620
RemoveMessage(context.Context, streamcontrol.ChatMessageID) error
@@ -45,6 +49,7 @@ func (d *StreamD) startListeningForChatMessages(
4549
}
4650
msg := api.ChatMessage{
4751
ChatMessage: ev,
52+
IsLive: true,
4853
Platform: platName,
4954
}
5055
if err := d.ChatMessagesStorage.AddMessage(ctx, msg); err != nil {
@@ -103,17 +108,40 @@ func (d *StreamD) SubscribeToChatMessages(
103108
ctx context.Context,
104109
since time.Time,
105110
limit uint64,
106-
) (<-chan api.ChatMessage, error) {
111+
) (_ret <-chan api.ChatMessage, _err error) {
112+
logger.Tracef(ctx, "SubscribeToChatMessages(ctx, %v, %v)", since, limit)
113+
defer func() { logger.Tracef(ctx, "/SubscribeToChatMessages(ctx, %v, %v): %p %v", since, limit, _ret, _err) }()
114+
107115
return eventSubToChan(
108116
ctx, d,
109117
func(ctx context.Context, outCh chan api.ChatMessage) {
118+
logger.Tracef(ctx, "backfilling the channel")
119+
defer func() { logger.Tracef(ctx, "/backfilling the channel") }()
110120
msgs, err := d.ChatMessagesStorage.GetMessagesSince(ctx, since, uint(limit))
111121
if err != nil {
112122
logger.Errorf(ctx, "unable to get the messages from the storage: %v", err)
113123
return
114124
}
115125
for _, msg := range msgs {
116-
outCh <- msg
126+
msg.IsLive = false
127+
if debugSendArchiveMessagesAsLive {
128+
msg.IsLive = true
129+
}
130+
if !func() (_ret bool) {
131+
defer func() {
132+
if recover() != nil {
133+
logger.Debugf(ctx, "the channel is closed")
134+
_ret = false
135+
}
136+
}()
137+
outCh <- msg
138+
return true
139+
}() {
140+
break
141+
}
142+
if debugSendArchiveMessagesAsLive {
143+
time.Sleep(5 * time.Second)
144+
}
117145
}
118146
},
119147
)

pkg/streamd/client/client.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1014,7 +1014,7 @@ func (c *Client) SubscribeToOAuthURLs(
10141014
func (c *Client) GetVariable(
10151015
ctx context.Context,
10161016
key consts.VarKey,
1017-
) ([]byte, error) {
1017+
) (api.VariableValue, error) {
10181018
reply, err := withStreamDClient(ctx, c, func(
10191019
ctx context.Context,
10201020
client streamd_grpc.StreamDClient,
@@ -1099,7 +1099,7 @@ func (c *Client) GetVariableHash(
10991099
func (c *Client) SetVariable(
11001100
ctx context.Context,
11011101
key consts.VarKey,
1102-
value []byte,
1102+
value api.VariableValue,
11031103
) error {
11041104
_, err := withStreamDClient(ctx, c, func(
11051105
ctx context.Context,
@@ -1119,6 +1119,35 @@ func (c *Client) SetVariable(
11191119
return err
11201120
}
11211121

1122+
func (c *Client) SubscribeToVariable(
1123+
ctx context.Context,
1124+
varKey consts.VarKey,
1125+
) (<-chan api.VariableValue, error) {
1126+
return unwrapStreamDChan(
1127+
ctx,
1128+
c,
1129+
func(
1130+
ctx context.Context,
1131+
client streamd_grpc.StreamDClient,
1132+
) (streamd_grpc.StreamD_SubscribeToVariableClient, error) {
1133+
return callWrapper(
1134+
ctx,
1135+
c,
1136+
client.SubscribeToVariable,
1137+
&streamd_grpc.SubscribeToVariableRequest{
1138+
Key: string(varKey),
1139+
},
1140+
)
1141+
},
1142+
func(
1143+
ctx context.Context,
1144+
event *streamd_grpc.VariableChange,
1145+
) api.VariableValue {
1146+
return event.GetValue()
1147+
},
1148+
)
1149+
}
1150+
11221151
func (c *Client) OBS(
11231152
ctx context.Context,
11241153
) (obs_grpc.OBSServer, context.CancelFunc, error) {

pkg/streamd/events.go

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -100,13 +100,21 @@ func eventSubToChan[T any](
100100
defer func() { logger.Debugf(ctx, "/eventSubToChan[%T]", sample) }()
101101

102102
topic := eventTopic(sample)
103+
return eventSubToChanUsingTopic(ctx, d, onReady, topic)
104+
}
103105

106+
func eventSubToChanUsingTopic[T any](
107+
ctx context.Context,
108+
d *StreamD,
109+
onReady func(ctx context.Context, outCh chan T),
110+
topic string,
111+
) (<-chan T, error) {
104112
var mutex sync.Mutex
105113
r := make(chan T)
106114
callback := func(in T) {
107115
mutex.Lock()
108116
defer mutex.Unlock()
109-
logger.Debugf(ctx, "eventSubToChan[%T]: received %#+v", sample, in)
117+
logger.Debugf(ctx, "eventSubToChanUsingTopic(%T): received %#+v", topic, in)
110118

111119
select {
112120
case <-ctx.Done():
@@ -121,28 +129,27 @@ func eventSubToChan[T any](
121129
}
122130
}
123131

124-
if onReady != nil {
125-
mutex.Lock()
126-
}
127-
128-
err := d.EventBus.SubscribeAsync(topic, callback, true)
129-
if err != nil {
130-
return nil, fmt.Errorf("unable to subscribe: %w", err)
131-
}
132-
133132
if onReady != nil {
134133
observability.Go(ctx, func(ctx context.Context) {
134+
mutex.Lock()
135135
defer mutex.Unlock()
136+
err := d.EventBus.SubscribeAsync(topic, callback, true)
137+
if err != nil {
138+
logger.Errorf(ctx, "unable to subscribe: %v", err)
139+
return
140+
}
136141
onReady(ctx, r)
137142
})
143+
} else {
144+
err := d.EventBus.SubscribeAsync(topic, callback, true)
145+
if err != nil {
146+
return nil, fmt.Errorf("unable to subscribe: %w", err)
147+
}
138148
}
139149

140150
observability.Go(ctx, func(ctx context.Context) {
141151
<-ctx.Done()
142152

143-
mutex.Lock()
144-
defer mutex.Unlock()
145-
146153
d.EventBus.Unsubscribe(topic, callback)
147154
d.EventBus.WaitAsync()
148155
close(r)

pkg/streamd/events_reflect.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package streamd
22

33
import (
44
"context"
5+
"fmt"
56
"reflect"
67

78
"github.com/facebookincubator/go-belt/tool/logger"
@@ -16,7 +17,7 @@ func eventTopic(
1617
if t.Kind() == reflect.Pointer {
1718
t = t.Elem()
1819
}
19-
return t.Name()
20+
return fmt.Sprintf("type:", t.Name())
2021
}
2122

2223
func (d *StreamD) publishEvent(

0 commit comments

Comments
 (0)