-
Notifications
You must be signed in to change notification settings - Fork 130
Description
Checkpoints for UDF are needed to save and reuse partial UDF data between job runs if UDF fails.
Important: user can change UDF code until UDF is successfully finished and he will still be able to reuse partial results - this is to solve most common problem where UDF fails with some user error and user figures out that there is small bug in his code so he want's to rerun but still reuse partial UDF results from previous run.
The idea is to create 2 checkpoints for UDF:
- Checkpoint before starting UDF calculation - hash of this checkpoint will include the hash of the chain except UDF itself (UDF function code) - i.e. it will include only UDF inputs into hash calc. This checkpoint is special and will be active until UDF is finally successfully done and we will connect partial UDF tables to it. We can also call this a partial checkpoint as it holds partial results. This checkpoint won't be needed when UDF finishes and can be removed, as it will be replaced with second checkpoint.
- Checkpoint after UDF is done - this is a checkpoint that will have chain part before UDF (udf inputs) + UDF itself (function code) in hash calculation and will be connected to final UDF results tables (we will create special "checkpoint" dataset out of it later on). If something fails in the chain after this UDF the next run will start from here, not recalculating UDF. Note that unlike "reset" checkpoint which is not invalidated if user changes UDF code, this one will be invalidated as it holds UDF function itself in hash calculation.
In example, these are the checkpoints created :
chain = (
dc
.read_values(num=list(range(100)))
.filter(C("num") < 50)
<----- Checkpoint 1, hash = hash(read_values) + hash (filter), data = UDF results (partial) ------>
.map(even=process_even)
<----- Checkpoint 2, hash = hash(Checkpoint 1) + hash (map/process_even), data = UDF results (final) ------>
.filter(C("num") < 20)
<----- Checkpoint 3, hash = hash(Checkpoint2) + hash (filter), data = UDF results (partial) ------>
.gen(res=gen_double, output = {"double": int, "even": int})
<----- Checkpoint 4, hash = hash(Checkpoint 2) + hash (gen/gen_double), data = UDF results (final) ------>
.save("numbers")
<----- Checkpoint 5, hash = hash(whole chain), data = Created dataset (this is final checkpoint in this chain) ------>
)
Alternative for 2 types of checkpoints is to create just a single checkpoint and completely remove UDF code out of hash calculation - this means that we will never recognize when user change UDF function code and chain / udf checkpoints will always be reused and to make sure changes are reflected user would need to re-run whole job from scratch.