Skip to content

Commit c9b0228

Browse files
author
Alexander Lavrukov
committed
separate-projections-poc: MigrationProjectionCache
1 parent 9ef88c6 commit c9b0228

File tree

10 files changed

+110
-87
lines changed

10 files changed

+110
-87
lines changed

repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryTable.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -405,25 +405,25 @@ public T insert(T tt) {
405405
T t = tt.preSave();
406406
transaction.getWatcher().markRowRead(type, t.getId());
407407
transaction.doInWriteTransaction("insert(" + t + ")", type, shard -> shard.insert(t));
408+
transaction.getTransactionLocal().projectionCache().save(transaction, t);
408409
transaction.getTransactionLocal().firstLevelCache().put(t);
409-
transaction.getTransactionLocal().projectionCache().save(t);
410410
return t;
411411
}
412412

413413
@Override
414414
public T save(T tt) {
415415
T t = tt.preSave();
416416
transaction.doInWriteTransaction("save(" + t + ")", type, shard -> shard.save(t));
417+
transaction.getTransactionLocal().projectionCache().save(transaction, t);
417418
transaction.getTransactionLocal().firstLevelCache().put(t);
418-
transaction.getTransactionLocal().projectionCache().save(t);
419419
return t;
420420
}
421421

422422
@Override
423423
public void delete(Entity.Id<T> id) {
424424
transaction.doInWriteTransaction("delete(" + id + ")", type, shard -> shard.delete(id));
425+
transaction.getTransactionLocal().projectionCache().delete(transaction, id);
425426
transaction.getTransactionLocal().firstLevelCache().putEmpty(id);
426-
transaction.getTransactionLocal().projectionCache().delete(id);
427427
}
428428

429429
@Override

repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/table/YdbTable.java

+9-58
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@
1313
import tech.ydb.yoj.repository.db.EntitySchema;
1414
import tech.ydb.yoj.repository.db.Range;
1515
import tech.ydb.yoj.repository.db.Table;
16-
import tech.ydb.yoj.repository.db.Tx;
1716
import tech.ydb.yoj.repository.db.ViewSchema;
1817
import tech.ydb.yoj.repository.db.bulk.BulkParams;
1918
import tech.ydb.yoj.repository.db.cache.FirstLevelCache;
2019
import tech.ydb.yoj.repository.db.cache.TransactionLocal;
2120
import tech.ydb.yoj.repository.db.readtable.ReadTableParams;
2221
import tech.ydb.yoj.repository.db.statement.Changeset;
22+
import tech.ydb.yoj.repository.ydb.YdbRepositoryTransaction;
2323
import tech.ydb.yoj.repository.ydb.bulk.BulkMapper;
2424
import tech.ydb.yoj.repository.ydb.bulk.BulkMapperImpl;
2525
import tech.ydb.yoj.repository.ydb.readtable.EntityIdKeyMapper;
@@ -54,17 +54,17 @@
5454
import static tech.ydb.yoj.repository.db.EntityExpressions.defaultOrder;
5555

