Skip to content

Commit 8e069d7

Browse files
committed
TopK optimization to not keep all values, instead have max of k values per window
1 parent ada8d3a commit 8e069d7

File tree

8 files changed

+496
-15
lines changed

8 files changed

+496
-15
lines changed

Sources/Core/Microsoft.StreamProcessing/Aggregates/SortedMultisetAggregateBase.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ protected SortedMultisetAggregateBase(IComparerExpression<T> comparer, QueryCont
1717
Expression<Func<Func<SortedDictionary<T, long>>, SortedMultiSet<T>>> template
1818
= (g) => new SortedMultiSet<T>(g);
1919
var replaced = template.ReplaceParametersInBody(generator);
20-
initialState = Expression.Lambda<Func<SortedMultiSet<T>>>(replaced);
20+
this.initialState = Expression.Lambda<Func<SortedMultiSet<T>>>(replaced);
2121
}
2222

2323
private readonly Expression<Func<SortedMultiSet<T>>> initialState;

Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKAggregate.cs

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,27 +6,37 @@
66
using System.Collections.Generic;
77
using System.Diagnostics.Contracts;
88
using System.Linq.Expressions;
9+
using Microsoft.StreamProcessing.Internal;
910

1011
namespace Microsoft.StreamProcessing.Aggregates
1112
{
12-
internal sealed class TopKAggregate<T> : SortedMultisetAggregateBase<T, List<RankedEvent<T>>>
13+
internal sealed class TopKAggregate<T> : IAggregate<T, ITopKState<T>, List<RankedEvent<T>>>
1314
{
1415
private readonly Comparison<T> compiledRankComparer;
1516
private readonly int k;
1617

17-
public TopKAggregate(int k, QueryContainer container) : this(k, ComparerExpression<T>.Default, container) { }
18+
public TopKAggregate(int k, IComparerExpression<T> rankComparer, QueryContainer container, bool isHoppingWindow)
19+
: this(k, rankComparer, ComparerExpression<T>.Default, container, isHoppingWindow) { }
1820

19-
public TopKAggregate(int k, IComparerExpression<T> rankComparer, QueryContainer container)
20-
: this(k, rankComparer, ComparerExpression<T>.Default, container) { }
21-
22-
public TopKAggregate(int k, IComparerExpression<T> rankComparer, IComparerExpression<T> overallComparer, QueryContainer container)
23-
: base(ThenOrderBy(Reverse(rankComparer), overallComparer), container)
21+
public TopKAggregate(int k, IComparerExpression<T> rankComparer, IComparerExpression<T> overallComparer,
22+
QueryContainer container, bool isHoppingWindow)
2423
{
2524
Contract.Requires(rankComparer != null);
2625
Contract.Requires(overallComparer != null);
2726
Contract.Requires(k > 0);
2827
this.compiledRankComparer = Reverse(rankComparer).GetCompareExpr().Compile();
2928
this.k = k;
29+
30+
Expression<Func<Func<SortedDictionary<T, long>>, ITopKState<T>>> template;
31+
if (isHoppingWindow)
32+
template = (g) => new HoppingTopKState<T>(k, compiledRankComparer, g);
33+
else
34+
template = (g) => new SimpleTopKState<T>(g);
35+
36+
var combinedComparer = ThenOrderBy(Reverse(rankComparer), overallComparer);
37+
var generator = combinedComparer.CreateSortedDictionaryGenerator<T, long>(container);
38+
var replaced = template.ReplaceParametersInBody(generator);
39+
this.initialState = Expression.Lambda<Func<ITopKState<T>>>(replaced);
3040
}
3141

3242
private static IComparerExpression<T> Reverse(IComparerExpression<T> comparer)
@@ -53,10 +63,11 @@ private static IComparerExpression<T> ThenOrderBy(IComparerExpression<T> compare
5363
return new ComparerExpression<T>(newExpression);
5464
}
5565

56-
public override Expression<Func<SortedMultiSet<T>, List<RankedEvent<T>>>> ComputeResult() => set => GetTopK(set);
66+
public Expression<Func<ITopKState<T>, List<RankedEvent<T>>>> ComputeResult() => set => GetTopK(set);
5767

58-
private List<RankedEvent<T>> GetTopK(SortedMultiSet<T> set)
68+
private List<RankedEvent<T>> GetTopK(ITopKState<T> state)
5969
{
70+
var set = state.GetSortedValues();
6071
int count = (int)Math.Min(this.k, set.TotalCount);
6172
var result = new List<RankedEvent<T>>(count);
6273
int nextRank = 1;
@@ -82,5 +93,20 @@ private List<RankedEvent<T>> GetTopK(SortedMultiSet<T> set)
8293

8394
return result;
8495
}
96+
97+
private readonly Expression<Func<ITopKState<T>>> initialState;
98+
public Expression<Func<ITopKState<T>>> InitialState() => initialState;
99+
100+
private static readonly Expression<Func<ITopKState<T>, long, T, ITopKState<T>>> acc
101+
= (state, timestamp, input) => state.Add(input, timestamp);
102+
public Expression<Func<ITopKState<T>, long, T, ITopKState<T>>> Accumulate() => acc;
103+
104+
private static readonly Expression<Func<ITopKState<T>, long, T, ITopKState<T>>> dec
105+
= (state, timestamp, input) => state.Remove(input, timestamp);
106+
public Expression<Func<ITopKState<T>, long, T, ITopKState<T>>> Deaccumulate() => dec;
107+
108+
private static readonly Expression<Func<ITopKState<T>, ITopKState<T>, ITopKState<T>>> diff
109+
= (leftState, rightState) => leftState.RemoveAll(rightState);
110+
public Expression<Func<ITopKState<T>, ITopKState<T>, ITopKState<T>>> Difference() => diff;
85111
}
86112
}
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
5+
namespace Microsoft.StreamProcessing.Aggregates
6+
{
7+
/// <summary>
8+
/// State used by TopK Aggregate
9+
/// </summary>
10+
/// <typeparam name="T"></typeparam>
11+
public interface ITopKState<T>
12+
{
13+
/// <summary>
14+
/// Add a single entry
15+
/// </summary>
16+
/// <param name="input"></param>
17+
/// <param name="timestamp"></param>
18+
ITopKState<T> Add(T input, long timestamp);
19+
20+
/// <summary>
21+
/// Removes the specified entry
22+
/// </summary>
23+
/// <param name="input"></param>
24+
/// <param name="timestamp"></param>
25+
ITopKState<T> Remove(T input, long timestamp);
26+
27+
/// <summary>
28+
/// Removes entries from other
29+
/// </summary>
30+
/// <param name="other"></param>
31+
ITopKState<T> RemoveAll(ITopKState<T> other);
32+
33+
/// <summary>
34+
/// Gets the values as sorted set
35+
/// </summary>
36+
/// <returns></returns>
37+
SortedMultiSet<T> GetSortedValues();
38+
39+
/// <summary>
40+
/// Returns total number of values in the set
41+
/// </summary>
42+
long Count { get; }
43+
}
44+
45+
internal class SimpleTopKState<T> : ITopKState<T>
46+
{
47+
private SortedMultiSet<T> values;
48+
49+
public SimpleTopKState(Func<SortedDictionary<T, long>> generator)
50+
{
51+
this.values = new SortedMultiSet<T>(generator);
52+
}
53+
54+
public long Count => this.values.TotalCount;
55+
56+
public virtual ITopKState<T> Add(T input, long timestamp)
57+
{
58+
this.values.Add(input);
59+
return this;
60+
}
61+
62+
public SortedMultiSet<T> GetSortedValues() => this.values;
63+
64+
public ITopKState<T> Remove(T input, long timestamp)
65+
{
66+
this.values.Remove(input);
67+
return this;
68+
}
69+
70+
public ITopKState<T> RemoveAll(ITopKState<T> other)
71+
{
72+
this.values.RemoveAll(other.GetSortedValues());
73+
return this;
74+
}
75+
}
76+
77+
internal class HoppingTopKState<T> : ITopKState<T>
78+
{
79+
public long currentTimestamp;
80+
81+
public SortedMultiSet<T> previousValues;
82+
public SortedMultiSet<T> currentValues;
83+
84+
public int k;
85+
86+
public Comparison<T> rankComparer;
87+
88+
public HoppingTopKState(int k, Comparison<T> rankComparer, Func<SortedDictionary<T, long>> generator)
89+
{
90+
this.k = k;
91+
this.rankComparer = rankComparer;
92+
this.currentValues = new SortedMultiSet<T>(generator);
93+
this.previousValues = new SortedMultiSet<T>(generator);
94+
}
95+
96+
public ITopKState<T> Add(T input, long timestamp)
97+
{
98+
if (timestamp > this.currentTimestamp)
99+
{
100+
MergeCurrentToPrevious();
101+
this.currentTimestamp = timestamp;
102+
}
103+
else if (timestamp < currentTimestamp)
104+
{
105+
throw new ArgumentException("Invalid timestamp");
106+
}
107+
108+
this.currentValues.Add(input);
109+
110+
var toRemove = this.currentValues.TotalCount - k;
111+
if (toRemove > 0)
112+
{
113+
var min = this.currentValues.GetMinItem();
114+
if (toRemove == min.Count)
115+
{
116+
this.currentValues.Remove(min.Item, min.Count);
117+
}
118+
else if (toRemove > min.Count)
119+
{
120+
throw new InvalidOperationException("CurrentValues has more items than required");
121+
}
122+
}
123+
return this;
124+
}
125+
126+
public ITopKState<T> Remove(T input, long timestamp)
127+
{
128+
if (timestamp < this.currentTimestamp)
129+
{
130+
this.previousValues.Remove(input);
131+
}
132+
else if (timestamp == this.currentTimestamp)
133+
{
134+
this.currentValues.Remove(input);
135+
}
136+
else
137+
{
138+
throw new ArgumentException("Invalid timestamp");
139+
}
140+
return this;
141+
}
142+
143+
public ITopKState<T> RemoveAll(ITopKState<T> other)
144+
{
145+
if (other.Count != 0)
146+
{
147+
if (other is HoppingTopKState<T> otherTopK)
148+
{
149+
if (otherTopK.currentTimestamp >= this.currentTimestamp)
150+
{
151+
throw new ArgumentException("Cannot remove entries with current or future timestamp");
152+
}
153+
this.previousValues.RemoveAll(otherTopK.currentValues);
154+
this.previousValues.RemoveAll(otherTopK.previousValues);
155+
}
156+
else
157+
{
158+
throw new InvalidOperationException("Cannot remove non-HoppingTopKState from HoppingTopKState");
159+
}
160+
}
161+
return this;
162+
}
163+
164+
// This function merges the current values to previous and is expensive
165+
// Currently it is only called by ComputeResult
166+
public SortedMultiSet<T> GetSortedValues()
167+
{
168+
if (this.previousValues.IsEmpty)
169+
return this.currentValues;
170+
else
171+
{
172+
MergeCurrentToPrevious();
173+
return this.previousValues;
174+
}
175+
}
176+
177+
private void MergeCurrentToPrevious()
178+
{
179+
// Swap so we merge small onto larger
180+
if (this.previousValues.UniqueCount < this.currentValues.UniqueCount)
181+
{
182+
var temp = this.previousValues;
183+
this.previousValues = this.currentValues;
184+
this.currentValues = temp;
185+
}
186+
187+
if (!this.currentValues.IsEmpty)
188+
{
189+
this.previousValues.AddAll(this.currentValues);
190+
this.currentValues.Clear();
191+
}
192+
}
193+
194+
public long Count => this.currentValues.TotalCount + this.previousValues.TotalCount;
195+
}
196+
}

