1
+ /*
2
+ * Licensed to the Apache Software Foundation (ASF) under one or more
3
+ * contributor license agreements. See the NOTICE file distributed with
4
+ * this work for additional information regarding copyright ownership.
5
+ * The ASF licenses this file to You under the Apache License, Version 2.0
6
+ * (the "License"); you may not use this file except in compliance with
7
+ * the License. You may obtain a copy of the License at
8
+ *
9
+ * http://www.apache.org/licenses/LICENSE-2.0
10
+ *
11
+ * Unless required by applicable law or agreed to in writing, software
12
+ * distributed under the License is distributed on an "AS IS" BASIS,
13
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
+ * See the License for the specific language governing permissions and
15
+ * limitations under the License.
16
+ */
17
+ package com .alibaba .cloud .ai .graph .agent .a2a ;
18
+
19
+ import com .alibaba .cloud .ai .graph .OverAllState ;
20
+ import com .alibaba .cloud .ai .graph .action .NodeAction ;
21
+ import com .alibaba .fastjson .JSON ;
22
+ import com .fasterxml .jackson .databind .ObjectMapper ;
23
+ import io .a2a .spec .AgentCard ;
24
+ import org .apache .http .HttpEntity ;
25
+ import org .apache .http .client .methods .CloseableHttpResponse ;
26
+ import org .apache .http .client .methods .HttpPost ;
27
+ import org .apache .http .entity .ContentType ;
28
+ import org .apache .http .entity .StringEntity ;
29
+ import org .apache .http .impl .client .CloseableHttpClient ;
30
+ import org .apache .http .impl .client .HttpClients ;
31
+ import org .apache .http .util .EntityUtils ;
32
+
33
+ import java .util .HashMap ;
34
+ import java .util .List ;
35
+ import java .util .Map ;
36
+ import java .util .UUID ;
37
+
38
+ public class A2aNode implements NodeAction {
39
+
40
+ private final AgentCard agentCard ;
41
+
42
+ private final String inputKeyFromParent ;
43
+
44
+ private final String outputKeyToParent ;
45
+
46
+ private final boolean streaming ;
47
+
48
+ private final ObjectMapper objectMapper = new ObjectMapper ();
49
+
50
+ public A2aNode (AgentCard agentCard , String inputKeyFromParent , String outputKeyToParent ) {
51
+ this (agentCard , inputKeyFromParent , outputKeyToParent , false );
52
+ }
53
+
54
+ public A2aNode (AgentCard agentCard , String inputKeyFromParent , String outputKeyToParent , boolean streaming ) {
55
+ this .agentCard = agentCard ;
56
+ this .inputKeyFromParent = inputKeyFromParent ;
57
+ this .outputKeyToParent = outputKeyToParent ;
58
+ this .streaming = streaming ;
59
+ }
60
+
61
+ @ Override
62
+ public Map <String , Object > apply (OverAllState state ) throws Exception {
63
+ String requestPayload = this .streaming ? buildSendStreamingMessageRequest (state , this .inputKeyFromParent )
64
+ : buildSendMessageRequest (state , this .inputKeyFromParent );
65
+
66
+ String resultText = sendMessageToServer (this .agentCard , requestPayload );
67
+
68
+ Map <String , Object > resultMap = JSON .parseObject (resultText , Map .class );
69
+ Map <String , Object > result = (Map <String , Object >) resultMap .get ("result" );
70
+
71
+ String responseText = extractResponseText (result );
72
+ return Map .of (this .outputKeyToParent , responseText );
73
+ }
74
+
75
+ private String extractResponseText (Map <String , Object > result ) {
76
+ if (result .containsKey ("artifacts" )) {
77
+ List <Object > artifacts = (List <Object >) result .get ("artifacts" );
78
+ StringBuilder responseBuilder = new StringBuilder ();
79
+ for (Object artifact : artifacts ) {
80
+ if (artifact instanceof Map ) {
81
+ List <Object > parts = (List <Object >) ((Map <String , Object >) artifact ).get ("parts" );
82
+ for (Object part : parts ) {
83
+ if (part instanceof Map ) {
84
+ String text = (String ) ((Map <String , Object >) part ).get ("text" );
85
+ if (text != null ) {
86
+ responseBuilder .append (text );
87
+ }
88
+ }
89
+ }
90
+ }
91
+ }
92
+ return responseBuilder .toString ();
93
+ } else {
94
+ List <Object > parts = (List <Object >) result .get ("parts" );
95
+ Map <String , Object > lastPart = (Map <String , Object >) parts .get (parts .size () - 1 );
96
+ return (String ) lastPart .get ("text" );
97
+ }
98
+ }
99
+
100
+ /**
101
+ * Build the JSON-RPC request payload to send to the A2A server.
102
+ *
103
+ * @param state Parent state
104
+ * @param inputKey Input key to retrieve user input from the state
105
+ * @return JSON string payload (e.g., JSON-RPC params)
106
+ */
107
+ private String buildSendMessageRequest (OverAllState state , String inputKey ) {
108
+ Object textValue = state .value (inputKey )
109
+ .orElseThrow (() -> new IllegalArgumentException ("Input key '" + inputKey + "' not found in state: " + state ));
110
+ String text = String .valueOf (textValue );
111
+
112
+ String id = UUID .randomUUID ().toString ();
113
+ String messageId = UUID .randomUUID ().toString ().replace ("-" , "" );
114
+
115
+ Map <String , Object > part = Map .of ("kind" , "text" , "text" , text );
116
+
117
+ Map <String , Object > message = new HashMap <>();
118
+ message .put ("kind" , "message" );
119
+ message .put ("messageId" , messageId );
120
+ message .put ("parts" , List .of (part ));
121
+ message .put ("role" , "user" );
122
+
123
+ Map <String , Object > params = Map .of ("message" , message );
124
+
125
+ Map <String , Object > root = new HashMap <>();
126
+ root .put ("id" , id );
127
+ root .put ("jsonrpc" , "2.0" );
128
+ root .put ("method" , "message/send" );
129
+ root .put ("params" , params );
130
+
131
+ try {
132
+ return objectMapper .writeValueAsString (root );
133
+ } catch (Exception e ) {
134
+ throw new IllegalStateException ("Failed to build JSON-RPC payload" , e );
135
+ }
136
+ }
137
+
138
+ /**
139
+ * Build the JSON-RPC streaming request payload (method: message/stream).
140
+ *
141
+ * @param state Parent state
142
+ * @param inputKey Input key to retrieve user input from the state
143
+ * @return JSON string payload for streaming
144
+ */
145
+ private String buildSendStreamingMessageRequest (OverAllState state , String inputKey ) {
146
+ Object textValue = state .value (inputKey )
147
+ .orElseThrow (() -> new IllegalArgumentException ("Input key '" + inputKey + "' not found in state: " + state ));
148
+ String text = String .valueOf (textValue );
149
+
150
+ String id = UUID .randomUUID ().toString ();
151
+ String messageId = UUID .randomUUID ().toString ().replace ("-" , "" );
152
+
153
+ Map <String , Object > part = Map .of ("kind" , "text" , "text" , text );
154
+
155
+ Map <String , Object > message = new HashMap <>();
156
+ message .put ("kind" , "message" );
157
+ message .put ("messageId" , messageId );
158
+ message .put ("parts" , List .of (part ));
159
+ message .put ("role" , "user" );
160
+
161
+ Map <String , Object > params = Map .of ("message" , message );
162
+
163
+ Map <String , Object > root = new HashMap <>();
164
+ root .put ("id" , id );
165
+ root .put ("jsonrpc" , "2.0" );
166
+ root .put ("method" , "message/stream" );
167
+ root .put ("params" , params );
168
+
169
+ try {
170
+ return objectMapper .writeValueAsString (root );
171
+ } catch (Exception e ) {
172
+ throw new IllegalStateException ("Failed to build JSON-RPC streaming payload" , e );
173
+ }
174
+ }
175
+
176
+ /**
177
+ * Send the request to the remote A2A server and return the non-streaming response.
178
+ *
179
+ * @param agentCard Agent card (source for server URL/metadata)
180
+ * @param requestPayload JSON string payload built by buildSendMessageRequest
181
+ * @return Response body as string
182
+ */
183
+ private String sendMessageToServer (AgentCard agentCard , String requestPayload ) throws Exception {
184
+ String baseUrl = resolveAgentBaseUrl (agentCard );
185
+ if (baseUrl == null || baseUrl .isBlank ()) {
186
+ throw new IllegalStateException ("AgentCard.url is empty" );
187
+ }
188
+
189
+ try (CloseableHttpClient httpClient = HttpClients .createDefault ()) {
190
+ HttpPost post = new HttpPost (baseUrl );
191
+ post .setHeader ("Content-Type" , "application/json" );
192
+ post .setEntity (new StringEntity (requestPayload , ContentType .APPLICATION_JSON ));
193
+
194
+ try (CloseableHttpResponse response = httpClient .execute (post )) {
195
+ int statusCode = response .getStatusLine ().getStatusCode ();
196
+ if (statusCode != 200 ) {
197
+ throw new IllegalStateException ("HTTP request failed, status: " + statusCode );
198
+ }
199
+ HttpEntity entity = response .getEntity ();
200
+ if (entity == null ) {
201
+ throw new IllegalStateException ("Empty HTTP entity" );
202
+ }
203
+ return EntityUtils .toString (entity , "UTF-8" );
204
+ }
205
+ }
206
+ }
207
+
208
+ /**
209
+ * Resolve base URL from the AgentCard.
210
+ */
211
+ private String resolveAgentBaseUrl (AgentCard agentCard ) {
212
+ return agentCard .url ();
213
+ }
214
+
215
+ }
0 commit comments