Skip to content

Commit e229ab0

Browse files
committed
buffer copies
1 parent f0f750c commit e229ab0

8 files changed

+209
-67
lines changed

recordio/async_writer.go

+15-10
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package recordio
55
import (
66
"github.com/godzie44/go-uring/uring"
77
"os"
8-
"sync/atomic"
98
)
109

1110
// AsyncWriter takes an uring and executes all writes asynchronously. There are only two barriers: flush and close.
@@ -28,13 +27,19 @@ func (w *AsyncWriter) Write(p []byte) (int, error) {
2827
}
2928
}
3029

31-
err := w.ring.QueueSQE(uring.Write(w.file.Fd(), p, w.offset), 0, 0)
30+
// TODO(thomas): we would need to make a defensive copy for p, which actually is not optimal
31+
// the reason is the buffer pooling (or the header reuse). It so happens that the original backing array was written
32+
// a couple times before the ring was submitted. That caused some funny offsets to be written and eventually fail reading.
33+
pc := make([]byte, len(p))
34+
copy(pc, p)
35+
36+
err := w.ring.QueueSQE(uring.Write(w.file.Fd(), pc, w.offset), 0, 0)
3237
if err != nil {
3338
return 0, err
3439
}
3540

36-
atomic.AddInt32(&w.submittedSQEs, 1)
37-
atomic.AddUint64(&w.offset, uint64(len(p)))
41+
w.submittedSQEs++
42+
w.offset += uint64(len(p))
3843

3944
return len(p), nil
4045
}
@@ -58,7 +63,7 @@ func (w *AsyncWriter) submitAwaitOne() error {
5863
return err
5964
}
6065

61-
atomic.AddInt32(&w.submittedSQEs, -1)
66+
w.submittedSQEs--
6267
w.ring.SeenCQE(cqe)
6368

6469
err = cqe.Error()
@@ -92,20 +97,20 @@ func (w *AsyncWriter) Close() error {
9297
return w.file.Close()
9398
}
9499

