|
1 | 1 | package alpakka.sse_to_elasticsearch;
|
2 | 2 |
|
3 |
| -import org.apache.commons.io.IOUtils; |
4 | 3 | import org.apache.hc.client5.http.classic.methods.HttpPost;
|
5 |
| -import org.apache.hc.client5.http.config.RequestConfig; |
| 4 | +import org.apache.hc.client5.http.config.ConnectionConfig; |
6 | 5 | import org.apache.hc.client5.http.impl.DefaultHttpRequestRetryStrategy;
|
7 | 6 | import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
|
8 | 7 | import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
|
| 8 | +import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder; |
9 | 9 | import org.apache.hc.core5.http.ContentType;
|
| 10 | +import org.apache.hc.core5.http.HttpEntity; |
| 11 | +import org.apache.hc.core5.http.io.entity.EntityUtils; |
10 | 12 | import org.apache.hc.core5.http.io.entity.StringEntity;
|
11 | 13 | import org.apache.hc.core5.util.TimeValue;
|
12 | 14 | import org.apache.hc.core5.util.Timeout;
|
|
15 | 17 | import org.slf4j.LoggerFactory;
|
16 | 18 |
|
17 | 19 | import java.io.IOException;
|
18 |
| -import java.nio.charset.StandardCharsets; |
19 | 20 | import java.util.concurrent.TimeUnit;
|
20 | 21 |
|
21 | 22 | /**
|
@@ -47,24 +48,30 @@ public String run(String text) {
|
47 | 48 | // For NER this means we get not just 'Person' but also 'Organisation', 'Location'
|
48 | 49 | requestParams.put("temperature", 0.2);
|
49 | 50 |
|
50 |
| - HttpPost request = new HttpPost("https://api.openai.com/v1/completions"); |
| 51 | + String endpointURL = "https://api.openai.com/v1/completions"; |
| 52 | + HttpPost request = new HttpPost(endpointURL); |
51 | 53 | request.setHeader("Authorization", "Bearer " + API_KEY);
|
52 | 54 | StringEntity requestEntity = new StringEntity(
|
53 | 55 | requestParams.toString(),
|
54 | 56 | ContentType.APPLICATION_JSON);
|
55 | 57 | request.setEntity(requestEntity);
|
56 | 58 |
|
57 |
| - RequestConfig timeoutsConfig = RequestConfig.custom() |
58 |
| - .setConnectTimeout(Timeout.of(DELAY_TO_RETRY_SECONDS, TimeUnit.SECONDS)).build(); |
| 59 | + PoolingHttpClientConnectionManagerBuilder connectionManagerBuilder = PoolingHttpClientConnectionManagerBuilder.create(); |
| 60 | + connectionManagerBuilder.setDefaultConnectionConfig(ConnectionConfig.custom() |
| 61 | + .setSocketTimeout(Timeout.of(DELAY_TO_RETRY_SECONDS, TimeUnit.SECONDS)) |
| 62 | + .build()); |
59 | 63 |
|
60 | 64 | try (CloseableHttpClient httpClient = HttpClientBuilder.create()
|
61 |
| - .setDefaultRequestConfig(timeoutsConfig) |
| 65 | + .setConnectionManager(connectionManagerBuilder.build()) |
62 | 66 | .setRetryStrategy(new DefaultHttpRequestRetryStrategy(3, TimeValue.ofMinutes(1L)))
|
63 | 67 | .build()) {
|
64 |
| - return IOUtils.toString(httpClient.execute(request).getEntity().getContent(), StandardCharsets.UTF_8); |
| 68 | + return httpClient.execute(request, response -> { |
| 69 | + HttpEntity entity = response.getEntity(); |
| 70 | + return entity != null ? EntityUtils.toString(entity) : "N/A"; |
| 71 | + }); |
65 | 72 | } catch (IOException e) {
|
66 |
| - LOGGER.warn("Unable to get result from openai completions endpoint. Cause: ", e); |
67 |
| - return "N/A"; |
| 73 | + LOGGER.warn("Connection issue while accessing openai API endpoint: {}. Cause: ", endpointURL, e); |
| 74 | + throw new RuntimeException(e); |
68 | 75 | }
|
69 | 76 | }
|
70 | 77 | }
|
0 commit comments