Skip to content

Commit f0f750c

Browse files
committed
basic support for writing
1 parent 4bee69e commit f0f750c

File tree

7 files changed

+87
-55
lines changed

7 files changed

+87
-55
lines changed

recordio/README.md

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,4 @@ Since version 5.x the linux kernel supports a new asynchronous approach to execu
218218

219219
You can read more about io_uring at [https://kernel.dk/io_uring.pdf](https://kernel.dk/io_uring.pdf).
220220

221-
222-
223-
221+
In case you have SELinux enabled, you might hit "permission denied" errors when initializing the uring try to test with permissive mode enabled temporarily.

recordio/async_writer.go

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
//go:build linux
2+
13
package recordio
24

35
import (
@@ -17,22 +19,13 @@ type AsyncWriter struct {
1719
offset uint64
1820
}
1921

20-
// TODO(thomas): not thread-safe yet
22+
// TODO(thomas): not thread-safe (yet)
2123
func (w *AsyncWriter) Write(p []byte) (int, error) {
2224
for w.submittedSQEs >= w.ringSize {
23-
// wait for at least one event to free from the queue
24-
cqe, err := w.ring.SubmitAndWaitCQEvents(1)
25+
err := w.submitAwaitOne()
2526
if err != nil {
2627
return 0, err
2728
}
28-
29-
err = cqe.Error()
30-
if err != nil {
31-
return 0, err
32-
}
33-
34-
atomic.AddInt32(&w.submittedSQEs, -1)
35-
w.ring.SeenCQE(cqe)
3629
}
3730

3831
err := w.ring.QueueSQE(uring.Write(w.file.Fd(), p, w.offset), 0, 0)
@@ -47,15 +40,30 @@ func (w *AsyncWriter) Write(p []byte) (int, error) {
4740
}
4841

4942
func (w *AsyncWriter) Flush() error {
50-
for w.submittedSQEs >= 0 {
43+
for w.submittedSQEs > 0 {
5144
// wait for at least one event to free from the queue
52-
cqe, err := w.ring.SubmitAndWaitCQEvents(1)
45+
err := w.submitAwaitOne()
5346
if err != nil {
5447
return err
5548
}
49+
}
50+
51+
return nil
52+
}
5653

57-
atomic.AddInt32(&w.submittedSQEs, -1)
58-
w.ring.SeenCQE(cqe)
54+
func (w *AsyncWriter) submitAwaitOne() error {
55+
// TODO(thomas): most likely there are more CQ events waiting, we should try to drain them optimistically to avoid overflowing memory buffers
56+
cqe, err := w.ring.SubmitAndWaitCQEvents(1)
57+
if err != nil {
58+
return err
59+
}
60+
61+
atomic.AddInt32(&w.submittedSQEs, -1)
62+
w.ring.SeenCQE(cqe)
63+
64+
err = cqe.Error()
65+
if err != nil {
66+
return err
5967
}
6068

6169
return nil

recordio/async_writer_test.go

Lines changed: 10 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,24 @@
1+
//go:build linux
2+
13
package recordio
24

35
import (
4-
"fmt"
5-
"github.com/godzie44/go-uring/uring"
66
"github.com/stretchr/testify/require"
7+
"github.com/thomasjungblut/go-sstables/recordio/iouring"
78
"io/ioutil"
8-
"os"
99
"testing"
1010
)
1111

1212
func TestAsyncWriter_HappyPath(t *testing.T) {
13+
ok, err := iouring.IsIOUringAvailable()
14+
require.NoError(t, err)
15+
if !ok {
16+
t.Skip("iouring not available here")
17+
return
18+
}
19+
1320
temp, err := ioutil.TempFile("", "TestAsyncWriter_HappyPath")
1421
require.NoError(t, err)
15-
require.NoError(t, temp.Close())
1622
defer closeCleanFile(t, temp)
1723

1824
writer, err := NewAsyncWriter(temp.Name(), 4)
@@ -25,35 +31,3 @@ func TestAsyncWriter_HappyPath(t *testing.T) {
2531

2632
require.NoError(t, writer.Close())
2733
}
28-
29-
func TestExample(t *testing.T) {
30-
ring, err := uring.New(8)
31-
require.NoError(t, err)
32-
defer ring.Close()
33-
34-
// open file and init read buffers
35-
file, err := os.Open("./go.mod")
36-
require.NoError(t, err)
37-
stat, _ := file.Stat()
38-
buff := make([]byte, stat.Size())
39-
40-
// add Read operation to SQ queue
41-
err = ring.QueueSQE(uring.Read(file.Fd(), buff, 0), 0, 0)
42-
require.NoError(t, err)
43-
44-
// submit all SQ new entries
45-
_, err = ring.Submit()
46-
require.NoError(t, err)
47-
48-
// wait until data is reading into buffer
49-
cqe, err := ring.WaitCQEvents(1)
50-
require.NoError(t, err)
51-
52-
require.NoError(t, cqe.Error()) //check read error
53-
54-
fmt.Printf("read %d bytes, read result: \n%s", cqe.Res, string(buff))
55-
56-
// dequeue CQ
57-
ring.SeenCQE(cqe)
58-
59-
}

recordio/direct_io.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package recordio
22

33
import (
4+
"errors"
45
"github.com/ncw/directio"
56
"io/ioutil"
67
"os"
8+
"syscall"
79
)
810

911
type DirectIOFactory struct {
@@ -51,7 +53,11 @@ func IsDirectIOAvailable() (available bool, err error) {
5153
tmpFile, err = directio.OpenFile(tmpFile.Name(), os.O_WRONLY|os.O_CREATE, 0666)
5254
if err != nil {
5355
// this syscall specifically signals that DirectIO is not supported
54-
// if errors.Is(err, syscall.EINVAL)
56+
if errors.Is(err, syscall.EINVAL) {
57+
available = false
58+
err = nil
59+
return
60+
}
5561
return
5662
}
5763

recordio/file_writer_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,13 @@ func TestWriterCrashCreatesValidHeader(t *testing.T) {
178178
}
179179

180180
func TestWriterCrashCreatesNoValidHeaderWithDirectIO(t *testing.T) {
181+
ok, err := IsDirectIOAvailable()
182+
require.NoError(t, err)
183+
if !ok {
184+
t.Skip("directio not available here")
185+
return
186+
}
187+
181188
tmpFile, err := ioutil.TempFile("", "recordio_CrashCreatesValidHeaderDirectIO")
182189
require.Nil(t, err)
183190
defer closeCleanFile(t, tmpFile)
@@ -195,6 +202,13 @@ func TestWriterCrashCreatesNoValidHeaderWithDirectIO(t *testing.T) {
195202
}
196203

197204
func TestWriterNotAllowsSyncsWithDirectIO(t *testing.T) {
205+
ok, err := IsDirectIOAvailable()
206+
require.NoError(t, err)
207+
if !ok {
208+
t.Skip("directio not available here")
209+
return
210+
}
211+
198212
tmpFile, err := ioutil.TempFile("", "recordio_WriterNotAllowsSyncsWithDirectIO")
199213
require.Nil(t, err)
200214
defer closeCleanFile(t, tmpFile)

recordio/iouring/iouring.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
//go:build linux
2+
3+
package iouring
4+
5+
import "github.com/godzie44/go-uring/uring"
6+
7+
// IsIOUringAvailable tests whether io_uring is supported by the kernel.
8+
// It will return (true, nil) if that's the case, if it's not available it will be (false, nil).
9+
// Any other error will be indicated by the error (either true/false).
10+
func IsIOUringAvailable() (available bool, err error) {
11+
ring, err := uring.New(1)
12+
defer func() {
13+
err = ring.Close()
14+
}()
15+
16+
return err == nil, err
17+
}

recordio/iouring/iouring_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package iouring
2+
3+
import (
4+
"github.com/stretchr/testify/require"
5+
"testing"
6+
)
7+
8+
func TestIsIOUringAvailable_HappyPath(t *testing.T) {
9+
ok, err := IsIOUringAvailable()
10+
require.NoError(t, err)
11+
if !ok {
12+
t.Skip("iouring not available here")
13+
return
14+
}
15+
}

0 commit comments

Comments
 (0)