-
Notifications
You must be signed in to change notification settings - Fork 62
Fixing Hibernate bug for @When annotations. #401
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,7 +4,6 @@ import app.cash.backfila.client.misk.hibernate.HibernateBackfill | |
import app.cash.backfila.client.misk.hibernate.PartitionProvider | ||
import com.google.common.collect.Ordering | ||
import javax.persistence.Table | ||
import kotlin.streams.toList | ||
import misk.hibernate.DbEntity | ||
import misk.hibernate.Session | ||
import misk.hibernate.Transacter | ||
|
@@ -13,11 +12,30 @@ import misk.hibernate.transaction | |
import misk.vitess.Keyspace | ||
import org.hibernate.internal.SessionImpl | ||
|
||
/** | ||
* The queries that are provided by the strategy are used to establish a primary key slice of | ||
* the table off which the backfill criteria is applied. | ||
*/ | ||
interface BoundingRangeStrategy<E : DbEntity<E>, Pkey : Any> { | ||
/** | ||
* Computes the raw table min and max based on the primary key. Returns null if the table is empty. | ||
*/ | ||
fun computeAbsoluteMinMax( | ||
backfill: HibernateBackfill<E, Pkey, *>, | ||
partitionName: String, | ||
): MinMax<Pkey>? | ||
|
||
/** | ||
* Computes a bound of size request.scan_size, to get a set of records that can be scanned for | ||
* records that match the criteria. | ||
* | ||
* Returns null if there is are no more records left in the table. | ||
* The return value must be greater than or equal to [backfillRangeStart] and less than or equal | ||
* to [backfillRangeEnd] and greater than [previousEndKey]. | ||
* | ||
* @param backfillRangeStart this is [MinMax.min] unless a specific range was specified. | ||
* @param backfillRangeEnd this is [MinMax.max] unless a specific range was specified. | ||
* @param previousEndKey is the null at the start or the result of a previous call to this function. | ||
*/ | ||
fun computeBoundingRangeMax( | ||
backfill: HibernateBackfill<E, Pkey, *>, | ||
|
@@ -27,11 +45,46 @@ interface BoundingRangeStrategy<E : DbEntity<E>, Pkey : Any> { | |
backfillRangeEnd: Pkey, | ||
scanSize: Long?, | ||
): Pkey? | ||
|
||
/** | ||
* Gets the min and count for the range of records. | ||
* | ||
* The returned [MinCount.min] value must be greater than [previousEndKey] and greater than or equal to [backfillRangeStart]. | ||
* If [previousEndKey] is null: | ||
* The returned [MinCount.scannedCount] counts items in [backfillRangeStart] (inclusive) until [end] (inclusive) | ||
* | ||
* If [previousEndKey] is non-null: | ||
* The returned [MinCount.scannedCount] counts items in [previousEndKey] (exclusive) until [end] (inclusive). | ||
* | ||
* @param backfillRangeStart this is [MinMax.min] unless a specific range was specified. | ||
* @param end is the batch slice and is greater than or equal to [backfillRangeStart]. | ||
* @param previousEndKey is the null at the start or the result of a previous call [computeBoundingRangeMax]. | ||
*/ | ||
fun computeMinAndCountForRange( | ||
backfill: HibernateBackfill<E, Pkey, *>, | ||
session: Session, | ||
previousEndKey: Pkey?, | ||
backfillRangeStart: Pkey, | ||
end: Pkey, | ||
): MinCount<Pkey> | ||
} | ||
|
||
class UnshardedHibernateBoundingRangeStrategy<E : DbEntity<E>, Pkey : Any>( | ||
private val partitionProvider: PartitionProvider, | ||
) : BoundingRangeStrategy<E, Pkey> { | ||
override fun computeAbsoluteMinMax( | ||
backfill: HibernateBackfill<E, Pkey, *>, | ||
partitionName: String, | ||
): MinMax<Pkey>? { | ||
return partitionProvider.transaction(partitionName) { session -> | ||
selectMinAndMax( | ||
backfill, | ||
session, | ||
schemaAndTable(backfill), | ||
) | ||
} | ||
} | ||
|
||
override fun computeBoundingRangeMax( | ||
backfill: HibernateBackfill<E, Pkey, *>, | ||
partitionName: String, | ||
|
@@ -42,21 +95,51 @@ class UnshardedHibernateBoundingRangeStrategy<E : DbEntity<E>, Pkey : Any>( | |
): Pkey? { | ||
return partitionProvider.transaction(partitionName) { session -> | ||
selectMaxBound( | ||
backfill, | ||
session, | ||
schemaAndTable(backfill), | ||
previousEndKey, | ||
backfillRangeStart, | ||
backfillRangeEnd, | ||
scanSize, | ||
backfill = backfill, | ||
session = session, | ||
schemaAndTable = schemaAndTable(backfill), | ||
previousEndKey = previousEndKey, | ||
backfillRangeStart = backfillRangeStart, | ||
backfillRangeEnd = backfillRangeEnd, | ||
scanSize = scanSize, | ||
) | ||
} | ||
} | ||
|
||
override fun computeMinAndCountForRange( | ||
backfill: HibernateBackfill<E, Pkey, *>, | ||
session: Session, | ||
previousEndKey: Pkey?, | ||
backfillRangeStart: Pkey, | ||
end: Pkey, | ||
): MinCount<Pkey> { | ||
return selectMinAndCount( | ||
backfill = backfill, | ||
session = session, | ||
schemaAndTable = schemaAndTable(backfill), | ||
previousEndKey = previousEndKey, | ||
backfillRangeStart = backfillRangeStart, | ||
end = end, | ||
) | ||
} | ||
} | ||
|
||
class VitessShardedBoundingRangeStrategy<E : DbEntity<E>, Pkey : Any>( | ||
private val partitionProvider: PartitionProvider, | ||
) : BoundingRangeStrategy<E, Pkey> { | ||
override fun computeAbsoluteMinMax( | ||
backfill: HibernateBackfill<E, Pkey, *>, | ||
partitionName: String, | ||
): MinMax<Pkey>? { | ||
return partitionProvider.transaction(partitionName) { session -> | ||
selectMinAndMax( | ||
backfill, | ||
session, | ||
onlyTable(backfill), | ||
) | ||
} | ||
} | ||
|
||
override fun computeBoundingRangeMax( | ||
backfill: HibernateBackfill<E, Pkey, *>, | ||
partitionName: String, | ||
|
@@ -68,22 +151,51 @@ class VitessShardedBoundingRangeStrategy<E : DbEntity<E>, Pkey : Any>( | |
return partitionProvider.transaction(partitionName) { session -> | ||
// We don't provide a schema when pinned to a shard. | ||
selectMaxBound( | ||
backfill, | ||
session, | ||
onlyTable(backfill), | ||
previousEndKey, | ||
backfillRangeStart, | ||
backfillRangeEnd, | ||
scanSize, | ||
backfill = backfill, | ||
session = session, | ||
schemaAndTable = onlyTable(backfill), | ||
previousEndKey = previousEndKey, | ||
backfillRangeStart = backfillRangeStart, | ||
backfillRangeEnd = backfillRangeEnd, | ||
scanSize = scanSize, | ||
) | ||
} | ||
} | ||
|
||
override fun computeMinAndCountForRange( | ||
backfill: HibernateBackfill<E, Pkey, *>, | ||
session: Session, | ||
previousEndKey: Pkey?, | ||
backfillRangeStart: Pkey, | ||
end: Pkey, | ||
): MinCount<Pkey> { | ||
return selectMinAndCount( | ||
backfill = backfill, | ||
session = session, | ||
schemaAndTable = onlyTable(backfill), | ||
previousEndKey = previousEndKey, | ||
backfillRangeStart = backfillRangeStart, | ||
end = end, | ||
) | ||
} | ||
} | ||
|
||
class VitessSingleCursorBoundingRangeStrategy<E : DbEntity<E>, Pkey : Any>( | ||
private val transacter: Transacter, | ||
private val keyspace: Keyspace, | ||
) : BoundingRangeStrategy<E, Pkey> { | ||
override fun computeAbsoluteMinMax( | ||
backfill: HibernateBackfill<E, Pkey, *>, | ||
partitionName: String, | ||
): MinMax<Pkey>? { | ||
return transacter.transaction { session -> | ||
selectMinAndMax( | ||
backfill, | ||
session, | ||
onlyTable(backfill), | ||
) | ||
} | ||
} | ||
|
||
/** | ||
* Computes a bounding range by scanning all shards and returning the minimum of MAX(pkey). | ||
|
@@ -108,24 +220,81 @@ class VitessSingleCursorBoundingRangeStrategy<E : DbEntity<E>, Pkey : Any>( | |
transacter.transaction(it) { session -> | ||
// We don't provide a schema when pinned to a shard. | ||
selectMaxBound( | ||
backfill, | ||
session, | ||
onlyTable(backfill), | ||
previousEndKey, | ||
backfillRangeStart, | ||
backfillRangeEnd, | ||
scanSize, | ||
backfill = backfill, | ||
session = session, | ||
schemaAndTable = onlyTable(backfill), | ||
previousEndKey = previousEndKey, | ||
backfillRangeStart = backfillRangeStart, | ||
backfillRangeEnd = backfillRangeEnd, | ||
scanSize = scanSize, | ||
) | ||
} | ||
}.toList() | ||
.filterNotNull() | ||
// Pkey must have a natural ordering | ||
.minWithOrNull(Ordering.natural<Comparable<Pkey>>() as Comparator<Pkey>) | ||
} | ||
|
||
override fun computeMinAndCountForRange( | ||
backfill: HibernateBackfill<E, Pkey, *>, | ||
session: Session, | ||
previousEndKey: Pkey?, | ||
backfillRangeStart: Pkey, | ||
end: Pkey, | ||
): MinCount<Pkey> { | ||
return selectMinAndCount( | ||
backfill = backfill, | ||
session = session, | ||
schemaAndTable = onlyTable(backfill), | ||
previousEndKey = previousEndKey, | ||
backfillRangeStart = backfillRangeStart, | ||
end = end, | ||
) | ||
} | ||
} | ||
|
||
class SingleCursorVitess | ||
|
||
private fun <E : DbEntity<E>, Pkey : Any> selectMinAndMax( | ||
backfill: HibernateBackfill<E, Pkey, *>, | ||
session: Session, | ||
schemaAndTable: String, | ||
): MinMax<Pkey>? { | ||
// This query uses raw sql to avoid bumping into hibernate features such as @Where and | ||
// @SQLRestriction. | ||
// All of [selectMaxBound], [selectMinAndMax] and [selectMinAndCount] must be raw SQL since | ||
// they depend on each other having the same view of the table. | ||
val pkeyName = backfill.primaryKeyName() | ||
val sql = """ | ||
|SELECT MIN($pkeyName) as min, MAX($pkeyName) as max | ||
|FROM $schemaAndTable | ||
""".trimMargin() | ||
val minMax = session.useConnection { connection -> | ||
connection.prepareStatement(sql).use { ps -> | ||
val pkeyType = session.hibernateSession.typeHelper.basic(backfill.pkeyClass.java)!! | ||
|
||
val rs = ps.executeQuery() | ||
rs.next() | ||
val min = pkeyType.nullSafeGet(rs, "min", session.hibernateSession as SessionImpl, null) | ||
val max = pkeyType.nullSafeGet(rs, "max", session.hibernateSession as SessionImpl, null) | ||
if (min == null) { | ||
// Empty table, no work to do for this partition. | ||
return@use null | ||
} else { | ||
checkNotNull(max) { "Table max was null but min wasn't, this shouldn't happen" } | ||
@Suppress("UNCHECKED_CAST") // Return type from the query should always be Pkey. | ||
MinMax(min as Pkey, max as Pkey) | ||
} | ||
} | ||
} | ||
return minMax | ||
} | ||
|
||
data class MinMax<Pkey : Any>( | ||
val min: Pkey, | ||
val max: Pkey, | ||
) | ||
|
||
private fun <E : DbEntity<E>, Pkey : Any> selectMaxBound( | ||
backfill: HibernateBackfill<E, Pkey, *>, | ||
session: Session, | ||
|
@@ -137,6 +306,8 @@ private fun <E : DbEntity<E>, Pkey : Any> selectMaxBound( | |
): Pkey? { | ||
// Hibernate doesn't support subqueries in FROM, and we don't want to read in 100k+ records, | ||
// so we use raw SQL here. | ||
// All of [selectMaxBound], [selectMinAndMax] and [selectMinAndCount] must be raw SQL since | ||
// they depend on each other having the same view of the table. | ||
val pkeyName = backfill.primaryKeyName() | ||
val params = mutableListOf<Pkey>() | ||
var where = when { | ||
|
@@ -178,6 +349,62 @@ private fun <E : DbEntity<E>, Pkey : Any> selectMaxBound( | |
return max as Pkey? | ||
} | ||
|
||
private fun <E : DbEntity<E>, Pkey : Any> selectMinAndCount( | ||
backfill: HibernateBackfill<E, Pkey, *>, | ||
session: Session, | ||
schemaAndTable: String, | ||
previousEndKey: Pkey?, | ||
backfillRangeStart: Pkey, | ||
end: Pkey, | ||
): MinCount<Pkey> { | ||
// This query uses raw sql to avoid bumping into hibernate features such as @Where and | ||
// @SQLRestriction. | ||
// All of [selectMaxBound], [selectMinAndMax] and [selectMinAndCount] must be raw SQL since | ||
// they depend on each other having the same view of the table. | ||
val pkeyName = backfill.primaryKeyName() | ||
val params = mutableListOf<Pkey>() | ||
var where = when { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ooooh now this lines up very nicely with the docs |
||
previousEndKey != null -> { | ||
params.add(previousEndKey) | ||
"WHERE $pkeyName > ?" | ||
} | ||
else -> { | ||
params.add(backfillRangeStart) | ||
"WHERE $pkeyName >= ?" | ||
} | ||
} | ||
params.add(end) | ||
where += " AND $pkeyName <= ?" | ||
val sql = """ | ||
|SELECT MIN($pkeyName) as start, COUNT(*) as scannedCount | ||
|FROM $schemaAndTable | ||
|$where | ||
""".trimMargin() | ||
val minCount = session.useConnection { connection -> | ||
connection.prepareStatement(sql).use { ps -> | ||
val pkeyType = session.hibernateSession.typeHelper.basic(backfill.pkeyClass.java)!! | ||
|
||
params.forEachIndexed { index, pkey -> | ||
pkeyType.nullSafeSet(ps, pkey, index + 1, session.hibernateSession as SessionImpl) | ||
} | ||
|
||
val rs = ps.executeQuery() | ||
rs.next() | ||
@Suppress("UNCHECKED_CAST") // Return type from the query should always be a Pkey and Long. | ||
MinCount( | ||
pkeyType.nullSafeGet(rs, "start", session.hibernateSession as SessionImpl, null) as Pkey, | ||
rs.getLong("scannedCount"), | ||
) | ||
} | ||
} | ||
return minCount | ||
} | ||
|
||
data class MinCount<Pkey : Any>( | ||
val min: Pkey, | ||
val scannedCount: Long, | ||
) | ||
|
||
private fun <E : DbEntity<E>, Pkey : Any> schemaAndTable(backfill: HibernateBackfill<E, Pkey, *>): String { | ||
val tableAnnotation = backfill.entityClass.java.getAnnotation(Table::class.java) | ||
val schema = tableAnnotation.schema | ||
|
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice docs