Skip to content

Commit 145fb89

Browse files
author
Alexander Lavrukov
committed
separate-projections-poc: MigrationProjectionCache
1 parent 443869c commit 145fb89

File tree

8 files changed

+103
-146
lines changed

8 files changed

+103
-146
lines changed

repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryTable.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -405,25 +405,25 @@ public T insert(T tt) {
405405
T t = tt.preSave();
406406
transaction.getWatcher().markRowRead(type, t.getId());
407407
transaction.doInWriteTransaction("insert(" + t + ")", type, shard -> shard.insert(t));
408+
transaction.getTransactionLocal().projectionCache().save(transaction, t);
408409
transaction.getTransactionLocal().firstLevelCache().put(t);
409-
transaction.getTransactionLocal().projectionCache().save(t);
410410
return t;
411411
}
412412

413413
@Override
414414
public T save(T tt) {
415415
T t = tt.preSave();
416416
transaction.doInWriteTransaction("save(" + t + ")", type, shard -> shard.save(t));
417+
transaction.getTransactionLocal().projectionCache().save(transaction, t);
417418
transaction.getTransactionLocal().firstLevelCache().put(t);
418-
transaction.getTransactionLocal().projectionCache().save(t);
419419
return t;
420420
}
421421

422422
@Override
423423
public void delete(Entity.Id<T> id) {
424424
transaction.doInWriteTransaction("delete(" + id + ")", type, shard -> shard.delete(id));
425+
transaction.getTransactionLocal().projectionCache().delete(transaction, id);
425426
transaction.getTransactionLocal().firstLevelCache().putEmpty(id);
426-
transaction.getTransactionLocal().projectionCache().delete(id);
427427
}
428428

429429
@Override

repository-ydb-v1/src/main/java/tech/ydb/yoj/repository/ydb/table/YdbTable.java

+9-58
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@
1313
import tech.ydb.yoj.repository.db.EntitySchema;
1414
import tech.ydb.yoj.repository.db.Range;
1515
import tech.ydb.yoj.repository.db.Table;
16-
import tech.ydb.yoj.repository.db.Tx;
1716
import tech.ydb.yoj.repository.db.ViewSchema;
1817
import tech.ydb.yoj.repository.db.bulk.BulkParams;
1918
import tech.ydb.yoj.repository.db.cache.FirstLevelCache;
2019
import tech.ydb.yoj.repository.db.cache.TransactionLocal;
2120
import tech.ydb.yoj.repository.db.readtable.ReadTableParams;
2221
import tech.ydb.yoj.repository.db.statement.Changeset;
22+
import tech.ydb.yoj.repository.ydb.YdbRepositoryTransaction;
2323
import tech.ydb.yoj.repository.ydb.bulk.BulkMapper;
2424
import tech.ydb.yoj.repository.ydb.bulk.BulkMapperImpl;
2525
import tech.ydb.yoj.repository.ydb.readtable.EntityIdKeyMapper;
@@ -54,17 +54,17 @@
5454
import static tech.ydb.yoj.repository.db.EntityExpressions.defaultOrder;
5555

