Skip to content

Commit b5600b3

Browse files
committed
fix: add flows
1 parent c3aa794 commit b5600b3

File tree

1 file changed

+102
-9
lines changed

1 file changed

+102
-9
lines changed

src/mcp_scan/printer.py

Lines changed: 102 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,15 @@
1414
ScalarToolLabels,
1515
ScanError,
1616
ScanPathResult,
17+
ServerScanResult,
1718
ToolAnnotationsWithLabels,
1819
entity_type_to_str,
1920
hash_entity,
2021
)
2122

23+
MAX_ENTITY_NAME_LENGTH = 25
24+
MAX_ENTITY_NAME_TOXIC_FLOW_LENGTH = 30
25+
2226

2327
def format_exception(e: Exception | None) -> tuple[str, rTraceback | None]:
2428
if e is None:
@@ -68,15 +72,15 @@ def format_scalar_labels(labels: ScalarToolLabels) -> str:
6872
"""
6973
label_parts = []
7074
if labels.is_public_sink > 0:
71-
label_parts.append(f"[gold1]Public sink: {str(labels.is_public_sink).rstrip('.0')}[/gold1]")
75+
label_parts.append("Public sink")
7276
if labels.destructive > 0:
73-
label_parts.append(f"[gold1]Destructive: {str(labels.destructive).rstrip('.0')}[/gold1]")
77+
label_parts.append("Destructive")
7478
if labels.untrusted_output > 0:
75-
label_parts.append(f"[gold1]Untrusted output: {str(labels.untrusted_output).rstrip('.0')}[/gold1]")
79+
label_parts.append("Untrusted output")
7680
if labels.private_data > 0:
77-
label_parts.append(f"[gold1]Private data: {str(labels.private_data).rstrip('.0')}[/gold1]")
81+
label_parts.append("Private data")
7882

79-
return " | ".join(label_parts)
83+
return "[gray62]" + " | ".join(label_parts) + "[/gray62]"
8084

8185

8286
def format_entity_line(entity: Entity, result: EntityScanResult | None = None) -> Text:
@@ -102,9 +106,9 @@ def format_entity_line(entity: Entity, result: EntityScanResult | None = None) -
102106

103107
# right-pad & truncate name
104108
name = entity.name
105-
if len(name) > 25:
106-
name = name[:22] + "..."
107-
name = name + " " * (25 - len(name))
109+
if len(name) > MAX_ENTITY_NAME_LENGTH:
110+
name = name[: (MAX_ENTITY_NAME_LENGTH - 3)] + "..."
111+
name = name + " " * (MAX_ENTITY_NAME_LENGTH - len(name))
108112

109113
# right-pad type
110114
type = entity_type_to_str(entity)
@@ -116,13 +120,14 @@ def format_entity_line(entity: Entity, result: EntityScanResult | None = None) -
116120
isinstance(entity, Tool)
117121
and entity.annotations is not None
118122
and isinstance(entity.annotations, ToolAnnotationsWithLabels)
123+
and is_verified is not False
119124
):
120125
if isinstance(entity.annotations.labels, ScalarToolLabels):
121126
labels = format_scalar_labels(entity.annotations.labels)
122127
elif isinstance(entity.annotations.labels, ErrorLabels):
123128
labels = f"[gray62]Error in labels computation: {entity.annotations.labels.error}[/gray62]"
124129

125-
text = f"{type} {color}[bold]{name}[/bold] {icon} {labels} {status}"
130+
text = f"{type} {color}[bold]{name}[/bold] {icon} {status} {labels}"
126131

127132
if include_description:
128133
if hasattr(entity, "description") and entity.description is not None:
@@ -148,6 +153,86 @@ def format_entity_line(entity: Entity, result: EntityScanResult | None = None) -
148153
return formatted_text
149154

150155

156+
def format_tool_flow(tool_name: str, server_name: str, value: float) -> Text:
157+
text = "{tool_name} {risk}"
158+
tool_name = f"{server_name}/{tool_name}"
159+
if len(tool_name) > MAX_ENTITY_NAME_TOXIC_FLOW_LENGTH:
160+
tool_name = tool_name[: (MAX_ENTITY_NAME_TOXIC_FLOW_LENGTH - 3)] + "..."
161+
tool_name = tool_name + " " * (MAX_ENTITY_NAME_TOXIC_FLOW_LENGTH - len(tool_name))
162+
163+
risk = "[gold1]Mild[/gold1]" if value <= 1.5 else "[red]High[/red]"
164+
return Text.from_markup(text.format(tool_name=tool_name, risk=risk))
165+
166+
167+
def format_toxic_flows(servers: list[ServerScanResult]) -> list[Tree]:
168+
"""
169+
Format toxic flows from the scan results into a tree structure.
170+
"""
171+
untrusted_output_tools: list[tuple[str, str, float]] = []
172+
destructive_tools: list[tuple[str, str, float]] = []
173+
private_data_tools: list[tuple[str, str, float]] = []
174+
is_public_sink_tools: list[tuple[str, str, float]] = []
175+
176+
for server in servers:
177+
if server.signature is None:
178+
continue
179+
for tool in server.signature.tools:
180+
if (
181+
tool.annotations is not None
182+
and isinstance(tool.annotations, ToolAnnotationsWithLabels)
183+
and isinstance(tool.annotations.labels, ScalarToolLabels)
184+
):
185+
if tool.annotations.labels.untrusted_output > 0:
186+
untrusted_output_tools.append(
187+
(tool.name, server.name or "", tool.annotations.labels.untrusted_output)
188+
)
189+
if tool.annotations.labels.destructive > 0:
190+
destructive_tools.append((tool.name, server.name or "", tool.annotations.labels.destructive))
191+
if tool.annotations.labels.private_data > 0:
192+
private_data_tools.append((tool.name, server.name or "", tool.annotations.labels.private_data))
193+
if tool.annotations.labels.is_public_sink > 0:
194+
is_public_sink_tools.append((tool.name, server.name or "", tool.annotations.labels.is_public_sink))
195+
196+
untrusted_output_tools.sort(key=lambda x: x[2], reverse=True)
197+
destructive_tools.sort(key=lambda x: x[2], reverse=True)
198+
private_data_tools.sort(key=lambda x: x[2], reverse=True)
199+
is_public_sink_tools.sort(key=lambda x: x[2], reverse=True)
200+
201+
toxic_flows: list[Tree] = []
202+
203+
# Flow 1: Untrusted output -> Private data -> Public sink
204+
leak_data_flow = Tree("[bold]Leak data flow[/bold]")
205+
untrusted_output_tree = Tree("[bold]Untrusted output[/bold]")
206+
private_data_tree = Tree("[bold]Private data[/bold]")
207+
public_sink_tree = Tree("[bold]Public sink[/bold]")
208+
for tool_name, server_name, value in untrusted_output_tools:
209+
untrusted_output_tree.add(format_tool_flow(tool_name, server_name, value))
210+
for tool_name, server_name, value in private_data_tools:
211+
private_data_tree.add(format_tool_flow(tool_name, server_name, value))
212+
for tool_name, server_name, value in is_public_sink_tools:
213+
public_sink_tree.add(format_tool_flow(tool_name, server_name, value))
214+
if len(untrusted_output_tools) > 0 and len(private_data_tools) > 0 and len(is_public_sink_tools) > 0:
215+
leak_data_flow.add(untrusted_output_tree)
216+
leak_data_flow.add(private_data_tree)
217+
leak_data_flow.add(public_sink_tree)
218+
toxic_flows.append(leak_data_flow)
219+
220+
# Flow 2: Untrusted output -> Destructive
221+
destructive_flow = Tree("[bold]Harm flow[/bold]")
222+
untrusted_output_tree = Tree("[bold]Untrusted output[/bold]")
223+
destructive_tree = Tree("[bold]Destructive[/bold]")
224+
for tool_name, server_name, value in untrusted_output_tools:
225+
untrusted_output_tree.add(format_tool_flow(tool_name, server_name, value))
226+
for tool_name, server_name, value in destructive_tools:
227+
destructive_tree.add(format_tool_flow(tool_name, server_name, value))
228+
if len(untrusted_output_tools) > 0 and len(destructive_tools) > 0:
229+
destructive_flow.add(untrusted_output_tree)
230+
destructive_flow.add(destructive_tree)
231+
toxic_flows.append(destructive_flow)
232+
233+
return toxic_flows
234+
235+
151236
def print_scan_path_result(result: ScanPathResult, print_errors: bool = False) -> None:
152237
if result.error is not None:
153238
err_status, traceback = format_error(result.error)
@@ -175,6 +260,14 @@ def print_scan_path_result(result: ScanPathResult, print_errors: bool = False) -
175260
if len(result.servers) > 0:
176261
rich.print(path_print_tree)
177262

263+
toxic_flows = format_toxic_flows(result.servers)
264+
if toxic_flows:
265+
toxic_flows_tree = Tree("● [bold][gold1]Toxic flows found:[/bold][/gold1]")
266+
for flow in toxic_flows:
267+
toxic_flows_tree.add(flow)
268+
rich.print()
269+
rich.print(toxic_flows_tree)
270+
178271
if print_errors and len(server_tracebacks) > 0:
179272
console = rich.console.Console()
180273
for server, traceback in server_tracebacks:

0 commit comments

Comments
 (0)