Skip to content

Allow any of the subscribed GruleEngineListener to break out of Cycle during the start of a Cycle. #456

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions engine/GruleEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ package engine
import (
"context"
"fmt"
"github.com/sirupsen/logrus"
"go.uber.org/zap"
"sort"
"time"

"github.com/sirupsen/logrus"
"go.uber.org/zap"

"github.com/hyperjumptech/grule-rule-engine/ast"
"github.com/hyperjumptech/grule-rule-engine/logger"
)
Expand All @@ -48,14 +49,12 @@ func SetLogger(externalLog interface{}) {
case *zap.Logger:
log, ok := externalLog.(*zap.Logger)
if !ok {

return
}
entry = logger.NewZap(log)
case *logrus.Logger:
log, ok := externalLog.(*logrus.Logger)
if !ok {

return
}
entry = logger.NewLogrus(log)
Expand All @@ -70,7 +69,6 @@ func SetLogger(externalLog interface{}) {
// NewGruleEngine will create new instance of GruleEngine struct.
// It will set the max cycle to 5000
func NewGruleEngine() *GruleEngine {

return &GruleEngine{
MaxCycle: DefaultCycleCount,
}
Expand All @@ -85,7 +83,6 @@ type GruleEngine struct {

// Execute function is the same as ExecuteWithContext(context.Background())
func (g *GruleEngine) Execute(dataCtx ast.IDataContext, knowledge *ast.KnowledgeBase) error {

return g.ExecuteWithContext(context.Background(), dataCtx, knowledge)
}

Expand Down Expand Up @@ -121,7 +118,6 @@ func (g *GruleEngine) notifyBeginCycle(cycle uint64) {
// The engine also do conflict resolution of which rule to execute.
func (g *GruleEngine) ExecuteWithContext(ctx context.Context, dataCtx ast.IDataContext, knowledge *ast.KnowledgeBase) error {
if knowledge == nil || dataCtx == nil {

return fmt.Errorf("nil KnowledgeBase or DataContext is not allowed")
}

Expand Down Expand Up @@ -163,6 +159,11 @@ func (g *GruleEngine) ExecuteWithContext(ctx context.Context, dataCtx ast.IDataC

g.notifyBeginCycle(cycle + 1)

// If any listener wants to abort the cycle, we will break the loop.
if dataCtx.IsComplete() {
break
}

// Select all rule entry that can be executed.
log.Tracef("Select all rule entry that can be executed.")
runnable := make([]*ast.RuleEntry, 0)
Expand All @@ -178,7 +179,6 @@ func (g *GruleEngine) ExecuteWithContext(ctx context.Context, dataCtx ast.IDataC
if err != nil {
log.Errorf("Failed testing condition for rule : %s. Got error %v", ruleEntry.RuleName, err)
if g.ReturnErrOnFailedRuleEvaluation {

return err
}
}
Expand Down Expand Up @@ -249,7 +249,6 @@ func (g *GruleEngine) ExecuteWithContext(ctx context.Context, dataCtx ast.IDataC
// Returns []*ast.RuleEntry order by salience
func (g *GruleEngine) FetchMatchingRules(dataCtx ast.IDataContext, knowledge *ast.KnowledgeBase) ([]*ast.RuleEntry, error) {
if knowledge == nil || dataCtx == nil {

return nil, fmt.Errorf("nil KnowledgeBase or DataContext is not allowed")
}

Expand All @@ -269,7 +268,7 @@ func (g *GruleEngine) FetchMatchingRules(dataCtx ast.IDataContext, knowledge *as
log.Debugf("Initializing Context")
knowledge.InitializeContext(dataCtx)

//Loop through all the rule entries available in the knowledge base and add to the response list if it is able to evaluate
// Loop through all the rule entries available in the knowledge base and add to the response list if it is able to evaluate
// Select all rule entry that can be executed.
log.Tracef("Select all rule entry that can be executed.")
runnable := make([]*ast.RuleEntry, 0)
Expand All @@ -292,7 +291,6 @@ func (g *GruleEngine) FetchMatchingRules(dataCtx ast.IDataContext, knowledge *as
log.Debugf("Matching rules length %d.", len(runnable))
if len(runnable) > 1 {
sort.SliceStable(runnable, func(i, j int) bool {

return runnable[i].Salience > runnable[j].Salience
})
}
Expand Down
120 changes: 107 additions & 13 deletions engine/GruleEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,6 @@ const complexRule2 = `rule ComplexRule "test complex rule" salience 10 {
}`

func TestEngine_ComplexRule2(t *testing.T) {

ts := &TestStruct{
Param1: false,
Param2: false,
Expand Down Expand Up @@ -274,7 +273,6 @@ const complexRule3 = `rule ComplexRule "test complex rule" salience 10 {
}`

func TestEngine_ComplexRule3(t *testing.T) {

ts := &TestStruct{
Param1: false,
Param2: false,
Expand Down Expand Up @@ -312,7 +310,6 @@ const complexRule4 = `rule ComplexRule "test complex rule" salience 10 {
}`

func TestEngine_ComplexRule4(t *testing.T) {

ts := &TestStruct{
Param1: true,
Param2: false,
Expand Down Expand Up @@ -347,7 +344,6 @@ const OpPresedenceRule = `rule OpPresedenceRule "test operator presedence" salie
}`

func TestEngine_OperatorPrecedence(t *testing.T) {

ts := &TestStruct{}

dctx := ast.NewDataContext()
Expand Down Expand Up @@ -511,7 +507,7 @@ Then
}`

func TestGruleEngine_FetchMatchingRules_Having_Same_Salience(t *testing.T) {
//Given
// Given
fact := &Fact{
Distance: 6000,
Duration: 123,
Expand All @@ -526,11 +522,11 @@ func TestGruleEngine_FetchMatchingRules_Having_Same_Salience(t *testing.T) {
kb, err := lib.NewKnowledgeBaseInstance("conflict_rules_test", "0.1.1")
assert.NoError(t, err)

//When
// When
engine := NewGruleEngine()
ruleEntries, err := engine.FetchMatchingRules(dctx, kb)

//Then
// Then
assert.NoError(t, err)
assert.Equal(t, 5, len(ruleEntries))
}
Expand Down Expand Up @@ -597,7 +593,7 @@ Then
}`

func TestGruleEngine_FetchMatchingRules_Having_Diff_Salience(t *testing.T) {
//Given
// Given
fact := &Fact{
Distance: 6000,
Duration: 121,
Expand All @@ -612,11 +608,11 @@ func TestGruleEngine_FetchMatchingRules_Having_Diff_Salience(t *testing.T) {
kb, err := lib.NewKnowledgeBaseInstance("conflict_rules_test", "0.1.1")
assert.NoError(t, err)

//When
// When
engine := NewGruleEngine()
ruleEntries, err := engine.FetchMatchingRules(dctx, kb)

//Then
// Then
assert.NoError(t, err)
assert.Equal(t, 4, len(ruleEntries))
assert.Equal(t, 8, ruleEntries[0].Salience)
Expand Down Expand Up @@ -659,7 +655,7 @@ type LogicalOperatorRuleFact struct {
}

func TestGruleEngine_Follows_logical_operator_precedence(t *testing.T) {
//Given
// Given
fact := &LogicalOperatorRuleFact{
Distance: 2000,
Duration: 121,
Expand All @@ -676,12 +672,110 @@ func TestGruleEngine_Follows_logical_operator_precedence(t *testing.T) {
kb, err := lib.NewKnowledgeBaseInstance("logical_operator_rules_test", "0.1.1")
assert.NoError(t, err)

//When
// When
engine := NewGruleEngine()
err = engine.Execute(dctx, kb)

//Then
// Then
assert.NoError(t, err)
assert.Equal(t, fact.Result, true)
assert.Equal(t, fact.NetAmount, float32(143.32))
}

const LevelRuleWithListener = `
rule ProcessThresholds "ProcessThresholds" Salience 1 {
when
DP.Index < DP.Thresholds.Len()
then
DP.ProcessThresholds(DP.Level.Value, DP.Thresholds[DP.Index]);
DP.Index = DP.Index + 1;
Log("Index: " + DP.Index);
}`

type Level struct {
Value int64
}

type Threshold struct {
MinLevel int64
}

type DataPoint struct {
Level *Level // Level that needs to be monitored.
Thresholds []*Threshold // Threshold configuration levels.
TurnedOn int64 // Turn something on for every threshold that breached.
Index int64 // Running index of the threshold that is being processed.
}

func (d *DataPoint) ProcessThresholds(level int64, t *Threshold) {
if d == nil {
return
}
if level > t.MinLevel {
d.TurnedOn++
}
}

type LevelListener struct {
kb *ast.KnowledgeBase
dc ast.IDataContext
dp *DataPoint
}

const MaxTurnOns = int64(2)

func (l *LevelListener) BeginCycle(cycle uint64) {
if l.dp.TurnedOn == MaxTurnOns { // Stop at MaxTurnOns.
l.dc.Complete() // Stop Cycle.
}
}

func (c *LevelListener) EvaluateRuleEntry(cycle uint64, entry *ast.RuleEntry, candidate bool) {
}

func (c *LevelListener) ExecuteRuleEntry(cycle uint64, entry *ast.RuleEntry) {
}

// TestGruleListener is a test function that verifies the behavior of the GruleEngineListener.
// It creates a DataPoint with some thresholds, sets up a GruleEngine with a custom Listener,
// and executes the engine. The test then verifies that the execution has aborted when a global
// condition is met and that rule cycle gets stopped, by validating DataPoint's TurnedOn and Index
// values are set to the expected MaxTurnOns value.
func TestGruleListener(t *testing.T) {
// Given
datapoint := &DataPoint{
Level: &Level{
Value: 1000,
},
Thresholds: []*Threshold{
{MinLevel: 200},
{MinLevel: 400},
{MinLevel: 600},
{MinLevel: 800},
},
Index: 0, // Initialize index to 0.
TurnedOn: int64(0), // Nothing turned on yet.
}

dctx := ast.NewDataContext()
err := dctx.Add("DP", datapoint)
assert.NoError(t, err)

lib := ast.NewKnowledgeLibrary()
rb := builder.NewRuleBuilder(lib)
err = rb.BuildRuleFromResource("TestListener", "0.0.1", pkg.NewBytesResource([]byte(LevelRuleWithListener)))
assert.NoError(t, err)
kb, err := lib.NewKnowledgeBaseInstance("TestListener", "0.0.1")
assert.NoError(t, err)

engine := NewGruleEngine()

// When
engine.Listeners = append(engine.Listeners, &LevelListener{kb: kb, dc: dctx, dp: datapoint})
err = engine.Execute(dctx, kb)
assert.NoError(t, err)

// Then
assert.Equal(t, MaxTurnOns, datapoint.TurnedOn) // Limit at MaxTurnOns.
assert.Equal(t, MaxTurnOns, datapoint.Index)
}