95-
func NewAsyncWriter(filePath string, numRingEntries uint32, opts ...uring.SetupOption) (WriteCloserFlusher, error) {
100+
func NewAsyncWriter(filePath string, numRingEntries uint32, opts ...uring.SetupOption) (WriteCloserFlusher, *os.File, error) {
96101
ring, err := uring.New(numRingEntries, opts...)
97102
if err != nil {
98-
return nil, err
103+
return nil, nil, err
99104
}
100105

101106
writeFile, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0666)
102107
if err != nil {
103-
return nil, err
108+
return nil, nil, err
104109
}
105110

106111
err = ring.RegisterFiles([]int{int(writeFile.Fd())})
107112
if err != nil {
108-
return nil, err
113+
return nil, nil, err
109114
}
110115

111116
writer := &AsyncWriter{
@@ -114,5 +119,5 @@ func NewAsyncWriter(filePath string, numRingEntries uint32, opts ...uring.SetupO
114119
ring: ring,
115120
}
116121

117-
return writer, nil
122+
return writer, writeFile, nil
118123
}

recordio/async_writer_test.go

+61-5
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@
33
package recordio
44

55
import (
6+
"github.com/stretchr/testify/assert"
67
"github.com/stretchr/testify/require"
7-
"github.com/thomasjungblut/go-sstables/recordio/iouring"
88
"io/ioutil"
9+
"os"
910
"testing"
1011
)
1112

1213
func TestAsyncWriter_HappyPath(t *testing.T) {
13-
ok, err := iouring.IsIOUringAvailable()
14+
ok, err := IsIOUringAvailable()
1415
require.NoError(t, err)
1516
if !ok {
1617
t.Skip("iouring not available here")
@@ -21,13 +22,68 @@ func TestAsyncWriter_HappyPath(t *testing.T) {
2122
require.NoError(t, err)
2223
defer closeCleanFile(t, temp)
2324

24-
writer, err := NewAsyncWriter(temp.Name(), 4)
25+
writer, file, err := NewAsyncWriter(temp.Name(), 4)
2526
require.NoError(t, err)
27+
require.NotNil(t, file)
2628

27-
for i := 0; i < 10000; i++ {
28-
_, err = writer.Write(randomRecordOfSize(1024))
29+
var expected []byte
30+
for i := 0; i < 100; i++ {
31+
s := randomRecordOfSize(10)
32+
_, err = writer.Write(s)
2933
require.NoError(t, err)
34+
expected = append(expected, s...)
3035
}
3136

3237
require.NoError(t, writer.Close())
38+
fileContentEquals(t, file, expected)
39+
}
40+
41+
func TestAsyncWriter_GuardAgainstBufferReuse(t *testing.T) {
42+
ok, err := IsIOUringAvailable()
43+
require.NoError(t, err)
44+
if !ok {
45+
t.Skip("iouring not available here")
46+
return
47+
}
48+
49+
temp, err := ioutil.TempFile("", "TestAsyncWriter_GuardAgainstBufferReuse")
50+
require.NoError(t, err)
51+
defer closeCleanFile(t, temp)
52+
53+
writer, file, err := NewAsyncWriter(temp.Name(), 4)
54+
require.NoError(t, err)
55+
require.NotNil(t, file)
56+
57+
reusedSlice := []byte{13, 06, 91}
58+
// we are writing the same slice, three times before a forced flush due to capacity
59+
writeBuf(t, writer, reusedSlice)
60+
writeBuf(t, writer, reusedSlice)
61+
writeBuf(t, writer, reusedSlice)
62+
// fourth time we change the slice in-place
63+
reusedSlice[0] = 29
64+
writeBuf(t, writer, reusedSlice)
65+
writeBuf(t, writer, reusedSlice)
66+
require.NoError(t, writer.Close())
67+
68+
fileContentEquals(t, file, []byte{
69+
13, 06, 91,
70+
13, 06, 91,
71+
13, 06, 91,
72+
29, 06, 91,
73+
29, 06, 91,
74+
})
75+
}
76+
77+
func fileContentEquals(t *testing.T, file *os.File, expectedContent []byte) {
78+
f, err := os.Open(file.Name())
79+
require.NoError(t, err)
80+
all, err := ioutil.ReadAll(f)
81+
require.NoError(t, err)
82+
assert.Equal(t, expectedContent, all)
83+
}
84+
85+
func writeBuf(t *testing.T, writer WriteCloserFlusher, buf []byte) {
86+
o, err := writer.Write(buf)
87+
require.NoError(t, err)
88+
assert.Equal(t, len(buf), o)
3389
}

recordio/file_writer.go

+58-33
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"encoding/binary"
55
"errors"
66
"fmt"
7+
"github.com/godzie44/go-uring/uring"
78
"os"
89

910
pool "github.com/libp2p/go-buffer-pool"
@@ -21,14 +22,14 @@ type FileWriter struct {
2122
open bool
2223
closed bool
2324

24-
file *os.File
25-
bufWriter WriteCloserFlusher
26-
currentOffset uint64
27-
compressionType int
28-
compressor compressor.CompressionI
29-
recordHeaderCache []byte
30-
bufferPool *pool.BufferPool
31-
directIOEnabled bool
25+
file *os.File
26+
bufWriter WriteCloserFlusher
27+
currentOffset uint64
28+
compressionType int
29+
compressor compressor.CompressionI
30+
recordHeaderCache []byte
31+
bufferPool *pool.BufferPool
32+
alignedBlockWrites bool
3233
}
3334

3435
var DirectIOSyncWriteErr = errors.New("currently not supporting directIO with sync writing")
@@ -59,7 +60,7 @@ func (w *FileWriter) Open() error {
5960

6061
// we flush early to get a valid file with header written, this is important in crash scenarios
6162
// when directIO is enabled however, we can't write misaligned blocks - thus this is not executed
62-
if !w.directIOEnabled {
63+
if !w.alignedBlockWrites {
6364
err = w.bufWriter.Flush()
6465
if err != nil {
6566
return fmt.Errorf("flushing header in file at '%s' failed with %w", w.file.Name(), err)
@@ -160,9 +161,10 @@ func (w *FileWriter) Write(record []byte) (uint64, error) {
160161
return prevOffset, nil
161162
}
162163

163-
// WriteSync appends a record of bytes and forces a disk sync, returns the current offset this item was written to
164+
// WriteSync appends a record of bytes and forces a disk sync, returns the current offset this item was written to.
165+
// When directIO is enabled however, we can't write misaligned blocks and immediately returns DirectIOSyncWriteErr
164166
func (w *FileWriter) WriteSync(record []byte) (uint64, error) {
165-
if w.directIOEnabled {
167+
if w.alignedBlockWrites {
166168
return 0, DirectIOSyncWriteErr
167169
}
168170

@@ -205,11 +207,14 @@ func (w *FileWriter) Size() uint64 {
205207
// options
206208

207209
type FileWriterOptions struct {
208-
path string
209-
file *os.File
210-
compressionType int
211-
bufferSizeBytes int
212-
useDirectIO bool
210+
path string
211+
file *os.File
212+
compressionType int
213+
bufferSizeBytes int
214+
enableDirectIO bool
215+
enableIOUring bool
216+
ioUringNumRingEntries uint32
217+
ioUringOpts []uring.SetupOption
213218
}
214219

215220
type FileWriterOption func(*FileWriterOptions)
@@ -246,21 +251,35 @@ func BufferSizeBytes(p int) FileWriterOption {
246251
}
247252
}
248253

249-
// DirectIO is experimental: this flag enables DirectIO while writing, this currently might not work due to the misaligned allocations
254+
// DirectIO is experimental: this flag enables DirectIO while writing. This has some limitation when writing headers and
255+
// disables the ability to use WriteSync.
250256
func DirectIO() FileWriterOption {
251257
return func(args *FileWriterOptions) {
252-
args.useDirectIO = true
258+
args.enableDirectIO = true
259+
}
260+
}
261+
262+
// IOUring is experimental: this flag enables async writes using io_uring. This has some limitation around platform, it
263+
// needs Linux and recent 5.x kernel to work. This currently also does not work together with DirectIO.
264+
func IOUring(numRingEntries uint32, opts ...uring.SetupOption) FileWriterOption {
265+
return func(args *FileWriterOptions) {
266+
args.enableIOUring = true
267+
args.ioUringNumRingEntries = numRingEntries
268+
args.ioUringOpts = opts
253269
}
254270
}
255271

256272
// NewFileWriter creates a new writer with the given options, either Path or File must be supplied, compression is optional.
257273
func NewFileWriter(writerOptions ...FileWriterOption) (WriterI, error) {
258274
opts := &FileWriterOptions{
259-
path: "",
260-
file: nil,
261-
compressionType: CompressionTypeNone,
262-
bufferSizeBytes: DefaultBufferSize,
263-
useDirectIO: false,
275+
path: "",
276+
file: nil,
277+
compressionType: CompressionTypeNone,
278+
bufferSizeBytes: DefaultBufferSize,
279+
enableDirectIO: false,
280+
enableIOUring: false,
281+
ioUringNumRingEntries: 4,
282+
ioUringOpts: nil,
264283
}
265284

266285
for _, writeOption := range writerOptions {
@@ -271,13 +290,19 @@ func NewFileWriter(writerOptions ...FileWriterOption) (WriterI, error) {
271290
return nil, errors.New("NewFileWriter: either os.File or string path must be supplied, never both")
272291
}
273292

293+
if opts.enableIOUring && opts.enableDirectIO {
294+
return nil, errors.New("NewFileWriter: either directIO or io_uring must be enabled, never both")
295+
}
296+
274297
if opts.path == "" {
275298
opts.path = opts.file.Name()
276299
}
277300

278301
var factory ReaderWriterCloserFactory
279-
if opts.useDirectIO {
302+
if opts.enableDirectIO {
280303
factory = DirectIOFactory{}
304+
} else if opts.enableIOUring {
305+
factory = NewIOUringFactory(opts.ioUringNumRingEntries, opts.ioUringOpts...)
281306
} else {
282307
factory = BufferedIOFactory{}
283308
}
@@ -294,18 +319,18 @@ func NewFileWriter(writerOptions ...FileWriterOption) (WriterI, error) {
294319
if err != nil {
295320
return nil, fmt.Errorf("failed to create new Writer at '%s' failed with %w", opts.path, err)
296321
}
297-
return newCompressedFileWriterWithFile(file, writer, opts.compressionType, opts.useDirectIO)
322+
return newCompressedFileWriterWithFile(file, writer, opts.compressionType, opts.enableDirectIO)
298323
}
299324

300325
// creates a new writer with the given os.File, with the desired compression
301-
func newCompressedFileWriterWithFile(file *os.File, bufWriter WriteCloserFlusher, compType int, directIOEnabled bool) (WriterI, error) {
326+
func newCompressedFileWriterWithFile(file *os.File, bufWriter WriteCloserFlusher, compType int, alignedBlockWrites bool) (WriterI, error) {
302327
return &FileWriter{
303-
file: file,
304-
bufWriter: bufWriter,
305-
directIOEnabled: directIOEnabled,
306-
open: false,
307-
closed: false,
308-
compressionType: compType,
309-
currentOffset: 0,
328+
file: file,
329+
bufWriter: bufWriter,
330+
alignedBlockWrites: alignedBlockWrites,
331+
open: false,
332+
closed: false,
333+
compressionType: compType,
334+
currentOffset: 0,
310335
}, nil
311336
}

recordio/file_writer_test.go

+5
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,11 @@ func TestWriterInitNoPath(t *testing.T) {
158158
assert.Equal(t, errors.New("NewFileWriter: either os.File or string path must be supplied, never both"), err)
159159
}
160160

161+
func TestWriterDirectIOAndIOUringDisabled(t *testing.T) {
162+
_, err := NewFileWriter(Path("/tmp/abc"), DirectIO(), IOUring(4))
163+
assert.Equal(t, errors.New("NewFileWriter: either directIO or io_uring must be enabled, never both"), err)
164+
}
165+
161166
func TestWriterCrashCreatesValidHeader(t *testing.T) {
162167
tmpFile, err := ioutil.TempFile("", "recordio_CrashCreatesValidHeader")
163168
require.Nil(t, err)

recordio/io_uring.go

+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package recordio
2+
3+
import (
4+
"github.com/godzie44/go-uring/uring"
5+
"os"
6+
)
7+
8+
type IOUringFactory struct {
9+
numRingEntries uint32
10+
opts []uring.SetupOption
11+
}
12+
13+
func (f *IOUringFactory) CreateNewReader(filePath string, bufSize int) (*os.File, ByteReaderResetCount, error) {
14+
//TODO implement me
15+
panic("implement me")
16+
}
17+
18+
func (f *IOUringFactory) CreateNewWriter(filePath string, _ int) (*os.File, WriteCloserFlusher, error) {
19+
writer, file, err := NewAsyncWriter(filePath, f.numRingEntries, f.opts...)
20+
if err != nil {
21+
return nil, nil, err
22+
}
23+
24+
return file, writer, nil
25+
}
26+
27+
func NewIOUringFactory(numRingEntries uint32, opts ...uring.SetupOption) *IOUringFactory {
28+
return &IOUringFactory{
29+
numRingEntries: numRingEntries,
30+
opts: opts,
31+
}
32+
}
33+
34+
// IsIOUringAvailable tests whether io_uring is supported by the kernel.
35+
// It will return (true, nil) if that's the case, if it's not available it will be (false, nil).
36+
// Any other error will be indicated by the error (either true/false).
37+
func IsIOUringAvailable() (available bool, err error) {
38+
ring, err := uring.New(1)
39+
defer func() {
40+
err = ring.Close()
41+
}()
42+
43+
return err == nil, err
44+
}

recordio/iouring/iouring_test.go renamed to recordio/io_uring_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package iouring
1+
package recordio
22

33
import (
44
"github.com/stretchr/testify/require"

0 commit comments

Comments
 (0)