Skip to content

Commit 3df1ee6

Browse files
committed
add StreamDecoder
1 parent 2e40711 commit 3df1ee6

File tree

4 files changed

+294
-3
lines changed

4 files changed

+294
-3
lines changed

pkg/spec/decode.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88

99
// FromBytes read Y3 buffer
1010
func FromBytes(buf []byte) (p *Packet, err error) {
11-
if len(buf) < 3 {
11+
if len(buf) < 2 {
1212
return nil, errors.New("malformed data")
1313
}
1414

pkg/spec/packet.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,17 @@ func (p *Packet) GetTag() uint64 {
1717
return tag
1818
}
1919

20-
// GetRawTag return Tag as raw bytes
21-
func (p *Packet) GetRawTag() []byte {
20+
// GetTagBuffer return Tag as raw bytes
21+
func (p *Packet) GetTagBuffer() []byte {
2222
return p.tagbuf
2323
}
24+
25+
// GetLengthBuffer return Tag as raw bytes
26+
func (p *Packet) GetLengthBuffer() []byte {
27+
return p.lenbuf
28+
}
29+
30+
// GetValueBuffer return Tag as raw bytes
31+
func (p *Packet) GetValueBuffer() []byte {
32+
return p.valbuf
33+
}

stream_api.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package y3
2+
3+
import (
4+
"io"
5+
"log"
6+
7+
"github.com/yomorun/y3-codec-golang/pkg/encoding"
8+
"github.com/yomorun/y3-codec-golang/pkg/spec"
9+
)
10+
11+
type StreamDecoder struct {
12+
errState bool
13+
tagbuf []byte
14+
lenbuf []byte
15+
valbuf []byte
16+
r io.Reader
17+
state string
18+
len int
19+
callback func(*spec.Packet)
20+
}
21+
22+
// NewStreamDecoder return a stream decoder
23+
func NewStreamDecoder(r io.Reader) *StreamDecoder {
24+
return &StreamDecoder{
25+
errState: false,
26+
r: r,
27+
state: "Nil",
28+
}
29+
}
30+
31+
// OnPacket trigger callback once Y3 packet parsed out
32+
func (sd *StreamDecoder) OnPacket(f func(*spec.Packet)) {
33+
sd.callback = f
34+
}
35+
36+
// Start the parser
37+
func (sd *StreamDecoder) Start() {
38+
// buffer
39+
tmp := make([]byte, 1)
40+
for {
41+
n, err := sd.r.Read(tmp)
42+
if err != nil {
43+
log.Printf("io err: tmp=[% X]", tmp)
44+
sd.reset(err)
45+
break
46+
}
47+
log.Printf("Recieved: n=%d, tmp=[% X]", n, tmp[:n])
48+
for _, v := range tmp[:n] {
49+
sd.fill(v)
50+
}
51+
}
52+
}
53+
54+
func (sd *StreamDecoder) fill(b byte) error {
55+
log.Printf("-> fill b=[% X], state=%s", b, sd.state)
56+
switch sd.state {
57+
case "Nil":
58+
sd.state = "TS"
59+
sd.fill(b)
60+
case "TS":
61+
sd.tagbuf = append(sd.tagbuf, b)
62+
if b&0x81 != 0x81 {
63+
// over of tag
64+
sd.state = "LS"
65+
log.Printf("Parsed Out Tag, tagbuf=[% X]", sd.tagbuf)
66+
return nil
67+
}
68+
case "LS":
69+
sd.lenbuf = append(sd.lenbuf, b)
70+
if b&0x81 != 0x81 {
71+
// over of len, start parse as PVarUInt64 value
72+
var len uint64
73+
codec := encoding.VarCodec{}
74+
err := codec.DecodePVarUInt64(sd.lenbuf, &len)
75+
if err != nil {
76+
sd.errState = true
77+
panic(err)
78+
} else {
79+
sd.len = int(len)
80+
log.Printf("Parsed Out len=%d, lenbuf=[% X]", sd.len, sd.lenbuf)
81+
}
82+
if sd.len == 0 {
83+
// reset state if zero-len packet
84+
log.Printf("[%s] Parsed Out valbuf=EMPTY", sd.state)
85+
// make a Packet object
86+
sd.fullfiled()
87+
sd.state = "Nil"
88+
}
89+
// update state
90+
sd.state = "VS"
91+
return nil
92+
}
93+
case "VS":
94+
sd.valbuf = append(sd.valbuf, b)
95+
if len(sd.valbuf) == sd.len {
96+
log.Printf("[%s] Parsed Out valbuf=[% X]", sd.state, sd.valbuf)
97+
// make a Packet object
98+
sd.fullfiled()
99+
// reset state
100+
sd.state = "Nil"
101+
}
102+
}
103+
return nil
104+
}
105+
106+
func (sd *StreamDecoder) fullfiled() {
107+
buf := append(sd.tagbuf, sd.lenbuf...)
108+
buf = append(buf, sd.valbuf...)
109+
110+
p, err := spec.FromBytes(buf)
111+
if err != nil {
112+
panic(err)
113+
}
114+
log.Printf("--> Fullfiled p=%v", p)
115+
sd.callback(p)
116+
117+
sd.reset(nil)
118+
}
119+
120+
func (sd *StreamDecoder) reset(err error) {
121+
if err != nil {
122+
log.Printf("[RESET] cause of error: %s", err.Error())
123+
}
124+
sd.errState = false
125+
sd.tagbuf = make([]byte, 0)
126+
sd.lenbuf = make([]byte, 0)
127+
sd.valbuf = make([]byte, 0)
128+
sd.state = "Nil"
129+
}

stream_api_test.go

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
package y3
2+
3+
import (
4+
"bytes"
5+
"log"
6+
"testing"
7+
8+
"github.com/yomorun/y3-codec-golang/pkg/spec"
9+
)
10+
11+
func TestStreamDecode1(t *testing.T) {
12+
// testStreamDecode(t, []byte{0x01, 0x00}, []byte{0x01}, []byte{0x00}, []byte{})
13+
data := []byte{0x01, 0x01, 0x03} //, []byte{0x01}, []byte{0x01}, []byte{0x03}, flag)
14+
// testStreamDecode(t, []byte{0x01, 0x02, 0x03, 0x04, 0x05}, []byte{0x01}, []byte{0x02}, []byte{0x03, 0x04})
15+
// testStreamDecode(t, []byte{0x01, 0x03, 0x03, 0x04, 0x05}, []byte{0x01}, []byte{0x03}, []byte{0x03, 0x04, 0x05})
16+
// testStreamDecode(t, []byte{0x01, 0x01}, []byte{0x02}, []byte{0x01}, []byte{})
17+
// t.Errorf("---")
18+
19+
expectTagbuf := []byte{0x01}
20+
expectLenbuf := []byte{0x01}
21+
expectValbuf := []byte{0x03}
22+
23+
// as reader
24+
r := bytes.NewReader(data)
25+
// create steam decoder
26+
pr := NewStreamDecoder(r)
27+
28+
// handler
29+
pr.OnPacket(func(p *spec.Packet) {
30+
t.Logf("[CALLBACK] p=%v", p)
31+
compareBytes(t, p.GetTagBuffer(), expectTagbuf, "T")
32+
compareBytes(t, p.GetLengthBuffer(), expectLenbuf, "L")
33+
compareBytes(t, p.GetValueBuffer(), expectValbuf, "V")
34+
})
35+
pr.Start()
36+
}
37+
38+
func TestStreamDecode2(t *testing.T) {
39+
data := []byte{0x01, 0x00}
40+
41+
expectTagbuf := []byte{0x01}
42+
expectLenbuf := []byte{0x00}
43+
expectValbuf := []byte{}
44+
45+
// as reader
46+
r := bytes.NewReader(data)
47+
// create steam decoder
48+
pr := NewStreamDecoder(r)
49+
50+
// handler
51+
pr.OnPacket(func(p *spec.Packet) {
52+
t.Logf("[CALLBACK] p=%v", p)
53+
compareBytes(t, p.GetTagBuffer(), expectTagbuf, "T")
54+
compareBytes(t, p.GetLengthBuffer(), expectLenbuf, "L")
55+
compareBytes(t, p.GetValueBuffer(), expectValbuf, "V")
56+
})
57+
pr.Start()
58+
}
59+
60+
func TestStreamDecode3(t *testing.T) {
61+
data := []byte{0x01, 0x02, 0x03, 0x04}
62+
63+
expectTagbuf := []byte{0x01}
64+
expectLenbuf := []byte{0x02}
65+
expectValbuf := []byte{0x03, 0x04}
66+
67+
// as reader
68+
r := bytes.NewReader(data)
69+
// create steam decoder
70+
pr := NewStreamDecoder(r)
71+
72+
// handler
73+
pr.OnPacket(func(p *spec.Packet) {
74+
t.Logf("[CALLBACK] p=%v", p)
75+
compareBytes(t, p.GetTagBuffer(), expectTagbuf, "T")
76+
compareBytes(t, p.GetLengthBuffer(), expectLenbuf, "L")
77+
compareBytes(t, p.GetValueBuffer(), expectValbuf, "V")
78+
})
79+
pr.Start()
80+
}
81+
82+
func TestStreamDecode4(t *testing.T) {
83+
data := []byte{0x01, 0x02, 0x03}
84+
85+
// as reader
86+
r := bytes.NewReader(data)
87+
// create steam decoder
88+
pr := NewStreamDecoder(r)
89+
90+
parsed := false
91+
92+
// handler
93+
pr.OnPacket(func(p *spec.Packet) {
94+
t.Logf("[CALLBACK] p=%v", p)
95+
if p != nil {
96+
t.Error(p)
97+
}
98+
parsed = true
99+
})
100+
pr.Start()
101+
102+
if parsed == true {
103+
t.Errorf("Should not trigger callback")
104+
}
105+
}
106+
107+
func TestStreamDecode5(t *testing.T) {
108+
data := []byte{0x01, 0x01, 0x01, 0x02, 0x02, 0x01, 0x02, 0x03, 0x03, 0x01, 0x02, 0x03}
109+
110+
// as reader
111+
r := bytes.NewReader(data)
112+
// create steam decoder
113+
pr := NewStreamDecoder(r)
114+
115+
times := 0
116+
117+
// handler
118+
pr.OnPacket(func(p *spec.Packet) {
119+
log.Printf("==>OnPacket: %v", p)
120+
if times == 0 {
121+
compareBytes(t, p.GetTagBuffer(), []byte{0x01}, "T")
122+
compareBytes(t, p.GetLengthBuffer(), []byte{0x01}, "L")
123+
compareBytes(t, p.GetValueBuffer(), []byte{0x01}, "V")
124+
}
125+
if times == 1 {
126+
compareBytes(t, p.GetTagBuffer(), []byte{0x02}, "T")
127+
compareBytes(t, p.GetLengthBuffer(), []byte{0x02}, "L")
128+
compareBytes(t, p.GetValueBuffer(), []byte{0x01, 0x02}, "V")
129+
}
130+
if times == 2 {
131+
compareBytes(t, p.GetTagBuffer(), []byte{0x03}, "T")
132+
compareBytes(t, p.GetLengthBuffer(), []byte{0x03}, "L")
133+
compareBytes(t, p.GetValueBuffer(), []byte{0x01, 0x02, 0x03}, "V")
134+
}
135+
times++
136+
})
137+
138+
pr.Start()
139+
}
140+
141+
func compareBytes(t *testing.T, result []byte, expected []byte, v string) {
142+
if len(result) != len(expected) {
143+
t.Errorf("\n[%s] expected:[% X]\n actual:[% X]\n", v, expected, result)
144+
}
145+
146+
for i, p := range result {
147+
if p != expected[i] {
148+
t.Errorf("\n[%s] expected:[% X]\n actual:[% X]\n", v, expected, result)
149+
break
150+
}
151+
}
152+
}

0 commit comments

Comments
 (0)