From d9e02249052083e94782cd75006ea4f8f35e0c77 Mon Sep 17 00:00:00 2001 From: xeraph Date: Thu, 27 Aug 2020 20:11:39 +0900 Subject: [PATCH 1/2] added field order and field summary to logpresso java client sdk, logpresso/ent-#3832 --- pom.xml | 4 +- .../com/logpresso/client/AbstractSession.java | 6 ++ .../java/com/logpresso/client/Cursor.java | 20 +++- .../com/logpresso/client/FieldSummary.java | 79 ++++++++++++++++ .../java/com/logpresso/client/Logpresso.java | 92 +++++++++++++++++-- src/main/java/com/logpresso/client/Query.java | 22 +++++ .../com/logpresso/client/QueryRequest.java | 42 +++++++++ .../client/http/impl/WebSocketSession.java | 2 +- 8 files changed, 251 insertions(+), 16 deletions(-) create mode 100644 src/main/java/com/logpresso/client/FieldSummary.java create mode 100644 src/main/java/com/logpresso/client/QueryRequest.java diff --git a/pom.xml b/pom.xml index 239d54f..01a69ee 100644 --- a/pom.xml +++ b/pom.xml @@ -5,12 +5,12 @@ 4.0.0 logpresso logpresso-sdk-java - 1.0.0-6 + 1.1.0 jar Logpresso SDK for Java - Eediom, Inc + Logpresso, Inc diff --git a/src/main/java/com/logpresso/client/AbstractSession.java b/src/main/java/com/logpresso/client/AbstractSession.java index 8a41973..3186ad1 100644 --- a/src/main/java/com/logpresso/client/AbstractSession.java +++ b/src/main/java/com/logpresso/client/AbstractSession.java @@ -136,10 +136,16 @@ public void unregisterTrap(String callbackName) throws IOException { } public void addListener(TrapListener listener) { + if (listener == null) + throw new IllegalArgumentException("trap listener should be not null"); + listeners.add(listener); } public void removeListener(TrapListener listener) { + if (listener == null) + throw new IllegalArgumentException("trap listener should be not null"); + listeners.remove(listener); } diff --git a/src/main/java/com/logpresso/client/Cursor.java b/src/main/java/com/logpresso/client/Cursor.java index bd864d7..845a1ba 100644 --- a/src/main/java/com/logpresso/client/Cursor.java +++ b/src/main/java/com/logpresso/client/Cursor.java @@ -17,13 +17,27 @@ import java.io.Closeable; import java.util.Iterator; +import java.util.List; /** * 커서는 쿼리 결과를 순회하는 인터페이스를 제공합니다. - * - * @author xeraph@eediom.com - * */ public interface Cursor extends Iterator, Closeable { + int getQueryId(); + + /** + * 필드 정렬 순서를 반환합니다. 필드 정렬 순서에 표시된 필드가 쿼리 결과에 존재하지 않을 수 있으며, 쿼리 결과의 모든 필드를 나열하지 + * 않습니다. 필드 정렬 순서는 실제 출력 필드에서 고려되어야 할 순서를 의미하므로, 필드 정렬 순서에 나타나지 않은 결과 필드는 사전순으로 + * 정렬해야 합니다. + * + * @return 필드 정렬 순서 목록. 필드 정렬 순서가 정의되지 않은 경우 null을 반환합니다. + */ + List getFieldOrder(); + /** + * queryWithSummary()를 사용하거나, 요약 정보를 생성하도록 설정한 쿼리의 경우 필드 요약 정보를 반환합니다. + * + * @return 필드 요약 정보. 요약 생성 옵션이 지정되지 않은 쿼리는 null을 반환합니다. + */ + List getSummary(); } diff --git a/src/main/java/com/logpresso/client/FieldSummary.java b/src/main/java/com/logpresso/client/FieldSummary.java new file mode 100644 index 0000000..0749a51 --- /dev/null +++ b/src/main/java/com/logpresso/client/FieldSummary.java @@ -0,0 +1,79 @@ +package com.logpresso.client; + +import java.util.Map; + +public class FieldSummary { + private String name; + private String type; + private long count; + private Object min; + private Object max; + private Double avg; + + public static FieldSummary parse(Map m) { + FieldSummary f = new FieldSummary(); + f.setName((String) m.get("name")); + f.setType((String) m.get("type")); + f.setCount(((Number) m.get("count")).longValue()); + f.setMin(m.get("min")); + f.setMax(m.get("max")); + if (m.get("avg") != null) + f.setAvg(((Number) m.get("avg")).doubleValue()); + return f; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public long getCount() { + return count; + } + + public void setCount(long count) { + this.count = count; + } + + public Object getMin() { + return min; + } + + public void setMin(Object min) { + this.min = min; + } + + public Object getMax() { + return max; + } + + public void setMax(Object max) { + this.max = max; + } + + public Double getAvg() { + return avg; + } + + public void setAvg(Double avg) { + this.avg = avg; + } + + @Override + public String toString() { + return String.format("field %s (type=%s, count=%d, min=%s, max=%s, avg=%f)", name, type, count, + min != null ? min.toString() : null, max != null ? max.toString() : null, avg); + } + +} diff --git a/src/main/java/com/logpresso/client/Logpresso.java b/src/main/java/com/logpresso/client/Logpresso.java index adbf6de..18964ef 100644 --- a/src/main/java/com/logpresso/client/Logpresso.java +++ b/src/main/java/com/logpresso/client/Logpresso.java @@ -1912,12 +1912,24 @@ public void removeJdbcProfile(String name) throws IOException { * createQuery(), startQuery(), stopQuery(), removeQuery(), getResult() 메소드 * 조합을 사용하십시오. * - * @param queryString - * 쿼리 문자열 (NULL 허용 안 함) + * @param queryString 쿼리 문자열 (NULL 허용 안 함) + * @param summary 쿼리 결과에 대한 요약 정보 생성 * @return 쿼리 결과를 조회할 수 있는 커서가 반환됩니다. */ public Cursor query(String queryString) throws IOException { - int id = createQuery(queryString); + return query(queryString, false); + } + + public Cursor queryWithSummary(String queryString) throws IOException { + return query(queryString, true); + } + + private Cursor query(String queryString, boolean summary) throws IOException { + QueryRequest req = new QueryRequest(); + req.setQueryString(queryString); + req.setUseSummary(summary); + + int id = createQuery(req); startQuery(id); Query q = queries.get(id); q.waitUntil(null); @@ -1932,12 +1944,15 @@ public Cursor query(String queryString) throws IOException { long total = q.getLoadedCount(); - return new LogCursorImpl(id, 0L, total, true, fetchSize); + if (summary) + q.setFieldSummary(getSummary(id)); + + return new LogCursorImpl(q, 0L, total, true, fetchSize); } private class LogCursorImpl implements Cursor { - private int id; + private Query query; private long offset; private long limit; private boolean removeOnClose; @@ -1949,8 +1964,8 @@ private class LogCursorImpl implements Cursor { private int fetchUnit; private Map prefetch; - public LogCursorImpl(int id, long offset, long limit, boolean removeOnClose, int fetchUnit) { - this.id = id; + public LogCursorImpl(Query query, long offset, long limit, boolean removeOnClose, int fetchUnit) { + this.query = query; this.offset = offset; this.limit = limit; this.removeOnClose = removeOnClose; @@ -1960,6 +1975,21 @@ public LogCursorImpl(int id, long offset, long limit, boolean removeOnClose, int this.fetchUnit = fetchUnit; } + @Override + public int getQueryId() { + return query.getId(); + } + + @Override + public List getFieldOrder() { + return query.getFieldOrder(); + } + + @Override + public List getSummary() { + return new ArrayList(query.getFieldSummary()); + } + @SuppressWarnings("unchecked") @Override public boolean hasNext() { @@ -1971,7 +2001,7 @@ public boolean hasNext() { try { if (cached == null || p >= currentCacheOffset + fetchUnit) { - cached = getResult(id, nextCacheOffset, fetchUnit); + cached = getResult(query.getId(), nextCacheOffset, fetchUnit); currentCacheOffset = nextCacheOffset; nextCacheOffset += fetchUnit; } @@ -2008,7 +2038,7 @@ public void remove() { @Override public void close() throws IOException { if (removeOnClose) - removeQuery(id); + removeQuery(query.getId()); } } @@ -2396,6 +2426,18 @@ public int createQuery(String queryString, StreamingResultSet rs) throws IOExcep * @since 0.9.1 */ public int createQuery(String queryString, StreamingResultSet rs, Map queryContext) throws IOException { + QueryRequest req = new QueryRequest(); + req.setQueryString(queryString); + req.setStreamingResultSet(rs); + req.setQueryContext(queryContext); + return createQuery(req); + } + + @SuppressWarnings("unchecked") + private int createQuery(QueryRequest req) throws IOException { + String queryString = req.getQueryString(); + Map queryContext = req.getQueryContext(); + StreamingResultSet rs = req.getStreamingResultSet(); String queryContextEncoded = null; if (queryContext != null) { @@ -2407,6 +2449,7 @@ public int createQuery(String queryString, StreamingResultSet rs, Map) resp.get("field_order")); + + Query old = queries.putIfAbsent(id, query); + if (old != null) + throw new IllegalStateException("duplicated query id " + id); + return id; } @@ -2895,6 +2946,27 @@ public Map getResult(int id, long offset, int limit) throws IOEx return decodeBinary(binary, uncompressedSize); } + @SuppressWarnings("unchecked") + public List getSummary(int id) throws IOException { + verifyQueryId(id); + + Map params = new HashMap(); + params.put("id", id); + + Message resp = rpc("org.araqne.logdb.msgbus.LogQueryPlugin.getSummary", params); + if (resp.get("summary") == null) + return null; + + List fields = new ArrayList(); + List l = (List) resp.get("summary"); + for (Object o : l) { + Map m = (Map) o; + fields.add(FieldSummary.parse(m)); + } + + return fields; + } + private Map decodeBinary(String binary, int uncompressedSize) { byte[] b = Base64.decode(binary); byte[] uncompressed = new byte[uncompressedSize]; diff --git a/src/main/java/com/logpresso/client/Query.java b/src/main/java/com/logpresso/client/Query.java index 1f7447e..1df58f2 100644 --- a/src/main/java/com/logpresso/client/Query.java +++ b/src/main/java/com/logpresso/client/Query.java @@ -55,6 +55,12 @@ public class Query { // @since 0.9.1 private List subQueries = new ArrayList(); + // @since 1.1.0 + private List fieldOrder; + + // @since 1.1.0 + private List fieldSummary; + public Query(Logpresso client, int id, String queryString) { this.client = client; this.id = id; @@ -348,6 +354,22 @@ public void setCancelReason(String cancelReason) { this.cancelReason = cancelReason; } + public List getFieldOrder() { + return fieldOrder; + } + + public void setFieldOrder(List fieldOrder) { + this.fieldOrder = fieldOrder; + } + + public List getFieldSummary() { + return fieldSummary; + } + + public void setFieldSummary(List fieldSummary) { + this.fieldSummary = fieldSummary; + } + private class WaitingCondition { private Long threshold; private Object signal = new Object(); diff --git a/src/main/java/com/logpresso/client/QueryRequest.java b/src/main/java/com/logpresso/client/QueryRequest.java new file mode 100644 index 0000000..0b5b2aa --- /dev/null +++ b/src/main/java/com/logpresso/client/QueryRequest.java @@ -0,0 +1,42 @@ +package com.logpresso.client; + +import java.util.Map; + +public class QueryRequest { + private String queryString; + private StreamingResultSet rs; + private Map queryContext; + private boolean useSummary; + + public String getQueryString() { + return queryString; + } + + public void setQueryString(String queryString) { + this.queryString = queryString; + } + + public StreamingResultSet getStreamingResultSet() { + return rs; + } + + public void setStreamingResultSet(StreamingResultSet rs) { + this.rs = rs; + } + + public Map getQueryContext() { + return queryContext; + } + + public void setQueryContext(Map queryContext) { + this.queryContext = queryContext; + } + + public boolean isUseSummary() { + return useSummary; + } + + public void setUseSummary(boolean useSummary) { + this.useSummary = useSummary; + } +} diff --git a/src/main/java/com/logpresso/client/http/impl/WebSocketSession.java b/src/main/java/com/logpresso/client/http/impl/WebSocketSession.java index ebd023d..e8f8add 100644 --- a/src/main/java/com/logpresso/client/http/impl/WebSocketSession.java +++ b/src/main/java/com/logpresso/client/http/impl/WebSocketSession.java @@ -189,7 +189,7 @@ public void run() { try { synchronized (sendLock) { // send msgbus ping - websocket.send("ping"); + websocket.sendPing(); } } catch (Throwable t) { // ignore ping fail From 9a6a2b83e8dc7bed72c32f9a4b6d886035eb8821 Mon Sep 17 00:00:00 2001 From: xeraph Date: Thu, 27 Aug 2020 20:36:47 +0900 Subject: [PATCH 2/2] added getFieldOrder() to Logpresso class. logpresso/ent-#3832 --- .../java/com/logpresso/client/Logpresso.java | 50 +++++++++++++++++-- .../com/logpresso/client/QueryRequest.java | 8 +-- 2 files changed, 49 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/logpresso/client/Logpresso.java b/src/main/java/com/logpresso/client/Logpresso.java index 18964ef..a535207 100644 --- a/src/main/java/com/logpresso/client/Logpresso.java +++ b/src/main/java/com/logpresso/client/Logpresso.java @@ -1925,8 +1925,7 @@ public Cursor queryWithSummary(String queryString) throws IOException { } private Cursor query(String queryString, boolean summary) throws IOException { - QueryRequest req = new QueryRequest(); - req.setQueryString(queryString); + QueryRequest req = new QueryRequest(queryString); req.setUseSummary(summary); int id = createQuery(req); @@ -2426,15 +2425,23 @@ public int createQuery(String queryString, StreamingResultSet rs) throws IOExcep * @since 0.9.1 */ public int createQuery(String queryString, StreamingResultSet rs, Map queryContext) throws IOException { - QueryRequest req = new QueryRequest(); - req.setQueryString(queryString); + QueryRequest req = new QueryRequest(queryString); req.setStreamingResultSet(rs); req.setQueryContext(queryContext); return createQuery(req); } + /** + * 주어진 쿼리 요청 객체를 사용하여 새 쿼리를 생성합니다. 권한이 없거나 문법이 틀린 경우 예외가 발생합니다. + * + * @param req 쿼리 요청 (NULL 허용 안 함) + * @since 1.1.0 + */ @SuppressWarnings("unchecked") - private int createQuery(QueryRequest req) throws IOException { + public int createQuery(QueryRequest req) throws IOException { + if (req == null) + throw new IllegalArgumentException("req should be not null"); + String queryString = req.getQueryString(); Map queryContext = req.getQueryContext(); StreamingResultSet rs = req.getStreamingResultSet(); @@ -2892,6 +2899,15 @@ private void flushInternal() { } } + /** + * 특정 쿼리가 종료할 때까지 현재 스레드를 대기(blocking) 합니다. 쿼리가 완료 혹은 취소되면 스레드 대기 상태가 풀립니다. + * + * @param id 쿼리 ID + */ + public void waitUntil(int id) { + waitUntil(id, null); + } + /** * 특정 쿼리에 대해서 주어진 쿼리 결과 갯수가 조회 가능할 때까지 현재 스레드를 대기(blocking) 합니다. 주어진 쿼리 결과 * 갯수를 채우지 못하더라도 쿼리가 완료 혹은 취소되면 스레드 대기 상태가 풀립니다. 이 메소드를 이용하면 매번 getQuery()를 @@ -2946,6 +2962,30 @@ public Map getResult(int id, long offset, int limit) throws IOEx return decodeBinary(binary, uncompressedSize); } + /** + * 필드 정렬 순서를 반환합니다. 필드 정렬 순서에 표시된 필드가 쿼리 결과에 존재하지 않을 수 있으며, 쿼리 결과의 모든 필드를 나열하지 + * 않습니다. 필드 정렬 순서는 실제 출력 필드에서 고려되어야 할 순서를 의미하므로, 필드 정렬 순서에 나타나지 않은 결과 필드는 사전순으로 + * 정렬해야 합니다. + * + * @param id 쿼리 ID + * @return 필드 정렬 순서 목록. 필드 정렬 순서가 정의되지 않은 경우 null을 반환합니다. + */ + @SuppressWarnings("unchecked") + public List getFieldOrder(int id) throws IOException { + verifyQueryId(id); + + Map params = new HashMap(); + params.put("id", id); + params.put("offset", 0); + params.put("limit", 1); + + Message resp = rpc("org.araqne.logdb.msgbus.LogQueryPlugin.getResult", params); + if (resp.getParameters().size() == 0) + throw new MessageException("query-not-found", "", resp.getParameters()); + + return (List) resp.get("field_order"); + } + @SuppressWarnings("unchecked") public List getSummary(int id) throws IOException { verifyQueryId(id); diff --git a/src/main/java/com/logpresso/client/QueryRequest.java b/src/main/java/com/logpresso/client/QueryRequest.java index 0b5b2aa..20ddead 100644 --- a/src/main/java/com/logpresso/client/QueryRequest.java +++ b/src/main/java/com/logpresso/client/QueryRequest.java @@ -8,12 +8,12 @@ public class QueryRequest { private Map queryContext; private boolean useSummary; - public String getQueryString() { - return queryString; + public QueryRequest(String queryString) { + this.queryString = queryString; } - public void setQueryString(String queryString) { - this.queryString = queryString; + public String getQueryString() { + return queryString; } public StreamingResultSet getStreamingResultSet() {