Skip to content

Commit a47ff35

Browse files
committed
Add metrics
1 parent a158cb6 commit a47ff35

File tree

1 file changed

+45
-22
lines changed

1 file changed

+45
-22
lines changed

singlestoredb/functions/ext/asgi.py

Lines changed: 45 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,10 @@
4040
import tempfile
4141
import textwrap
4242
import threading
43+
import time
4344
import typing
4445
import urllib
46+
import uuid
4547
import zipfile
4648
import zipimport
4749
from types import ModuleType
@@ -69,6 +71,7 @@
6971
from ..signature import signature_to_sql
7072
from ..typing import Masked
7173
from ..typing import Table
74+
from .timer import Timer
7275

7376
try:
7477
import cloudpickle
@@ -613,6 +616,16 @@ def cancel_all_tasks(tasks: Iterable[asyncio.Task[Any]]) -> None:
613616
pass
614617

615618

619+
def start_counter() -> float:
620+
"""Start a timer and return the start time."""
621+
return time.perf_counter()
622+
623+
624+
def end_counter(start: float) -> float:
625+
"""End a timer and return the elapsed time."""
626+
return time.perf_counter() - start
627+
628+
616629
class Application(object):
617630
"""
618631
Create an external function application.
@@ -939,6 +952,8 @@ async def __call__(
939952
Function to send response information
940953
941954
'''
955+
timer = Timer(id=str(uuid.uuid4()), timestamp=time.time())
956+
942957
assert scope['type'] == 'http'
943958

944959
method = scope['method']
@@ -964,12 +979,13 @@ async def __call__(
964979
returns_data_format = func_info['returns_data_format']
965980
data = []
966981
more_body = True
967-
while more_body:
968-
request = await receive()
969-
if request['type'] == 'http.disconnect':
970-
raise RuntimeError('client disconnected')
971-
data.append(request['body'])
972-
more_body = request.get('more_body', False)
982+
with timer('receive_data'):
983+
while more_body:
984+
request = await receive()
985+
if request['type'] == 'http.disconnect':
986+
raise RuntimeError('client disconnected')
987+
data.append(request['body'])
988+
more_body = request.get('more_body', False)
973989

974990
data_version = headers.get(b's2-ef-version', b'')
975991
input_handler = self.handlers[(content_type, data_version, args_data_format)]
@@ -981,17 +997,17 @@ async def __call__(
981997

982998
cancel_event = threading.Event()
983999

984-
func_args = [
985-
cancel_event,
986-
*input_handler['load']( # type: ignore
1000+
with timer('parse_input'):
1001+
inputs = input_handler['load']( # type: ignore
9871002
func_info['colspec'], b''.join(data),
988-
),
989-
]
1003+
)
9901004

9911005
func_task = asyncio.create_task(
992-
func(*func_args)
1006+
func(cancel_event, *inputs)
9931007
if func_info['is_async']
994-
else to_thread(lambda: asyncio.run(func(*func_args))),
1008+
else to_thread(
1009+
lambda: asyncio.run(func(cancel_event, *inputs)),
1010+
),
9951011
)
9961012
disconnect_task = asyncio.create_task(
9971013
cancel_on_disconnect(receive),
@@ -1002,9 +1018,10 @@ async def __call__(
10021018

10031019
all_tasks += [func_task, disconnect_task, timeout_task]
10041020

1005-
done, pending = await asyncio.wait(
1006-
all_tasks, return_when=asyncio.FIRST_COMPLETED,
1007-
)
1021+
with timer('function_call'):
1022+
done, pending = await asyncio.wait(
1023+
all_tasks, return_when=asyncio.FIRST_COMPLETED,
1024+
)
10081025

10091026
cancel_all_tasks(pending)
10101027

@@ -1024,9 +1041,10 @@ async def __call__(
10241041
elif task is func_task:
10251042
result.extend(task.result())
10261043

1027-
body = output_handler['dump'](
1028-
[x[1] for x in func_info['returns']], *result, # type: ignore
1029-
)
1044+
with timer('format_output'):
1045+
body = output_handler['dump'](
1046+
[x[1] for x in func_info['returns']], *result, # type: ignore
1047+
)
10301048

10311049
await send(output_handler['response'])
10321050

@@ -1089,9 +1107,14 @@ async def __call__(
10891107
await send(self.path_not_found_response_dict)
10901108

10911109
# Send body
1092-
out = self.body_response_dict.copy()
1093-
out['body'] = body
1094-
await send(out)
1110+
with timer('send_response'):
1111+
out = self.body_response_dict.copy()
1112+
out['body'] = body
1113+
await send(out)
1114+
1115+
timer.metadata['function'] = func_name.decode('utf-8') if func_name else ''
1116+
timer.finish()
1117+
timer.log_metrics()
10951118

10961119
def _create_link(
10971120
self,

0 commit comments

Comments
 (0)