Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion AntiPattern_Remediator/full_repo_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ def process_java_files_with_workflow(file_paths: list, settings, db_manager, pro
"code_review_times": 0,
"msgs": [],
"answer": None,
"current_file_path": file_path # Track current file being processed
"current_file_path": file_path, # Track current file being processed
"explanation_response_raw": None,
"explanation_json": None
}

try:
Expand Down
22 changes: 21 additions & 1 deletion AntiPattern_Remediator/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
from colorama import Fore, Style
import os
from pathlib import Path
import json

from full_repo_workflow import run_full_repo_workflow
from workflow.results_manager import save_intermediate_results


def run_code_snippet_workflow(settings, db_manager, prompt_manager, langgraph):
Expand Down Expand Up @@ -66,7 +69,10 @@ def run_code_snippet_workflow(settings, db_manager, prompt_manager, langgraph):
"code_review_results": None,
"code_review_times": 0,
"msgs": [],
"answer": None
"answer": None,
"current_file_path": None, # Track current file being processed
"explanation_response_raw": None,
"explanation_json": None,
}

final_state = langgraph.invoke(initial_state)
Expand All @@ -78,6 +84,20 @@ def run_code_snippet_workflow(settings, db_manager, prompt_manager, langgraph):
print(f"Refactored code: {'Yes' if final_state.get('refactored_code') else 'No'}")
print(f"Code review results: {final_state.get('code_review_times')}")

# Show explanation from ExplainerAgent
if final_state.get("explanation_json"):
print(Fore.CYAN + "\n=== Explanation (JSON) ===" + Style.RESET_ALL)
print(json.dumps(final_state["explanation_json"], indent=2, ensure_ascii=False))
else:
print(Fore.RED + "\nNo explanation was generated." + Style.RESET_ALL)

save_intermediate_results(
file_path="java_code_snippet",
final_state=final_state,
settings=settings
)



def main():
"""Main function: Choose between code snippet analysis or full repository run"""
Expand Down
6 changes: 4 additions & 2 deletions AntiPattern_Remediator/src/core/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
from .refactor_strategist import RefactorStrategist
from .code_transformer import CodeTransformer
from .code_reviewer import CodeReviewerAgent
from .explainer import ExplainerAgent

__all__ = [
"AntipatternScanner",
"AntipatternScanner",
"RefactorStrategist",
"CodeTransformer",
"CodeReviewerAgent"
"CodeReviewerAgent",
"ExplainerAgent"
]
39 changes: 21 additions & 18 deletions AntiPattern_Remediator/src/core/agents/antipattern_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

class AntipatternScanner:
"""Antipattern scanner agent"""

def __init__(self, tool, model, prompt_manager: PromptManager):
self.prompt_manager = prompt_manager
self.tool = tool
Expand All @@ -28,33 +28,37 @@ def retrieve_context(self, state: AgentState):
search_query = f"Java antipatterns code analysis: {state['code'][:50]}"
# Use retriever_tool to get relevant context
context = self.tool.invoke({"query": search_query})

# Get current file path from state
current_file_path = state['current_file_path']

# Extract project key and relative file path from the current file path
project_key = None
relative_file_path = None

if current_file_path:
path_obj = Path(current_file_path)

# Find the repository name (project key) by looking for 'clones' directory
for i, part in enumerate(path_obj.parts):
if part == 'clones' and i + 1 < len(path_obj.parts):
project_key = path_obj.parts[i + 1] # Repository name as project key
# Get the relative path from the repository root
relative_file_path = str(Path(*path_obj.parts[i + 2:]))
break

api = SonarQubeAPI()
print(Fore.CYAN + f"Using SonarQube project: {project_key}, file: {relative_file_path}" + Style.RESET_ALL)
issues = api.get_issues_for_file(project_key=project_key, file_path=relative_file_path)
solutions = []
for issue in issues["issues"]:
solutions.append(api.get_rules_and_fix_method(rule_key=issue['rule']))
state["context"] = {"sonarqube_issues": issues, "search_context": context, "solutions": solutions}