Sources/Core/Microsoft.StreamProcessing/Collections/SortedMultiSet.cs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,5 +267,39 @@ public IEnumerable<T> GetEnumerable()
267267
yield return keyAndCount.Key;
268268
}
269269
}
270+
271+
/// <summary>
272+
/// Get the smallest element (and its number of occurances) in the collection
273+
/// </summary>
274+
/// <returns>The smallest element and its count</returns>
275+
[EditorBrowsable(EditorBrowsableState.Never)]
276+
public ItemAndCount<T> GetMinItem()
277+
{
278+
var minItem = this.Elements.FirstOrDefault();
279+
return new ItemAndCount<T>(minItem.Key, minItem.Value);
280+
}
281+
}
282+
283+
/// <summary>
284+
/// Represents an Item and its count used in MultiSet
285+
/// </summary>
286+
/// <typeparam name="T"></typeparam>
287+
public struct ItemAndCount<T>
288+
{
289+
internal ItemAndCount(T item, long count)
290+
{
291+
Item = item;
292+
Count = count;
293+
}
294+
295+
/// <summary>
296+
/// Item type used in Multiset
297+
/// </summary>
298+
public readonly T Item;
299+
300+
/// <summary>
301+
/// Count of items with same value
302+
/// </summary>
303+
public readonly long Count;
270304
}
271305
}

