Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
<modelVersion>4.0.0</modelVersion>
<groupId>logpresso</groupId>
<artifactId>logpresso-sdk-java</artifactId>
<version>1.0.0-6</version>
<version>1.1.0</version>
<packaging>jar</packaging>
<name>Logpresso SDK for Java</name>

<organization>
<name>Eediom, Inc</name>
<name>Logpresso, Inc</name>
</organization>

<properties>
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/logpresso/client/AbstractSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,16 @@ public void unregisterTrap(String callbackName) throws IOException {
}

public void addListener(TrapListener listener) {
if (listener == null)
throw new IllegalArgumentException("trap listener should be not null");

listeners.add(listener);
}

public void removeListener(TrapListener listener) {
if (listener == null)
throw new IllegalArgumentException("trap listener should be not null");

listeners.remove(listener);
}

Expand Down
20 changes: 17 additions & 3 deletions src/main/java/com/logpresso/client/Cursor.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,27 @@

import java.io.Closeable;
import java.util.Iterator;
import java.util.List;

/**
* 커서는 쿼리 결과를 순회하는 인터페이스를 제공합니다.
*
* @author xeraph@eediom.com
*
*/
public interface Cursor extends Iterator<Tuple>, Closeable {
int getQueryId();

/**
* 필드 정렬 순서를 반환합니다. 필드 정렬 순서에 표시된 필드가 쿼리 결과에 존재하지 않을 수 있으며, 쿼리 결과의 모든 필드를 나열하지
* 않습니다. 필드 정렬 순서는 실제 출력 필드에서 고려되어야 할 순서를 의미하므로, 필드 정렬 순서에 나타나지 않은 결과 필드는 사전순으로
* 정렬해야 합니다.
*
* @return 필드 정렬 순서 목록. 필드 정렬 순서가 정의되지 않은 경우 null을 반환합니다.
*/
List<String> getFieldOrder();

/**
* queryWithSummary()를 사용하거나, 요약 정보를 생성하도록 설정한 쿼리의 경우 필드 요약 정보를 반환합니다.
*
* @return 필드 요약 정보. 요약 생성 옵션이 지정되지 않은 쿼리는 null을 반환합니다.
*/
List<FieldSummary> getSummary();
}
79 changes: 79 additions & 0 deletions src/main/java/com/logpresso/client/FieldSummary.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.logpresso.client;

import java.util.Map;