api = SonarQubeAPI()
print(Fore.CYAN + f"Using SonarQube project: {project_key}, file: {relative_file_path}" + Style.RESET_ALL)
issues = api.get_issues_for_file(project_key=project_key, file_path=relative_file_path)
solutions = []
for issue in issues["issues"]:
solutions.append(api.get_rules_and_fix_method(rule_key=issue['rule']))
state["context"] = {"sonarqube_issues": issues, "search_context": context, "solutions": solutions}
else:
state["context"] = {"sonarqube_issues": None, "search_context": context, "solutions": []}

print(Fore.GREEN + f"Successfully retrieved relevant context" + Style.RESET_ALL)

except Exception as e:
print(Fore.RED + f"Error retrieving context: {e}" + Style.RESET_ALL)
state["context"] = "No additional context available due to retrieval error."
Expand All @@ -64,7 +68,7 @@ def analyze_antipatterns(self, state: AgentState):
print("Analyzing code for antipatterns...")
try:
prompt_template = self.prompt_manager.get_prompt(self.prompt_manager.ANTIPATTERN_SCANNER)

# Get historical messages from state, or use empty list if none exist
msgs = state.get('msgs', [])

Expand All @@ -74,20 +78,19 @@ def analyze_antipatterns(self, state: AgentState):
sonarqube_issues=state['context'].get('solutions', ''),
msgs=msgs
)

response = self.llm.invoke(formatted_messages)
state["antipatterns_scanner_results"] = response.content if hasattr(response, 'content') else str(response)
print(Fore.GREEN + "Analysis completed successfully" + Style.RESET_ALL)
except Exception as e:
print(Fore.RED + f"Error during analysis: {e}" + Style.RESET_ALL)
state["antipatterns_scanner_results"] = f"Error occurred during analysis: {e}"
return state
def display_antipatterns_results(self, state: AgentState):

def display_antipatterns_results(self, state: AgentState):
"""Display the final analysis results"""
print("\nANTIPATTERN ANALYSIS RESULTS")
print("=" * 60)
print(state.get("antipatterns_scanner_results", "No analysis results available."))
print("=" * 60)
return state

168 changes: 168 additions & 0 deletions AntiPattern_Remediator/src/core/agents/explainer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
"""
ExplainerAgent — full-state returns, collision-safe
- Returns a full state dict but NEVER writes 'code' back to the graph.
- Uses PromptManager if available; otherwise falls back to an inline prompt.
"""
from __future__ import annotations
from typing import Dict, Any
import json

from langchain_core.language_models import BaseLanguageModel
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.prompts import PromptTemplate
from ..prompt import PromptManager
from src.core.utils import extract_first_json

PROMPT_KEY = "explainer"


class ExplainerAgent:
def __init__(self, llm: BaseLanguageModel, prompt_manager: PromptManager):
self.llm = llm
self.prompt_manager = prompt_manager

# Merge helper: return a FULL state but drop keys we must not rewrite.
@staticmethod
def _merge_return_state(state: Dict[str, Any], updates: Dict[str, Any], drop_keys=("code",)) -> Dict[str, Any]:
merged = dict(state)
# don't write to LastValue 'code' to avoid concurrent updates
for k in drop_keys:
if k in merged:
merged.pop(k)
merged.update(updates or {})
return merged

def explain_antipattern(self, state: Dict[str, Any]) -> Dict[str, Any]:
"""Generate explanation JSON for detected antipatterns and refactor."""
print("Preparing to Explain...")
kwargs = dict(
code=state.get("code", ""),
context=state.get("context", ""),
refactored_code=state.get("refactored_code", ""),
refactoring_strategy=state.get("refactoring_strategy_results", ""),
antipattern_name=state.get("antipatterns_scanner_results", "Unknown antipattern"),
antipatterns_json=json.dumps(state.get("antipatterns_json", []), ensure_ascii=False),
msgs=state.get("msgs", []),
)

messages = self._build_messages(**kwargs)

try:
response = self.llm.invoke(messages)
raw = getattr(response, "content", None) or str(response)
except Exception as e:
raw = f"LLM error: {e}"

# Robust parse
try:
parsed = extract_first_json(raw)
except Exception:
parsed = None

if isinstance(parsed, dict):
exp_json = parsed
elif isinstance(parsed, list):
exp_json = {"items": parsed}
else:
exp_json = self._fallback_payload(state)

updates = {
"explanation_response_raw": raw,
"explanation_json": exp_json,
}
# Return FULL state but with 'code' removed to avoid LastValue collision
return self._merge_return_state(state, updates, drop_keys=("code",))

def display_explanation(self, state: Dict[str, Any]) -> Dict[str, Any]:
print("\n=== Explanation (raw) ===\n", state.get("explanation_response_raw", "N/A"))
if state.get("explanation_json"):
print(
"\n=== Explanation (JSON) ===\n",
json.dumps(state["explanation_json"], indent=2, ensure_ascii=False),
)
# Return FULL state but again ensure 'code' isn't echoed back
return self._merge_return_state(state, {}, drop_keys=("code",))

def _build_messages(self, **kwargs) -> Any:
"""
Build a list of messages for the LLM.
Uses the user‑supplied prompt if available; otherwise falls back to an
inline prompt that safely injects JSON strings via placeholders.
"""
if "msgs" not in kwargs or kwargs["msgs"] is None:
kwargs = {**kwargs, "msgs": []}

# Try to get a prompt from the PromptManager
prompt = None
getp = getattr(self.prompt_manager, "get_prompt", None)
if callable(getp):
prompt = getp(PROMPT_KEY)

if prompt is not None:
# The prompt from PromptManager should already be a PromptTemplate
# or a string that can be formatted safely.
try:
return prompt.format_messages(**kwargs)
except KeyError:
# Fall back if the prompt contains unexpected placeholders
pass

# ------------------------------------------------------------------
# Inline fallback – use direct string template instead of nested PromptTemplate
# ------------------------------------------------------------------
schema = {
"items": [{
"antipattern_name": "",
"antipattern_description": "",
"impact": "",
"why_it_is_bad": "",
"how_we_fixed_it": "",
"refactored_code": "",
"summary": ""
}],
"what_changed": [],
"why_better": [],
"principles_applied": [],
"trade_offs": [],
"closing_summary": ""
}

# Prepare the JSON strings that will be inserted via placeholders
json_input = json.dumps(
{k: v for k, v in kwargs.items() if k != "msgs"},
ensure_ascii=False
)
json_schema = json.dumps(schema, ensure_ascii=False)

# Create the ChatPromptTemplate directly with string templates
fallback = ChatPromptTemplate.from_messages([
("system", "Return STRICT JSON only. No commentary."),
("user", "Given inputs (JSON):\n{json_input}\nRespond with STRICT JSON using exactly this schema:\n{json_schema}"),
MessagesPlaceholder("msgs"),
])

# Format the messages with the prepared JSON strings
return fallback.format_messages(
json_input=json_input,
json_schema=json_schema,
msgs=kwargs["msgs"]
)

@staticmethod
def _fallback_payload(state: Dict[str, Any]) -> Dict[str, Any]:
return {
"items": [{
"antipattern_name": state.get("antipattern_name", "Unknown antipattern"),
"antipattern_description": state.get("antipattern_description", ""),
"impact": "",
"why_it_is_bad": "",
"how_we_fixed_it": state.get("refactor_rationale", ""),
"refactored_code": state.get("refactored_code", ""),
"summary": "Auto-generated minimal explanation (parser fallback)."
}],
"what_changed": [],
"why_better": [],
"principles_applied": [],
"trade_offs": [],
"closing_summary": ""
}
Loading