Skip to content

Commit bbf03b7

Browse files
committed
Limit the amount of chat messages
1 parent 8636e92 commit bbf03b7

File tree

12 files changed

+542
-510
lines changed

12 files changed

+542
-510
lines changed

cmd/streamcli/commands/commands.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ func chatListen(cmd *cobra.Command, args []string) {
319319
assertNoError(ctx, err)
320320

321321
fmt.Println("subscribing...")
322-
ch, err := streamD.SubscribeToChatMessages(ctx, time.Now().Add(-time.Minute))
322+
ch, err := streamD.SubscribeToChatMessages(ctx, time.Now().Add(-time.Minute), 1000)
323323
assertNoError(ctx, err)
324324

325325
fmt.Println("started listening...")

cmd/streampanel/FyneApp.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@ Website = "https://github.com/xaionaro/streamctl"
55
Name = "streampanel"
66
ID = "center.dx.streampanel"
77
Version = "0.1.0"
8-
Build = 411
8+
Build = 412

pkg/chatmessagesstorage/get_messages_since.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,20 @@ import (
1414
func (s *ChatMessagesStorage) GetMessagesSince(
1515
ctx context.Context,
1616
since time.Time,
17+
limit uint,
1718
) ([]api.ChatMessage, error) {
18-
return xsync.DoA2R2(ctx, &s.Mutex, s.getMessagesSinceLocked, ctx, since)
19+
return xsync.DoA3R2(ctx, &s.Mutex, s.getMessagesSinceLocked, ctx, since, limit)
1920
}
2021

2122
func (s *ChatMessagesStorage) getMessagesSinceLocked(
2223
ctx context.Context,
2324
since time.Time,
25+
limit uint,
2426
) (_ret []api.ChatMessage, _err error) {
25-
logger.Tracef(ctx, "getMessagesSinceLocked(ctx, %v)", since)
26-
defer func() { logger.Tracef(ctx, "/getMessagesSinceLocked(ctx, %v): len:%d, %v", since, len(_ret), _err) }()
27+
logger.Tracef(ctx, "getMessagesSinceLocked(ctx, %v, %d)", since, limit)
28+
defer func() {
29+
logger.Tracef(ctx, "/getMessagesSinceLocked(ctx, %v, %d): len:%d, %v", since, limit, len(_ret), _err)
30+
}()
2731

2832
if len(s.Messages) == 0 {
2933
return nil, nil
@@ -46,5 +50,9 @@ func (s *ChatMessagesStorage) getMessagesSinceLocked(
4650
idx = 0
4751
}
4852

53+
if limit > 0 && len(s.Messages)-idx > int(limit) {
54+
idx = len(s.Messages) - int(limit)
55+
}
56+
4957
return slices.Clone(s.Messages[idx:]), nil
5058
}

pkg/streamd/api/streamd.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ type StreamD interface {
279279
SubscribeToChatMessages(
280280
ctx context.Context,
281281
since time.Time,
282+
limit uint64,
282283
) (<-chan ChatMessage, error)
283284
SendChatMessage(
284285
ctx context.Context,

pkg/streamd/chat.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ type ChatMessageStorage interface {
1616
RemoveMessage(context.Context, streamcontrol.ChatMessageID) error
1717
Load(ctx context.Context) error
1818
Store(ctx context.Context) error
19-
GetMessagesSince(context.Context, time.Time) ([]api.ChatMessage, error)
19+
GetMessagesSince(context.Context, time.Time, uint) ([]api.ChatMessage, error)
2020
}
2121

2222
func (d *StreamD) startListeningForChatMessages(
@@ -102,11 +102,12 @@ func (d *StreamD) BanUser(
102102
func (d *StreamD) SubscribeToChatMessages(
103103
ctx context.Context,
104104
since time.Time,
105+
limit uint64,
105106
) (<-chan api.ChatMessage, error) {
106-
return eventSubToChan[api.ChatMessage](
107+
return eventSubToChan(
107108
ctx, d,
108109
func(ctx context.Context, outCh chan api.ChatMessage) {
109-
msgs, err := d.ChatMessagesStorage.GetMessagesSince(ctx, since)
110+
msgs, err := d.ChatMessagesStorage.GetMessagesSince(ctx, since, uint(limit))
110111
if err != nil {
111112
logger.Errorf(ctx, "unable to get the messages from the storage: %v", err)
112113
return

pkg/streamd/client/client.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2631,6 +2631,7 @@ func (c *Client) SubmitEvent(
26312631
func (c *Client) SubscribeToChatMessages(
26322632
ctx context.Context,
26332633
since time.Time,
2634+
limit uint64,
26342635
) (<-chan api.ChatMessage, error) {
26352636
return unwrapStreamDChan(
26362637
ctx,
@@ -2645,6 +2646,7 @@ func (c *Client) SubscribeToChatMessages(
26452646
client.SubscribeToChatMessages,
26462647
&streamd_grpc.SubscribeToChatMessagesRequest{
26472648
SinceUNIXNano: uint64(since.UnixNano()),
2649+
Limit: limit,
26482650
},
26492651
)
26502652
},

pkg/streamd/grpc/go/streamd_grpc/streamd.pb.go

Lines changed: 505 additions & 495 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/streamd/grpc/streamd.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -747,6 +747,7 @@ message SubmitEventReply {}
747747

748748
message SubscribeToChatMessagesRequest {
749749
uint64 sinceUNIXNano = 1;
750+
uint64 limit = 2;
750751
}
751752
message ChatMessage {
752753
uint64 createdAtUNIXNano = 1;

pkg/streamd/server/grpc.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1913,13 +1913,14 @@ func (grpc *GRPCServer) SubscribeToChatMessages(
19131913
srv streamd_grpc.StreamD_SubscribeToChatMessagesServer,
19141914
) error {
19151915
ts := req.GetSinceUNIXNano()
1916+
limit := req.GetLimit()
19161917
since := time.Unix(
19171918
int64(ts)/int64(time.Second.Nanoseconds()),
19181919
int64(ts)%int64(time.Second.Nanoseconds()),
19191920
)
19201921
return wrapChan(
19211922
func(ctx context.Context) (<-chan api.ChatMessage, error) {
1922-
return grpc.StreamD.SubscribeToChatMessages(ctx, since)
1923+
return grpc.StreamD.SubscribeToChatMessages(ctx, since, limit)
19231924
},
19241925
srv,
19251926
func(input api.ChatMessage) streamd_grpc.ChatMessage {

pkg/streampanel/chat.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
)
1818

1919
const (
20-
ChatLogSize = 100
20+
ChatLogSize = 50
2121
)
2222

2323
type chatUIInterface interface {
@@ -66,7 +66,7 @@ func (p *Panel) getChatUIs(ctx context.Context) []chatUIInterface {
6666
}
6767

6868
func (p *Panel) initChatMessagesHandler(ctx context.Context) error {
69-
msgCh, err := p.StreamD.SubscribeToChatMessages(ctx, time.Now().Add(-7*24*time.Hour))
69+
msgCh, err := p.StreamD.SubscribeToChatMessages(ctx, time.Now().Add(-7*24*time.Hour), ChatLogSize)
7070
if err != nil {
7171
return fmt.Errorf("unable to subscribe to chat messages: %w", err)
7272
}

0 commit comments

Comments
 (0)