Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import app.cash.backfila.client.misk.hibernate.HibernateBackfill
import app.cash.backfila.client.misk.hibernate.PartitionProvider
import com.google.common.collect.Ordering
import javax.persistence.Table
import kotlin.streams.toList
import misk.hibernate.DbEntity
import misk.hibernate.Session
import misk.hibernate.Transacter
Expand All @@ -13,11 +12,30 @@ import misk.hibernate.transaction
import misk.vitess.Keyspace
import org.hibernate.internal.SessionImpl

/**
* The queries that are provided by the strategy are used to establish a primary key slice of
* the table off which the backfill criteria is applied.
*/
interface BoundingRangeStrategy<E : DbEntity<E>, Pkey : Any> {
/**
* Computes the raw table min and max based on the primary key. Returns null if the table is empty.
*/
fun computeAbsoluteMinMax(
backfill: HibernateBackfill<E, Pkey, *>,
partitionName: String,
): MinMax<Pkey>?

/**
* Computes a bound of size request.scan_size, to get a set of records that can be scanned for
* records that match the criteria.
*
* Returns null if there is are no more records left in the table.
* The return value must be greater than or equal to [backfillRangeStart] and less than or equal
* to [backfillRangeEnd] and greater than [previousEndKey].
*
* @param backfillRangeStart this is [MinMax.min] unless a specific range was specified.
* @param backfillRangeEnd this is [MinMax.max] unless a specific range was specified.
* @param previousEndKey is the null at the start or the result of a previous call to this function.
*/
fun computeBoundingRangeMax(
backfill: HibernateBackfill<E, Pkey, *>,
Expand All @@ -27,11 +45,46 @@ interface BoundingRangeStrategy<E : DbEntity<E>, Pkey : Any> {
backfillRangeEnd: Pkey,
scanSize: Long?,
): Pkey?

/**
* Gets the min and count for the range of records.
*
* The returned [MinCount.min] value must be greater than [previousEndKey] and greater than or equal to [backfillRangeStart].
* If [previousEndKey] is null:
* The returned [MinCount.scannedCount] counts items in [backfillRangeStart] (inclusive) until [end] (inclusive)
*
* If [previousEndKey] is non-null:
* The returned [MinCount.scannedCount] counts items in [previousEndKey] (exclusive) until [end] (inclusive).
*
* @param backfillRangeStart this is [MinMax.min] unless a specific range was specified.
* @param end is the batch slice and is greater than or equal to [backfillRangeStart].
* @param previousEndKey is the null at the start or the result of a previous call [computeBoundingRangeMax].
*/
fun computeMinAndCountForRange(
backfill: HibernateBackfill<E, Pkey, *>,
session: Session,
previousEndKey: Pkey?,
backfillRangeStart: Pkey,
end: Pkey,
): MinCount<Pkey>
}

class UnshardedHibernateBoundingRangeStrategy<E : DbEntity<E>, Pkey : Any>(
private val partitionProvider: PartitionProvider,
) : BoundingRangeStrategy<E, Pkey> {
override fun computeAbsoluteMinMax(
backfill: HibernateBackfill<E, Pkey, *>,
partitionName: String,
): MinMax<Pkey>? {
return partitionProvider.transaction(partitionName) { session ->
selectMinAndMax(
backfill,
session,
schemaAndTable(backfill),
)
}
}

override fun computeBoundingRangeMax(
backfill: HibernateBackfill<E, Pkey, *>,
partitionName: String,
Expand All @@ -42,21 +95,51 @@ class UnshardedHibernateBoundingRangeStrategy<E : DbEntity<E>, Pkey : Any>(
): Pkey? {
return partitionProvider.transaction(partitionName) { session ->
selectMaxBound(
backfill,
session,
schemaAndTable(backfill),
previousEndKey,
backfillRangeStart,
backfillRangeEnd,
scanSize,
backfill = backfill,
session = session,
schemaAndTable = schemaAndTable(backfill),
previousEndKey = previousEndKey,
backfillRangeStart = backfillRangeStart,
backfillRangeEnd = backfillRangeEnd,
scanSize = scanSize,
)
}
}

override fun computeMinAndCountForRange(
backfill: HibernateBackfill<E, Pkey, *>,
session: Session,
previousEndKey: Pkey?,
backfillRangeStart: Pkey,
end: Pkey,
): MinCount<Pkey> {
return selectMinAndCount(
backfill = backfill,
session = session,
schemaAndTable = schemaAndTable(backfill),
previousEndKey = previousEndKey,
backfillRangeStart = backfillRangeStart,
end = end,
)
}
}

class VitessShardedBoundingRangeStrategy<E : DbEntity<E>, Pkey : Any>(
private val partitionProvider: PartitionProvider,
) : BoundingRangeStrategy<E, Pkey> {
override fun computeAbsoluteMinMax(
backfill: HibernateBackfill<E, Pkey, *>,
partitionName: String,
): MinMax<Pkey>? {
return partitionProvider.transaction(partitionName) { session ->
selectMinAndMax(
backfill,
session,
onlyTable(backfill),
)
}
}

override fun computeBoundingRangeMax(
backfill: HibernateBackfill<E, Pkey, *>,
partitionName: String,
Expand All @@ -68,22 +151,51 @@ class VitessShardedBoundingRangeStrategy<E : DbEntity<E>, Pkey : Any>(
return partitionProvider.transaction(partitionName) { session ->
// We don't provide a schema when pinned to a shard.
selectMaxBound(
backfill,
session,
onlyTable(backfill),
previousEndKey,
backfillRangeStart,
backfillRangeEnd,
scanSize,
backfill = backfill,
session = session,
schemaAndTable = onlyTable(backfill),
previousEndKey = previousEndKey,
backfillRangeStart = backfillRangeStart,
backfillRangeEnd = backfillRangeEnd,
scanSize = scanSize,
)
}
}

override fun computeMinAndCountForRange(
backfill: HibernateBackfill<E, Pkey, *>,
session: Session,
previousEndKey: Pkey?,
backfillRangeStart: Pkey,
end: Pkey,
): MinCount<Pkey> {
return selectMinAndCount(
backfill = backfill,
session = session,
schemaAndTable = onlyTable(backfill),
previousEndKey = previousEndKey,
backfillRangeStart = backfillRangeStart,
end = end,
)
}
}

class VitessSingleCursorBoundingRangeStrategy<E : DbEntity<E>, Pkey : Any>(
private val transacter: Transacter,
private val keyspace: Keyspace,
) : BoundingRangeStrategy<E, Pkey> {
override fun computeAbsoluteMinMax(
backfill: HibernateBackfill<E, Pkey, *>,
partitionName: String,
): MinMax<Pkey>? {
return transacter.transaction { session ->
selectMinAndMax(
backfill,
session,
onlyTable(backfill),
)
}
}

/**
* Computes a bounding range by scanning all shards and returning the minimum of MAX(pkey).
Expand All @@ -108,24 +220,81 @@ class VitessSingleCursorBoundingRangeStrategy<E : DbEntity<E>, Pkey : Any>(
transacter.transaction(it) { session ->
// We don't provide a schema when pinned to a shard.
selectMaxBound(
backfill,
session,
onlyTable(backfill),
previousEndKey,
backfillRangeStart,
backfillRangeEnd,
scanSize,
backfill = backfill,
session = session,
schemaAndTable = onlyTable(backfill),
previousEndKey = previousEndKey,
backfillRangeStart = backfillRangeStart,
backfillRangeEnd = backfillRangeEnd,
scanSize = scanSize,
)
}
}.toList()
.filterNotNull()
// Pkey must have a natural ordering
.minWithOrNull(Ordering.natural<Comparable<Pkey>>() as Comparator<Pkey>)
}

override fun computeMinAndCountForRange(
backfill: HibernateBackfill<E, Pkey, *>,
session: Session,
previousEndKey: Pkey?,
backfillRangeStart: Pkey,
end: Pkey,
): MinCount<Pkey> {
return selectMinAndCount(
backfill = backfill,
session = session,
schemaAndTable = onlyTable(backfill),
previousEndKey = previousEndKey,
backfillRangeStart = backfillRangeStart,
end = end,
)
}
}

class SingleCursorVitess

private fun <E : DbEntity<E>, Pkey : Any> selectMinAndMax(
backfill: HibernateBackfill<E, Pkey, *>,
session: Session,
schemaAndTable: String,
): MinMax<Pkey>? {
// This query uses raw sql to avoid bumping into hibernate features such as @Where and
// @SQLRestriction.
// All of [selectMaxBound], [selectMinAndMax] and [selectMinAndCount] must be raw SQL since
// they depend on each other having the same view of the table.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice docs

val pkeyName = backfill.primaryKeyName()
val sql = """
|SELECT MIN($pkeyName) as min, MAX($pkeyName) as max
|FROM $schemaAndTable
""".trimMargin()
val minMax = session.useConnection { connection ->
connection.prepareStatement(sql).use { ps ->
val pkeyType = session.hibernateSession.typeHelper.basic(backfill.pkeyClass.java)!!

val rs = ps.executeQuery()
rs.next()
val min = pkeyType.nullSafeGet(rs, "min", session.hibernateSession as SessionImpl, null)
val max = pkeyType.nullSafeGet(rs, "max", session.hibernateSession as SessionImpl, null)
if (min == null) {
// Empty table, no work to do for this partition.
return@use null
} else {
checkNotNull(max) { "Table max was null but min wasn't, this shouldn't happen" }
@Suppress("UNCHECKED_CAST") // Return type from the query should always be Pkey.
MinMax(min as Pkey, max as Pkey)
}
}
}
return minMax
}

data class MinMax<Pkey : Any>(
val min: Pkey,
val max: Pkey,
)

private fun <E : DbEntity<E>, Pkey : Any> selectMaxBound(
backfill: HibernateBackfill<E, Pkey, *>,
session: Session,
Expand All @@ -137,6 +306,8 @@ private fun <E : DbEntity<E>, Pkey : Any> selectMaxBound(
): Pkey? {
// Hibernate doesn't support subqueries in FROM, and we don't want to read in 100k+ records,
// so we use raw SQL here.
// All of [selectMaxBound], [selectMinAndMax] and [selectMinAndCount] must be raw SQL since
// they depend on each other having the same view of the table.
val pkeyName = backfill.primaryKeyName()
val params = mutableListOf<Pkey>()
var where = when {
Expand Down Expand Up @@ -178,6 +349,62 @@ private fun <E : DbEntity<E>, Pkey : Any> selectMaxBound(
return max as Pkey?
}

private fun <E : DbEntity<E>, Pkey : Any> selectMinAndCount(
backfill: HibernateBackfill<E, Pkey, *>,
session: Session,
schemaAndTable: String,
previousEndKey: Pkey?,
backfillRangeStart: Pkey,
end: Pkey,
): MinCount<Pkey> {
// This query uses raw sql to avoid bumping into hibernate features such as @Where and
// @SQLRestriction.
// All of [selectMaxBound], [selectMinAndMax] and [selectMinAndCount] must be raw SQL since
// they depend on each other having the same view of the table.
val pkeyName = backfill.primaryKeyName()
val params = mutableListOf<Pkey>()
var where = when {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooooh now this lines up very nicely with the docs

previousEndKey != null -> {
params.add(previousEndKey)
"WHERE $pkeyName > ?"
}
else -> {
params.add(backfillRangeStart)
"WHERE $pkeyName >= ?"
}
}
params.add(end)
where += " AND $pkeyName <= ?"
val sql = """
|SELECT MIN($pkeyName) as start, COUNT(*) as scannedCount
|FROM $schemaAndTable
|$where
""".trimMargin()
val minCount = session.useConnection { connection ->
connection.prepareStatement(sql).use { ps ->
val pkeyType = session.hibernateSession.typeHelper.basic(backfill.pkeyClass.java)!!

params.forEachIndexed { index, pkey ->
pkeyType.nullSafeSet(ps, pkey, index + 1, session.hibernateSession as SessionImpl)
}

val rs = ps.executeQuery()
rs.next()
@Suppress("UNCHECKED_CAST") // Return type from the query should always be a Pkey and Long.
MinCount(
pkeyType.nullSafeGet(rs, "start", session.hibernateSession as SessionImpl, null) as Pkey,
rs.getLong("scannedCount"),
)
}
}
return minCount
}

data class MinCount<Pkey : Any>(
val min: Pkey,
val scannedCount: Long,
)

private fun <E : DbEntity<E>, Pkey : Any> schemaAndTable(backfill: HibernateBackfill<E, Pkey, *>): String {
val tableAnnotation = backfill.entityClass.java.getAnnotation(Table::class.java)
val schema = tableAnnotation.schema
Expand Down
Loading
Loading