-
Notifications
You must be signed in to change notification settings - Fork 562
feat(server): adoption for off-heap memory management in kout module #2704
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #2704 +/- ##
============================================
- Coverage 46.99% 37.56% -9.43%
+ Complexity 821 588 -233
============================================
Files 745 756 +11
Lines 60062 62108 +2046
Branches 7670 8068 +398
============================================
- Hits 28225 23330 -4895
- Misses 29014 36266 +7252
+ Partials 2823 2512 -311 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| "'{}', max degree '{}', capacity '{}' and limit '{}'", | ||
| graph, source, direction, edgeLabel, depth, | ||
| nearest, maxDegree, capacity, limit); | ||
| MemoryPool queryPool = MemoryManager.getInstance().addQueryMemoryPool(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we bind a MemoryPool for each graph, or bind MemoryManager to graph?
| .bindCorrespondingTaskMemoryPool(Thread.currentThread().getName(), | ||
| (TaskMemoryPool) currentTaskPool); | ||
| MemoryPool currentOperationPool = currentTaskPool.addChildPool("kout-main-operation"); | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a method like graph.switchToMemoryPool("kout", "main")?
| }); | ||
|
|
||
| ApiMeasurer measure = new ApiMeasurer(); | ||
| try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer to add a wrapper method for MemoryPool init-and-gc, then call the original method?
| return QueryResults.emptyIterator(); | ||
| } | ||
| if (needCacheVertex(vertex)) { | ||
| vertex.convertIdToOnHeapIfNeeded(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's ok to just call convert in HeapCache.update(), at the same time, we avoid modifying the code everywhere
| // NOTE: it's slower performance to use: | ||
| // String.format("%x-%s", type.code(), name) | ||
| return IdGenerator.of(type.string() + "-" + id.asString()); | ||
| return new IdGenerator.StringId(type.string() + "-" + id.asString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a OnHeapIdGenerator.of() class?
| Thread.currentThread() | ||
| .getName()); | ||
| return new BinaryIdOffHeap(bytes, null, | ||
| return taskMemoryPool == null ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expect this.taskMemoryPool style
| TaskMemoryPool taskMemoryPool = (TaskMemoryPool) MemoryManager.getInstance() | ||
| .getCorrespondingTaskMemoryPool( | ||
| Thread.currentThread() | ||
| .getName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer by Thread id instead of Thread name to avoid duplicate names
| return null; | ||
| } | ||
|
|
||
| int count = queryMemoryPools.size(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expect this.xx stype
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the difference between QueryMemoryPool and MemoryPool? if no difference, just naming MemoryPool is ok.
and can we make some special names for CorrespondingTaskMemoryPool and CurrentWorkingOperatorMemoryPool, such as RequestMemoryPool and RequestStageMemoryPool
| private final ExecutorService arbitrateExecutor; | ||
|
|
||
| private static MemoryMode MEMORY_MODE = MemoryMode.ENABLE_OFF_HEAP_MANAGEMENT; | ||
| private static MemoryMode MEMORY_MODE = MemoryMode.DISABLE_MEMORY_MANAGEMENT; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MEMORY_MODE style is only for const var, and if there is only a single MemoryManager, also remove static mark: private MemoryMode memoryMode
| return new BinaryIdOffHeap(bytes, null, | ||
| return taskMemoryPool == null ? | ||
| new BinaryBackendEntry.BinaryId(bytes, id) : | ||
| new BinaryIdOffHeap(bytes, null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can directly get the memory pool at one time MemoryManager.getInstance().currentMemoryPool(), it does the 2 steps:
getCorrespondingTaskMemoryPool, getCurrentWorkingOperatorMemoryPool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and can we rename some methods for a more concise and consistent style:
- getCorrespondingTaskMemoryPool => currentTaskMemoryPool
- getCurrentWorkingOperatorMemoryPool => currentOperatorMemoryPool
- MemoryPool.getSnapShot() => MemoryPool.getStats()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also move MemoryPoolStats out from impl?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer to get MemoryPool from threadlocal
|
Due to the lack of activity, the current pr is marked as stale and will be closed after 180 days, any update will remove the stale label |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a great improvement to our memory performance, looking forward to merging it soon.
| .bindCorrespondingTaskMemoryPool(Thread.currentThread().getName(), | ||
| (TaskMemoryPool) currentTaskPool); | ||
| MemoryPool currentOperationPool = | ||
| currentTaskPool.addChildPool("kout-consume-operation"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the Consumers is a general class, seems it's not appropriate to put the upper-level kout code here
| this.runningFutures.add( | ||
| this.executor.submit(new ContextCallable<>(this::runAndDone))); | ||
| this.executor.submit( | ||
| new ContextCallable<>(() -> this.runAndDone(MemoryManager.getInstance() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we move this TaskMemoryPool-bind code into ContextCallable?
| return new BinaryIdOffHeap(bytes, null, | ||
| return taskMemoryPool == null ? | ||
| new BinaryBackendEntry.BinaryId(bytes, id) : | ||
| new BinaryIdOffHeap(bytes, null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and can we rename some methods for a more concise and consistent style:
- getCorrespondingTaskMemoryPool => currentTaskMemoryPool
- getCurrentWorkingOperatorMemoryPool => currentOperatorMemoryPool
- MemoryPool.getSnapShot() => MemoryPool.getStats()
| MemoryManager.getInstance() | ||
| .bindCorrespondingTaskMemoryPool(Thread.currentThread().getName(), | ||
| (TaskMemoryPool) currentTaskPool); | ||
| MemoryPool currentOperationPool = currentTaskPool.addChildPool("kout-main-operation"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can let:
- MemoryPool.addChildPool return QueryMemoryPool
- QueryMemoryPool.addChildPool return TaskMemoryPool
- TaskMemoryPooll.addChildPool return OperatorMemoryPool
| return new BinaryIdOffHeap(bytes, null, | ||
| return taskMemoryPool == null ? | ||
| new BinaryBackendEntry.BinaryId(bytes, id) : | ||
| new BinaryIdOffHeap(bytes, null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also move MemoryPoolStats out from impl?
| } | ||
| return manager.serializer(g, measure.measures()).writeList("vertices", ids); | ||
| } finally { | ||
| Optional.ofNullable(queryPool) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer if-style
| return new BinaryIdOffHeap(bytes, null, | ||
| return taskMemoryPool == null ? | ||
| new BinaryBackendEntry.BinaryId(bytes, id) : | ||
| new BinaryIdOffHeap(bytes, null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer to get MemoryPool from threadlocal
| this.isOutEdge = true; | ||
| } | ||
|
|
||
| public void convertIdToOnHeapIfNeeded() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to super class?
| return new HugeEdgeProperty<>(this, pkey, val); | ||
| Class<V> valueType = (Class<V>) val.getClass(); | ||
| PropertyFactory<V> propertyFactory = PropertyFactory.getInstance(valueType); | ||
| return propertyFactory.newHugeEdgeProperty(this, pkey, val); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have no OffheapEdge?
| new HugeVertexProperty<>(owner, key, value) : | ||
| new HugeVertexPropertyOffHeap<>( | ||
| taskMemoryPool.getCurrentWorkingOperatorMemoryPool(), owner, key, | ||
| value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we avoid bytes copy in serializeSelfToByteBuf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- preter to let OffHeapObject.serializeSelfToByteBuf return ByteBuf.
- can we rename OffHeapObject.zeroCopyReadFromByteBuf to readAsHeapObject?
- OffHeapObject.getAllMemoryBlock is useless? will we release an object individually?
- HugeVertexPropertyOffHeap.isPresent optimize with 'valueOffHeap != null'
wip...