Skip to content

Commit 0781495

Browse files
committed
Automatically re-init Kick chat listener
1 parent 1004082 commit 0781495

File tree

13 files changed

+320
-199
lines changed

13 files changed

+320
-199
lines changed

go.mod

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
module github.com/xaionaro-go/streamctl
22

3-
go 1.24.1
3+
go 1.24.4
4+
5+
toolchain go1.24.5
46

57
// The original go-yaml is very slow, using the improved version instead
68
replace github.com/goccy/go-yaml v1.11.3 => github.com/yoelsusanto/go-yaml v0.0.0-20240324162521-2018c1ab915b
@@ -281,7 +283,7 @@ require (
281283
github.com/pojntfx/weron v0.2.7
282284
github.com/prometheus/client_golang v1.20.5
283285
github.com/rs/zerolog v1.33.0
284-
github.com/scorfly/gokick v1.1.1
286+
github.com/scorfly/gokick v1.11.0
285287
github.com/sethvargo/go-password v0.3.1
286288
github.com/shirou/gopsutil v3.21.11+incompatible
287289
github.com/sirupsen/logrus v1.9.3
@@ -312,7 +314,7 @@ require (
312314
github.com/xaionaro-go/xfyne v0.0.0-20250615190411-4c96281f6e25
313315
github.com/xaionaro-go/xlogrus v0.0.0-20250111150201-60557109545a
314316
github.com/xaionaro-go/xpath v0.0.0-20250111145115-55f5728f643f
315-
github.com/xaionaro-go/xsync v0.0.0-20250614210231-b74f647f859f
317+
github.com/xaionaro-go/xsync v0.0.0-20250713122735-6e002588c3d0
316318
github.com/yutopp/go-flv v0.3.1
317319
golang.org/x/crypto v0.39.0
318320
google.golang.org/grpc v1.73.0

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -948,8 +948,8 @@ github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb
948948
github.com/rymdport/portal v0.4.1 h1:2dnZhjf5uEaeDjeF/yBIeeRo6pNI2QAKm7kq1w/kbnA=
949949
github.com/rymdport/portal v0.4.1/go.mod h1:kFF4jslnJ8pD5uCi17brj/ODlfIidOxlgUDTO5ncnC4=
950950
github.com/sagikazarmark/crypt v0.6.0/go.mod h1:U8+INwJo3nBv1m6A/8OBXAq7Jnpspk5AxSgDyEQcea8=
951-
github.com/scorfly/gokick v1.1.1 h1:xkV/49ZXBHbq52UayDTiK3ne4Bb8hCBoKUZfPaK8Q+w=
952-
github.com/scorfly/gokick v1.1.1/go.mod h1:uJ1L4RHLc+g3RvqxhKHiB2VvEvQelJLbtgNSlqp8WB8=
951+
github.com/scorfly/gokick v1.11.0 h1:4VMpyO8hnPUW4dYgCfOi/YzMgVgj/Rh/OJW7oi2rn+0=
952+
github.com/scorfly/gokick v1.11.0/go.mod h1:9Tc5+/nwcCLjgDVUeW2K/vhEbkMkN+OE8UQVx4fL1WA=
953953
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
954954
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
955955
github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 h1:n661drycOFuPLCN3Uc8sB6B/s6Z4t2xvBgU1htSHuq8=
@@ -1157,8 +1157,8 @@ github.com/xaionaro-go/xlogrus v0.0.0-20250111150201-60557109545a h1:EoNRdOtBMnZ
11571157
github.com/xaionaro-go/xlogrus v0.0.0-20250111150201-60557109545a/go.mod h1:RPfWNuwqJykkA2TEvisXqHgy1ypA/1H2HBdIRSeVJ9o=
11581158
github.com/xaionaro-go/xpath v0.0.0-20250111145115-55f5728f643f h1:ofxY1akRlVdJ/AEDj0EakK4Aez8fzeWTTe2mCAZiJ0A=
11591159
github.com/xaionaro-go/xpath v0.0.0-20250111145115-55f5728f643f/go.mod h1:f0DVcqddOy1RALOgXJHwpQnkp1u1yeBX/+A2+Bf4EGc=
1160-
github.com/xaionaro-go/xsync v0.0.0-20250614210231-b74f647f859f h1:Yn/hgHYgZ05zaTo6cK+Fmx46ah0O8ey0JMsWQ4TsgnY=
1161-
github.com/xaionaro-go/xsync v0.0.0-20250614210231-b74f647f859f/go.mod h1:FCpywNAl4a4hgzE8j7Z+TpdhBQi5WHxnI35jOrFpoQw=
1160+
github.com/xaionaro-go/xsync v0.0.0-20250713122735-6e002588c3d0 h1:JDl35mgD3RsmEx7xbBCSONLceBCTFUWQvPUG5YsWrgU=
1161+
github.com/xaionaro-go/xsync v0.0.0-20250713122735-6e002588c3d0/go.mod h1:FCpywNAl4a4hgzE8j7Z+TpdhBQi5WHxnI35jOrFpoQw=
11621162
github.com/xaionaro-go/zerolog2belt v0.0.0-20241103164018-a3bc1ea487e5 h1:jAy7VLg8y8XE1R8jBte4PRDJzOaAE+sUfmttfB9ZcAY=
11631163
github.com/xaionaro-go/zerolog2belt v0.0.0-20241103164018-a3bc1ea487e5/go.mod h1:KJuX7yl27vU+KV6CpsWOe5ZMY4zSg70hvEhwoYdr17w=
11641164
github.com/xanzy/ssh-agent v0.3.3 h1:+/15pJfg/RsTxqYcX6fHqOXZwwMP+2VyYWJeWM2qQFM=

pkg/streamcontrol/kick/cache.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@ package kick
33
import (
44
"context"
55

6-
"github.com/xaionaro-go/kickcom"
6+
"github.com/scorfly/gokick"
77
)
88

99
type Cache struct {
10-
ChanInfo *kickcom.ChannelV1
11-
Categories *[]kickcom.CategoryV1Short
10+
ChanInfo *gokick.ChannelResponse
11+
Categories *[]gokick.CategoryResponse
1212
}
1313

1414
func (c *Cache) Clone() *Cache {

pkg/streamcontrol/kick/cache_unsafe.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,22 @@ package kick
22

33
import (
44
"github.com/go-ng/xatomic"
5-
"github.com/xaionaro-go/kickcom"
5+
"github.com/scorfly/gokick"
66
)
77

8-
func (cache *Cache) GetChanInfo() *kickcom.ChannelV1 {
8+
func (cache *Cache) GetChanInfo() *gokick.ChannelResponse {
99
if cache == nil {
1010
return nil
1111
}
1212

1313
return xatomic.LoadPointer(&cache.ChanInfo)
1414
}
1515

16-
func (cache *Cache) SetChanInfo(chanInfo *kickcom.ChannelV1) {
16+
func (cache *Cache) SetChanInfo(chanInfo *gokick.ChannelResponse) {
1717
xatomic.StorePointer(&cache.ChanInfo, chanInfo)
1818
}
1919

20-
func (cache *Cache) GetCategories() []kickcom.CategoryV1Short {
20+
func (cache *Cache) GetCategories() []gokick.CategoryResponse {
2121
if cache == nil {
2222
return nil
2323
}
@@ -29,6 +29,6 @@ func (cache *Cache) GetCategories() []kickcom.CategoryV1Short {
2929
return *ptr
3030
}
3131

32-
func (cache *Cache) SetCategories(categories []kickcom.CategoryV1Short) {
32+
func (cache *Cache) SetCategories(categories []gokick.CategoryResponse) {
3333
xatomic.StorePointer(&cache.Categories, &categories)
3434
}

pkg/streamcontrol/kick/chat_handler.go renamed to pkg/streamcontrol/kick/chat_handler_obsolete.go

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,46 +13,67 @@ import (
1313
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
1414
)
1515

16-
type ChatClient interface {
16+
type ChatClientOBSOLETE interface {
1717
GetChatMessagesV2(
1818
ctx context.Context,
1919
channelID uint64,
2020
cursor uint64,
2121
) (*kickcom.ChatMessagesV2Reply, error)
2222
}
2323

24-
type ChatHandler struct {
24+
type ChatHandlerOBSOLETE struct {
2525
currentCursor uint64
2626
channelID uint64
2727
lastMessageID string
28-
client ReverseEngClient
28+
client ChatClientOBSOLETE
2929
cancelFunc context.CancelFunc
3030
messagesOutChan chan streamcontrol.ChatMessage
31+
onClose func(context.Context)
3132
}
3233

3334
func (k *Kick) newChatHandler(
3435
ctx context.Context,
35-
channelID uint64,
36-
) (*ChatHandler, error) {
37-
return NewChatHandler(ctx, k.ReverseEngClient, channelID)
36+
channelSlug string,
37+
onClose func(context.Context),
38+
) (*ChatHandlerOBSOLETE, error) {
39+
reverseEngClient, err := kickcom.New()
40+
if err != nil {
41+
return nil, fmt.Errorf("unable to initialize a client to Kick: %w", err)
42+
}
43+
resp, err := reverseEngClient.GetChannelV1(ctx, channelSlug)
44+
if err != nil {
45+
return nil, fmt.Errorf("unable to get channel '%s' info: %w", channelSlug, err)
46+
}
47+
return NewChatHandlerOBSOLETE(ctx, reverseEngClient, resp.ID, onClose)
3848
}
3949

40-
func NewChatHandler(
50+
func NewChatHandlerOBSOLETE(
4151
ctx context.Context,
42-
chatClient ReverseEngClient,
52+
chatClient ChatClientOBSOLETE,
4353
channelID uint64,
44-
) (*ChatHandler, error) {
54+
onClose func(context.Context),
55+
) (_ret *ChatHandlerOBSOLETE, _err error) {
56+
logger.Debugf(ctx, "NewChatHandlerOBSOLETE(ctx, client, %d, %p)", channelID, onClose)
57+
defer func() {
58+
logger.Debugf(ctx, "/NewChatHandlerOBSOLETE(ctx, client, %d, %p): %#+v %v", channelID, onClose, _ret, _err)
59+
}()
4560

4661
ctx, cancelFn := context.WithCancel(ctx)
47-
h := &ChatHandler{
62+
h := &ChatHandlerOBSOLETE{
4863
currentCursor: 0,
4964
client: chatClient,
5065
channelID: channelID,
5166
cancelFunc: cancelFn,
5267
messagesOutChan: make(chan streamcontrol.ChatMessage, 100),
68+
onClose: onClose,
5369
}
5470

5571
observability.Go(ctx, func(ctx context.Context) {
72+
if onClose != nil {
73+
defer func() {
74+
onClose(ctx)
75+
}()
76+
}
5677
defer func() {
5778
close(h.messagesOutChan)
5879
}()
@@ -81,14 +102,14 @@ func NewChatHandler(
81102
return h, nil
82103
}
83104

84-
func (h *ChatHandler) iterate(ctx context.Context) error {
105+
func (h *ChatHandlerOBSOLETE) iterate(ctx context.Context) error {
85106
startTS := time.Now()
86-
reply, err := h.client.GetChatMessagesV2(ctx, h.channelID, 0)
107+
reply, err := h.client.GetChatMessagesV2(ctx, uint64(h.channelID), 0)
87108
if err != nil {
88109
return fmt.Errorf("unable to get the chat messages of channel with ID %d: %w", h.channelID, err)
89110
}
90111
rtt := time.Since(startTS)
91-
logger.Tracef(ctx, "round trip time == %v", rtt)
112+
logger.Tracef(ctx, "round trip time == %v (messages count: %d)", rtt, len(reply.Data.Messages))
92113

93114
// TODO: use the cursor instead of message ID to avoid duplicates
94115
if reply.Data.Cursor != "" {
@@ -120,7 +141,7 @@ func (h *ChatHandler) iterate(ctx context.Context) error {
120141
return nil
121142
}
122143

123-
func (h *ChatHandler) sendMessage(
144+
func (h *ChatHandlerOBSOLETE) sendMessage(
124145
msg kickcom.ChatMessageV2,
125146
) {
126147
h.lastMessageID = msg.ID
@@ -135,6 +156,6 @@ func (h *ChatHandler) sendMessage(
135156
default:
136157
}
137158
}
138-
func (h *ChatHandler) MessagesChan() <-chan streamcontrol.ChatMessage {
159+
func (h *ChatHandlerOBSOLETE) MessagesChan() <-chan streamcontrol.ChatMessage {
139160
return h.messagesOutChan
140161
}

pkg/streamcontrol/kick/cmd/chatlistener/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func main() {
4545
channel, err := client.GetChannelV1(ctx, channelSlug)
4646
assertNoError(err)
4747

48-
h, err := kick.NewChatHandler(ctx, client, channel.ID)
48+
h, err := kick.NewChatHandlerOBSOLETE(ctx, client, channel.ID, nil)
4949
assertNoError(err)
5050

5151
fmt.Println("started")

0 commit comments

Comments
 (0)