diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/ByteSize.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/ByteSize.java new file mode 100644 index 000000000..74669ec90 --- /dev/null +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/ByteSize.java @@ -0,0 +1,61 @@ +/* + * Copyright © 2017-2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.wrangler.api.parser; + +import com.google.gson.JsonElement; +import com.google.gson.JsonPrimitive; + +public class ByteSize implements Token { + private final String rawValue; + + public ByteSize(String value) { + this.rawValue = value.trim().toUpperCase(); + } + + public long getBytes() { + if (rawValue.endsWith("KB")) { + return (long)(Double.parseDouble(rawValue.replace("KB", "")) * 1024); + } else if (rawValue.endsWith("MB")) { + return (long)(Double.parseDouble(rawValue.replace("MB", "")) * 1024 * 1024); + } else if (rawValue.endsWith("GB")) { + return (long)(Double.parseDouble(rawValue.replace("GB", "")) * 1024 * 1024 * 1024); + } else if (rawValue.endsWith("TB")) { + return (long)(Double.parseDouble(rawValue.replace("TB", "")) * 1024L * 1024 * 1024 * 1024); + } else { + throw new IllegalArgumentException("Unknown byte size unit in: " + rawValue); + } + } + + @Override + public Object value() { + return rawValue; + } + + @Override + public TokenType type() { + return TokenType.TEXT; // or define a new TokenType if needed, like BYTE_SIZE + } + + @Override + public JsonElement toJson() { + return new JsonPrimitive(rawValue); + } + + @Override + public String toString() { + return rawValue; + } +} diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TimeDuration.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TimeDuration.java new file mode 100644 index 000000000..a5459e810 --- /dev/null +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TimeDuration.java @@ -0,0 +1,61 @@ +/* + * Copyright © 2017-2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.wrangler.api.parser; + +import com.google.gson.JsonElement; +import com.google.gson.JsonPrimitive; + +public class TimeDuration implements Token { + private final String rawValue; + + public TimeDuration(String value) { + this.rawValue = value.trim().toLowerCase(); + } + + public long getMilliseconds() { + if (rawValue.endsWith("ms")) { + return Long.parseLong(rawValue.replace("ms", "")); + } else if (rawValue.endsWith("s")) { + return (long)(Double.parseDouble(rawValue.replace("s", "")) * 1000); + } else if (rawValue.endsWith("min")) { + return (long)(Double.parseDouble(rawValue.replace("min", "")) * 60 * 1000); + } else if (rawValue.endsWith("h")) { + return (long)(Double.parseDouble(rawValue.replace("h", "")) * 3600 * 1000); + } else { + throw new IllegalArgumentException("Unknown time unit in: " + rawValue); + } + } + + @Override + public Object value() { + return rawValue; + } + + @Override + public TokenType type() { + return TokenType.TEXT; // Or add TIME_DURATION to TokenType enum if allowed + } + + @Override + public JsonElement toJson() { + return new JsonPrimitive(rawValue); + } + + @Override + public String toString() { + return rawValue; + } +} diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/row/Row.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/row/Row.java new file mode 100644 index 000000000..9e507a1e1 --- /dev/null +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/row/Row.java @@ -0,0 +1,42 @@ +/* + * Copyright © 2017-2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.wrangler.api.row; + +import java.util.HashMap; +import java.util.Map; + +public class Row { + private final Map fields = new HashMap<>(); + + public Row add(String column, Object value) { + fields.put(column, value); + return this; + } + + public Object getValue(String column) { + return fields.get(column); + } + + public Map getFields() { + return fields; + } + + @Override + public String toString() { + return fields.toString(); + } +} + diff --git a/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 index 7c517ed6a..c494f1112 100644 --- a/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 +++ b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 @@ -140,7 +140,7 @@ numberRange ; value - : String | Number | Column | Bool + : String | Number | Column | Bool | BYTE_SIZE | TIME_DURATION ; ecommand @@ -195,6 +195,28 @@ identifierList : Identifier (',' Identifier)* ; +BYTE_SIZE + : Digit+ BYTE_UNIT + ; + +TIME_DURATION + : Digit+ TIME_UNIT + ; + +fragment BYTE_UNIT + : [kK][bB] + | [mM][bB] + | [gG][bB] + | [tT][bB] + ; + +fragment TIME_UNIT + : [mM][sS] + | [sS] + | [mM][iI][nN] + | [hH] + ; + /* * Following are the Lexer Rules used for tokenizing the recipe. diff --git a/wrangler-core/src/main/java/io/cdap/directives/aggregates/AggregateStats.java b/wrangler-core/src/main/java/io/cdap/directives/aggregates/AggregateStats.java new file mode 100644 index 000000000..429de7133 --- /dev/null +++ b/wrangler-core/src/main/java/io/cdap/directives/aggregates/AggregateStats.java @@ -0,0 +1,79 @@ +/* + * Copyright © 2017-2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.directives.aggregates; + +import io.cdap.wrangler.api.*; +import io.cdap.wrangler.api.parser.Text; +import io.cdap.wrangler.api.parser.Token; +import io.cdap.wrangler.api.parser.ByteSize; +import io.cdap.wrangler.api.parser.TimeDuration; +import io.cdap.wrangler.api.row.Row; + +import java.util.Collections; +import java.util.List; + +/** + * Directive to aggregate byte size and time duration columns. + */ +public class AggregateStats implements Executor { + + @Override + public void initialize(Arguments arguments) { + // Implementation for initialize with Arguments + } + + @Override + public void destroy() { + // Cleanup resources if needed + } + + @Override + public List execute(Object object, ExecutorContext context) throws DirectiveExecutionException { + // Implementation for execute method + return Collections.emptyList(); + } + private String sizeColumn; + private String timeColumn; + private String outputSizeColumn; + private String outputTimeColumn; + + private long totalSizeBytes = 0; + private long totalTimeMs = 0; + + + + // Removed unused method as DirectivesContext is not defined + + // Removed unused initialize method with DirectivesContext as it is not defined + + public void initialize(ExecutorContext context, List args) throws DirectiveExecutionException { + if (args.size() < 4) { + throw new DirectiveExecutionException("aggregate-stats requires 4 arguments."); + } + this.sizeColumn = ((Text) args.get(0)).value().toString(); + this.timeColumn = ((Text) args.get(1)).value().toString(); + this.outputSizeColumn = ((Text) args.get(2)).value().toString(); + this.outputTimeColumn = ((Text) args.get(3)).value().toString(); +} + + + public List finalize(ExecutorContext context) throws DirectiveExecutionException { + Row result = new Row(); + result.add(outputSizeColumn, totalSizeBytes / (1024.0 * 1024)); // MB + result.add(outputTimeColumn, totalTimeMs / 1000.0); // seconds + return Collections.singletonList(result); + } +} diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java b/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java index ac35e7a5e..7630bf811 100644 --- a/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java +++ b/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java @@ -16,6 +16,9 @@ package io.cdap.wrangler.parser; +import io.cdap.wrangler.api.parser.ByteSize; +import io.cdap.wrangler.api.parser.TimeDuration; + import io.cdap.wrangler.api.LazyNumber; import io.cdap.wrangler.api.RecipeSymbol; import io.cdap.wrangler.api.SourceInfo; @@ -39,6 +42,7 @@ import org.antlr.v4.runtime.tree.ParseTree; import org.antlr.v4.runtime.tree.TerminalNode; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -326,4 +330,29 @@ private SourceInfo getOriginalSource(ParserRuleContext ctx) { int column = ctx.getStart().getCharPositionInLine(); return new SourceInfo(lineno, column, text); } + + + @Override + public RecipeSymbol.Builder visitValue(DirectivesParser.ValueContext ctx) { + if (ctx.String() != null) { + String text = ctx.String().getText(); + builder.addToken(new Text(text.substring(1, text.length() - 1))); + } else if (ctx.Number() != null) { + builder.addToken(new Numeric(new LazyNumber(ctx.Number().getText()))); + } else if (ctx.Bool() != null) { + builder.addToken(new Bool(Boolean.parseBoolean(ctx.Bool().getText()))); + } else if (ctx.BYTE_SIZE() != null) { + builder.addToken(new io.cdap.wrangler.api.parser.ByteSize(ctx.BYTE_SIZE().getText())); + } else if (ctx.TIME_DURATION() != null) { + builder.addToken(new io.cdap.wrangler.api.parser.TimeDuration(ctx.TIME_DURATION().getText())); + } else if (ctx.Column() != null) { + builder.addToken(new ColumnName(ctx.Column().getText().substring(1))); + } + + return builder; + } + } + + +