Skip to content

UDF checkpoints #1392

@ilongin

Description

@ilongin

Checkpoints for UDF are needed to save and reuse partial UDF data between job runs if UDF fails.
Important: user can change UDF code until UDF is successfully finished and he will still be able to reuse partial results - this is to solve most common problem where UDF fails with some user error and user figures out that there is small bug in his code so he want's to rerun but still reuse partial UDF results from previous run.

The idea is to create 2 checkpoints for UDF:

  1. Checkpoint before starting UDF calculation - hash of this checkpoint will include the hash of the chain except UDF itself (UDF function code) - i.e. it will include only UDF inputs into hash calc. This checkpoint is special and will be active until UDF is finally successfully done and we will connect partial UDF tables to it. We can also call this a partial checkpoint as it holds partial results. This checkpoint won't be needed when UDF finishes and can be removed, as it will be replaced with second checkpoint.
  2. Checkpoint after UDF is done - this is a checkpoint that will have chain part before UDF (udf inputs) + UDF itself (function code) in hash calculation and will be connected to final UDF results tables (we will create special "checkpoint" dataset out of it later on). If something fails in the chain after this UDF the next run will start from here, not recalculating UDF. Note that unlike "reset" checkpoint which is not invalidated if user changes UDF code, this one will be invalidated as it holds UDF function itself in hash calculation.

In example, these are the checkpoints created :

chain = (
    dc
    .read_values(num=list(range(100)))
    .filter(C("num") < 50)
     <----- Checkpoint 1, hash = hash(read_values) + hash (filter), data = UDF results (partial)  ------>
    .map(even=process_even) 
     <----- Checkpoint 2, hash = hash(Checkpoint 1) + hash (map/process_even), data = UDF results (final)  ------>
    .filter(C("num") < 20)
     <----- Checkpoint 3, hash = hash(Checkpoint2) + hash (filter), data = UDF results (partial)  ------>
    .gen(res=gen_double, output = {"double": int, "even": int}) 
     <----- Checkpoint 4, hash = hash(Checkpoint 2) + hash (gen/gen_double), data = UDF results (final)  ------>
    .save("numbers")
     <----- Checkpoint 5, hash = hash(whole chain), data = Created dataset (this is final checkpoint in this chain) ------>
)

Alternative for 2 types of checkpoints is to create just a single checkpoint and completely remove UDF code out of hash calculation - this means that we will never recognize when user change UDF function code and chain / udf checkpoints will always be reused and to make sure changes are reflected user would need to re-run whole job from scratch.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions