package Tgraphs; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.spargel.GatherFunction; import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.ScatterFunction; import org.apache.flink.types.NullValue; /** * Created by s133781 on 03-Nov-16. * * input: * Tgraph * output: * Dataset>>> * */ public class SingleSourceShortestTemporalPathEAT3 implements TGraphAlgorithm>> { private final K srcVertexId; private final Integer maxIterations; public SingleSourceShortestTemporalPathEAT3(K srcVertexId, Integer maxIterations) { this.srcVertexId = srcVertexId; this.maxIterations = maxIterations; } @Override public DataSet> run(Tgraph input) throws Exception { input.getGellyGraph().mapVertices(new InitVerticesMapper(srcVertexId)).getVertices().print(); return null; // return input.getGellyGraph().mapVertices(new InitVerticesMapper(srcVertexId)).runScatterGatherIteration( // new MinDistanceMessengerforTuplewithpath(), new VertexDistanceUpdaterwithpath(), // maxIterations).getVertices(); } public static final class InitVerticesMapper implements MapFunction, Tuple2> { private K srcVertexId; public InitVerticesMapper(K srcId) { this.srcVertexId = srcId; } public Tuple2 map(Vertex value) throws Exception { if (value.f0.equals(srcVertexId)) { return new Tuple2<>(value.getId(),0D); } else { return new Tuple2<>(value.getId(),Double.MAX_VALUE); } } } /* * mindistance function from scatterfunction with: * K as K * VV as Double * Message as Tuple2> * EV as Tuple3 * * checks if vertexvalue is < inf, then sends a message to neighboor vertexes * with end time and such * */ private static final class MinDistanceMessengerforTuplewithpath extends ScatterFunction> { @Override public void sendMessages(Vertex vertex) { if (vertex.getValue() < Double.POSITIVE_INFINITY) { //Checks if it has been passed for the first time for (Edge> edge : getEdges()) { if (edge.getValue().f1 >= vertex.getValue()) { //If starting time of the edge sendMessageTo(edge.getTarget(), edge.getValue().f2.doubleValue()); } } } } } /** * K as K * VV as Tuple2> * Message as Tuple2> * * @param */ private static final class VertexDistanceUpdaterwithpath extends GatherFunction { @Override public void updateVertex(Vertex vertex, MessageIterator inMessages) { Double minDistance = Double.MAX_VALUE; for (Double msg : inMessages) { if (msg < minDistance) { minDistance = msg; } } if (vertex.getValue() > minDistance) { setNewVertexValue(minDistance); } } } }