5656
public class YdbTable<T extends Entity<T>> implements Table<T> {
57-
private final QueryExecutor executor;
57+
private final YdbRepositoryTransaction<?> executor;
5858
@Getter
5959
private final Class<T> type;
6060

61-
public YdbTable(Class<T> type, QueryExecutor executor) {
61+
public YdbTable(Class<T> type, YdbRepositoryTransaction<?> executor) {
6262
this.type = type;
63-
this.executor = new CheckingQueryExecutor(executor);
63+
this.executor = executor;
6464
}
6565

6666
protected YdbTable(QueryExecutor executor) {
67-
this.executor = new CheckingQueryExecutor(executor);
67+
this.executor = (YdbRepositoryTransaction<?>) executor;
6868
this.type = resolveEntityType();
6969
}
7070

@@ -420,25 +420,25 @@ public void update(Entity.Id<T> id, Changeset changeset) {
420420
public T insert(T t) {
421421
T entityToSave = t.preSave();
422422
executor.pendingExecute(YqlStatement.insert(type), entityToSave);
423+
executor.getTransactionLocal().projectionCache().save(executor, entityToSave);
423424
executor.getTransactionLocal().firstLevelCache().put(entityToSave);
424-
executor.getTransactionLocal().projectionCache().save(entityToSave);
425425
return t;
426426
}
427427

428428
@Override
429429
public T save(T t) {
430430
T entityToSave = t.preSave();
431431
executor.pendingExecute(YqlStatement.save(type), entityToSave);
432+
executor.getTransactionLocal().projectionCache().save(executor, entityToSave);
432433
executor.getTransactionLocal().firstLevelCache().put(entityToSave);
433-
executor.getTransactionLocal().projectionCache().save(entityToSave);
434434
return t;
435435
}
436436

437437
@Override
438438
public void delete(Entity.Id<T> id) {
439439
executor.pendingExecute(YqlStatement.delete(type), id);
440+
executor.getTransactionLocal().projectionCache().delete(executor, id);
440441
executor.getTransactionLocal().firstLevelCache().putEmpty(id);
441-
executor.getTransactionLocal().projectionCache().delete(id);
442442
}
443443

444444
/**
@@ -458,7 +458,7 @@ public <ID extends Id<T>> void migrate(ID id) {
458458
T rawEntity = foundRaw.get(0);
459459
T entityToSave = rawEntity.postLoad().preSave();
460460
executor.pendingExecute(YqlStatement.save(type), entityToSave);
461-
executor.getTransactionLocal().projectionCache().save(entityToSave);
461+
executor.getTransactionLocal().projectionCache().save(executor, entityToSave);
462462
}
463463

464464
@Override
@@ -494,55 +494,6 @@ default <IN> void bulkUpsert(BulkMapper<IN> mapper, List<IN> input, BulkParams p
494494
TransactionLocal getTransactionLocal();
495495
}
496496

497-
public static class CheckingQueryExecutor implements QueryExecutor {
498-
private final QueryExecutor delegate;
499-
private final Tx originTx;
500-
501-
public CheckingQueryExecutor(QueryExecutor delegate) {
502-
this.delegate = delegate;
503-
this.originTx = Tx.Current.exists() ? Tx.Current.get() : null;
504-
}
505-
506-
private void check() {
507-
Tx.checkSameTx(originTx);
508-
}
509-
510-
@Override
511-
public <PARAMS, RESULT> List<RESULT> execute(Statement<PARAMS, RESULT> statement, PARAMS params) {
512-
check();
513-
return delegate.execute(statement, params);
514-
}
515-
516-
@Override
517-
public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT> statement, PARAMS params) {
518-
return delegate.executeScanQuery(statement, params);
519-
}
520-
521-
@Override
522-
public <PARAMS> void pendingExecute(Statement<PARAMS, ?> statement, PARAMS value) {
523-
check();
524-
delegate.pendingExecute(statement, value);
525-
}
526-
527-
@Override
528-
public <IN> void bulkUpsert(BulkMapper<IN> mapper, List<IN> input, BulkParams params) {
529-
check();
530-
delegate.bulkUpsert(mapper, input, params);
531-
}
532-
533-
@Override
534-
public <IN, OUT> Stream<OUT> readTable(ReadTableMapper<IN, OUT> mapper, ReadTableParams<IN> params) {
535-
check();
536-
return delegate.readTable(mapper, params);
537-
}
538-
539-
@Override
540-
public TransactionLocal getTransactionLocal() {
541-
check();
542-
return delegate.getTransactionLocal();
543-
}
544-
}
545-
546497
public <ID extends Id<T>> void updateIn(Collection<ID> ids, Changeset changeset) {
547498
var params = new UpdateInStatement.UpdateInStatementInput<>(ids, changeset.toMap());
548499

repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/table/YdbTable.java

+10-58
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@
1313
import tech.ydb.yoj.repository.db.EntitySchema;
1414
import tech.ydb.yoj.repository.db.Range;
1515
import tech.ydb.yoj.repository.db.Table;
16-
import tech.ydb.yoj.repository.db.Tx;
1716
import tech.ydb.yoj.repository.db.ViewSchema;
1817
import tech.ydb.yoj.repository.db.bulk.BulkParams;
1918
import tech.ydb.yoj.repository.db.cache.FirstLevelCache;
2019
import tech.ydb.yoj.repository.db.cache.TransactionLocal;
2120
import tech.ydb.yoj.repository.db.readtable.ReadTableParams;
2221
import tech.ydb.yoj.repository.db.statement.Changeset;
22+
import tech.ydb.yoj.repository.ydb.YdbRepositoryTransaction;
2323
import tech.ydb.yoj.repository.ydb.bulk.BulkMapper;
2424
import tech.ydb.yoj.repository.ydb.bulk.BulkMapperImpl;
2525
import tech.ydb.yoj.repository.ydb.readtable.EntityIdKeyMapper;
@@ -54,20 +54,21 @@
5454
import static tech.ydb.yoj.repository.db.EntityExpressions.defaultOrder;
5555

5656
public class YdbTable<T extends Entity<T>> implements Table<T> {
57-
private final QueryExecutor executor;
57+
private final YdbRepositoryTransaction<?> executor;
5858
@Getter
5959
private final Class<T> type;
6060

61-
public YdbTable(Class<T> type, QueryExecutor executor) {
61+
public YdbTable(Class<T> type, YdbRepositoryTransaction<?> executor) {
6262
this.type = type;
63-
this.executor = new CheckingQueryExecutor(executor);
63+
this.executor = executor;
6464
}
6565

6666
protected YdbTable(QueryExecutor executor) {
67-
this.executor = new CheckingQueryExecutor(executor);
67+
this.executor = (YdbRepositoryTransaction<?>) executor;
6868
this.type = resolveEntityType();
6969
}
7070

71+
7172
@SuppressWarnings("unchecked")
7273
private Class<T> resolveEntityType() {
7374
return (Class<T>) (new TypeToken<T>(getClass()) {
@@ -421,7 +422,7 @@ public T insert(T t) {
421422
T entityToSave = t.preSave();
422423
executor.pendingExecute(YqlStatement.insert(type), entityToSave);
423424
executor.getTransactionLocal().firstLevelCache().put(entityToSave);
424-
executor.getTransactionLocal().projectionCache().save(entityToSave);
425+
executor.getTransactionLocal().projectionCache().save(executor, entityToSave);
425426
return t;
426427
}
427428

@@ -430,15 +431,15 @@ public T save(T t) {
430431
T entityToSave = t.preSave();
431432
executor.pendingExecute(YqlStatement.save(type), entityToSave);
432433
executor.getTransactionLocal().firstLevelCache().put(entityToSave);
433-
executor.getTransactionLocal().projectionCache().save(entityToSave);
434+
executor.getTransactionLocal().projectionCache().save(executor, entityToSave);
434435
return t;
435436
}
436437

437438
@Override
438439
public void delete(Entity.Id<T> id) {
439440
executor.pendingExecute(YqlStatement.delete(type), id);
440441
executor.getTransactionLocal().firstLevelCache().putEmpty(id);
441-
executor.getTransactionLocal().projectionCache().delete(id);
442+
executor.getTransactionLocal().projectionCache().delete(executor, id);
442443
}
443444

444445
/**
@@ -458,7 +459,7 @@ public <ID extends Id<T>> void migrate(ID id) {
458459
T rawEntity = foundRaw.get(0);
459460
T entityToSave = rawEntity.postLoad().preSave();
460461
executor.pendingExecute(YqlStatement.save(type), entityToSave);
461-
executor.getTransactionLocal().projectionCache().save(entityToSave);
462+
executor.getTransactionLocal().projectionCache().save(executor, entityToSave);
462463
}
463464

464465
@Override
@@ -494,55 +495,6 @@ default <IN> void bulkUpsert(BulkMapper<IN> mapper, List<IN> input, BulkParams p
494495
TransactionLocal getTransactionLocal();
495496
}
496497

497-
public static class CheckingQueryExecutor implements QueryExecutor {
498-
private final QueryExecutor delegate;
499-
private final Tx originTx;
500-
501-
public CheckingQueryExecutor(QueryExecutor delegate) {
502-
this.delegate = delegate;
503-
this.originTx = Tx.Current.exists() ? Tx.Current.get() : null;
504-
}
505-
506-
private void check() {
507-
Tx.checkSameTx(originTx);
508-
}
509-
510-
@Override
511-
public <PARAMS, RESULT> List<RESULT> execute(Statement<PARAMS, RESULT> statement, PARAMS params) {
512-
check();
513-
return delegate.execute(statement, params);
514-
}
515-
516-
@Override
517-
public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT> statement, PARAMS params) {
518-
return delegate.executeScanQuery(statement, params);
519-
}
520-
521-
@Override
522-
public <PARAMS> void pendingExecute(Statement<PARAMS, ?> statement, PARAMS value) {
523-
check();
524-
delegate.pendingExecute(statement, value);
525-
}
526-
527-
@Override
528-
public <IN> void bulkUpsert(BulkMapper<IN> mapper, List<IN> input, BulkParams params) {
529-
check();
530-
delegate.bulkUpsert(mapper, input, params);
531-
}
532-
533-
@Override
534-
public <IN, OUT> Stream<OUT> readTable(ReadTableMapper<IN, OUT> mapper, ReadTableParams<IN> params) {
535-
check();
536-
return delegate.readTable(mapper, params);
537-
}
538-
539-
@Override
540-
public TransactionLocal getTransactionLocal() {
541-
check();
542-
return delegate.getTransactionLocal();
543-
}
544-
}
545-
546498
public <ID extends Id<T>> void updateIn(Collection<ID> ids, Changeset changeset) {
547499
var params = new UpdateInStatement.UpdateInStatementInput<>(ids, changeset.toMap());
548500

repository/src/main/java/tech/ydb/yoj/repository/db/cache/TransactionLocal.java

+14-21
Original file line numberDiff line numberDiff line change
@@ -3,46 +3,39 @@
33
import lombok.NonNull;
44
import tech.ydb.yoj.repository.BaseDb;
55
import tech.ydb.yoj.repository.db.TxOptions;
6+
import tech.ydb.yoj.repository.db.projection.MigrationProjectionCache;
67
import tech.ydb.yoj.repository.db.projection.ProjectionCache;
78
import tech.ydb.yoj.repository.db.projection.RoProjectionCache;
8-
import tech.ydb.yoj.repository.db.projection.RwProjectionCache;
9-
10-
import java.util.IdentityHashMap;
11-
import java.util.Map;
12-
import java.util.function.Supplier;
139

1410
public class TransactionLocal {
15-
private final Map<Supplier<?>, Object> singletons = new IdentityHashMap<>();
16-
17-
private final Supplier<FirstLevelCache> firstLevelCacheSupplier;
18-
private final Supplier<ProjectionCache> projectionCacheSupplier;
19-
private final Supplier<TransactionLog> logSupplier;
11+
private final FirstLevelCache firstLevelCache;
12+
private final ProjectionCache projectionCache;
13+
private final TransactionLog log;
2014

2115
public TransactionLocal(@NonNull TxOptions options) {
22-
this.firstLevelCacheSupplier = options.isFirstLevelCache() ? FirstLevelCache::create : FirstLevelCache::empty;
23-
this.projectionCacheSupplier = options.isMutable() ? RwProjectionCache::new : RoProjectionCache::new;
24-
this.logSupplier = () -> new TransactionLog(options.getLogLevel());
16+
this.firstLevelCache = options.isFirstLevelCache() ? FirstLevelCache.create() : FirstLevelCache.empty();
17+
if (options.isMutable()) {
18+
this.projectionCache = new MigrationProjectionCache(firstLevelCache);
19+
} else {
20+
this.projectionCache = new RoProjectionCache();
21+
}
22+
this.log = new TransactionLog(options.getLogLevel());
2523
}
2624

2725
public static TransactionLocal get() {
2826
return BaseDb.current(Holder.class).getTransactionLocal();
2927
}
3028

31-
@SuppressWarnings("unchecked")
32-
public <X> X instance(@NonNull Supplier<X> supplier) {
33-
return (X) singletons.computeIfAbsent(supplier, Supplier::get);
34-
}
35-
3629
public ProjectionCache projectionCache() {
37-
return instance(projectionCacheSupplier);
30+
return projectionCache;
3831
}
3932

4033
public FirstLevelCache firstLevelCache() {
41-
return instance(firstLevelCacheSupplier);
34+
return firstLevelCache;
4235
}
4336

4437
public TransactionLog log() {
45-
return instance(logSupplier);
38+
return log;
4639
}
4740

4841
public interface Holder {

0 commit comments

Comments
 (0)