Skip to content

Commit 4bee69e

Browse files
committed
testing with examples
1 parent e3ce1da commit 4bee69e

File tree

5 files changed

+190
-0
lines changed

5 files changed

+190
-0
lines changed

go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
module github.com/thomasjungblut/go-sstables
22

33
require (
4+
github.com/godzie44/go-uring v0.0.0-20220131163417-b7645a723f7b
45
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db
56
github.com/libp2p/go-buffer-pool v0.0.2
67
github.com/ncw/directio v1.0.5
@@ -12,8 +13,10 @@ require (
1213

1314
require (
1415
github.com/davecgh/go-spew v1.1.1 // indirect
16+
github.com/libp2p/go-sockaddr v0.1.1 // indirect
1517
github.com/pmezard/go-difflib v1.0.0 // indirect
1618
github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3 // indirect
19+
golang.org/x/sys v0.0.0-20210921065528-437939a70204 // indirect
1720
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
1821
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
1922
)

go.sum

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
22
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
33
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
4+
github.com/godzie44/go-uring v0.0.0-20220131163417-b7645a723f7b h1:H31NTxP3FAzF0mVbqRCJA04XKxoAQD85Ug+zZ761duo=
5+
github.com/godzie44/go-uring v0.0.0-20220131163417-b7645a723f7b/go.mod h1:ermjEDUoT/fS+3Ona5Vd6t6mZkw1eHp99ILO5jGRBkM=
46
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
57
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
68
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
79
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
810
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
911
github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs=
1012
github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM=
13+
github.com/libp2p/go-sockaddr v0.1.1 h1:yD80l2ZOdGksnOyHrhxDdTDFrf7Oy+v3FMVArIRgZxQ=
14+
github.com/libp2p/go-sockaddr v0.1.1/go.mod h1:syPvOmNs24S3dFVGJA1/mrqdeijPxLV2Le3BRLKd68k=
1115
github.com/ncw/directio v1.0.5 h1:JSUBhdjEvVaJvOoyPAbcW0fnd0tvRXD76wEfZ1KcQz4=
1216
github.com/ncw/directio v1.0.5/go.mod h1:rX/pKEYkOXBGOggmcyJeJGloCkleSvphPx2eV3t6ROk=
1317
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -21,6 +25,9 @@ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5Cc
2125
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
2226
golang.org/x/exp v0.0.0-20181210123644-7d6377eee41f h1:wJ3O7VtAmBlW5LFzggOI2U6CBWIkG+/IYf4Q/VGJdaA=
2327
golang.org/x/exp v0.0.0-20181210123644-7d6377eee41f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
28+
golang.org/x/sys v0.0.0-20190228124157-a34e9553db1e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
29+
golang.org/x/sys v0.0.0-20210921065528-437939a70204 h1:JJhkWtBuTQKyz2bd5WG9H8iUsJRU3En/KRfN8B2RnDs=
30+
golang.org/x/sys v0.0.0-20210921065528-437939a70204/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
2431
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
2532
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
2633
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

recordio/README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,3 +210,14 @@ available, err := recordio.IsDirectIOAvailable()
210210

211211
In this package the DirectIO support comes through a library called [ncw/directio](https://github.com/ncw/directio), which has good support across Linux, macOS and Windows under a single interface. The caveats of each platform, for example the buffer/block sizes, need to still be taken into account.
212212
Another caveat is that the block alignment causes to write a certain amount of waste. Let's imagine you have blocks of 1024 bytes and only want to write 1025 bytes, with DirectIO enabled you will end up with a file of size 2048 (2 blocks) instead of a file with only 1025 bytes with DirectIO disabled. The DirectIO file will be padded with zeroes towards the end and the in-library readers honor this format and not assume a corrupted file format.
213+
214+
215+
## io_uring (experimental)
216+
217+
Since version 5.x the linux kernel supports a new asynchronous approach to execute syscalls. In a few words, io_uring is a shared ring buffer between the kernel and user space which allows queueing syscalls and later retrieve their results.
218+
219+
You can read more about io_uring at [https://kernel.dk/io_uring.pdf](https://kernel.dk/io_uring.pdf).
220+
221+
222+
223+

recordio/async_writer.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package recordio
2+
3+
import (
4+
"github.com/godzie44/go-uring/uring"
5+
"os"
6+
"sync/atomic"
7+
)
8+
9+
// AsyncWriter takes an uring and executes all writes asynchronously. There are only two barriers: flush and close.
10+
// Those barriers will ensure all previous writes have succeeded.
11+
type AsyncWriter struct {
12+
ringSize int32
13+
submittedSQEs int32
14+
ring *uring.Ring
15+
16+
file *os.File
17+
offset uint64
18+
}
19+
20+
// TODO(thomas): not thread-safe yet
21+
func (w *AsyncWriter) Write(p []byte) (int, error) {
22+
for w.submittedSQEs >= w.ringSize {
23+
// wait for at least one event to free from the queue
24+
cqe, err := w.ring.SubmitAndWaitCQEvents(1)
25+
if err != nil {
26+
return 0, err
27+
}
28+
29+
err = cqe.Error()
30+
if err != nil {
31+
return 0, err
32+
}
33+
34+
atomic.AddInt32(&w.submittedSQEs, -1)
35+
w.ring.SeenCQE(cqe)
36+
}
37+
38+
err := w.ring.QueueSQE(uring.Write(w.file.Fd(), p, w.offset), 0, 0)
39+
if err != nil {
40+
return 0, err
41+
}
42+
43+
atomic.AddInt32(&w.submittedSQEs, 1)
44+
atomic.AddUint64(&w.offset, uint64(len(p)))
45+
46+
return len(p), nil
47+
}
48+
49+
func (w *AsyncWriter) Flush() error {
50+
for w.submittedSQEs >= 0 {
51+
// wait for at least one event to free from the queue
52+
cqe, err := w.ring.SubmitAndWaitCQEvents(1)
53+
if err != nil {
54+
return err
55+
}
56+
57+
atomic.AddInt32(&w.submittedSQEs, -1)
58+
w.ring.SeenCQE(cqe)
59+
}
60+
61+
return nil
62+
}
63+
64+
func (w *AsyncWriter) Size() int {
65+
return 0
66+
}
67+
68+
func (w *AsyncWriter) Close() error {
69+
err := w.Flush()
70+
if err != nil {
71+
return err
72+
}
73+
74+
err = w.ring.UnRegisterFiles()
75+
if err != nil {
76+
return err
77+
}
78+
79+
err = w.ring.Close()
80+
if err != nil {
81+
return err
82+
}
83+
84+
return w.file.Close()
85+
}
86+
87+
func NewAsyncWriter(filePath string, numRingEntries uint32, opts ...uring.SetupOption) (WriteCloserFlusher, error) {
88+
ring, err := uring.New(numRingEntries, opts...)
89+
if err != nil {
90+
return nil, err
91+
}
92+
93+
writeFile, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0666)
94+
if err != nil {
95+
return nil, err
96+
}
97+
98+
err = ring.RegisterFiles([]int{int(writeFile.Fd())})
99+
if err != nil {
100+
return nil, err
101+
}
102+
103+
writer := &AsyncWriter{
104+
ringSize: int32(numRingEntries),
105+
file: writeFile,
106+
ring: ring,
107+
}
108+
109+
return writer, nil
110+
}

recordio/async_writer_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package recordio
2+
3+
import (
4+
"fmt"
5+
"github.com/godzie44/go-uring/uring"
6+
"github.com/stretchr/testify/require"
7+
"io/ioutil"
8+
"os"
9+
"testing"
10+
)
11+
12+
func TestAsyncWriter_HappyPath(t *testing.T) {
13+
temp, err := ioutil.TempFile("", "TestAsyncWriter_HappyPath")
14+
require.NoError(t, err)
15+
require.NoError(t, temp.Close())
16+
defer closeCleanFile(t, temp)
17+
18+
writer, err := NewAsyncWriter(temp.Name(), 4)
19+
require.NoError(t, err)
20+
21+
for i := 0; i < 10000; i++ {
22+
_, err = writer.Write(randomRecordOfSize(1024))
23+
require.NoError(t, err)
24+
}
25+
26+
require.NoError(t, writer.Close())
27+
}
28+
29+
func TestExample(t *testing.T) {
30+
ring, err := uring.New(8)
31+
require.NoError(t, err)
32+
defer ring.Close()
33+
34+
// open file and init read buffers
35+
file, err := os.Open("./go.mod")
36+
require.NoError(t, err)
37+
stat, _ := file.Stat()
38+
buff := make([]byte, stat.Size())
39+
40+
// add Read operation to SQ queue
41+
err = ring.QueueSQE(uring.Read(file.Fd(), buff, 0), 0, 0)
42+
require.NoError(t, err)
43+
44+
// submit all SQ new entries
45+
_, err = ring.Submit()
46+
require.NoError(t, err)
47+
48+
// wait until data is reading into buffer
49+
cqe, err := ring.WaitCQEvents(1)
50+
require.NoError(t, err)
51+
52+
require.NoError(t, cqe.Error()) //check read error
53+
54+
fmt.Printf("read %d bytes, read result: \n%s", cqe.Res, string(buff))
55+
56+
// dequeue CQ
57+
ring.SeenCQE(cqe)
58+
59+
}

0 commit comments

Comments
 (0)