-
Notifications
You must be signed in to change notification settings - Fork 7
Live Logging #711
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Live Logging #711
Changes from 7 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
77fe456
Live Logging
luv-bansal 49ce2de
Live logging and patch signature in local-runner
luv-bansal 0ad4729
Fix patching model signature
luv-bansal 36c27b8
Improve logging
luv-bansal dff66c6
Remove patch signatures
luv-bansal 82d9659
fix edge case
luv-bansal a8d1db3
fix test
luv-bansal 3e42f4a
handle edge case
luv-bansal 6f4b509
status in constant file
luv-bansal File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -1,3 +1,4 @@ | ||||||
import time | ||||||
from typing import Iterator | ||||||
|
||||||
from clarifai_grpc.grpc.api import service_pb2 | ||||||
|
@@ -6,6 +7,7 @@ | |||||
from clarifai_protocol.utils.health import HealthProbeRequestHandler | ||||||
|
||||||
from clarifai.client.auth.helper import ClarifaiAuthHelper | ||||||
from clarifai.utils.logging import get_req_id_from_context, logger | ||||||
|
||||||
from ..utils.url_fetcher import ensure_urls_downloaded | ||||||
from .model_class import ModelClass | ||||||
|
@@ -106,13 +108,21 @@ def runner_item_predict( | |||||
raise Exception("Unexpected work item type: {}".format(runner_item)) | ||||||
request = runner_item.post_model_outputs_request | ||||||
ensure_urls_downloaded(request, auth_helper=self._auth_helper) | ||||||
start_time = time.time() | ||||||
req_id = get_req_id_from_context() | ||||||
status_str = "UNKNOWN" | ||||||
# Endpoint is always POST /v2/.../outputs for this runner | ||||||
endpoint = "POST /v2/.../outputs " | ||||||
luv-bansal marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
resp = self.model.predict_wrapper(request) | ||||||
# if we have any non-successful code already it's an error we can return. | ||||||
if ( | ||||||
resp.status.code != status_code_pb2.SUCCESS | ||||||
and resp.status.code != status_code_pb2.ZERO | ||||||
): | ||||||
status_str = f"{resp.status.code} ERROR" | ||||||
duration_ms = (time.time() - start_time) * 1000 | ||||||
logger.info(f"{endpoint} | {status_str} | {duration_ms:.2f}ms | req_id={req_id}") | ||||||
return service_pb2.RunnerItemOutput(multi_output_response=resp) | ||||||
successes = [] | ||||||
for output in resp.outputs: | ||||||
|
@@ -126,18 +136,23 @@ def runner_item_predict( | |||||
code=status_code_pb2.SUCCESS, | ||||||
description="Success", | ||||||
) | ||||||
status_str = "200 OK" | ||||||
luv-bansal marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
elif any(successes): | ||||||
status = status_pb2.Status( | ||||||
code=status_code_pb2.MIXED_STATUS, | ||||||
description="Mixed Status", | ||||||
) | ||||||
status_str = "207 MIXED" | ||||||
luv-bansal marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
else: | ||||||
status = status_pb2.Status( | ||||||
code=status_code_pb2.FAILURE, | ||||||
description="Failed", | ||||||
) | ||||||
status_str = "500 FAIL" | ||||||
luv-bansal marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
resp.status.CopyFrom(status) | ||||||
duration_ms = (time.time() - start_time) * 1000 | ||||||
logger.info(f"{endpoint} | {status_str} | {duration_ms:.2f}ms | req_id={req_id}") | ||||||
return service_pb2.RunnerItemOutput(multi_output_response=resp) | ||||||
|
||||||
def runner_item_generate( | ||||||
|
@@ -150,12 +165,21 @@ def runner_item_generate( | |||||
request = runner_item.post_model_outputs_request | ||||||
ensure_urls_downloaded(request, auth_helper=self._auth_helper) | ||||||
|
||||||
# --- Live logging additions --- | ||||||
start_time = time.time() | ||||||
req_id = get_req_id_from_context() | ||||||
status_str = "UNKNOWN" | ||||||
endpoint = "POST /v2/.../outputs/generate" | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nitpick] The hardcoded endpoint string uses '...' which is unclear. Consider extracting these endpoint strings to constants to avoid duplication and improve maintainability.
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||
|
||||||
for resp in self.model.generate_wrapper(request): | ||||||
# if we have any non-successful code already it's an error we can return. | ||||||
if ( | ||||||
resp.status.code != status_code_pb2.SUCCESS | ||||||
and resp.status.code != status_code_pb2.ZERO | ||||||
): | ||||||
status_str = f"{resp.status.code} ERROR" | ||||||
duration_ms = (time.time() - start_time) * 1000 | ||||||
logger.info(f"{endpoint} | {status_str} | {duration_ms:.2f}ms | req_id={req_id}") | ||||||
yield service_pb2.RunnerItemOutput(multi_output_response=resp) | ||||||
continue | ||||||
successes = [] | ||||||
|
@@ -170,30 +194,44 @@ def runner_item_generate( | |||||
code=status_code_pb2.SUCCESS, | ||||||
description="Success", | ||||||
) | ||||||
status_str = "200 OK" | ||||||
elif any(successes): | ||||||
status = status_pb2.Status( | ||||||
code=status_code_pb2.MIXED_STATUS, | ||||||
description="Mixed Status", | ||||||
) | ||||||
status_str = "207 MIXED" | ||||||
else: | ||||||
status = status_pb2.Status( | ||||||
code=status_code_pb2.FAILURE, | ||||||
description="Failed", | ||||||
) | ||||||
status_str = "500 FAIL" | ||||||
resp.status.CopyFrom(status) | ||||||
|
||||||
yield service_pb2.RunnerItemOutput(multi_output_response=resp) | ||||||
|
||||||
duration_ms = (time.time() - start_time) * 1000 | ||||||
logger.info(f"{endpoint} | {status_str} | {duration_ms:.2f}ms | req_id={req_id}") | ||||||
luv-bansal marked this conversation as resolved.
Show resolved
Hide resolved
luv-bansal marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
def runner_item_stream( | ||||||
self, runner_item_iterator: Iterator[service_pb2.RunnerItem] | ||||||
) -> Iterator[service_pb2.RunnerItemOutput]: | ||||||
# Call the generate() method the underlying model implements. | ||||||
start_time = time.time() | ||||||
req_id = get_req_id_from_context() | ||||||
status_str = "UNKNOWN" | ||||||
endpoint = "POST /v2/.../outputs/stream " | ||||||
luv-bansal marked this conversation as resolved.
Show resolved
Hide resolved
luv-bansal marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
for resp in self.model.stream_wrapper(pmo_iterator(runner_item_iterator)): | ||||||
# if we have any non-successful code already it's an error we can return. | ||||||
if ( | ||||||
resp.status.code != status_code_pb2.SUCCESS | ||||||
and resp.status.code != status_code_pb2.ZERO | ||||||
): | ||||||
status_str = f"{resp.status.code} ERROR" | ||||||
duration_ms = (time.time() - start_time) * 1000 | ||||||
logger.info(f"{endpoint} | {status_str} | {duration_ms:.2f}ms | req_id={req_id}") | ||||||
yield service_pb2.RunnerItemOutput(multi_output_response=resp) | ||||||
continue | ||||||
successes = [] | ||||||
|
@@ -208,20 +246,26 @@ def runner_item_stream( | |||||
code=status_code_pb2.SUCCESS, | ||||||
description="Success", | ||||||
) | ||||||
status_str = "200 OK" | ||||||
elif any(successes): | ||||||
status = status_pb2.Status( | ||||||
code=status_code_pb2.MIXED_STATUS, | ||||||
description="Mixed Status", | ||||||
) | ||||||
status_str = "207 MIXED" | ||||||
else: | ||||||
status = status_pb2.Status( | ||||||
code=status_code_pb2.FAILURE, | ||||||
description="Failed", | ||||||
) | ||||||
status_str = "500 FAIL" | ||||||
resp.status.CopyFrom(status) | ||||||
|
||||||
yield service_pb2.RunnerItemOutput(multi_output_response=resp) | ||||||
|
||||||
duration_ms = (time.time() - start_time) * 1000 | ||||||
luv-bansal marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
logger.info(f"{endpoint} | {status_str} | {duration_ms:.2f}ms | req_id={req_id}") | ||||||
luv-bansal marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
|
||||||
def pmo_iterator(runner_item_iterator, auth_helper=None): | ||||||
for runner_item in runner_item_iterator: | ||||||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The endpoint string contains trailing whitespace which appears to be for formatting alignment. Consider using a consistent string format without manual padding, or use a constant if alignment is needed for display purposes.
Copilot uses AI. Check for mistakes.