diff --git a/pom.xml b/pom.xml index 970ad6d4..c754633b 100644 --- a/pom.xml +++ b/pom.xml @@ -39,6 +39,11 @@ me@matthiasb.com http://matthiasb.com + + Robin Syihab + robin@digaku.com + http://robin.nosql.asia + 2012 diff --git a/src/main/java/com/thinkaurelius/faunus/FaunusElement.java b/src/main/java/com/thinkaurelius/faunus/FaunusElement.java index d8c933e8..c1d9dbd3 100644 --- a/src/main/java/com/thinkaurelius/faunus/FaunusElement.java +++ b/src/main/java/com/thinkaurelius/faunus/FaunusElement.java @@ -210,11 +210,17 @@ public void readFields(final DataInput in) throws IOException { public void write(final DataOutput out) throws IOException { WritableUtils.writeVLong(out, this.id); out.writeBoolean(this.pathEnabled); - if (this.pathEnabled) - ElementPaths.write(this.paths, out); - else - WritableUtils.writeVLong(out, this.pathCounter); - ElementProperties.write(this.properties, out); + + try { + if (this.pathEnabled) + ElementPaths.write(this.paths, out); + else + WritableUtils.writeVLong(out, this.pathCounter); + ElementProperties.write(this.properties, out); + } catch (com.esotericsoftware.kryo.KryoException e) { + throw new com.esotericsoftware.kryo.KryoException("Kryo failed when processing " + this.toString() + + ". " + e.getMessage()); + } } @Override diff --git a/src/main/java/com/thinkaurelius/faunus/formats/graphson/FaunusGraphSONUtility.java b/src/main/java/com/thinkaurelius/faunus/formats/graphson/FaunusGraphSONUtility.java index 3624a212..96e5ad68 100644 --- a/src/main/java/com/thinkaurelius/faunus/formats/graphson/FaunusGraphSONUtility.java +++ b/src/main/java/com/thinkaurelius/faunus/formats/graphson/FaunusGraphSONUtility.java @@ -6,11 +6,7 @@ import com.tinkerpop.blueprints.Edge; import com.tinkerpop.blueprints.Element; import com.tinkerpop.blueprints.Vertex; -import com.tinkerpop.blueprints.util.io.graphson.ElementFactory; -import com.tinkerpop.blueprints.util.io.graphson.ElementPropertyConfig; -import com.tinkerpop.blueprints.util.io.graphson.GraphSONMode; -import com.tinkerpop.blueprints.util.io.graphson.GraphSONTokens; -import com.tinkerpop.blueprints.util.io.graphson.GraphSONUtility; +import com.tinkerpop.blueprints.util.io.graphson.*; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; @@ -20,11 +16,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.util.Arrays; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; +import java.util.*; import static com.tinkerpop.blueprints.Direction.IN; import static com.tinkerpop.blueprints.Direction.OUT; @@ -44,7 +36,11 @@ public class FaunusGraphSONUtility { private static final FaunusElementFactory elementFactory = new FaunusElementFactory(); - private static final GraphSONUtility graphson = new GraphSONUtility(GraphSONMode.COMPACT, elementFactory, + private static final GraphSONUtility graphsonCompact = new GraphSONUtility(GraphSONMode.COMPACT, elementFactory, + ElementPropertyConfig.ExcludeProperties(VERTEX_IGNORE, EDGE_IGNORE)); + private static final GraphSONUtility graphsonNormal = new GraphSONUtility(GraphSONMode.NORMAL, elementFactory, + ElementPropertyConfig.ExcludeProperties(VERTEX_IGNORE, EDGE_IGNORE)); + private static final GraphSONUtility graphsonExtended = new GraphSONUtility(GraphSONMode.EXTENDED, elementFactory, ElementPropertyConfig.ExcludeProperties(VERTEX_IGNORE, EDGE_IGNORE)); public static List fromJSON(final InputStream in) throws IOException { @@ -60,32 +56,60 @@ public static List fromJSON(final InputStream in) throws IOExcepti } public static FaunusVertex fromJSON(String line) throws IOException { + return fromJSON(line, GraphSONMode.COMPACT); + } + + private static GraphSONUtility getGraphSON(GraphSONMode mode){ + switch(mode){ + case EXTENDED: + return graphsonExtended; + case NORMAL: + return graphsonNormal; + default: + return graphsonCompact; + } + + } + + public static FaunusVertex fromJSON(String line, GraphSONMode mode) throws IOException { try { final JSONObject json = new JSONObject(new JSONTokener(line)); line = EMPTY_STRING; // clear up some memory - final FaunusVertex vertex = (FaunusVertex) graphson.vertexFromJson(json); + JSONArray outEArray = json.optJSONArray(_OUT_E); + JSONArray inEArray = json.optJSONArray(_IN_E); - fromJSONEdges(vertex, json.optJSONArray(_OUT_E), OUT); json.remove(_OUT_E); // clear up some memory - fromJSONEdges(vertex, json.optJSONArray(_IN_E), IN); json.remove(_IN_E); // clear up some memory + FaunusVertex vertex = (FaunusVertex) getGraphSON(mode).vertexFromJson(json); + + fromJSONEdges(vertex, outEArray, OUT, mode); + fromJSONEdges(vertex, inEArray, IN, mode); + + return vertex; + } catch (NullPointerException e){ + return null; } catch (Exception e) { throw new IOException(e.getMessage(), e); } } private static void fromJSONEdges(final FaunusVertex vertex, final JSONArray edges, final Direction direction) throws JSONException, IOException { + fromJSONEdges(vertex, edges, direction, GraphSONMode.COMPACT); + } + + private static void fromJSONEdges(final FaunusVertex vertex, final JSONArray edges, final Direction direction, GraphSONMode mode) throws JSONException, IOException { if (null != edges) { for (int i = 0; i < edges.length(); i++) { final JSONObject edge = edges.optJSONObject(i); FaunusEdge faunusEdge = null; + if (direction.equals(Direction.IN)) { - faunusEdge = (FaunusEdge) graphson.edgeFromJson(edge, new FaunusVertex(edge.optLong(GraphSONTokens._OUT_V)), vertex); + faunusEdge = (FaunusEdge) getGraphSON(mode).edgeFromJson(edge, new FaunusVertex(edge.optLong(GraphSONTokens._OUT_V)), vertex); } else if (direction.equals(Direction.OUT)) { - faunusEdge = (FaunusEdge) graphson.edgeFromJson(edge, vertex, new FaunusVertex(edge.optLong(GraphSONTokens._IN_V))); + faunusEdge = (FaunusEdge) getGraphSON(mode).edgeFromJson(edge, vertex, new FaunusVertex(edge.optLong(GraphSONTokens._IN_V))); } if (faunusEdge != null) { @@ -96,8 +120,12 @@ private static void fromJSONEdges(final FaunusVertex vertex, final JSONArray edg } public static JSONObject toJSON(final Vertex vertex) throws IOException { + return toJSON(vertex, GraphSONMode.COMPACT); + } + + public static JSONObject toJSON(final Vertex vertex, GraphSONMode mode) throws IOException { try { - final JSONObject object = GraphSONUtility.jsonFromElement(vertex, getElementPropertyKeys(vertex, false), GraphSONMode.COMPACT); + final JSONObject object = GraphSONUtility.jsonFromElement(vertex, getElementPropertyKeys(vertex, false), mode); // force the ID to long. with blueprints, most implementations will send back a long, but // some like TinkerGraph will return a string. the same is done for edges below @@ -107,7 +135,7 @@ public static JSONObject toJSON(final Vertex vertex) throws IOException { if (!edges.isEmpty()) { final JSONArray outEdgesArray = new JSONArray(); for (final Edge outEdge : edges) { - final JSONObject edgeObject = GraphSONUtility.jsonFromElement(outEdge, getElementPropertyKeys(outEdge, true), GraphSONMode.COMPACT); + final JSONObject edgeObject = GraphSONUtility.jsonFromElement(outEdge, getElementPropertyKeys(outEdge, true), mode); outEdgesArray.put(edgeObject); } object.put(_OUT_E, outEdgesArray); @@ -117,7 +145,7 @@ public static JSONObject toJSON(final Vertex vertex) throws IOException { if (!edges.isEmpty()) { final JSONArray inEdgesArray = new JSONArray(); for (final Edge inEdge : edges) { - final JSONObject edgeObject = GraphSONUtility.jsonFromElement(inEdge, getElementPropertyKeys(inEdge, false), GraphSONMode.COMPACT); + final JSONObject edgeObject = GraphSONUtility.jsonFromElement(inEdge, getElementPropertyKeys(inEdge, false), mode); inEdgesArray.put(edgeObject); } object.put(_IN_E, inEdgesArray); diff --git a/src/main/java/com/thinkaurelius/faunus/formats/graphson/GraphSONInputFormat.java b/src/main/java/com/thinkaurelius/faunus/formats/graphson/GraphSONInputFormat.java index 69cdb6f5..2c2ac729 100644 --- a/src/main/java/com/thinkaurelius/faunus/formats/graphson/GraphSONInputFormat.java +++ b/src/main/java/com/thinkaurelius/faunus/formats/graphson/GraphSONInputFormat.java @@ -2,6 +2,7 @@ import com.thinkaurelius.faunus.FaunusVertex; import com.thinkaurelius.faunus.formats.VertexQueryFilter; +import com.tinkerpop.blueprints.util.io.graphson.GraphSONMode; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -25,7 +26,20 @@ public class GraphSONInputFormat extends FileInputFormat createRecordReader(final InputSplit split, final TaskAttemptContext context) { - return new GraphSONRecordReader(this.vertexQuery); + + final GraphSONMode mode; + + String modeStr = context.getConfiguration().getRaw("faunus.graphson.mode"); + + if (modeStr.equals("normal")){ + mode = GraphSONMode.NORMAL; + }else if (modeStr.equals("extended")){ + mode = GraphSONMode.EXTENDED; + }else{ + mode = GraphSONMode.COMPACT; + } + + return new GraphSONRecordReader(this.vertexQuery, mode); } @Override diff --git a/src/main/java/com/thinkaurelius/faunus/formats/graphson/GraphSONOutputFormat.java b/src/main/java/com/thinkaurelius/faunus/formats/graphson/GraphSONOutputFormat.java index 26d3a3ac..ad00c7e7 100644 --- a/src/main/java/com/thinkaurelius/faunus/formats/graphson/GraphSONOutputFormat.java +++ b/src/main/java/com/thinkaurelius/faunus/formats/graphson/GraphSONOutputFormat.java @@ -2,10 +2,12 @@ import com.thinkaurelius.faunus.FaunusVertex; import com.thinkaurelius.faunus.formats.FaunusFileOutputFormat; +import com.tinkerpop.blueprints.util.io.graphson.GraphSONMode; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import java.io.DataOutputStream; import java.io.IOException; /** @@ -15,6 +17,29 @@ public class GraphSONOutputFormat extends FaunusFileOutputFormat { @Override public RecordWriter getRecordWriter(final TaskAttemptContext job) throws IOException, InterruptedException { - return new GraphSONRecordWriter(super.getDataOuputStream(job)); + DataOutputStream os = super.getDataOuputStream(job); + final GraphSONMode mode; + + String modeStr = job.getConfiguration().getRaw("faunus.graphson.mode"); + + if (modeStr == "normal"){ + mode = GraphSONMode.NORMAL; + }else if (modeStr == "extended"){ + mode = GraphSONMode.EXTENDED; + }else{ + mode = GraphSONMode.COMPACT; + } + + return new GraphSONRecordWriter(os) { + + @Override + public void write(NullWritable key, FaunusVertex vertex) throws IOException { + if (null != vertex) { + this.out.write(FaunusGraphSONUtility.toJSON(vertex, mode).toString().getBytes("UTF-8")); + this.out.write(NEWLINE); + } + } + }; } -} \ No newline at end of file +} + diff --git a/src/main/java/com/thinkaurelius/faunus/formats/graphson/GraphSONRecordReader.java b/src/main/java/com/thinkaurelius/faunus/formats/graphson/GraphSONRecordReader.java index 90b72ebd..c2630dd6 100644 --- a/src/main/java/com/thinkaurelius/faunus/formats/graphson/GraphSONRecordReader.java +++ b/src/main/java/com/thinkaurelius/faunus/formats/graphson/GraphSONRecordReader.java @@ -4,6 +4,7 @@ import com.thinkaurelius.faunus.FaunusVertex; import com.thinkaurelius.faunus.formats.VertexQueryFilter; import com.thinkaurelius.faunus.mapreduce.FaunusCompiler; +import com.tinkerpop.blueprints.util.io.graphson.GraphSONMode; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; @@ -21,10 +22,16 @@ public class GraphSONRecordReader extends RecordReader { - private static final String UTF8 = "UTF-8"; - private static final byte[] NEWLINE; + protected static final String UTF8 = "UTF-8"; + protected static final byte[] NEWLINE; protected DataOutputStream out; static {