~/paste/19924
~/paste/19924
~/paste/19924

  1. package Tgraphs;
  2.  
  3. import org.apache.flink.api.common.functions.MapFunction;
  4. import org.apache.flink.api.java.DataSet;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. import org.apache.flink.api.java.tuple.Tuple3;
  7. import org.apache.flink.graph.Edge;
  8. import org.apache.flink.graph.Vertex;
  9. import org.apache.flink.graph.spargel.GatherFunction;
  10. import org.apache.flink.graph.spargel.MessageIterator;
  11. import org.apache.flink.graph.spargel.ScatterFunction;
  12. import org.apache.flink.types.NullValue;
  13.  
  14. /**
  15.  * Created by s133781 on 03-Nov-16.
  16.  *
  17.  * input:
  18.  * Tgraph <K,Double,EV,Double>
  19.  * output:
  20.  * Dataset<Vertex<K,tuple2<Double,Arraylist<Double>>>>
  21.  *
  22.  */
  23. public class SingleSourceShortestTemporalPathEAT3<K,EV> implements TGraphAlgorithm<K,NullValue,EV,Double,DataSet<Vertex<K,Double>>> {
  24.  
  25.     private final K srcVertexId;
  26.     private final Integer maxIterations;
  27.  
  28.     public SingleSourceShortestTemporalPathEAT3(K srcVertexId, Integer maxIterations) {
  29.         this.srcVertexId = srcVertexId;
  30.         this.maxIterations = maxIterations;
  31.     }
  32.     @Override
  33.     public DataSet<Vertex<K,Double>> run(Tgraph<K, NullValue, EV, Double> input) throws Exception {
  34.  
  35.         input.getGellyGraph().mapVertices(new InitVerticesMapper<K>(srcVertexId)).getVertices().print();
  36.         return null;
  37.  
  38. //        return input.getGellyGraph().mapVertices(new InitVerticesMapper<K>(srcVertexId)).runScatterGatherIteration(
  39. //                new MinDistanceMessengerforTuplewithpath<K,EV>(), new VertexDistanceUpdaterwithpath<K>(),
  40. //                maxIterations).getVertices();
  41.  
  42.     }
  43.  
  44.     public static final class InitVerticesMapper<K>     implements MapFunction<Vertex<K, NullValue>, Tuple2<K,Double>> {
  45.  
  46.         private K srcVertexId;
  47.  
  48.         public InitVerticesMapper(K srcId) {
  49.             this.srcVertexId = srcId;
  50.         }
  51.  
  52.         public Tuple2<K,Double> map(Vertex<K, NullValue> value) throws Exception {
  53.              if (value.f0.equals(srcVertexId)) {
  54.                 return new Tuple2<>(value.getId(),0D);
  55.             } else {
  56.                  return new Tuple2<>(value.getId(),Double.MAX_VALUE);
  57.             }
  58.         }
  59.     }
  60.  
  61.  
  62.     /*
  63.     * mindistance function from scatterfunction with:
  64.     * K as K
  65.     * VV as Double
  66.     * Message as Tuple2<Double,ArrayList<K>>
  67.     * EV as Tuple3<EV, Double, Double>
  68.     *
  69.     * checks if vertexvalue is < inf, then sends a message to neighboor vertexes
  70.     * with end time and such
  71.     * */
  72.     private static final class MinDistanceMessengerforTuplewithpath<K,EV> extends ScatterFunction<K, Double, Double, Tuple3<EV, Double, Double>> {
  73.         @Override
  74.         public void sendMessages(Vertex<K, Double> vertex) {
  75.             if (vertex.getValue() < Double.POSITIVE_INFINITY) { //Checks if it has been passed for the first time
  76.                 for (Edge<K, Tuple3<EV,Double,Double>> edge : getEdges()) {
  77.                     if (edge.getValue().f1 >= vertex.getValue()) { //If starting time of the edge
  78.  
  79.                         sendMessageTo(edge.getTarget(), edge.getValue().f2.doubleValue());
  80.                     }
  81.                 }
  82.             }
  83.         }
  84.     }
  85.     /**
  86.      * K as K
  87.      * VV as Tuple2<Double,ArrayList<K>>
  88.      * Message as Tuple2<Double,ArrayList<K>>
  89.      *
  90.      * @param <K>
  91.      */
  92.  
  93.  
  94.     private static final class VertexDistanceUpdaterwithpath<K> extends GatherFunction<K, Double, Double> {
  95.  
  96.         @Override
  97.         public void updateVertex(Vertex<K, Double> vertex, MessageIterator<Double> inMessages) {
  98.  
  99.             Double minDistance = Double.MAX_VALUE;
  100.             for (Double msg : inMessages) {
  101.                 if (msg < minDistance) {
  102.                     minDistance = msg;
  103.                 }
  104.             }
  105.  
  106.             if (vertex.getValue() > minDistance) {
  107.                 setNewVertexValue(minDistance);
  108.             }
  109.         }
  110.  
  111.  
  112.     }
  113.  
  114.  
  115. }
  116.  
Language: java
Posted by Anonymous at 17 Nov 2016, 12:40:39 UTC