1
1
import ast
2
2
import json
3
-
3
+ import rich
4
4
import aiohttp
5
5
6
- from .models import EntityScanResult , ServerScanResult , entity_type_to_str
6
+ from .models import EntityScanResult , ScanPathResult
7
7
8
8
9
- async def verify_server (server_scan_result : ServerScanResult , base_url : str ) -> ServerScanResult :
10
- result = server_scan_result .model_copy (deep = True )
11
- if len (server_scan_result .entities ) == 0 :
12
- return result
13
- messages = [
14
- {
15
- "role" : "system" ,
16
- "content" : (
17
- f"{ entity_type_to_str (entity ).capitalize ()} Name:{ entity .name } \n "
18
- f"{ entity_type_to_str (entity ).capitalize ()} Description:{ entity .description } "
19
- ),
20
- }
21
- for entity in server_scan_result .entities
22
- ]
9
+ async def verify_server (scan_path : ScanPathResult , base_url : str ) -> ScanPathResult :
10
+ if len (scan_path .entities ) == 0 :
11
+ return scan_path .model_copy (deep = True )
23
12
url = base_url [:- 1 ] if base_url .endswith ("/" ) else base_url
24
- url = url + "/api/v1 /public/mcp"
13
+ url = url + "/api/v2 /public/mcp"
25
14
headers = {"Content-Type" : "application/json" }
26
- data = {
27
- "messages" : messages ,
28
- }
29
15
try :
30
16
async with aiohttp .ClientSession () as session :
31
- async with session .post (url , headers = headers , data = json . dumps ( data )) as response :
17
+ async with session .post (url , headers = headers , data = scan_path . model_dump_json ( )) as response :
32
18
if response .status == 200 :
33
- response_content : dict = await response .json ()
34
- result .result = [EntityScanResult (verified = True ) for _ in messages ]
35
- for error in response_content .get ("errors" , []):
36
- key = ast .literal_eval (error ["key" ])
37
- idx = key [1 ][0 ]
38
- result .result [idx ].verified = False
39
- result .result [idx ].status = "failed - " + " " .join (error ["args" ])
40
- return result
19
+ return ScanPathResult .model_validate_json (await response .read ())
41
20
else :
42
21
raise Exception (f"Error: { response .status } - { await response .text ()} " )
43
22
except Exception as e :
@@ -46,5 +25,7 @@ async def verify_server(server_scan_result: ServerScanResult, base_url: str) ->
46
25
errstr = errstr .splitlines ()[0 ]
47
26
except Exception :
48
27
errstr = ""
49
- result .result = [EntityScanResult (status = "could not reach verification server " + errstr ) for _ in messages ]
28
+ result = scan_path .model_copy (deep = True )
29
+ for server in result .servers :
30
+ server .result = [EntityScanResult (status = "could not reach verification server " + errstr ) for _ in server .entities ]
50
31
return result
0 commit comments