Sources/Core/Microsoft.StreamProcessing/Windows/Window.cs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,25 +162,29 @@ public IAggregate<TSource, MinMaxState<TValue>, TValue> Max<TValue>(Expression<F
162162
/// <summary>
163163
/// Computes a time-sensitive top-k aggregate using snapshot semantics based on a key selector.
164164
/// </summary>
165-
public IAggregate<TSource, SortedMultiSet<TSource>, List<RankedEvent<TSource>>> TopK<TOrderValue>(Expression<Func<TSource, TOrderValue>> orderer, int k)
165+
public IAggregate<TSource, ITopKState<TSource>, List<RankedEvent<TSource>>> TopK<TOrderValue>(Expression<Func<TSource, TOrderValue>> orderer, int k)
166166
{
167167
Invariant.IsNotNull(orderer, nameof(orderer));
168168
Invariant.IsPositive(k, nameof(k));
169169
var orderComparer = ComparerExpression<TOrderValue>.Default.TransformInput(orderer);
170-
var aggregate = new TopKAggregate<TSource>(k, orderComparer, this.Properties.QueryContainer);
170+
bool isHoppingWindow = !Properties.IsTumbling && Properties.IsConstantDuration && Properties.ConstantDurationLength.HasValue &&
171+
Properties.IsConstantHop && Properties.ConstantHopLength.HasValue && (Properties.ConstantHopLength.Value % Properties.ConstantDurationLength.Value) == 0;
172+
var aggregate = new TopKAggregate<TSource>(k, orderComparer, this.Properties.QueryContainer, isHoppingWindow);
171173
return aggregate.SkipNulls().ApplyFilter(this.Filter);
172174
}
173175

174176
/// <summary>
175177
/// Computes a time-sensitive top-k aggregate using snapshot semantics based on a key selector with the provided ordering comparer.
176178
/// </summary>
177-
public IAggregate<TSource, SortedMultiSet<TSource>, List<RankedEvent<TSource>>> TopK<TOrderValue>(Expression<Func<TSource, TOrderValue>> orderer, IComparerExpression<TOrderValue> comparer, int k)
179+
public IAggregate<TSource, ITopKState<TSource>, List<RankedEvent<TSource>>> TopK<TOrderValue>(Expression<Func<TSource, TOrderValue>> orderer, IComparerExpression<TOrderValue> comparer, int k)
178180
{
179181
Invariant.IsNotNull(orderer, nameof(orderer));
180182
Invariant.IsNotNull(comparer, nameof(comparer));
181183
Invariant.IsPositive(k, nameof(k));
182184
var orderComparer = comparer.TransformInput(orderer);
183-
var aggregate = new TopKAggregate<TSource>(k, orderComparer, this.Properties.QueryContainer);
185+
bool isHoppingWindow = !Properties.IsTumbling && Properties.IsConstantDuration && Properties.ConstantDurationLength.HasValue &&
186+
Properties.IsConstantHop && Properties.ConstantHopLength.HasValue && (Properties.ConstantHopLength.Value % Properties.ConstantDurationLength.Value) == 0;
187+
var aggregate = new TopKAggregate<TSource>(k, orderComparer, this.Properties.QueryContainer, isHoppingWindow);
184188
return aggregate.SkipNulls().ApplyFilter(this.Filter);
185189
}
186190

0 commit comments

Comments
 (0)