5656
public class YdbTable<T extends Entity<T>> implements Table<T> {
57-
private final QueryExecutor executor;
57+
private final YdbRepositoryTransaction<?> executor;
5858
@Getter
5959
private final Class<T> type;
6060

61-
public YdbTable(Class<T> type, QueryExecutor executor) {
61+
public YdbTable(Class<T> type, YdbRepositoryTransaction<?> executor) {
6262
this.type = type;
63-
this.executor = new CheckingQueryExecutor(executor);
63+
this.executor = executor;
6464
}
6565

6666
protected YdbTable(QueryExecutor executor) {
67-
this.executor = new CheckingQueryExecutor(executor);
67+
this.executor = (YdbRepositoryTransaction<?>) executor;
6868
this.type = resolveEntityType();
6969
}
7070

@@ -421,7 +421,7 @@ public T insert(T t) {
421421
T entityToSave = t.preSave();
422422
executor.pendingExecute(YqlStatement.insert(type), entityToSave);
423423
executor.getTransactionLocal().firstLevelCache().put(entityToSave);
424-
executor.getTransactionLocal().projectionCache().save(entityToSave);
424+
executor.getTransactionLocal().projectionCache().save(executor, entityToSave);
425425
return t;
426426
}
427427

@@ -430,15 +430,15 @@ public T save(T t) {
430430
T entityToSave = t.preSave();
431431
executor.pendingExecute(YqlStatement.save(type), entityToSave);
432432
executor.getTransactionLocal().firstLevelCache().put(entityToSave);
433-
executor.getTransactionLocal().projectionCache().save(entityToSave);
433+
executor.getTransactionLocal().projectionCache().save(executor, entityToSave);
434434
return t;
435435
}
436436

437437
@Override
438438
public void delete(Entity.Id<T> id) {
439439
executor.pendingExecute(YqlStatement.delete(type), id);
440440
executor.getTransactionLocal().firstLevelCache().putEmpty(id);
441-
executor.getTransactionLocal().projectionCache().delete(id);
441+
executor.getTransactionLocal().projectionCache().delete(executor, id);
442442
}
443443

444444
/**
@@ -458,7 +458,7 @@ public <ID extends Id<T>> void migrate(ID id) {
458458
T rawEntity = foundRaw.get(0);
459459
T entityToSave = rawEntity.postLoad().preSave();
460460
executor.pendingExecute(YqlStatement.save(type), entityToSave);
461-
executor.getTransactionLocal().projectionCache().save(entityToSave);
461+
executor.getTransactionLocal().projectionCache().save(executor, entityToSave);
462462
}
463463

464464
@Override
@@ -494,55 +494,6 @@ default <IN> void bulkUpsert(BulkMapper<IN> mapper, List<IN> input, BulkParams p
494494
TransactionLocal getTransactionLocal();
495495
}
496496

497-
public static class CheckingQueryExecutor implements QueryExecutor {
498-
private final QueryExecutor delegate;
499-
private final Tx originTx;
500-
501-
public CheckingQueryExecutor(QueryExecutor delegate) {
502-
this.delegate = delegate;
503-
this.originTx = Tx.Current.exists() ? Tx.Current.get() : null;
504-
}
505-
506-
private void check() {
507-
Tx.checkSameTx(originTx);
508-
}
509-
510-
@Override
511-
public <PARAMS, RESULT> List<RESULT> execute(Statement<PARAMS, RESULT> statement, PARAMS params) {
512-
check();
513-
return delegate.execute(statement, params);
514-
}
515-
516-
@Override
517-
public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT> statement, PARAMS params) {
518-
return delegate.executeScanQuery(statement, params);
519-
}
520-
521-
@Override
522-
public <PARAMS> void pendingExecute(Statement<PARAMS, ?> statement, PARAMS value) {
523-
check();
524-
delegate.pendingExecute(statement, value);
525-
}
526-
527-
@Override
528-
public <IN> void bulkUpsert(BulkMapper<IN> mapper, List<IN> input, BulkParams params) {
529-
check();
530-
delegate.bulkUpsert(mapper, input, params);
531-
}
532-
533-
@Override
534-
public <IN, OUT> Stream<OUT> readTable(ReadTableMapper<IN, OUT> mapper, ReadTableParams<IN> params) {
535-
check();
536-
return delegate.readTable(mapper, params);
537-
}
538-
539-
@Override
540-
public TransactionLocal getTransactionLocal() {
541-
check();
542-
return delegate.getTransactionLocal();
543-
}
544-
}
545-
546497
public <ID extends Id<T>> void updateIn(Collection<ID> ids, Changeset changeset) {
547498
var params = new UpdateInStatement.UpdateInStatementInput<>(ids, changeset.toMap());
548499

repository/src/main/java/tech/ydb/yoj/repository/db/StdTxManager.java

+5
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,11 @@ public TxManager immediateWrites() {
133133
return withOptions(this.options.withImmediateWrites(true));
134134
}
135135

136+
@Override
137+
public TxManager separateProjections() {
138+
return withOptions(this.options.withSeparateProjections(true));
139+
}
140+
136141
@Override
137142
public TxManager noFirstLevelCache() {
138143
return withOptions(this.options.withFirstLevelCache(false));

repository/src/main/java/tech/ydb/yoj/repository/db/TxManager.java

+2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public interface TxManager {
4141
*/
4242
TxManager immediateWrites();
4343

44+
TxManager separateProjections();
45+
4446
/**
4547
* Turn off first level cache
4648
*/

repository/src/main/java/tech/ydb/yoj/repository/db/TxOptions.java

+2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ public class TxOptions {
3333

3434
boolean immediateWrites;
3535

36+
boolean separateProjections;
37+
3638
public static TxOptions create(@NonNull IsolationLevel isolationLevel) {
3739
return builder()
3840
.isolationLevel(isolationLevel)

repository/src/main/java/tech/ydb/yoj/repository/db/cache/TransactionLocal.java

+22-20
Original file line numberDiff line numberDiff line change
@@ -3,46 +3,48 @@
33
import lombok.NonNull;
44
import tech.ydb.yoj.repository.BaseDb;
55
import tech.ydb.yoj.repository.db.TxOptions;
6+
import tech.ydb.yoj.repository.db.projection.MigrationProjectionCache;
67
import tech.ydb.yoj.repository.db.projection.ProjectionCache;
78
import tech.ydb.yoj.repository.db.projection.RoProjectionCache;
89
import tech.ydb.yoj.repository.db.projection.RwProjectionCache;
910

10-
import java.util.IdentityHashMap;
11-
import java.util.Map;
12-
import java.util.function.Supplier;
13-
1411
public class TransactionLocal {
15-
private final Map<Supplier<?>, Object> singletons = new IdentityHashMap<>();
16-
17-
private final Supplier<FirstLevelCache> firstLevelCacheSupplier;
18-
private final Supplier<ProjectionCache> projectionCacheSupplier;
19-
private final Supplier<TransactionLog> logSupplier;
12+
private final FirstLevelCache firstLevelCache;
13+
private final ProjectionCache projectionCache;
14+
private final TransactionLog log;
2015

2116
public TransactionLocal(@NonNull TxOptions options) {
22-
this.firstLevelCacheSupplier = options.isFirstLevelCache() ? FirstLevelCache::create : FirstLevelCache::empty;
23-
this.projectionCacheSupplier = options.isMutable() ? RwProjectionCache::new : RoProjectionCache::new;
24-
this.logSupplier = () -> new TransactionLog(options.getLogLevel());
17+
this.firstLevelCache = options.isFirstLevelCache() ? FirstLevelCache.create() : FirstLevelCache.empty();
18+
this.projectionCache = createProjectionCache(firstLevelCache, options);
19+
this.log = new TransactionLog(options.getLogLevel());
2520
}
2621

27-
public static TransactionLocal get() {
28-
return BaseDb.current(Holder.class).getTransactionLocal();
22+
private static ProjectionCache createProjectionCache(FirstLevelCache firstLevelCache, TxOptions options) {
23+
if (options.isMutable()) {
24+
if (options.isSeparateProjections()) {
25+
return new MigrationProjectionCache(firstLevelCache);
26+
}
27+
28+
return new RwProjectionCache();
29+
}
30+
31+
return new RoProjectionCache();
2932
}
3033

31-
@SuppressWarnings("unchecked")
32-
public <X> X instance(@NonNull Supplier<X> supplier) {
33-
return (X) singletons.computeIfAbsent(supplier, Supplier::get);
34+
public static TransactionLocal get() {
35+
return BaseDb.current(Holder.class).getTransactionLocal();
3436
}
3537

3638
public ProjectionCache projectionCache() {
37-
return instance(projectionCacheSupplier);
39+
return projectionCache;
3840
}
3941

4042
public FirstLevelCache firstLevelCache() {
41-
return instance(firstLevelCacheSupplier);
43+
return firstLevelCache;
4244
}
4345

4446
public TransactionLog log() {
45-
return instance(logSupplier);
47+
return log;
4648
}
4749

4850
public interface Holder {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package tech.ydb.yoj.repository.db.projection;
2+
3+
import lombok.RequiredArgsConstructor;
4+
import tech.ydb.yoj.repository.db.Entity;
5+
import tech.ydb.yoj.repository.db.RepositoryTransaction;
6+
import tech.ydb.yoj.repository.db.cache.FirstLevelCache;
7+
8+
import java.util.List;
9+
import java.util.NoSuchElementException;
10+
import java.util.Optional;
11+
12+
@RequiredArgsConstructor
13+
public class MigrationProjectionCache implements ProjectionCache {
14+
private final FirstLevelCache cache;
15+
16+
@Override
17+
public void load(Entity<?> entity) {
18+
}
19+
20+
@Override
21+
public void save(RepositoryTransaction transaction, Entity<?> entity) {
22+
delete(transaction, entity.getId());
23+
24+
List<Entity<?>> newProjections = entity.createProjections();
25+
for (Entity<?> projection : newProjections) {
26+
saveEntity(transaction, projection);
27+
}
28+
}
29+
30+
@Override
31+
public void delete(RepositoryTransaction transaction, Entity.Id<?> id) {
32+
Optional<? extends Entity<?>> oldEntity;
33+
try {
34+
oldEntity = cache.peek(id);
35+
} catch (NoSuchElementException e) {
36+
return;
37+
}
38+
39+
if (oldEntity.isPresent()) {
40+
List<Entity<?>> oldProjections = oldEntity.get().createProjections();
41+
for (Entity<?> projection : oldProjections) {
42+
deleteEntity(transaction, projection.getId());
43+
}
44+
}
45+
}
46+
47+
@Override
48+
public void applyProjectionChanges(RepositoryTransaction transaction) {
49+
}
50+
51+
private <T extends Entity<T>> void deleteEntity(RepositoryTransaction transaction, Entity.Id<T> entityId) {
52+
transaction.table(entityId.getType()).delete(entityId);
53+
}
54+
55+
private <T extends Entity<T>> void saveEntity(RepositoryTransaction transaction, Entity<T> entity) {
56+
@SuppressWarnings("unchecked")
57+
T castedEntity = (T) entity;
58+
59+
transaction.table(entity.getId().getType()).save(castedEntity);
60+
}
61+
}

repository/src/main/java/tech/ydb/yoj/repository/db/projection/ProjectionCache.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
public interface ProjectionCache {
88
void load(Entity<?> entity);
99

10-
void save(Entity<?> entity);
10+
void save(RepositoryTransaction transaction, Entity<?> entity);
1111

12-
void delete(Entity.Id<?> id);
12+
void delete(RepositoryTransaction transaction, Entity.Id<?> id);
1313

1414
void applyProjectionChanges(RepositoryTransaction transaction);
1515
}

repository/src/main/java/tech/ydb/yoj/repository/db/projection/RoProjectionCache.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@ public void load(Entity<?> entity) {
99
}
1010

1111
@Override
12-
public void save(Entity<?> entity) {
12+
public void save(RepositoryTransaction transaction, Entity<?> entity) {
1313
throw new UnsupportedOperationException("Should not be invoked in RO");
1414
}
1515

1616
@Override
17-
public void delete(Entity.Id<?> id) {
17+
public void delete(RepositoryTransaction transaction, Entity.Id<?> id) {
1818
throw new UnsupportedOperationException("Should not be invoked in RO");
1919
}
2020

repository/src/main/java/tech/ydb/yoj/repository/db/projection/RwProjectionCache.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@ public void load(Entity<?> entity) {
2222
}
2323

2424
@Override
25-
public void save(Entity<?> entity) {
25+
public void save(RepositoryTransaction transaction, Entity<?> entity) {
2626
row(entity.getId()).save(entity);
2727
}
2828

2929
@Override
30-
public void delete(Entity.Id<?> id) {
30+
public void delete(RepositoryTransaction transaction, Entity.Id<?> id) {
3131
row(id).delete();
3232
}
3333

0 commit comments

Comments
 (0)