From d3d1ba8586744f4e303908988900218c8c64c397 Mon Sep 17 00:00:00 2001 From: Thomas Jungblut Date: Fri, 11 Feb 2022 15:26:03 +0100 Subject: [PATCH 1/4] testing with examples --- go.mod | 3 + go.sum | 9 +++ recordio/README.md | 11 ++++ recordio/async_writer.go | 110 ++++++++++++++++++++++++++++++++++ recordio/async_writer_test.go | 59 ++++++++++++++++++ 5 files changed, 192 insertions(+) create mode 100644 recordio/async_writer.go create mode 100644 recordio/async_writer_test.go diff --git a/go.mod b/go.mod index e8b6959..6e47a72 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,7 @@ module github.com/thomasjungblut/go-sstables require ( github.com/anishathalye/porcupine v0.1.2 + github.com/godzie44/go-uring v0.0.0-20220926161041-69611e8b13d5 github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db github.com/libp2p/go-buffer-pool v0.0.2 github.com/ncw/directio v1.0.5 @@ -13,8 +14,10 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/libp2p/go-sockaddr v0.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3 // indirect + golang.org/x/sys v0.0.0-20210921065528-437939a70204 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect ) diff --git a/go.sum b/go.sum index 4550a06..89a42c9 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,10 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/godzie44/go-uring v0.0.0-20220131163417-b7645a723f7b h1:H31NTxP3FAzF0mVbqRCJA04XKxoAQD85Ug+zZ761duo= +github.com/godzie44/go-uring v0.0.0-20220131163417-b7645a723f7b/go.mod h1:ermjEDUoT/fS+3Ona5Vd6t6mZkw1eHp99ILO5jGRBkM= +github.com/godzie44/go-uring v0.0.0-20220926161041-69611e8b13d5 h1:5zELAgnSz0gqmr4Q5DWCoOzNHoeBAxVUXB7LS1eG+sw= +github.com/godzie44/go-uring v0.0.0-20220926161041-69611e8b13d5/go.mod h1:ermjEDUoT/fS+3Ona5Vd6t6mZkw1eHp99ILO5jGRBkM= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -8,6 +12,8 @@ github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs= github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM= +github.com/libp2p/go-sockaddr v0.1.1 h1:yD80l2ZOdGksnOyHrhxDdTDFrf7Oy+v3FMVArIRgZxQ= +github.com/libp2p/go-sockaddr v0.1.1/go.mod h1:syPvOmNs24S3dFVGJA1/mrqdeijPxLV2Le3BRLKd68k= github.com/ncw/directio v1.0.5 h1:JSUBhdjEvVaJvOoyPAbcW0fnd0tvRXD76wEfZ1KcQz4= github.com/ncw/directio v1.0.5/go.mod h1:rX/pKEYkOXBGOggmcyJeJGloCkleSvphPx2eV3t6ROk= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -23,6 +29,9 @@ github.com/tjungblu/porcupine v0.0.0-20221116095144-377185aa0569 h1:acDBvgSBtnyB github.com/tjungblu/porcupine v0.0.0-20221116095144-377185aa0569/go.mod h1:+z336r1WR0gcwl1ALfoNBpDTCW06vO5DzBwunEcSvcs= golang.org/x/exp v0.0.0-20181210123644-7d6377eee41f h1:wJ3O7VtAmBlW5LFzggOI2U6CBWIkG+/IYf4Q/VGJdaA= golang.org/x/exp v0.0.0-20181210123644-7d6377eee41f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/sys v0.0.0-20190228124157-a34e9553db1e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20210921065528-437939a70204 h1:JJhkWtBuTQKyz2bd5WG9H8iUsJRU3En/KRfN8B2RnDs= +golang.org/x/sys v0.0.0-20210921065528-437939a70204/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/recordio/README.md b/recordio/README.md index 6bb26f1..eee29e0 100644 --- a/recordio/README.md +++ b/recordio/README.md @@ -210,3 +210,14 @@ available, err := recordio.IsDirectIOAvailable() In this package the DirectIO support comes through a library called [ncw/directio](https://github.com/ncw/directio), which has good support across Linux, macOS and Windows under a single interface. The caveats of each platform, for example the buffer/block sizes, need to still be taken into account. Another caveat is that the block alignment causes to write a certain amount of waste. Let's imagine you have blocks of 1024 bytes and only want to write 1025 bytes, with DirectIO enabled you will end up with a file of size 2048 (2 blocks) instead of a file with only 1025 bytes with DirectIO disabled. The DirectIO file will be padded with zeroes towards the end and the in-library readers honor this format and not assume a corrupted file format. + + +## io_uring (experimental) + +Since version 5.x the linux kernel supports a new asynchronous approach to execute syscalls. In a few words, io_uring is a shared ring buffer between the kernel and user space which allows queueing syscalls and later retrieve their results. + +You can read more about io_uring at [https://kernel.dk/io_uring.pdf](https://kernel.dk/io_uring.pdf). + + + + diff --git a/recordio/async_writer.go b/recordio/async_writer.go new file mode 100644 index 0000000..72b179d --- /dev/null +++ b/recordio/async_writer.go @@ -0,0 +1,110 @@ +package recordio + +import ( + "github.com/godzie44/go-uring/uring" + "os" + "sync/atomic" +) + +// AsyncWriter takes an uring and executes all writes asynchronously. There are only two barriers: flush and close. +// Those barriers will ensure all previous writes have succeeded. +type AsyncWriter struct { + ringSize int32 + submittedSQEs int32 + ring *uring.Ring + + file *os.File + offset uint64 +} + +// TODO(thomas): not thread-safe yet +func (w *AsyncWriter) Write(p []byte) (int, error) { + for w.submittedSQEs >= w.ringSize { + // wait for at least one event to free from the queue + cqe, err := w.ring.SubmitAndWaitCQEvents(1) + if err != nil { + return 0, err + } + + err = cqe.Error() + if err != nil { + return 0, err + } + + atomic.AddInt32(&w.submittedSQEs, -1) + w.ring.SeenCQE(cqe) + } + + err := w.ring.QueueSQE(uring.Write(w.file.Fd(), p, w.offset), 0, 0) + if err != nil { + return 0, err + } + + atomic.AddInt32(&w.submittedSQEs, 1) + atomic.AddUint64(&w.offset, uint64(len(p))) + + return len(p), nil +} + +func (w *AsyncWriter) Flush() error { + for w.submittedSQEs >= 0 { + // wait for at least one event to free from the queue + cqe, err := w.ring.SubmitAndWaitCQEvents(1) + if err != nil { + return err + } + + atomic.AddInt32(&w.submittedSQEs, -1) + w.ring.SeenCQE(cqe) + } + + return nil +} + +func (w *AsyncWriter) Size() int { + return 0 +} + +func (w *AsyncWriter) Close() error { + err := w.Flush() + if err != nil { + return err + } + + err = w.ring.UnRegisterFiles() + if err != nil { + return err + } + + err = w.ring.Close() + if err != nil { + return err + } + + return w.file.Close() +} + +func NewAsyncWriter(filePath string, numRingEntries uint32, opts ...uring.SetupOption) (WriteCloserFlusher, error) { + ring, err := uring.New(numRingEntries, opts...) + if err != nil { + return nil, err + } + + writeFile, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0666) + if err != nil { + return nil, err + } + + err = ring.RegisterFiles([]int{int(writeFile.Fd())}) + if err != nil { + return nil, err + } + + writer := &AsyncWriter{ + ringSize: int32(numRingEntries), + file: writeFile, + ring: ring, + } + + return writer, nil +} diff --git a/recordio/async_writer_test.go b/recordio/async_writer_test.go new file mode 100644 index 0000000..d214ba4 --- /dev/null +++ b/recordio/async_writer_test.go @@ -0,0 +1,59 @@ +package recordio + +import ( + "fmt" + "github.com/godzie44/go-uring/uring" + "github.com/stretchr/testify/require" + "io/ioutil" + "os" + "testing" +) + +func TestAsyncWriter_HappyPath(t *testing.T) { + temp, err := ioutil.TempFile("", "TestAsyncWriter_HappyPath") + require.NoError(t, err) + require.NoError(t, temp.Close()) + defer closeCleanFile(t, temp) + + writer, err := NewAsyncWriter(temp.Name(), 4) + require.NoError(t, err) + + for i := 0; i < 10000; i++ { + _, err = writer.Write(randomRecordOfSize(1024)) + require.NoError(t, err) + } + + require.NoError(t, writer.Close()) +} + +func TestExample(t *testing.T) { + ring, err := uring.New(8) + require.NoError(t, err) + defer ring.Close() + + // open file and init read buffers + file, err := os.Open("./go.mod") + require.NoError(t, err) + stat, _ := file.Stat() + buff := make([]byte, stat.Size()) + + // add Read operation to SQ queue + err = ring.QueueSQE(uring.Read(file.Fd(), buff, 0), 0, 0) + require.NoError(t, err) + + // submit all SQ new entries + _, err = ring.Submit() + require.NoError(t, err) + + // wait until data is reading into buffer + cqe, err := ring.WaitCQEvents(1) + require.NoError(t, err) + + require.NoError(t, cqe.Error()) //check read error + + fmt.Printf("read %d bytes, read result: \n%s", cqe.Res, string(buff)) + + // dequeue CQ + ring.SeenCQE(cqe) + +} From 216a620c2fa3a8834ee5fa1fd4901e762c26764f Mon Sep 17 00:00:00 2001 From: Thomas Jungblut Date: Mon, 14 Feb 2022 11:14:26 +0100 Subject: [PATCH 2/4] basic support for writing --- recordio/README.md | 4 +-- recordio/async_writer.go | 38 +++++++++++++++----------- recordio/async_writer_test.go | 46 +++++++------------------------- recordio/iouring/iouring.go | 17 ++++++++++++ recordio/iouring/iouring_test.go | 15 +++++++++++ 5 files changed, 66 insertions(+), 54 deletions(-) create mode 100644 recordio/iouring/iouring.go create mode 100644 recordio/iouring/iouring_test.go diff --git a/recordio/README.md b/recordio/README.md index eee29e0..04a87c4 100644 --- a/recordio/README.md +++ b/recordio/README.md @@ -218,6 +218,4 @@ Since version 5.x the linux kernel supports a new asynchronous approach to execu You can read more about io_uring at [https://kernel.dk/io_uring.pdf](https://kernel.dk/io_uring.pdf). - - - +In case you have SELinux enabled, you might hit "permission denied" errors when initializing the uring try to test with permissive mode enabled temporarily. diff --git a/recordio/async_writer.go b/recordio/async_writer.go index 72b179d..147ce0b 100644 --- a/recordio/async_writer.go +++ b/recordio/async_writer.go @@ -1,3 +1,5 @@ +//go:build linux + package recordio import ( @@ -17,22 +19,13 @@ type AsyncWriter struct { offset uint64 } -// TODO(thomas): not thread-safe yet +// TODO(thomas): not thread-safe (yet) func (w *AsyncWriter) Write(p []byte) (int, error) { for w.submittedSQEs >= w.ringSize { - // wait for at least one event to free from the queue - cqe, err := w.ring.SubmitAndWaitCQEvents(1) + err := w.submitAwaitOne() if err != nil { return 0, err } - - err = cqe.Error() - if err != nil { - return 0, err - } - - atomic.AddInt32(&w.submittedSQEs, -1) - w.ring.SeenCQE(cqe) } err := w.ring.QueueSQE(uring.Write(w.file.Fd(), p, w.offset), 0, 0) @@ -47,15 +40,30 @@ func (w *AsyncWriter) Write(p []byte) (int, error) { } func (w *AsyncWriter) Flush() error { - for w.submittedSQEs >= 0 { + for w.submittedSQEs > 0 { // wait for at least one event to free from the queue - cqe, err := w.ring.SubmitAndWaitCQEvents(1) + err := w.submitAwaitOne() if err != nil { return err } + } + + return nil +} - atomic.AddInt32(&w.submittedSQEs, -1) - w.ring.SeenCQE(cqe) +func (w *AsyncWriter) submitAwaitOne() error { + // TODO(thomas): most likely there are more CQ events waiting, we should try to drain them optimistically to avoid overflowing memory buffers + cqe, err := w.ring.SubmitAndWaitCQEvents(1) + if err != nil { + return err + } + + atomic.AddInt32(&w.submittedSQEs, -1) + w.ring.SeenCQE(cqe) + + err = cqe.Error() + if err != nil { + return err } return nil diff --git a/recordio/async_writer_test.go b/recordio/async_writer_test.go index d214ba4..dca5e04 100644 --- a/recordio/async_writer_test.go +++ b/recordio/async_writer_test.go @@ -1,18 +1,24 @@ +//go:build linux + package recordio import ( - "fmt" - "github.com/godzie44/go-uring/uring" "github.com/stretchr/testify/require" + "github.com/thomasjungblut/go-sstables/recordio/iouring" "io/ioutil" - "os" "testing" ) func TestAsyncWriter_HappyPath(t *testing.T) { + ok, err := iouring.IsIOUringAvailable() + require.NoError(t, err) + if !ok { + t.Skip("iouring not available here") + return + } + temp, err := ioutil.TempFile("", "TestAsyncWriter_HappyPath") require.NoError(t, err) - require.NoError(t, temp.Close()) defer closeCleanFile(t, temp) writer, err := NewAsyncWriter(temp.Name(), 4) @@ -25,35 +31,3 @@ func TestAsyncWriter_HappyPath(t *testing.T) { require.NoError(t, writer.Close()) } - -func TestExample(t *testing.T) { - ring, err := uring.New(8) - require.NoError(t, err) - defer ring.Close() - - // open file and init read buffers - file, err := os.Open("./go.mod") - require.NoError(t, err) - stat, _ := file.Stat() - buff := make([]byte, stat.Size()) - - // add Read operation to SQ queue - err = ring.QueueSQE(uring.Read(file.Fd(), buff, 0), 0, 0) - require.NoError(t, err) - - // submit all SQ new entries - _, err = ring.Submit() - require.NoError(t, err) - - // wait until data is reading into buffer - cqe, err := ring.WaitCQEvents(1) - require.NoError(t, err) - - require.NoError(t, cqe.Error()) //check read error - - fmt.Printf("read %d bytes, read result: \n%s", cqe.Res, string(buff)) - - // dequeue CQ - ring.SeenCQE(cqe) - -} diff --git a/recordio/iouring/iouring.go b/recordio/iouring/iouring.go new file mode 100644 index 0000000..bef5987 --- /dev/null +++ b/recordio/iouring/iouring.go @@ -0,0 +1,17 @@ +//go:build linux + +package iouring + +import "github.com/godzie44/go-uring/uring" + +// IsIOUringAvailable tests whether io_uring is supported by the kernel. +// It will return (true, nil) if that's the case, if it's not available it will be (false, nil). +// Any other error will be indicated by the error (either true/false). +func IsIOUringAvailable() (available bool, err error) { + ring, err := uring.New(1) + defer func() { + err = ring.Close() + }() + + return err == nil, err +} diff --git a/recordio/iouring/iouring_test.go b/recordio/iouring/iouring_test.go new file mode 100644 index 0000000..b972e48 --- /dev/null +++ b/recordio/iouring/iouring_test.go @@ -0,0 +1,15 @@ +package iouring + +import ( + "github.com/stretchr/testify/require" + "testing" +) + +func TestIsIOUringAvailable_HappyPath(t *testing.T) { + ok, err := IsIOUringAvailable() + require.NoError(t, err) + if !ok { + t.Skip("iouring not available here") + return + } +} From 349cf9690936cf5f4cb8cd45784722d5d502ede3 Mon Sep 17 00:00:00 2001 From: Thomas Jungblut Date: Mon, 14 Feb 2022 17:27:38 +0100 Subject: [PATCH 3/4] buffer copies --- recordio/async_writer.go | 25 ++++--- recordio/async_writer_test.go | 66 +++++++++++++++++-- recordio/file_writer.go | 2 +- recordio/file_writer_test.go | 5 ++ recordio/io_uring.go | 44 +++++++++++++ .../iouring_test.go => io_uring_test.go} | 2 +- recordio/iouring/iouring.go | 17 ----- recordio/recordio_test.go | 2 +- 8 files changed, 128 insertions(+), 35 deletions(-) create mode 100644 recordio/io_uring.go rename recordio/{iouring/iouring_test.go => io_uring_test.go} (93%) delete mode 100644 recordio/iouring/iouring.go diff --git a/recordio/async_writer.go b/recordio/async_writer.go index 147ce0b..fc576ef 100644 --- a/recordio/async_writer.go +++ b/recordio/async_writer.go @@ -5,7 +5,6 @@ package recordio import ( "github.com/godzie44/go-uring/uring" "os" - "sync/atomic" ) // AsyncWriter takes an uring and executes all writes asynchronously. There are only two barriers: flush and close. @@ -28,13 +27,19 @@ func (w *AsyncWriter) Write(p []byte) (int, error) { } } - err := w.ring.QueueSQE(uring.Write(w.file.Fd(), p, w.offset), 0, 0) + // TODO(thomas): we would need to make a defensive copy for p, which actually is not optimal + // the reason is the buffer pooling (or the header reuse). It so happens that the original backing array was written + // a couple times before the ring was submitted. That caused some funny offsets to be written and eventually fail reading. + pc := make([]byte, len(p)) + copy(pc, p) + + err := w.ring.QueueSQE(uring.Write(w.file.Fd(), pc, w.offset), 0, 0) if err != nil { return 0, err } - atomic.AddInt32(&w.submittedSQEs, 1) - atomic.AddUint64(&w.offset, uint64(len(p))) + w.submittedSQEs++ + w.offset += uint64(len(p)) return len(p), nil } @@ -58,7 +63,7 @@ func (w *AsyncWriter) submitAwaitOne() error { return err } - atomic.AddInt32(&w.submittedSQEs, -1) + w.submittedSQEs-- w.ring.SeenCQE(cqe) err = cqe.Error() @@ -92,20 +97,20 @@ func (w *AsyncWriter) Close() error { return w.file.Close() } -func NewAsyncWriter(filePath string, numRingEntries uint32, opts ...uring.SetupOption) (WriteCloserFlusher, error) { +func NewAsyncWriter(filePath string, numRingEntries uint32, opts ...uring.SetupOption) (WriteCloserFlusher, *os.File, error) { ring, err := uring.New(numRingEntries, opts...) if err != nil { - return nil, err + return nil, nil, err } writeFile, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0666) if err != nil { - return nil, err + return nil, nil, err } err = ring.RegisterFiles([]int{int(writeFile.Fd())}) if err != nil { - return nil, err + return nil, nil, err } writer := &AsyncWriter{ @@ -114,5 +119,5 @@ func NewAsyncWriter(filePath string, numRingEntries uint32, opts ...uring.SetupO ring: ring, } - return writer, nil + return writer, writeFile, nil } diff --git a/recordio/async_writer_test.go b/recordio/async_writer_test.go index dca5e04..f42f79b 100644 --- a/recordio/async_writer_test.go +++ b/recordio/async_writer_test.go @@ -3,14 +3,15 @@ package recordio import ( + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/thomasjungblut/go-sstables/recordio/iouring" "io/ioutil" + "os" "testing" ) func TestAsyncWriter_HappyPath(t *testing.T) { - ok, err := iouring.IsIOUringAvailable() + ok, err := IsIOUringAvailable() require.NoError(t, err) if !ok { t.Skip("iouring not available here") @@ -21,13 +22,68 @@ func TestAsyncWriter_HappyPath(t *testing.T) { require.NoError(t, err) defer closeCleanFile(t, temp) - writer, err := NewAsyncWriter(temp.Name(), 4) + writer, file, err := NewAsyncWriter(temp.Name(), 4) require.NoError(t, err) + require.NotNil(t, file) - for i := 0; i < 10000; i++ { - _, err = writer.Write(randomRecordOfSize(1024)) + var expected []byte + for i := 0; i < 100; i++ { + s := randomRecordOfSize(10) + _, err = writer.Write(s) require.NoError(t, err) + expected = append(expected, s...) } require.NoError(t, writer.Close()) + fileContentEquals(t, file, expected) +} + +func TestAsyncWriter_GuardAgainstBufferReuse(t *testing.T) { + ok, err := IsIOUringAvailable() + require.NoError(t, err) + if !ok { + t.Skip("iouring not available here") + return + } + + temp, err := ioutil.TempFile("", "TestAsyncWriter_GuardAgainstBufferReuse") + require.NoError(t, err) + defer closeCleanFile(t, temp) + + writer, file, err := NewAsyncWriter(temp.Name(), 4) + require.NoError(t, err) + require.NotNil(t, file) + + reusedSlice := []byte{13, 06, 91} + // we are writing the same slice, three times before a forced flush due to capacity + writeBuf(t, writer, reusedSlice) + writeBuf(t, writer, reusedSlice) + writeBuf(t, writer, reusedSlice) + // fourth time we change the slice in-place + reusedSlice[0] = 29 + writeBuf(t, writer, reusedSlice) + writeBuf(t, writer, reusedSlice) + require.NoError(t, writer.Close()) + + fileContentEquals(t, file, []byte{ + 13, 06, 91, + 13, 06, 91, + 13, 06, 91, + 29, 06, 91, + 29, 06, 91, + }) +} + +func fileContentEquals(t *testing.T, file *os.File, expectedContent []byte) { + f, err := os.Open(file.Name()) + require.NoError(t, err) + all, err := ioutil.ReadAll(f) + require.NoError(t, err) + assert.Equal(t, expectedContent, all) +} + +func writeBuf(t *testing.T, writer WriteCloserFlusher, buf []byte) { + o, err := writer.Write(buf) + require.NoError(t, err) + assert.Equal(t, len(buf), o) } diff --git a/recordio/file_writer.go b/recordio/file_writer.go index 0cba66b..f060ef6 100644 --- a/recordio/file_writer.go +++ b/recordio/file_writer.go @@ -88,7 +88,7 @@ func fileHeaderAsByteSlice(compressionType uint32) []byte { } // for legacy reference still around, main paths unused - mostly for tests writing old versions -//noinspection GoUnusedFunction +// noinspection GoUnusedFunction func writeRecordHeaderV1(writer *FileWriter, payloadSizeUncompressed uint64, payloadSizeCompressed uint64) (int, error) { // 4 byte magic number, 8 byte uncompressed size, 8 bytes for compressed size = 20 bytes bytes := make([]byte, RecordHeaderSizeBytes) diff --git a/recordio/file_writer_test.go b/recordio/file_writer_test.go index c9778ef..7bc0941 100644 --- a/recordio/file_writer_test.go +++ b/recordio/file_writer_test.go @@ -158,6 +158,11 @@ func TestWriterInitNoPath(t *testing.T) { assert.Equal(t, errors.New("NewFileWriter: either os.File or string path must be supplied, never both"), err) } +func TestWriterDirectIOAndIOUringDisabled(t *testing.T) { + _, err := NewFileWriter(Path("/tmp/abc"), DirectIO(), IOUring(4)) + assert.Equal(t, errors.New("NewFileWriter: either directIO or io_uring must be enabled, never both"), err) +} + func TestWriterCrashCreatesValidHeader(t *testing.T) { tmpFile, err := ioutil.TempFile("", "recordio_CrashCreatesValidHeader") require.Nil(t, err) diff --git a/recordio/io_uring.go b/recordio/io_uring.go new file mode 100644 index 0000000..77ea4db --- /dev/null +++ b/recordio/io_uring.go @@ -0,0 +1,44 @@ +package recordio + +import ( + "github.com/godzie44/go-uring/uring" + "os" +) + +type IOUringFactory struct { + numRingEntries uint32 + opts []uring.SetupOption +} + +func (f *IOUringFactory) CreateNewReader(filePath string, bufSize int) (*os.File, ByteReaderResetCount, error) { + //TODO implement me + panic("implement me") +} + +func (f *IOUringFactory) CreateNewWriter(filePath string, _ int) (*os.File, WriteCloserFlusher, error) { + writer, file, err := NewAsyncWriter(filePath, f.numRingEntries, f.opts...) + if err != nil { + return nil, nil, err + } + + return file, writer, nil +} + +func NewIOUringFactory(numRingEntries uint32, opts ...uring.SetupOption) *IOUringFactory { + return &IOUringFactory{ + numRingEntries: numRingEntries, + opts: opts, + } +} + +// IsIOUringAvailable tests whether io_uring is supported by the kernel. +// It will return (true, nil) if that's the case, if it's not available it will be (false, nil). +// Any other error will be indicated by the error (either true/false). +func IsIOUringAvailable() (available bool, err error) { + ring, err := uring.New(1) + defer func() { + err = ring.Close() + }() + + return err == nil, err +} diff --git a/recordio/iouring/iouring_test.go b/recordio/io_uring_test.go similarity index 93% rename from recordio/iouring/iouring_test.go rename to recordio/io_uring_test.go index b972e48..8eb40ab 100644 --- a/recordio/iouring/iouring_test.go +++ b/recordio/io_uring_test.go @@ -1,4 +1,4 @@ -package iouring +package recordio import ( "github.com/stretchr/testify/require" diff --git a/recordio/iouring/iouring.go b/recordio/iouring/iouring.go deleted file mode 100644 index bef5987..0000000 --- a/recordio/iouring/iouring.go +++ /dev/null @@ -1,17 +0,0 @@ -//go:build linux - -package iouring - -import "github.com/godzie44/go-uring/uring" - -// IsIOUringAvailable tests whether io_uring is supported by the kernel. -// It will return (true, nil) if that's the case, if it's not available it will be (false, nil). -// Any other error will be indicated by the error (either true/false). -func IsIOUringAvailable() (available bool, err error) { - ring, err := uring.New(1) - defer func() { - err = ring.Close() - }() - - return err == nil, err -} diff --git a/recordio/recordio_test.go b/recordio/recordio_test.go index 6d51257..9e1c823 100644 --- a/recordio/recordio_test.go +++ b/recordio/recordio_test.go @@ -61,7 +61,7 @@ func TestReadWriteEndToEndDirectIO(t *testing.T) { return } - tmpFile, err := ioutil.TempFile("", "recordio_EndToEnd") + tmpFile, err := ioutil.TempFile("", "recordio_EndToEndDirectIO") require.NoError(t, err) defer func() { require.NoError(t, os.Remove(tmpFile.Name())) }() writer, err := NewFileWriter(File(tmpFile), DirectIO()) From d647670e7e4d39fbff03ec579a346c1a3adc47df Mon Sep 17 00:00:00 2001 From: Thomas Jungblut Date: Fri, 2 Dec 2022 12:52:09 +0100 Subject: [PATCH 4/4] remove e2e test --- recordio/file_writer_test.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/recordio/file_writer_test.go b/recordio/file_writer_test.go index 7bc0941..c9778ef 100644 --- a/recordio/file_writer_test.go +++ b/recordio/file_writer_test.go @@ -158,11 +158,6 @@ func TestWriterInitNoPath(t *testing.T) { assert.Equal(t, errors.New("NewFileWriter: either os.File or string path must be supplied, never both"), err) } -func TestWriterDirectIOAndIOUringDisabled(t *testing.T) { - _, err := NewFileWriter(Path("/tmp/abc"), DirectIO(), IOUring(4)) - assert.Equal(t, errors.New("NewFileWriter: either directIO or io_uring must be enabled, never both"), err) -} - func TestWriterCrashCreatesValidHeader(t *testing.T) { tmpFile, err := ioutil.TempFile("", "recordio_CrashCreatesValidHeader") require.Nil(t, err)