public class FieldSummary {
private String name;
private String type;
private long count;
private Object min;
private Object max;
private Double avg;

public static FieldSummary parse(Map<String, Object> m) {
FieldSummary f = new FieldSummary();
f.setName((String) m.get("name"));
f.setType((String) m.get("type"));
f.setCount(((Number) m.get("count")).longValue());
f.setMin(m.get("min"));
f.setMax(m.get("max"));
if (m.get("avg") != null)
f.setAvg(((Number) m.get("avg")).doubleValue());
return f;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getType() {
return type;
}

public void setType(String type) {
this.type = type;
}

public long getCount() {
return count;
}

public void setCount(long count) {
this.count = count;
}

public Object getMin() {
return min;
}

public void setMin(Object min) {
this.min = min;
}

public Object getMax() {
return max;
}

public void setMax(Object max) {
this.max = max;
}

public Double getAvg() {
return avg;
}

public void setAvg(Double avg) {
this.avg = avg;
}

@Override
public String toString() {
return String.format("field %s (type=%s, count=%d, min=%s, max=%s, avg=%f)", name, type, count,
min != null ? min.toString() : null, max != null ? max.toString() : null, avg);
}

}
132 changes: 122 additions & 10 deletions src/main/java/com/logpresso/client/Logpresso.java
Original file line number Diff line number Diff line change
Expand Up @@ -1912,12 +1912,23 @@ public void removeJdbcProfile(String name) throws IOException {
* createQuery(), startQuery(), stopQuery(), removeQuery(), getResult() 메소드
* 조합을 사용하십시오.
*
* @param queryString
* 쿼리 문자열 (NULL 허용 안 함)
* @param queryString 쿼리 문자열 (NULL 허용 안 함)
* @param summary 쿼리 결과에 대한 요약 정보 생성
* @return 쿼리 결과를 조회할 수 있는 커서가 반환됩니다.
*/
public Cursor query(String queryString) throws IOException {
int id = createQuery(queryString);
return query(queryString, false);
}

public Cursor queryWithSummary(String queryString) throws IOException {
return query(queryString, true);
}

private Cursor query(String queryString, boolean summary) throws IOException {
QueryRequest req = new QueryRequest(queryString);
req.setUseSummary(summary);

int id = createQuery(req);
startQuery(id);
Query q = queries.get(id);
q.waitUntil(null);
Expand All @@ -1932,12 +1943,15 @@ public Cursor query(String queryString) throws IOException {

long total = q.getLoadedCount();

return new LogCursorImpl(id, 0L, total, true, fetchSize);
if (summary)
q.setFieldSummary(getSummary(id));

return new LogCursorImpl(q, 0L, total, true, fetchSize);
}

private class LogCursorImpl implements Cursor {

private int id;
private Query query;
private long offset;
private long limit;
private boolean removeOnClose;
Expand All @@ -1949,8 +1963,8 @@ private class LogCursorImpl implements Cursor {
private int fetchUnit;
private Map<String, Object> prefetch;

public LogCursorImpl(int id, long offset, long limit, boolean removeOnClose, int fetchUnit) {
this.id = id;
public LogCursorImpl(Query query, long offset, long limit, boolean removeOnClose, int fetchUnit) {
this.query = query;
this.offset = offset;
this.limit = limit;
this.removeOnClose = removeOnClose;
Expand All @@ -1960,6 +1974,21 @@ public LogCursorImpl(int id, long offset, long limit, boolean removeOnClose, int
this.fetchUnit = fetchUnit;
}

@Override
public int getQueryId() {
return query.getId();
}

@Override
public List<String> getFieldOrder() {
return query.getFieldOrder();
}

@Override
public List<FieldSummary> getSummary() {
return new ArrayList<FieldSummary>(query.getFieldSummary());
}

@SuppressWarnings("unchecked")
@Override
public boolean hasNext() {
Expand All @@ -1971,7 +2000,7 @@ public boolean hasNext() {

try {
if (cached == null || p >= currentCacheOffset + fetchUnit) {
cached = getResult(id, nextCacheOffset, fetchUnit);
cached = getResult(query.getId(), nextCacheOffset, fetchUnit);
currentCacheOffset = nextCacheOffset;
nextCacheOffset += fetchUnit;
}
Expand Down Expand Up @@ -2008,7 +2037,7 @@ public void remove() {
@Override
public void close() throws IOException {
if (removeOnClose)
removeQuery(id);
removeQuery(query.getId());
}
}

Expand Down Expand Up @@ -2396,6 +2425,26 @@ public int createQuery(String queryString, StreamingResultSet rs) throws IOExcep
* @since 0.9.1
*/
public int createQuery(String queryString, StreamingResultSet rs, Map<String, Object> queryContext) throws IOException {
QueryRequest req = new QueryRequest(queryString);
req.setStreamingResultSet(rs);
req.setQueryContext(queryContext);
return createQuery(req);
}

/**
* 주어진 쿼리 요청 객체를 사용하여 새 쿼리를 생성합니다. 권한이 없거나 문법이 틀린 경우 예외가 발생합니다.
*
* @param req 쿼리 요청 (NULL 허용 안 함)
* @since 1.1.0
*/
@SuppressWarnings("unchecked")
public int createQuery(QueryRequest req) throws IOException {
if (req == null)
throw new IllegalArgumentException("req should be not null");

String queryString = req.getQueryString();
Map<String, Object> queryContext = req.getQueryContext();
StreamingResultSet rs = req.getStreamingResultSet();

String queryContextEncoded = null;
if (queryContext != null) {
Expand All @@ -2407,6 +2456,7 @@ public int createQuery(String queryString, StreamingResultSet rs, Map<String, Ob
params.put("query", queryString);
params.put("source", "java-client");
params.put("context", queryContextEncoded);
params.put("use_summary", req.isUseSummary());

Message resp = rpc("org.araqne.logdb.msgbus.LogQueryPlugin.createQuery", params);
int id = resp.getInt("id");
Expand All @@ -2418,7 +2468,15 @@ public int createQuery(String queryString, StreamingResultSet rs, Map<String, Ob
session.registerTrap("logdb-query-result-" + id);
}

queries.putIfAbsent(id, new Query(this, id, queryString));
Query query = new Query(this, id, queryString);

if (resp.get("field_order") != null)
query.setFieldOrder((List<String>) resp.get("field_order"));

Query old = queries.putIfAbsent(id, query);
if (old != null)
throw new IllegalStateException("duplicated query id " + id);

return id;
}

Expand Down Expand Up @@ -2841,6 +2899,15 @@ private void flushInternal() {
}
}

/**
* 특정 쿼리가 종료할 때까지 현재 스레드를 대기(blocking) 합니다. 쿼리가 완료 혹은 취소되면 스레드 대기 상태가 풀립니다.
*
* @param id 쿼리 ID
*/
public void waitUntil(int id) {
waitUntil(id, null);
}

/**
* 특정 쿼리에 대해서 주어진 쿼리 결과 갯수가 조회 가능할 때까지 현재 스레드를 대기(blocking) 합니다. 주어진 쿼리 결과
* 갯수를 채우지 못하더라도 쿼리가 완료 혹은 취소되면 스레드 대기 상태가 풀립니다. 이 메소드를 이용하면 매번 getQuery()를
Expand Down Expand Up @@ -2895,6 +2962,51 @@ public Map<String, Object> getResult(int id, long offset, int limit) throws IOEx
return decodeBinary(binary, uncompressedSize);
}

/**
* 필드 정렬 순서를 반환합니다. 필드 정렬 순서에 표시된 필드가 쿼리 결과에 존재하지 않을 수 있으며, 쿼리 결과의 모든 필드를 나열하지
* 않습니다. 필드 정렬 순서는 실제 출력 필드에서 고려되어야 할 순서를 의미하므로, 필드 정렬 순서에 나타나지 않은 결과 필드는 사전순으로
* 정렬해야 합니다.
*
* @param id 쿼리 ID
* @return 필드 정렬 순서 목록. 필드 정렬 순서가 정의되지 않은 경우 null을 반환합니다.
*/
@SuppressWarnings("unchecked")
public List<String> getFieldOrder(int id) throws IOException {
verifyQueryId(id);

Map<String, Object> params = new HashMap<String, Object>();
params.put("id", id);
params.put("offset", 0);
params.put("limit", 1);

Message resp = rpc("org.araqne.logdb.msgbus.LogQueryPlugin.getResult", params);
if (resp.getParameters().size() == 0)
throw new MessageException("query-not-found", "", resp.getParameters());

return (List<String>) resp.get("field_order");
}

@SuppressWarnings("unchecked")
public List<FieldSummary> getSummary(int id) throws IOException {
verifyQueryId(id);

Map<String, Object> params = new HashMap<String, Object>();
params.put("id", id);

Message resp = rpc("org.araqne.logdb.msgbus.LogQueryPlugin.getSummary", params);
if (resp.get("summary") == null)
return null;

List<FieldSummary> fields = new ArrayList<FieldSummary>();
List<Object> l = (List<Object>) resp.get("summary");
for (Object o : l) {
Map<String, Object> m = (Map<String, Object>) o;
fields.add(FieldSummary.parse(m));
}

return fields;
}

private Map<String, Object> decodeBinary(String binary, int uncompressedSize) {
byte[] b = Base64.decode(binary);
byte[] uncompressed = new byte[uncompressedSize];
Expand Down
22 changes: 22 additions & 0 deletions src/main/java/com/logpresso/client/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ public class Query {
// @since 0.9.1
private List<SubQuery> subQueries = new ArrayList<SubQuery>();

// @since 1.1.0
private List<String> fieldOrder;

// @since 1.1.0
private List<FieldSummary> fieldSummary;

public Query(Logpresso client, int id, String queryString) {
this.client = client;
this.id = id;
Expand Down Expand Up @@ -348,6 +354,22 @@ public void setCancelReason(String cancelReason) {
this.cancelReason = cancelReason;
}

public List<String> getFieldOrder() {
return fieldOrder;
}

public void setFieldOrder(List<String> fieldOrder) {
this.fieldOrder = fieldOrder;
}

public List<FieldSummary> getFieldSummary() {
return fieldSummary;
}

public void setFieldSummary(List<FieldSummary> fieldSummary) {
this.fieldSummary = fieldSummary;
}

private class WaitingCondition {
private Long threshold;
private Object signal = new Object();
Expand Down
Loading