Configuration conf = new Configuration();
conf.setFloat(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 16000);
final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
int maxIterations = 5;
DataSet
<Tuple4
<String,
String,
Double, Double
>> temporalsetdoubles
= env.
readCsvFile("./datasets/Testgraph2")
.fieldDelimiter(",") // node IDs are separated by spaces
.ignoreComments("%") // comments start with "%"
Tgraph
<String, NullValue, NullValue, Double
> TemporalGraph
= Tgraph.
From4TupleNoEdgesNoVertexes(temporalsetdoubles,env
);
DataSet
<String
> setforlist
= TemporalGraph.
getVertices().
map(new MapFunction
<Vertex
<String, NullValue
>, String
>() {
@Override
return value.getId();
}
});
List<String> vertexcollection = setforlist.collect();
DataSet<ArrayList<String>> collectionDataSet;
collectionDataSet = TemporalGraph.run(new SingleSourceShortestTemporalPathSTTJustPaths<>(vertexcollection.get(0), maxIterations)).map(new vertextestmapperdataset());
for (String vertexid
: vertexcollection
) {
DataSet<ArrayList<String>> iteratorDataSet = TemporalGraph.run(new SingleSourceShortestTemporalPathSTTJustPaths<>(vertexid, maxIterations)).map(new vertextestmapperdataset());
collectionDataSet = iteratorDataSet.union(collectionDataSet);
iteratorDataSet.print();
}
collectionDataSet.print();