@@ -62,17 +62,23 @@ type BlockService interface {
6262
6363 // DeleteBlock deletes the given block from the blockservice.
6464 DeleteBlock (ctx context.Context , o cid.Cid ) error
65- }
66-
67- // BoundedBlockService is a Blockservice bounded via strict multihash Allowlist.
68- type BoundedBlockService interface {
69- BlockService
7065
71- Allowlist () verifcid.Allowlist
66+ // NewSession creates a new session that allows for
67+ // controlled exchange of wantlists to decrease the bandwidth overhead.
68+ // If the current exchange is a [fetcher.SessionExchange], a new exchange
69+ // session will be created. Otherwise, the current exchange will be used
70+ // directly.
71+ // Sessions are lazily setup, this is cheap.
72+ NewSession (context.Context ) BlockGetter
73+
74+ // ContextWithSession is creates a context with an embded session,
75+ // future calls to [BlockService.GetBlock], [BlockService.GetBlocks] and [BlockService.NewSession]
76+ // will be redirected to this same session instead.
77+ // Sessions are lazily setup, this is cheap.
78+ // It wont make a new session if one exists already in the context.
79+ ContextWithSession (ctx context.Context ) context.Context
7280}
7381
74- var _ BoundedBlockService = (* blockService )(nil )
75-
7682type blockService struct {
7783 allowlist verifcid.Allowlist
7884 blockstore blockstore.Blockstore
@@ -141,24 +147,25 @@ func (s *blockService) Allowlist() verifcid.Allowlist {
141147 return s .allowlist
142148}
143149
144- // NewSession creates a new session that allows for
145- // controlled exchange of wantlists to decrease the bandwidth overhead.
146- // If the current exchange is a SessionExchange, a new exchange
147- // session will be created. Otherwise, the current exchange will be used
148- // directly.
149- // Sessions are lazily setup, this is cheap.
150- func NewSession (ctx context.Context , bs BlockService ) * Session {
151- ses := grabSessionFromContext (ctx , bs )
150+ func (s * blockService ) NewSession (ctx context.Context ) BlockGetter {
151+ ses := s .grabSessionFromContext (ctx )
152152 if ses != nil {
153153 return ses
154154 }
155155
156- return newSession (ctx , bs )
156+ return s . newSession (ctx )
157157}
158158
159159// newSession is like [NewSession] but it does not attempt to reuse session from the existing context.
160- func newSession (ctx context.Context , bs BlockService ) * Session {
161- return & Session {bs : bs , sesctx : ctx }
160+ func (s * blockService ) newSession (ctx context.Context ) * session {
161+ return & session {bs : s , sesctx : ctx }
162+ }
163+
164+ func (s * blockService ) ContextWithSession (ctx context.Context ) context.Context {
165+ if s .grabSessionFromContext (ctx ) != nil {
166+ return ctx
167+ }
168+ return context .WithValue (ctx , s , s .newSession (ctx ))
162169}
163170
164171// AddBlock adds a particular block to the service, Putting it into the datastore.
@@ -240,30 +247,27 @@ func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error {
240247// GetBlock retrieves a particular block from the service,
241248// Getting it from the datastore using the key (hash).
242249func (s * blockService ) GetBlock (ctx context.Context , c cid.Cid ) (blocks.Block , error ) {
243- if ses := grabSessionFromContext (ctx , s ); ses != nil {
250+ if ses := s . grabSessionFromContext (ctx ); ses != nil {
244251 return ses .GetBlock (ctx , c )
245252 }
246253
247254 ctx , span := internal .StartSpan (ctx , "blockService.GetBlock" , trace .WithAttributes (attribute .Stringer ("CID" , c )))
248255 defer span .End ()
249256
250- return getBlock (ctx , c , s , s .getExchangeFetcher )
257+ return s . getBlock (ctx , c , s .getExchangeFetcher )
251258}
252259
253- // Look at what I have to do, no interface covariance :'(
254260func (s * blockService ) getExchangeFetcher () exchange.Fetcher {
255261 return s .exchange
256262}
257263
258- func getBlock (ctx context.Context , c cid.Cid , bs BlockService , fetchFactory func () exchange.Fetcher ) (blocks.Block , error ) {
259- err := verifcid .ValidateCid (grabAllowlistFromBlockservice ( bs ) , c ) // hash security
264+ func ( s * blockService ) getBlock (ctx context.Context , c cid.Cid , fetchFactory func () exchange.Fetcher ) (blocks.Block , error ) {
265+ err := verifcid .ValidateCid (s . allowlist , c ) // hash security
260266 if err != nil {
261267 return nil , err
262268 }
263269
264- blockstore := bs .Blockstore ()
265-
266- block , err := blockstore .Get (ctx , c )
270+ block , err := s .blockstore .Get (ctx , c )
267271 switch {
268272 case err == nil :
269273 return block , nil
@@ -285,12 +289,12 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func
285289 return nil , err
286290 }
287291 // also write in the blockstore for caching, inform the exchange that the block is available
288- err = blockstore .Put (ctx , blk )
292+ err = s . blockstore .Put (ctx , blk )
289293 if err != nil {
290294 return nil , err
291295 }
292- if ex := bs . Exchange (); ex != nil {
293- err = ex .NotifyNewBlocks (ctx , blk )
296+ if s . exchange != nil {
297+ err = s . exchange .NotifyNewBlocks (ctx , blk )
294298 if err != nil {
295299 return nil , err
296300 }
@@ -303,28 +307,26 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func
303307// the returned channel.
304308// NB: No guarantees are made about order.
305309func (s * blockService ) GetBlocks (ctx context.Context , ks []cid.Cid ) <- chan blocks.Block {
306- if ses := grabSessionFromContext (ctx , s ); ses != nil {
310+ if ses := s . grabSessionFromContext (ctx ); ses != nil {
307311 return ses .GetBlocks (ctx , ks )
308312 }
309313
310314 ctx , span := internal .StartSpan (ctx , "blockService.GetBlocks" )
311315 defer span .End ()
312316
313- return getBlocks (ctx , ks , s , s .getExchangeFetcher )
317+ return s . getBlocks (ctx , ks , s .getExchangeFetcher )
314318}
315319
316- func getBlocks (ctx context.Context , ks []cid.Cid , blockservice BlockService , fetchFactory func () exchange.Fetcher ) <- chan blocks.Block {
320+ func ( s * blockService ) getBlocks (ctx context.Context , ks []cid.Cid , fetchFactory func () exchange.Fetcher ) <- chan blocks.Block {
317321 out := make (chan blocks.Block )
318322
319323 go func () {
320324 defer close (out )
321325
322- allowlist := grabAllowlistFromBlockservice (blockservice )
323-
324326 var lastAllValidIndex int
325327 var c cid.Cid
326328 for lastAllValidIndex , c = range ks {
327- if err := verifcid .ValidateCid (allowlist , c ); err != nil {
329+ if err := verifcid .ValidateCid (s . allowlist , c ); err != nil {
328330 break
329331 }
330332 }
@@ -335,7 +337,7 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet
335337 copy (ks2 , ks [:lastAllValidIndex ]) // fast path for already filtered elements
336338 for _ , c := range ks [lastAllValidIndex :] { // don't rescan already scanned elements
337339 // hash security
338- if err := verifcid .ValidateCid (allowlist , c ); err == nil {
340+ if err := verifcid .ValidateCid (s . allowlist , c ); err == nil {
339341 ks2 = append (ks2 , c )
340342 } else {
341343 logger .Errorf ("unsafe CID (%s) passed to blockService.GetBlocks: %s" , c , err )
@@ -344,11 +346,9 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet
344346 ks = ks2
345347 }
346348
347- bs := blockservice .Blockstore ()
348-
349349 var misses []cid.Cid
350350 for _ , c := range ks {
351- hit , err := bs .Get (ctx , c )
351+ hit , err := s . blockstore .Get (ctx , c )
352352 if err != nil {
353353 misses = append (misses , c )
354354 continue
@@ -371,7 +371,6 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet
371371 return
372372 }
373373
374- ex := blockservice .Exchange ()
375374 var cache [1 ]blocks.Block // preallocate once for all iterations
376375 for {
377376 var b blocks.Block
@@ -386,16 +385,16 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet
386385 }
387386
388387 // write in the blockstore for caching
389- err = bs .Put (ctx , b )
388+ err = s . blockstore .Put (ctx , b )
390389 if err != nil {
391390 logger .Errorf ("could not write blocks from the network to the blockstore: %s" , err )
392391 return
393392 }
394393
395- if ex != nil {
394+ if s . exchange != nil {
396395 // inform the exchange that the blocks are available
397396 cache [0 ] = b
398- err = ex .NotifyNewBlocks (ctx , cache [:]... )
397+ err = s . exchange .NotifyNewBlocks (ctx , cache [:]... )
399398 if err != nil {
400399 logger .Errorf ("could not tell the exchange about new blocks: %s" , err )
401400 return
@@ -433,16 +432,16 @@ func (s *blockService) Close() error {
433432 return s .exchange .Close ()
434433}
435434
436- // Session is a helper type to provide higher level access to bitswap sessions
437- type Session struct {
435+ // session is a helper type to provide higher level access to bitswap sessions
436+ type session struct {
438437 createSession sync.Once
439- bs BlockService
438+ bs * blockService
440439 ses exchange.Fetcher
441440 sesctx context.Context
442441}
443442
444443// grabSession is used to lazily create sessions.
445- func (s * Session ) grabSession () exchange.Fetcher {
444+ func (s * session ) grabSession () exchange.Fetcher {
446445 s .createSession .Do (func () {
447446 defer func () {
448447 s .sesctx = nil // early gc
@@ -465,64 +464,38 @@ func (s *Session) grabSession() exchange.Fetcher {
465464}
466465
467466// GetBlock gets a block in the context of a request session
468- func (s * Session ) GetBlock (ctx context.Context , c cid.Cid ) (blocks.Block , error ) {
469- ctx , span := internal .StartSpan (ctx , "Session .GetBlock" , trace .WithAttributes (attribute .Stringer ("CID" , c )))
467+ func (s * session ) GetBlock (ctx context.Context , c cid.Cid ) (blocks.Block , error ) {
468+ ctx , span := internal .StartSpan (ctx , "session .GetBlock" , trace .WithAttributes (attribute .Stringer ("CID" , c )))
470469 defer span .End ()
471470
472- return getBlock (ctx , c , s . bs , s .grabSession )
471+ return s . bs . getBlock (ctx , c , s .grabSession )
473472}
474473
475474// GetBlocks gets blocks in the context of a request session
476- func (s * Session ) GetBlocks (ctx context.Context , ks []cid.Cid ) <- chan blocks.Block {
477- ctx , span := internal .StartSpan (ctx , "Session .GetBlocks" )
475+ func (s * session ) GetBlocks (ctx context.Context , ks []cid.Cid ) <- chan blocks.Block {
476+ ctx , span := internal .StartSpan (ctx , "session .GetBlocks" )
478477 defer span .End ()
479478
480- return getBlocks (ctx , ks , s .bs , s .grabSession )
481- }
482-
483- var _ BlockGetter = (* Session )(nil )
484-
485- // ContextWithSession is a helper which creates a context with an embded session,
486- // future calls to [BlockGetter.GetBlock], [BlockGetter.GetBlocks] and [NewSession] with the same [BlockService]
487- // will be redirected to this same session instead.
488- // Sessions are lazily setup, this is cheap.
489- // It wont make a new session if one exists already in the context.
490- func ContextWithSession (ctx context.Context , bs BlockService ) context.Context {
491- if grabSessionFromContext (ctx , bs ) != nil {
492- return ctx
493- }
494- return EmbedSessionInContext (ctx , newSession (ctx , bs ))
479+ return s .bs .getBlocks (ctx , ks , s .grabSession )
495480}
496481
497- // EmbedSessionInContext is like [ContextWithSession] but it allows to embed an existing session.
498- func EmbedSessionInContext (ctx context.Context , ses * Session ) context.Context {
499- // use ses.bs as a key, so if multiple blockservices use embeded sessions it gets dispatched to the matching blockservice.
500- return context .WithValue (ctx , ses .bs , ses )
501- }
482+ var _ BlockGetter = (* session )(nil )
502483
503484// grabSessionFromContext returns nil if the session was not found
504485// This is a private API on purposes, I dislike when consumers tradeoff compiletime typesafety with runtime typesafety,
505- // if this API is public it is too easy to forget to pass a [BlockService] or [Session ] object around in your app.
486+ // if this API is public it is too easy to forget to pass a [BlockService] or [session ] object around in your app.
506487// By having this private we allow consumers to follow the trace of where the blockservice is passed and used.
507- func grabSessionFromContext (ctx context.Context , bs BlockService ) * Session {
508- s := ctx .Value (bs )
488+ func ( s * blockService ) grabSessionFromContext (ctx context.Context ) * session {
489+ ss := ctx .Value (s )
509490 if s == nil {
510491 return nil
511492 }
512493
513- ss , ok := s .(* Session )
494+ sss , ok := ss .(* session )
514495 if ! ok {
515496 // idk what to do here, that kinda sucks, giveup
516497 return nil
517498 }
518499
519- return ss
520- }
521-
522- // grabAllowlistFromBlockservice never returns nil
523- func grabAllowlistFromBlockservice (bs BlockService ) verifcid.Allowlist {
524- if bbs , ok := bs .(BoundedBlockService ); ok {
525- return bbs .Allowlist ()
526- }
527- return verifcid .DefaultAllowlist
500+ return sss
528501}
0 commit comments