-
Notifications
You must be signed in to change notification settings - Fork 364
Open
Labels
status: waiting-for-feedbackWe need additional information before we can continueWe need additional information before we can continue
Description
pom.xml:
...
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
AbstractR2dbcConfiguration:
@Configuration
@Slf4j
public class H2Config extends AbstractR2dbcConfiguration {
private static final String SCHEMA = "schema.sql";
@Value(${spring.r2dbc.url})
private String r2dbcUrl;
@Override
public ConnectionFactory connectionFactory() {
return get(r2dbcUrl);
}
@Bean
ConnectionFactoryInitializer initializer(ConnectionFactory connectionFactory) {
log.info("H2 using: {}", r2dbcUrl);
ConnectionFactoryInitializer initializer = new ConnectionFactoryInitializer();
initializer.setConnectionFactory(connectionFactory);
initializer.setDatabasePopulator(new ResourceDatabasePopulator(new ClassPathResource(SCHEMA)));
return initializer;
}
@Bean
public BeforeConvertCallback<FileMeta> fileIdGenerationCallback() {
return (entity, _) -> {
if (entity.pk() == null) {
var newId = new FileMeta.PK(UUID.randomUUID(), System.currentTimeMillis());
log.trace("New FileMeta.PK was generated: ({}, {})", newId.fileId(), newId.ts());
return Mono.just(new FileMeta(newId, entity.userId(), entity.fileName(), entity.vectorizable(), entity.vectorized(), entity.mimeType(), entity.uri()));
}
return Mono.just(entity);
};
}
@Override
public List<Object> getCustomConverters() {
return List.of(
new URIToStringConverter(), // (Converter<URI, String>) URI::toString,
new StringToURIConverter() // (Converter<String, URI>) URI::create
);
}
@ReadingConverter
private static class StringToURIConverter implements Converter<String, URI> {
@Override
public URI convert(String source) {
return URI.create(source);
}
}
@WritingConverter
private static class URIToStringConverter implements Converter<URI, String> {
@Override
public String convert(URI source) {
return source.toString();
}
}
}
FileMeta.java:
@Table(TABLE_FILE_META)
public record FileMeta(
@Id PK pk,
@Column(USER_ID_COLUMN) String userId,
@Column(FILE_NAME_COLUMN) String fileName,
@Column(VECTORIZABLE_COLUMN) boolean vectorizable,
@Column(VECTORIZED_COLUMN) boolean vectorized,
@Column(MIMETYPE_COLUMN) String mimeType,
@Column(URI_COLUMN) URI uri
) {
public record PK(@Column(FILE_ID_COLUMN) UUID fileId, @Column(TS_COLUMN) long ts) {
}
}
FileMetaRepository:
@Repository
public interface FileMetaRepository extends ReactiveCrudRepository<FileMeta, FileMeta.PK> {
Flux<FileMeta> findByUserId(String userId);
Flux<FileMeta> findByVectorizedAndVectorizable(boolean vectorized, boolean vectorizable);
}
schema.sql:
CREATE TABLE IF NOT EXISTS FILE_META
(
FILE_ID uuid not null,
TS BIGINT not null,
USER_ID varchar(64) not null,
FILE_NAME varchar(128),
VECTORIZABLE boolean not null,
VECTORIZED boolean not null,
MIMETYPE varchar(128),
URI varchar(256),
PRIMARY KEY (FILE_ID, TS)
);
log:
2025-07-20 01:14:21,499 [main] DEBUG c.e.r.agentic.AgenticApplication - Running with Spring Boot v3.3.13, Spring v6.1.21
2025-07-20 01:14:21,499 [main] INFO c.e.r.agentic.AgenticApplication - No active profile set, falling back to 1 default profile: "default"
2025-07-20 01:14:21,768 [main] INFO o.s.d.r.c.RepositoryConfigurationDelegate - Multiple Spring Data modules found, entering strict repository configuration mode
2025-07-20 01:14:21,768 [main] INFO o.s.d.r.c.RepositoryConfigurationDelegate - Bootstrapping Spring Data Reactive Cassandra repositories in DEFAULT mode.
2025-07-20 01:14:21,821 [main] INFO o.s.d.r.c.RepositoryConfigurationDelegate - Finished Spring Data repository scanning in 49 ms. Found 3 Reactive Cassandra repository interfaces.
2025-07-20 01:14:21,823 [main] INFO o.s.d.r.c.RepositoryConfigurationDelegate - Multiple Spring Data modules found, entering strict repository configuration mode
2025-07-20 01:14:21,823 [main] INFO o.s.d.r.c.RepositoryConfigurationDelegate - Bootstrapping Spring Data Cassandra repositories in DEFAULT mode.
2025-07-20 01:14:21,827 [main] INFO o.s.d.r.c.RepositoryConfigurationDelegate - Finished Spring Data repository scanning in 3 ms. Found 0 Cassandra repository interfaces.
2025-07-20 01:14:21,830 [main] INFO o.s.d.r.c.RepositoryConfigurationDelegate - Multiple Spring Data modules found, entering strict repository configuration mode
2025-07-20 01:14:21,830 [main] INFO o.s.d.r.c.RepositoryConfigurationDelegate - Bootstrapping Spring Data R2DBC repositories in DEFAULT mode.
2025-07-20 01:14:21,834 [main] INFO o.s.d.r.c.RepositoryConfigurationDelegate - Finished Spring Data repository scanning in 3 ms. Found 1 R2DBC repository interface.
...
2025-07-20 01:14:23,717 [single-1] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.r2dbc.BadSqlGrammarException: executeMany; bad SQL grammar [SELECT "FILE_META"."PK", "FILE_META"."USER_ID", "FILE_META"."FILE_NAME", "FILE_META"."VECTORIZABLE", "FILE_META"."VECTORIZED", "FILE_META"."MIMETYPE", "FILE_META"."URI" FROM "FILE_META" WHERE "FILE_META"."USER_ID" = $1]
Caused by: org.springframework.r2dbc.BadSqlGrammarException: executeMany; bad SQL grammar [SELECT "FILE_META"."PK", "FILE_META"."USER_ID", "FILE_META"."FILE_NAME", "FILE_META"."VECTORIZABLE", "FILE_META"."VECTORIZED", "FILE_META"."MIMETYPE", "FILE_META"."URI" FROM "FILE_META" WHERE "FILE_META"."USER_ID" = $1]
at org.springframework.r2dbc.connection.ConnectionFactoryUtils.convertR2dbcException(ConnectionFactoryUtils.java:253)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoError] :
Error has been observed at the following site(s):
*___Flux.onErrorMap ⇢ at org.springframework.r2dbc.core.DefaultDatabaseClient.inConnectionMany(DefaultDatabaseClient.java:155)
|_ Flux.concatMap ⇢ at org.springframework.data.r2dbc.core.R2dbcEntityTemplate$EntityCallbackAdapter.all(R2dbcEntityTemplate.java:940)
*__Mono.flatMapMany ⇢ at org.springframework.data.r2dbc.repository.query.AbstractR2dbcQuery.lambda$execute$0(AbstractR2dbcQuery.java:78)
*__Mono.flatMapMany ⇢ at org.springframework.data.r2dbc.repository.query.AbstractR2dbcQuery.execute(AbstractR2dbcQuery.java:78)
*____Flux.usingWhen ⇢ at org.springframework.data.repository.core.support.RepositoryMethodInvoker$ReactiveInvocationListenerDecorator.decorate(RepositoryMethodInvoker.java:236)
|_ Flux.filter ⇢ at cn.easttrans.reaiot.agentic.service.rag.FileService.lambda$peekH2$2(FileService.java:82)
|_ Flux.map ⇢ at cn.easttrans.reaiot.agentic.service.rag.FileService.lambda$peekH2$2(FileService.java:83)
|_ Flux.doOnNext ⇢ at cn.easttrans.reaiot.agentic.service.rag.FileService.lambda$peekH2$2(FileService.java:95)
*________Flux.defer ⇢ at cn.easttrans.reaiot.agentic.service.rag.FileService.peekH2(FileService.java:79)
Original Stack Trace:
at org.springframework.r2dbc.connection.ConnectionFactoryUtils.convertR2dbcException(ConnectionFactoryUtils.java:253)
at org.springframework.r2dbc.core.DefaultDatabaseClient.lambda$inConnectionMany$8(DefaultDatabaseClient.java:156)
at reactor.core.publisher.Flux.lambda$onErrorMap$29(Flux.java:7310)
at reactor.core.publisher.Flux.lambda$onErrorResume$30(Flux.java:7363)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.deferredError(FluxUsingWhen.java:403)
at reactor.core.publisher.FluxUsingWhen$RollbackInner.onComplete(FluxUsingWhen.java:480)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2230)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onComplete(FluxPeekFuseable.java:277)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2230)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:210)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:210)
at reactor.pool.SimpleDequePool.maybeRecycleAndDrain(SimpleDequePool.java:547)
at reactor.pool.SimpleDequePool$QueuePoolRecyclerInner.onComplete(SimpleDequePool.java:788)
at reactor.core.publisher.Operators.complete(Operators.java:137)
at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.pool.SimpleDequePool$QueuePoolRecyclerMono.subscribe(SimpleDequePool.java:901)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:241)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:204)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at reactor.core.publisher.Operators.complete(Operators.java:137)
at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:265)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:241)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:204)
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onComplete(FluxHandleFuseable.java:239)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:85)
at reactor.core.publisher.MonoCallable$MonoCallableSubscription.request(MonoCallable.java:159)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2366)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2240)
at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:48)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:265)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onError(FluxFilterFuseable.java:162)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onError(FluxFilterFuseable.java:382)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onError(FluxMapFuseable.java:340)
at reactor.core.publisher.Operators.error(Operators.java:198)
at reactor.core.publisher.MonoError.subscribe(MonoError.java:53)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onError(FluxUsingWhen.java:368)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:846)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:612)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:592)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onError(FluxFlatMap.java:455)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142)
at reactor.core.publisher.Operators.error(Operators.java:198)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:190)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:83)
at reactor.core.publisher.Flux.subscribe(Flux.java:8848)
at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:202)
at reactor.core.publisher.FluxFlatMap.subscribeOrReturn(FluxFlatMap.java:94)
at reactor.core.publisher.Flux.subscribe(Flux.java:8833)
at reactor.core.publisher.FluxUsingWhen$ResourceSubscriber.onNext(FluxUsingWhen.java:198)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.FluxRetry$RetrySubscriber.onNext(FluxRetry.java:88)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:245)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:305)
at io.r2dbc.pool.MonoDiscardOnCancel$MonoDiscardOnCancelSubscriber.onNext(MonoDiscardOnCancel.java:92)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:294)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:188)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:237)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:204)
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onComplete(FluxHandleFuseable.java:239)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:85)
at reactor.core.publisher.MonoCallable$MonoCallableSubscription.request(MonoCallable.java:159)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2366)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2240)
at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:48)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:265)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
at io.r2dbc.pool.MonoDiscardOnCancel.subscribe(MonoDiscardOnCancel.java:50)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)
at reactor.pool.AbstractPool$Borrower.deliver(AbstractPool.java:472)
at reactor.pool.SimpleDequePool.lambda$drainLoop$9(SimpleDequePool.java:435)
at reactor.core.publisher.FluxDoOnEach$DoOnEachSubscriber.onNext(FluxDoOnEach.java:154)
at reactor.core.publisher.FluxDoOnEach$DoOnEachFuseableSubscriber.onNext(FluxDoOnEach.java:281)
at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.onNext(MonoSubscribeOn.java:146)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
at reactor.core.publisher.MonoSupplier$MonoSupplierSubscription.request(MonoSupplier.java:145)
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164)
at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.trySchedule(MonoSubscribeOn.java:189)
at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.onSubscribe(MonoSubscribeOn.java:134)
at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92)
at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:48)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:126)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:317)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
...
JDK version: 21
Spring Data version: 2025.1.0-M4
BTW: spring-boot-starter-data-jdbc does NOT give the same error with the same FileMeta (H2Config class that extends AbstractJdbcConfiguration is differnet tho).
Metadata
Metadata
Assignees
Labels
status: waiting-for-feedbackWe need additional information before we can continueWe need additional information before we can continue