forked from NVIDIA-AI-Blueprints/vulnerability-analysis
-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathcve_summarize.py
More file actions
67 lines (51 loc) · 2.81 KB
/
cve_summarize.py
File metadata and controls
67 lines (51 loc) · 2.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
from aiq.builder.builder import Builder
from aiq.builder.framework_enum import LLMFrameworkEnum
from aiq.builder.function_info import FunctionInfo
from aiq.cli.register_workflow import register_function
from aiq.data_models.function import FunctionBaseConfig
from pydantic import Field
from exploit_iq_commons.utils.string_utils import get_checklist_item_string
from exploit_iq_commons.logging.loggers_factory import LoggingFactory, trace_id
logger = LoggingFactory.get_agent_logger(__name__)
class CVESummarizeToolConfig(FunctionBaseConfig, name="cve_summarize"):
"""
Defines a function that generates concise, human-readable summarization paragraph from agent results.
"""
llm_name: str = Field(description="The LLM model to use")
@register_function(config_type=CVESummarizeToolConfig, framework_wrappers=[LLMFrameworkEnum.LANGCHAIN])
async def cve_summarize(config: CVESummarizeToolConfig, builder: Builder):
from langchain_core.prompts import PromptTemplate
from vuln_analysis.data_models.state import AgentMorpheusEngineState
from vuln_analysis.utils.prompting import SUMMARY_PROMPT
llm = await builder.get_llm(llm_name=config.llm_name, wrapper_type=LLMFrameworkEnum.LANGCHAIN)
prompt = PromptTemplate(input_variables=["response"], template=SUMMARY_PROMPT)
chain = prompt | llm
async def summarize_cve(results):
response = '\n'.join(
[get_checklist_item_string(idx + 1, checklist_item) for idx, checklist_item in enumerate(results[1])])
final_summary = await chain.ainvoke({"response": response})
return final_summary.content
async def _arun(state: AgentMorpheusEngineState) -> AgentMorpheusEngineState:
trace_id.set(state.original_input.input.scan.id)
results = await asyncio.gather(*(summarize_cve(results) for results in state.checklist_results.items()))
state.final_summaries = dict(zip(state.checklist_results.keys(), results))
return state
yield FunctionInfo.from_fn(
_arun,
input_schema=AgentMorpheusEngineState,
description=("Generates concise, human-readable summarization paragraph from agent results."))