diff --git a/Makefile b/Makefile index 75af185..ef1393c 100644 --- a/Makefile +++ b/Makefile @@ -26,6 +26,11 @@ compile-proto: protoc --go_out=. --go_opt=paths=source_relative _examples/proto/mutation.proto protoc --go_out=. --go_opt=paths=source_relative benchmark/proto/bench.proto protoc --go_out=. --go_opt=paths=source_relative sstables/proto/sstable.proto + @echo + @echo "==> Compiling capnproto files <==" + capnp compile -ogo -I ../go-capnp/std/ benchmark/capnproto/bench.capnp + capnp compile -ogo -I ../go-capnp/std/ recordio/test_files/text_line.capnp + .PHONY: release release: diff --git a/benchmark/capnproto/bench.capnp b/benchmark/capnproto/bench.capnp new file mode 100644 index 0000000..9d83d29 --- /dev/null +++ b/benchmark/capnproto/bench.capnp @@ -0,0 +1,8 @@ +@0xbf8381dfd6a0d017; +using Go = import "/go.capnp"; +$Go.package("capnproto"); +$Go.import("benchmark/capnproto"); + +struct BytesMsg { + key @0 :Data; +} diff --git a/benchmark/capnproto/bench.capnp.go b/benchmark/capnproto/bench.capnp.go new file mode 100644 index 0000000..c09816c --- /dev/null +++ b/benchmark/capnproto/bench.capnp.go @@ -0,0 +1,107 @@ +// Code generated by capnpc-go. DO NOT EDIT. + +package capnproto + +import ( + capnp "capnproto.org/go/capnp/v3" + text "capnproto.org/go/capnp/v3/encoding/text" + schemas "capnproto.org/go/capnp/v3/schemas" +) + +type BytesMsg capnp.Struct + +// BytesMsg_TypeID is the unique identifier for the type BytesMsg. +const BytesMsg_TypeID = 0xc40df8c769538d0a + +func NewBytesMsg(s *capnp.Segment) (BytesMsg, error) { + st, err := capnp.NewStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 1}) + return BytesMsg(st), err +} + +func NewRootBytesMsg(s *capnp.Segment) (BytesMsg, error) { + st, err := capnp.NewRootStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 1}) + return BytesMsg(st), err +} + +func ReadRootBytesMsg(msg *capnp.Message) (BytesMsg, error) { + root, err := msg.Root() + return BytesMsg(root.Struct()), err +} + +func (s BytesMsg) String() string { + str, _ := text.Marshal(0xc40df8c769538d0a, capnp.Struct(s)) + return str +} + +func (s BytesMsg) EncodeAsPtr(seg *capnp.Segment) capnp.Ptr { + return capnp.Struct(s).EncodeAsPtr(seg) +} + +func (BytesMsg) DecodeFromPtr(p capnp.Ptr) BytesMsg { + return BytesMsg(capnp.Struct{}.DecodeFromPtr(p)) +} + +func (s BytesMsg) ToPtr() capnp.Ptr { + return capnp.Struct(s).ToPtr() +} +func (s BytesMsg) IsValid() bool { + return capnp.Struct(s).IsValid() +} + +func (s BytesMsg) Message() *capnp.Message { + return capnp.Struct(s).Message() +} + +func (s BytesMsg) Segment() *capnp.Segment { + return capnp.Struct(s).Segment() +} +func (s BytesMsg) Key() ([]byte, error) { + p, err := capnp.Struct(s).Ptr(0) + return []byte(p.Data()), err +} + +func (s BytesMsg) HasKey() bool { + return capnp.Struct(s).HasPtr(0) +} + +func (s BytesMsg) SetKey(v []byte) error { + return capnp.Struct(s).SetData(0, v) +} + +// BytesMsg_List is a list of BytesMsg. +type BytesMsg_List = capnp.StructList[BytesMsg] + +// NewBytesMsg creates a new list of BytesMsg. +func NewBytesMsg_List(s *capnp.Segment, sz int32) (BytesMsg_List, error) { + l, err := capnp.NewCompositeList(s, capnp.ObjectSize{DataSize: 0, PointerCount: 1}, sz) + return capnp.StructList[BytesMsg](l), err +} + +// BytesMsg_Future is a wrapper for a BytesMsg promised by a client call. +type BytesMsg_Future struct{ *capnp.Future } + +func (f BytesMsg_Future) Struct() (BytesMsg, error) { + p, err := f.Future.Ptr() + return BytesMsg(p.Struct()), err +} + +const schema_bf8381dfd6a0d017 = "x\xda\x12\x08q`1\xe4\xdd\xcf\xc8\xc0\x14(\xc2\xca" + + "\xf6\x9f\xab78\xf3\xf8\x0f\xde#\x0c\x82\x0a\x8c\xff\xc5" + + "/,\xb8v\xbf\xb1y?\x03+#;\x03\x83\xe1Q" + + "/F\xc1\x9b\xec\x0c\x0c\x82W\xed\x19t\xff'\xa5\xe6" + + "%g\xe4&\x16\xb1d\xeb''\x16\xe4\x15\x14\xe5\x97" + + "\xe4\xeb\x83\x05\xf5\xc0|+\xa7\xca\x92\xd4b\xdf\xe2t" + + "\x06\x86\x00F\xc6@\x16f\x16\x06\x06\x16F\x06\x06A" + + "^%\x06\x86@\x0ef\xc6@\x11&F\xf6\xec\xd4J" + + "F^\x06&F^\x06F@\x00\x00\x00\xff\xffxk" + + "$m" + +func RegisterSchema(reg *schemas.Registry) { + reg.Register(&schemas.Schema{ + String: schema_bf8381dfd6a0d017, + Nodes: []uint64{ + 0xc40df8c769538d0a, + }, + Compressed: true, + }) +} diff --git a/benchmark/recordio_read_test.go b/benchmark/recordio_read_test.go index c0df03f..8333eab 100644 --- a/benchmark/recordio_read_test.go +++ b/benchmark/recordio_read_test.go @@ -1,10 +1,14 @@ package benchmark import ( + "capnproto.org/go/capnp/v3" "errors" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + bCapnProto "github.com/thomasjungblut/go-sstables/benchmark/capnproto" bProto "github.com/thomasjungblut/go-sstables/benchmark/proto" "github.com/thomasjungblut/go-sstables/recordio" + rCapnProto "github.com/thomasjungblut/go-sstables/recordio/capnproto" rProto "github.com/thomasjungblut/go-sstables/recordio/proto" "io" "os" @@ -117,3 +121,65 @@ func BenchmarkRecordIOProtoRead(b *testing.B) { } } + +func BenchmarkRecordIOCapnProtoRead(b *testing.B) { + benchmarks := []struct { + name string + fileSize int + }{ + {"32mb", 1024 * 1024 * 32}, + {"64mb", 1024 * 1024 * 64}, + {"128mb", 1024 * 1024 * 128}, + {"256mb", 1024 * 1024 * 256}, + {"512mb", 1024 * 1024 * 512}, + {"1024mb", 1024 * 1024 * 1024}, + {"2048mb", 1024 * 1024 * 1024 * 2}, + {"4096mb", 1024 * 1024 * 1024 * 4}, + {"8192mb", 1024 * 1024 * 1024 * 8}, + } + + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + bytes := randomRecordOfSize(1024) + tmpFile, err := os.CreateTemp("", "recordio_Bench") + assert.NoError(b, err) + defer os.Remove(tmpFile.Name()) + + writer, err := rCapnProto.NewWriter(rCapnProto.File(tmpFile)) + assert.NoError(b, err) + assert.NoError(b, writer.Open()) + + arena := capnp.SingleSegment(nil) + msg, seg, err := capnp.NewMessage(arena) + require.NoError(b, err) + bytesMsg, err := bCapnProto.NewBytesMsg(seg) + require.NoError(b, err) + require.NoError(b, bytesMsg.SetKey(bytes)) + for writer.Size() < uint64(bm.fileSize) { + _, _ = writer.Write(msg) + } + b.SetBytes(int64(writer.Size())) + assert.NoError(b, writer.Close()) + + b.ResetTimer() + for n := 0; n < b.N; n++ { + reader, err := rCapnProto.NewReader(rCapnProto.ReaderPath(tmpFile.Name())) + assert.NoError(b, err) + assert.NoError(b, reader.Open()) + + for { + msg, err := reader.ReadNext() + if errors.Is(err, io.EOF) { + break + } + + _, err = bCapnProto.ReadRootBytesMsg(msg) + require.NoError(b, err) + } + + assert.NoError(b, reader.Close()) + } + }) + } + +} diff --git a/go.mod b/go.mod index eaf88a7..46cab9b 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3 // indirect + golang.org/x/sync v0.7.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 8cddc58..f218478 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,8 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/ncw/directio v1.0.5 h1:JSUBhdjEvVaJvOoyPAbcW0fnd0tvRXD76wEfZ1KcQz4= github.com/ncw/directio v1.0.5/go.mod h1:rX/pKEYkOXBGOggmcyJeJGloCkleSvphPx2eV3t6ROk= +github.com/philhofer/fwd v1.1.2 h1:bnDivRJ1EWPjUIRXV5KfORO897HTbpFAQddBdE8t7Gw= +github.com/philhofer/fwd v1.1.2/go.mod h1:qkPdfjR2SIEbspLqpe1tO4n5yICnr2DY7mqEx2tUTP0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570 h1:gIlAHnH1vJb5vwEjIp5kBj/eu99p/bl0Ay2goiPe5xE= @@ -18,10 +20,16 @@ github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3 h1:njlZPzLwU639 github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3/go.mod h1:hpGUWaI9xL8pRQCTXQgocU38Qw1g0Us7n5PxxTwTCYU= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/tinylib/msgp v1.1.9 h1:SHf3yoO2sGA0veCJeCBYLHuttAVFHGm2RHgNodW7wQU= +github.com/tinylib/msgp v1.1.9/go.mod h1:BCXGB54lDD8qUEPmiG0cQQUANC4IUQyB2ItS2UDlO/k= +github.com/tj/assert v0.0.3 h1:Df/BlaZ20mq6kuai7f5z2TvPFiwC3xaWJSDQNiIS3Rk= +github.com/tj/assert v0.0.3/go.mod h1:Ne6X72Q+TB1AteidzQncjw9PabbMp4PBMZ1k+vd1Pvk= github.com/tjungblu/porcupine v0.0.0-20221116095144-377185aa0569 h1:acDBvgSBtnyBidmpmTEgxStjXeuYyfG3fF72khP24/Y= github.com/tjungblu/porcupine v0.0.0-20221116095144-377185aa0569/go.mod h1:+z336r1WR0gcwl1ALfoNBpDTCW06vO5DzBwunEcSvcs= golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 h1:yixxcjnhBmY0nkL253HFVIm0JsFHwrHdT3Yh6szTnfY= golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8/go.mod h1:jj3sYF3dwk5D+ghuXyeI3r5MFf+NT2An6/9dOA95KSI= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= diff --git a/recordio/capnproto/capnproto_reader.go b/recordio/capnproto/capnproto_reader.go new file mode 100644 index 0000000..9fb7f79 --- /dev/null +++ b/recordio/capnproto/capnproto_reader.go @@ -0,0 +1,85 @@ +package capnproto + +import ( + "capnproto.org/go/capnp/v3" + "errors" + "os" + + "github.com/thomasjungblut/go-sstables/recordio" +) + +type Reader struct { + recordio.ReaderI +} + +func (r *Reader) ReadNext() (*capnp.Message, error) { + bytes, err := r.ReaderI.ReadNext() + if err != nil { + return nil, err + } + + return capnp.Unmarshal(bytes) +} + +// options + +type ReaderOptions struct { + path string + file *os.File + bufSizeBytes int +} + +type ReaderOption func(*ReaderOptions) + +func ReaderPath(p string) ReaderOption { + return func(args *ReaderOptions) { + args.path = p + } +} + +func ReaderFile(p *os.File) ReaderOption { + return func(args *ReaderOptions) { + args.file = p + } +} + +func ReadBufferSizeBytes(p int) ReaderOption { + return func(args *ReaderOptions) { + args.bufSizeBytes = p + } +} + +// NewReader creates a new reader with the given options. Either Path or File must be supplied +func NewReader(readerOptions ...ReaderOption) (ReaderI, error) { + opts := &ReaderOptions{ + path: "", + file: nil, + bufSizeBytes: 1024 * 1024 * 4, + } + + for _, readerOption := range readerOptions { + readerOption(opts) + } + + if (opts.file != nil) && (opts.path != "") { + return nil, errors.New("either os.File or string path must be supplied, never both") + } + + if opts.file == nil { + if opts.path == "" { + return nil, errors.New("path was not supplied") + } + } + reader, err := recordio.NewFileReader( + recordio.ReaderPath(opts.path), + recordio.ReaderFile(opts.file), + recordio.ReaderBufferSizeBytes(opts.bufSizeBytes)) + if err != nil { + return nil, err + } + + return &Reader{ + reader, + }, nil + +} diff --git a/recordio/capnproto/capnproto_writer.go b/recordio/capnproto/capnproto_writer.go new file mode 100644 index 0000000..d1d5646 --- /dev/null +++ b/recordio/capnproto/capnproto_writer.go @@ -0,0 +1,121 @@ +package capnproto + +import ( + "capnproto.org/go/capnp/v3" + "errors" + "os" + + "github.com/ncw/directio" + "github.com/thomasjungblut/go-sstables/recordio" +) + +type Writer struct { + recordio.WriterI +} + +func (w *Writer) Write(record *capnp.Message) (uint64, error) { + bytes, err := record.Marshal() + if err != nil { + return 0, err + } + return w.WriterI.Write(bytes) +} + +func (w *Writer) WriteSync(record *capnp.Message) (uint64, error) { + bytes, err := record.Marshal() + if err != nil { + return 0, err + } + return w.WriterI.WriteSync(bytes) +} + +// options + +type WriterOptions struct { + path string + file *os.File + compressionType int + bufSizeBytes int + useDirectIO bool +} + +type WriterOption func(*WriterOptions) + +func Path(p string) WriterOption { + return func(args *WriterOptions) { + args.path = p + } +} + +func File(p *os.File) WriterOption { + return func(args *WriterOptions) { + args.file = p + } +} + +func CompressionType(p int) WriterOption { + return func(args *WriterOptions) { + args.compressionType = p + } +} + +func WriteBufferSizeBytes(p int) WriterOption { + return func(args *WriterOptions) { + args.bufSizeBytes = p + } +} + +func DirectIO() WriterOption { + return func(args *WriterOptions) { + args.useDirectIO = true + } +} + +// NewWriter creates a new writer with the given options. Either Path or File must be supplied, compression is optional and +// turned off by default. +func NewWriter(writerOptions ...WriterOption) (WriterI, error) { + opts := &WriterOptions{ + path: "", + file: nil, + compressionType: recordio.CompressionTypeNone, + bufSizeBytes: 1024 * 1024 * 4, + useDirectIO: false, + } + + for _, writeOption := range writerOptions { + writeOption(opts) + } + + if (opts.file != nil) && (opts.path != "") { + return nil, errors.New("either os.File or string path must be supplied, never both") + } + + if opts.file == nil { + if opts.path == "" { + return nil, errors.New("path was not supplied") + } + if opts.useDirectIO { + f, err := directio.OpenFile(opts.path, os.O_WRONLY|os.O_CREATE, 0666) + if err != nil { + return nil, err + } + opts.file = f + } else { + f, err := os.OpenFile(opts.path, os.O_WRONLY|os.O_CREATE, 0666) + if err != nil { + return nil, err + } + opts.file = f + } + } + + writer, err := recordio.NewFileWriter( + recordio.File(opts.file), + recordio.CompressionType(opts.compressionType), + recordio.BufferSizeBytes(opts.bufSizeBytes)) + if err != nil { + return nil, err + } + + return &Writer{writer}, nil +} diff --git a/recordio/capnproto/mmap_capnproto_reader.go b/recordio/capnproto/mmap_capnproto_reader.go new file mode 100644 index 0000000..623c5ca --- /dev/null +++ b/recordio/capnproto/mmap_capnproto_reader.go @@ -0,0 +1,36 @@ +package capnproto + +import ( + "capnproto.org/go/capnp/v3" + "github.com/thomasjungblut/go-sstables/recordio" +) + +type MMapProtoReader struct { + reader recordio.ReadAtI +} + +func (r *MMapProtoReader) Open() error { + return r.reader.Open() +} + +func (r *MMapProtoReader) ReadNextAt(offset uint64) (*capnp.Message, error) { + bytes, err := r.reader.ReadNextAt(offset) + if err != nil { + return nil, err + } + + return capnp.Unmarshal(bytes) +} + +func (r *MMapProtoReader) Close() error { + return r.reader.Close() +} + +func NewMMapCapnProtoReaderWithPath(path string) (ReadAtI, error) { + r, err := recordio.NewMemoryMappedReaderWithPath(path) + if err != nil { + return nil, err + } + + return &MMapProtoReader{reader: r}, nil +} diff --git a/recordio/capnproto/recordio_capnproto.go b/recordio/capnproto/recordio_capnproto.go new file mode 100644 index 0000000..53a3237 --- /dev/null +++ b/recordio/capnproto/recordio_capnproto.go @@ -0,0 +1,31 @@ +package capnproto + +import ( + "capnproto.org/go/capnp/v3" + "github.com/thomasjungblut/go-sstables/recordio" +) + +type ReaderI interface { + recordio.OpenClosableI + // ReadNext reads the next record and returns the unmarshalled message, io.EOF error when it reaches the end signalled by (nil, io.EOF) + ReadNext() (*capnp.Message, error) + // SkipNext skips the next record, EOF error when it reaches the end signalled by io.EOF as the error + SkipNext() error +} + +type WriterI interface { + recordio.OpenClosableI + recordio.SizeI + // Write appends a record, returns the current offset this item was written to + Write(record *capnp.Message) (uint64, error) + // WriteSync appends a record and forces a disk sync, returns the current offset this item was written to + WriteSync(record *capnp.Message) (uint64, error) +} + +// ReadAtI implementors must make their implementation thread-safe +type ReadAtI interface { + recordio.OpenClosableI + // ReadNextAt reads the next record at the given offset, EOF error when it reaches the end signalled by (nil, io.EOF). + // It can be wrapped however, so always check using errors.Is(err, io.EOF). Implementation must be thread-safe. + ReadNextAt(offset uint64) (*capnp.Message, error) +} diff --git a/recordio/capnproto/recordio_capnproto_test.go b/recordio/capnproto/recordio_capnproto_test.go new file mode 100644 index 0000000..d19b90b --- /dev/null +++ b/recordio/capnproto/recordio_capnproto_test.go @@ -0,0 +1,186 @@ +package capnproto + +import ( + "bufio" + "math/rand" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/thomasjungblut/go-sstables/recordio" + "github.com/thomasjungblut/go-sstables/recordio/test_files" + + "capnproto.org/go/capnp/v3" +) + +const TestFile = "../test_files/berlin52.tsp" + +func TestReadWriteEndToEndCapnProto(t *testing.T) { + tmpFile, err := os.CreateTemp("", "recordio_EndToEndCapnProto") + require.NoError(t, err) + defer func() { require.NoError(t, os.Remove(tmpFile.Name())) }() + writer, err := NewWriter(File(tmpFile)) + require.NoError(t, err) + + endToEndReadWriteCapnProto(writer, t, tmpFile) +} + +func TestReadWriteEndToEndGzipCapnProto(t *testing.T) { + tmpFile, err := os.CreateTemp("", "recordio_EndToEndGzipCapnProto") + require.NoError(t, err) + defer func() { require.NoError(t, os.Remove(tmpFile.Name())) }() + writer, err := NewWriter(File(tmpFile), CompressionType(recordio.CompressionTypeGZIP)) + require.NoError(t, err) + + endToEndReadWriteCapnProto(writer, t, tmpFile) +} + +func TestReadWriteEndToEndSnappyCapnProto(t *testing.T) { + tmpFile, err := os.CreateTemp("", "recordio_EndToEndSnappyCapnProto") + require.NoError(t, err) + defer func() { require.NoError(t, os.Remove(tmpFile.Name())) }() + writer, err := NewWriter(File(tmpFile), CompressionType(recordio.CompressionTypeSnappy)) + require.NoError(t, err) + + endToEndReadWriteCapnProto(writer, t, tmpFile) +} + +func endToEndReadWriteCapnProto(writer WriterI, t *testing.T, tmpFile *os.File) { + // we're reading the file line by line and try to read it back and assert the same content + inFile, err := os.Open(TestFile) + require.NoError(t, err) + require.NoError(t, writer.Open()) + + numRead := 0 + scanner := bufio.NewScanner(inFile) + scanner.Split(bufio.ScanLines) + arena := capnp.SingleSegment(nil) + msg, seg, err := capnp.NewMessage(arena) + require.NoError(t, err) + for scanner.Scan() { + lineMsg, err := test_files.NewRootTextLineCapnProto(seg) + require.NoError(t, err) + lineMsg.SetLineNumber(int32(numRead)) + require.NoError(t, lineMsg.SetLine(scanner.Text())) + _, err = writer.Write(msg) + require.NoError(t, err) + numRead++ + } + require.NoError(t, scanner.Err()) + assert.Equal(t, 59, numRead) + require.NoError(t, writer.Close()) + require.NoError(t, inFile.Close()) + + reader, err := NewReader(ReaderPath(tmpFile.Name())) + require.NoError(t, err) + require.NoError(t, reader.Open()) + + inFile, err = os.Open(TestFile) + require.NoError(t, err) + scanner = bufio.NewScanner(inFile) + scanner.Split(bufio.ScanLines) + numRead = 0 + for scanner.Scan() { + msg, err := reader.ReadNext() + require.NoError(t, err) + + textLine, err := test_files.ReadRootTextLineCapnProto(msg) + require.NoError(t, err) + + line, err := textLine.Line() + require.NoError(t, err) + assert.Equal(t, numRead, int(textLine.LineNumber())) + assert.Equal(t, scanner.Text(), line) + numRead++ + } + require.NoError(t, scanner.Err()) + assert.Equal(t, 59, numRead) + require.NoError(t, reader.Close()) + require.NoError(t, inFile.Close()) +} + +func TestRandomReadWriteEndToEndCapnProto(t *testing.T) { + tmpFile, err := os.CreateTemp("", "recordio_EndToEndCapnProto") + require.NoError(t, err) + defer func() { require.NoError(t, os.Remove(tmpFile.Name())) }() + writer, err := NewWriter(File(tmpFile)) + require.NoError(t, err) + + endToEndRandomReadWriteCapnProto(writer, t, tmpFile) +} + +func TestRandomReadWriteEndToEndGzipCapnProto(t *testing.T) { + tmpFile, err := os.CreateTemp("", "recordio_EndToEndGzipCapnProto") + require.NoError(t, err) + defer func() { require.NoError(t, os.Remove(tmpFile.Name())) }() + writer, err := NewWriter(File(tmpFile), CompressionType(recordio.CompressionTypeGZIP)) + require.NoError(t, err) + + endToEndRandomReadWriteCapnProto(writer, t, tmpFile) +} + +func TestRandomReadWriteEndToEndSnappyCapnProto(t *testing.T) { + tmpFile, err := os.CreateTemp("", "recordio_EndToEndSnappyCapnProto") + require.NoError(t, err) + defer func() { require.NoError(t, os.Remove(tmpFile.Name())) }() + writer, err := NewWriter(File(tmpFile), CompressionType(recordio.CompressionTypeSnappy)) + require.NoError(t, err) + + endToEndRandomReadWriteCapnProto(writer, t, tmpFile) +} + +func endToEndRandomReadWriteCapnProto(writer WriterI, t *testing.T, tmpFile *os.File) { + // same idea as above, but we're testing the random read via mmap + inFile, err := os.Open(TestFile) + require.NoError(t, err) + require.NoError(t, writer.Open()) + + var lines []string + offsetMap := make(map[string]uint64) + numRead := 0 + scanner := bufio.NewScanner(inFile) + scanner.Split(bufio.ScanLines) + arena := capnp.SingleSegment(nil) + msg, seg, err := capnp.NewMessage(arena) + require.NoError(t, err) + for scanner.Scan() { + line := scanner.Text() + lineMsg, err := test_files.NewRootTextLineCapnProto(seg) + require.NoError(t, err) + lineMsg.SetLineNumber(int32(numRead)) + require.NoError(t, lineMsg.SetLine(scanner.Text())) + offset, err := writer.Write(msg) + offsetMap[line] = offset + lines = append(lines, line) + require.NoError(t, err) + numRead++ + } + require.NoError(t, scanner.Err()) + assert.Equal(t, 59, numRead) + require.NoError(t, writer.Close()) + require.NoError(t, inFile.Close()) + + reader, err := NewMMapCapnProtoReaderWithPath(tmpFile.Name()) + require.NoError(t, err) + require.NoError(t, reader.Open()) + + // we shuffle the lines, so we can test the actual random read behaviour + rand.Shuffle(len(lines), func(i, j int) { + lines[i], lines[j] = lines[j], lines[i] + }) + + numRead = 0 + for _, s := range lines { + offset := offsetMap[s] + msg, err := reader.ReadNextAt(offset) + require.NoError(t, err) + textLine, err := test_files.ReadRootTextLineCapnProto(msg) + require.NoError(t, err) + line, err := textLine.Line() + assert.Equal(t, s, line) + numRead++ + } + assert.Equal(t, 59, numRead) + require.NoError(t, reader.Close()) +} diff --git a/recordio/test_files/text_line.capnp b/recordio/test_files/text_line.capnp new file mode 100644 index 0000000..8e8779f --- /dev/null +++ b/recordio/test_files/text_line.capnp @@ -0,0 +1,9 @@ +@0xbf8381dfd6a0d017; +using Go = import "/go.capnp"; +$Go.package("test_files"); +$Go.import("recordio/test_files"); + +struct TextLineCapnProto { + lineNumber @0 :Int32; + line @1 :Text; +} diff --git a/recordio/test_files/text_line.capnp.go b/recordio/test_files/text_line.capnp.go new file mode 100644 index 0000000..5db6a5a --- /dev/null +++ b/recordio/test_files/text_line.capnp.go @@ -0,0 +1,123 @@ +// Code generated by capnpc-go. DO NOT EDIT. + +package test_files + +import ( + capnp "capnproto.org/go/capnp/v3" + text "capnproto.org/go/capnp/v3/encoding/text" + schemas "capnproto.org/go/capnp/v3/schemas" +) + +type TextLineCapnProto capnp.Struct + +// TextLineCapnProto_TypeID is the unique identifier for the type TextLineCapnProto. +const TextLineCapnProto_TypeID = 0xc5ef92ce21f07260 + +func NewTextLineCapnProto(s *capnp.Segment) (TextLineCapnProto, error) { + st, err := capnp.NewStruct(s, capnp.ObjectSize{DataSize: 8, PointerCount: 1}) + return TextLineCapnProto(st), err +} + +func NewRootTextLineCapnProto(s *capnp.Segment) (TextLineCapnProto, error) { + st, err := capnp.NewRootStruct(s, capnp.ObjectSize{DataSize: 8, PointerCount: 1}) + return TextLineCapnProto(st), err +} + +func ReadRootTextLineCapnProto(msg *capnp.Message) (TextLineCapnProto, error) { + root, err := msg.Root() + return TextLineCapnProto(root.Struct()), err +} + +func (s TextLineCapnProto) String() string { + str, _ := text.Marshal(0xc5ef92ce21f07260, capnp.Struct(s)) + return str +} + +func (s TextLineCapnProto) EncodeAsPtr(seg *capnp.Segment) capnp.Ptr { + return capnp.Struct(s).EncodeAsPtr(seg) +} + +func (TextLineCapnProto) DecodeFromPtr(p capnp.Ptr) TextLineCapnProto { + return TextLineCapnProto(capnp.Struct{}.DecodeFromPtr(p)) +} + +func (s TextLineCapnProto) ToPtr() capnp.Ptr { + return capnp.Struct(s).ToPtr() +} +func (s TextLineCapnProto) IsValid() bool { + return capnp.Struct(s).IsValid() +} + +func (s TextLineCapnProto) Message() *capnp.Message { + return capnp.Struct(s).Message() +} + +func (s TextLineCapnProto) Segment() *capnp.Segment { + return capnp.Struct(s).Segment() +} +func (s TextLineCapnProto) LineNumber() int32 { + return int32(capnp.Struct(s).Uint32(0)) +} + +func (s TextLineCapnProto) SetLineNumber(v int32) { + capnp.Struct(s).SetUint32(0, uint32(v)) +} + +func (s TextLineCapnProto) Line() (string, error) { + p, err := capnp.Struct(s).Ptr(0) + return p.Text(), err +} + +func (s TextLineCapnProto) HasLine() bool { + return capnp.Struct(s).HasPtr(0) +} + +func (s TextLineCapnProto) LineBytes() ([]byte, error) { + p, err := capnp.Struct(s).Ptr(0) + return p.TextBytes(), err +} + +func (s TextLineCapnProto) SetLine(v string) error { + return capnp.Struct(s).SetText(0, v) +} + +// TextLineCapnProto_List is a list of TextLineCapnProto. +type TextLineCapnProto_List = capnp.StructList[TextLineCapnProto] + +// NewTextLineCapnProto creates a new list of TextLineCapnProto. +func NewTextLineCapnProto_List(s *capnp.Segment, sz int32) (TextLineCapnProto_List, error) { + l, err := capnp.NewCompositeList(s, capnp.ObjectSize{DataSize: 8, PointerCount: 1}, sz) + return capnp.StructList[TextLineCapnProto](l), err +} + +// TextLineCapnProto_Future is a wrapper for a TextLineCapnProto promised by a client call. +type TextLineCapnProto_Future struct{ *capnp.Future } + +func (f TextLineCapnProto_Future) Struct() (TextLineCapnProto, error) { + p, err := f.Future.Ptr() + return TextLineCapnProto(p.Struct()), err +} + +const schema_bf8381dfd6a0d017 = "x\xda\x12Hu`1\xe4\xdd\xcf\xc8\xc0\x14(\xc2\xca" + + "\xf6?\xa1\xe8\x83\xe2\xb9I\xef\x8f2\x04\xaa02\xfe" + + "\x17\xbf\xb0\xe0\xda\xfd\xc6\xe6\xfd\x0c\xac\x8c\xec\x0c\x0c\x86" + + "G71\x0a\xdeeg`\x10\xbcY\xce\xa0\xfb\xbf(" + + "59\xbf(%3\x9fU\xbf$\xb5\xb8$>-3" + + "'\xb5X\xbf$\xb5\xa2$>'3/U/9\xb1" + + " \xaf\xc0*$\xb5\xa2\xc4'3/\xd59\xb1 O" + + ">\xa0(\xbf$?\x80\x911\x90\x83\x99\x85\x81\x81\x85" + + "\x91\x81AP3\x8a\x81!P\x83\x991\xd0\x84\x89\x91" + + "\x91Q\x84\x11$f\xa8\xc5\xc0\x10\xa8\xc3\xcc\x18h\xc1" + + "\xc4\xf8\x1fd\x96_in\x12\x03sj\x11#\x0b\x03" + + "\x13#\x0b\x03#?H\x90\x91\x87\x81\x89\x91\x87\x81\x11" + + "\x10\x00\x00\xff\xff\x10\xa33\xb6" + +func RegisterSchema(reg *schemas.Registry) { + reg.Register(&schemas.Schema{ + String: schema_bf8381dfd6a0d017, + Nodes: []uint64{ + 0xc5ef92ce21f07260, + }, + Compressed: true, + }) +}