@@ -4,7 +4,6 @@ import app.cash.backfila.client.misk.hibernate.HibernateBackfill
4
4
import app.cash.backfila.client.misk.hibernate.PartitionProvider
5
5
import com.google.common.collect.Ordering
6
6
import javax.persistence.Table
7
- import kotlin.streams.toList
8
7
import misk.hibernate.DbEntity
9
8
import misk.hibernate.Session
10
9
import misk.hibernate.Transacter
@@ -13,7 +12,19 @@ import misk.hibernate.transaction
13
12
import misk.vitess.Keyspace
14
13
import org.hibernate.internal.SessionImpl
15
14
15
+ /* *
16
+ * The queries that are provided by the strategy are used to establish a primary key slice of
17
+ * the table off which the backfill criteria is applied.
18
+ */
16
19
interface BoundingRangeStrategy <E : DbEntity <E >, Pkey : Any > {
20
+ /* *
21
+ * Computes the raw table min and max based on primary key. Returns null if the table is empty.
22
+ */
23
+ fun computeAbsoluteMinMax (
24
+ backfill : HibernateBackfill <E , Pkey , * >,
25
+ partitionName : String ,
26
+ ): MinMax <Pkey >?
27
+
17
28
/* *
18
29
* Computes a bound of size request.scan_size, to get a set of records that can be scanned for
19
30
* records that match the criteria.
@@ -27,11 +38,35 @@ interface BoundingRangeStrategy<E : DbEntity<E>, Pkey : Any> {
27
38
backfillRangeEnd : Pkey ,
28
39
scanSize : Long? ,
29
40
): Pkey ?
41
+
42
+ /* *
43
+ * Gets the min and count for the range of records.
44
+ */
45
+ fun computeMinAndCountForRange (
46
+ backfill : HibernateBackfill <E , Pkey , * >,
47
+ session : Session ,
48
+ previousEndKey : Pkey ? ,
49
+ backfillRangeStart : Pkey ,
50
+ end : Pkey ,
51
+ ): MinCount <Pkey >
30
52
}
31
53
32
54
class UnshardedHibernateBoundingRangeStrategy <E : DbEntity <E >, Pkey : Any >(
33
55
private val partitionProvider : PartitionProvider ,
34
56
) : BoundingRangeStrategy<E, Pkey> {
57
+ override fun computeAbsoluteMinMax (
58
+ backfill : HibernateBackfill <E , Pkey , * >,
59
+ partitionName : String ,
60
+ ): MinMax <Pkey >? {
61
+ return partitionProvider.transaction(partitionName) { session ->
62
+ selectMinAndMax(
63
+ backfill,
64
+ session,
65
+ schemaAndTable(backfill),
66
+ )
67
+ }
68
+ }
69
+
35
70
override fun computeBoundingRangeMax (
36
71
backfill : HibernateBackfill <E , Pkey , * >,
37
72
partitionName : String ,
@@ -42,21 +77,51 @@ class UnshardedHibernateBoundingRangeStrategy<E : DbEntity<E>, Pkey : Any>(
42
77
): Pkey ? {
43
78
return partitionProvider.transaction(partitionName) { session ->
44
79
selectMaxBound(
45
- backfill,
46
- session,
47
- schemaAndTable(backfill),
48
- previousEndKey,
49
- backfillRangeStart,
50
- backfillRangeEnd,
51
- scanSize,
80
+ backfill = backfill ,
81
+ session = session ,
82
+ schemaAndTable = schemaAndTable (backfill),
83
+ previousEndKey = previousEndKey ,
84
+ backfillRangeStart = backfillRangeStart ,
85
+ backfillRangeEnd = backfillRangeEnd ,
86
+ scanSize = scanSize ,
52
87
)
53
88
}
54
89
}
90
+
91
+ override fun computeMinAndCountForRange (
92
+ backfill : HibernateBackfill <E , Pkey , * >,
93
+ session : Session ,
94
+ previousEndKey : Pkey ? ,
95
+ backfillRangeStart : Pkey ,
96
+ end : Pkey ,
97
+ ): MinCount <Pkey > {
98
+ return selectMinAndCount(
99
+ backfill = backfill,
100
+ session = session,
101
+ schemaAndTable = schemaAndTable(backfill),
102
+ previousEndKey = previousEndKey,
103
+ backfillRangeStart = backfillRangeStart,
104
+ end = end,
105
+ )
106
+ }
55
107
}
56
108
57
109
class VitessShardedBoundingRangeStrategy <E : DbEntity <E >, Pkey : Any >(
58
110
private val partitionProvider : PartitionProvider ,
59
111
) : BoundingRangeStrategy<E, Pkey> {
112
+ override fun computeAbsoluteMinMax (
113
+ backfill : HibernateBackfill <E , Pkey , * >,
114
+ partitionName : String ,
115
+ ): MinMax <Pkey >? {
116
+ return partitionProvider.transaction(partitionName) { session ->
117
+ selectMinAndMax(
118
+ backfill,
119
+ session,
120
+ onlyTable(backfill),
121
+ )
122
+ }
123
+ }
124
+
60
125
override fun computeBoundingRangeMax (
61
126
backfill : HibernateBackfill <E , Pkey , * >,
62
127
partitionName : String ,
@@ -68,22 +133,51 @@ class VitessShardedBoundingRangeStrategy<E : DbEntity<E>, Pkey : Any>(
68
133
return partitionProvider.transaction(partitionName) { session ->
69
134
// We don't provide a schema when pinned to a shard.
70
135
selectMaxBound(
71
- backfill,
72
- session,
73
- onlyTable(backfill),
74
- previousEndKey,
75
- backfillRangeStart,
76
- backfillRangeEnd,
77
- scanSize,
136
+ backfill = backfill ,
137
+ session = session ,
138
+ schemaAndTable = onlyTable(backfill),
139
+ previousEndKey = previousEndKey ,
140
+ backfillRangeStart = backfillRangeStart ,
141
+ backfillRangeEnd = backfillRangeEnd ,
142
+ scanSize = scanSize ,
78
143
)
79
144
}
80
145
}
146
+
147
+ override fun computeMinAndCountForRange (
148
+ backfill : HibernateBackfill <E , Pkey , * >,
149
+ session : Session ,
150
+ previousEndKey : Pkey ? ,
151
+ backfillRangeStart : Pkey ,
152
+ end : Pkey ,
153
+ ): MinCount <Pkey > {
154
+ return selectMinAndCount(
155
+ backfill = backfill,
156
+ session = session,
157
+ schemaAndTable = onlyTable(backfill),
158
+ previousEndKey = previousEndKey,
159
+ backfillRangeStart = backfillRangeStart,
160
+ end = end,
161
+ )
162
+ }
81
163
}
82
164
83
165
class VitessSingleCursorBoundingRangeStrategy <E : DbEntity <E >, Pkey : Any >(
84
166
private val transacter : Transacter ,
85
167
private val keyspace : Keyspace ,
86
168
) : BoundingRangeStrategy<E, Pkey> {
169
+ override fun computeAbsoluteMinMax (
170
+ backfill : HibernateBackfill <E , Pkey , * >,
171
+ partitionName : String ,
172
+ ): MinMax <Pkey >? {
173
+ return transacter.transaction { session ->
174
+ selectMinAndMax(
175
+ backfill,
176
+ session,
177
+ onlyTable(backfill),
178
+ )
179
+ }
180
+ }
87
181
88
182
/* *
89
183
* Computes a bounding range by scanning all shards and returning the minimum of MAX(pkey).
@@ -108,24 +202,81 @@ class VitessSingleCursorBoundingRangeStrategy<E : DbEntity<E>, Pkey : Any>(
108
202
transacter.transaction(it) { session ->
109
203
// We don't provide a schema when pinned to a shard.
110
204
selectMaxBound(
111
- backfill,
112
- session,
113
- onlyTable(backfill),
114
- previousEndKey,
115
- backfillRangeStart,
116
- backfillRangeEnd,
117
- scanSize,
205
+ backfill = backfill ,
206
+ session = session ,
207
+ schemaAndTable = onlyTable(backfill),
208
+ previousEndKey = previousEndKey ,
209
+ backfillRangeStart = backfillRangeStart ,
210
+ backfillRangeEnd = backfillRangeEnd ,
211
+ scanSize = scanSize ,
118
212
)
119
213
}
120
214
}.toList()
121
215
.filterNotNull()
122
216
// Pkey must have a natural ordering
123
217
.minWithOrNull(Ordering .natural<Comparable <Pkey >>() as Comparator <Pkey >)
124
218
}
219
+
220
+ override fun computeMinAndCountForRange (
221
+ backfill : HibernateBackfill <E , Pkey , * >,
222
+ session : Session ,
223
+ previousEndKey : Pkey ? ,
224
+ backfillRangeStart : Pkey ,
225
+ end : Pkey ,
226
+ ): MinCount <Pkey > {
227
+ return selectMinAndCount(
228
+ backfill = backfill,
229
+ session = session,
230
+ schemaAndTable = onlyTable(backfill),
231
+ previousEndKey = previousEndKey,
232
+ backfillRangeStart = backfillRangeStart,
233
+ end = end,
234
+ )
235
+ }
125
236
}
126
237
127
238
class SingleCursorVitess
128
239
240
+ private fun <E : DbEntity <E >, Pkey : Any > selectMinAndMax (
241
+ backfill : HibernateBackfill <E , Pkey , * >,
242
+ session : Session ,
243
+ schemaAndTable : String ,
244
+ ): MinMax <Pkey >? {
245
+ // This query uses raw sql to avoid bumping into hibernate features such as @Where and
246
+ // @SQLRestriction.
247
+ // All of [selectMaxBound], [selectMinAndMax] and [selectMinAndCount] must be raw SQL since
248
+ // they depend on each other having the same view of the table.
249
+ val pkeyName = backfill.primaryKeyName()
250
+ val sql = """
251
+ |SELECT MIN($pkeyName ) as min, MAX($pkeyName ) as max
252
+ |FROM $schemaAndTable
253
+ """ .trimMargin()
254
+ val minMax = session.useConnection { connection ->
255
+ connection.prepareStatement(sql).use { ps ->
256
+ val pkeyType = session.hibernateSession.typeHelper.basic(backfill.pkeyClass.java)!!
257
+
258
+ val rs = ps.executeQuery()
259
+ rs.next()
260
+ val min = pkeyType.nullSafeGet(rs, " min" , session.hibernateSession as SessionImpl , null )
261
+ val max = pkeyType.nullSafeGet(rs, " max" , session.hibernateSession as SessionImpl , null )
262
+ if (min == null ) {
263
+ // Empty table, no work to do for this partition.
264
+ return @use null
265
+ } else {
266
+ checkNotNull(max) { " Table max was null but min wasn't, this shouldn't happen" }
267
+ @Suppress(" UNCHECKED_CAST" ) // Return type from the query should always be Pkey.
268
+ MinMax (min as Pkey , max as Pkey )
269
+ }
270
+ }
271
+ }
272
+ return minMax
273
+ }
274
+
275
+ data class MinMax <Pkey : Any >(
276
+ val min : Pkey ,
277
+ val max : Pkey ,
278
+ )
279
+
129
280
private fun <E : DbEntity <E >, Pkey : Any > selectMaxBound (
130
281
backfill : HibernateBackfill <E , Pkey , * >,
131
282
session : Session ,
@@ -137,6 +288,8 @@ private fun <E : DbEntity<E>, Pkey : Any> selectMaxBound(
137
288
): Pkey ? {
138
289
// Hibernate doesn't support subqueries in FROM, and we don't want to read in 100k+ records,
139
290
// so we use raw SQL here.
291
+ // All of [selectMaxBound], [selectMinAndMax] and [selectMinAndCount] must be raw SQL since
292
+ // they depend on each other having the same view of the table.
140
293
val pkeyName = backfill.primaryKeyName()
141
294
val params = mutableListOf<Pkey >()
142
295
var where = when {
@@ -178,6 +331,62 @@ private fun <E : DbEntity<E>, Pkey : Any> selectMaxBound(
178
331
return max as Pkey ?
179
332
}
180
333
334
+ private fun <E : DbEntity <E >, Pkey : Any > selectMinAndCount (
335
+ backfill : HibernateBackfill <E , Pkey , * >,
336
+ session : Session ,
337
+ schemaAndTable : String ,
338
+ previousEndKey : Pkey ? ,
339
+ backfillRangeStart : Pkey ,
340
+ end : Pkey ,
341
+ ): MinCount <Pkey > {
342
+ // This query uses raw sql to avoid bumping into hibernate features such as @Where and
343
+ // @SQLRestriction.
344
+ // All of [selectMaxBound], [selectMinAndMax] and [selectMinAndCount] must be raw SQL since
345
+ // they depend on each other having the same view of the table.
346
+ val pkeyName = backfill.primaryKeyName()
347
+ val params = mutableListOf<Pkey >()
348
+ var where = when {
349
+ previousEndKey != null -> {
350
+ params.add(previousEndKey)
351
+ " WHERE $pkeyName > ?"
352
+ }
353
+ else -> {
354
+ params.add(backfillRangeStart)
355
+ " WHERE $pkeyName >= ?"
356
+ }
357
+ }
358
+ params.add(end)
359
+ where + = " AND $pkeyName <= ?"
360
+ val sql = """
361
+ |SELECT MIN($pkeyName ) as start, COUNT(*) as scannedCount
362
+ |FROM $schemaAndTable
363
+ |$where
364
+ """ .trimMargin()
365
+ val minCount = session.useConnection { connection ->
366
+ connection.prepareStatement(sql).use { ps ->
367
+ val pkeyType = session.hibernateSession.typeHelper.basic(backfill.pkeyClass.java)!!
368
+
369
+ params.forEachIndexed { index, pkey ->
370
+ pkeyType.nullSafeSet(ps, pkey, index + 1 , session.hibernateSession as SessionImpl )
371
+ }
372
+
373
+ val rs = ps.executeQuery()
374
+ rs.next()
375
+ @Suppress(" UNCHECKED_CAST" ) // Return type from the query should always be a Pkey and Long.
376
+ MinCount (
377
+ pkeyType.nullSafeGet(rs, " start" , session.hibernateSession as SessionImpl , null ) as Pkey ,
378
+ rs.getLong(" scannedCount" ),
379
+ )
380
+ }
381
+ }
382
+ return minCount
383
+ }
384
+
385
+ data class MinCount <Pkey : Any >(
386
+ val min : Pkey ,
387
+ val scannedCount : Long ,
388
+ )
389
+
181
390
private fun <E : DbEntity <E >, Pkey : Any > schemaAndTable (backfill : HibernateBackfill <E , Pkey , * >): String {
182
391
val tableAnnotation = backfill.entityClass.java.getAnnotation(Table ::class .java)
183
392
val schema = tableAnnotation.schema
0 commit comments