Configuration conf = new Configuration(); conf.setFloat(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 16000); final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf); int maxIterations = 5; DataSet> temporalsetdoubles = env.readCsvFile("./datasets/Testgraph2") .fieldDelimiter(",") // node IDs are separated by spaces .ignoreComments("%") // comments start with "%" .types(String.class, String.class, Double.class, Double.class); // read the node IDs as Longs Tgraph TemporalGraph = Tgraph.From4TupleNoEdgesNoVertexes(temporalsetdoubles,env); DataSet setforlist = TemporalGraph.getVertices().map(new MapFunction, String>() { @Override public String map(Vertex value) throws Exception { return value.getId(); } }); List vertexcollection = setforlist.collect(); DataSet> collectionDataSet; collectionDataSet = TemporalGraph.run(new SingleSourceShortestTemporalPathSTTJustPaths<>(vertexcollection.get(0), maxIterations)).map(new vertextestmapperdataset()); for (String vertexid : vertexcollection) { DataSet> iteratorDataSet = TemporalGraph.run(new SingleSourceShortestTemporalPathSTTJustPaths<>(vertexid, maxIterations)).map(new vertextestmapperdataset()); collectionDataSet = iteratorDataSet.union(collectionDataSet); iteratorDataSet.print(); } collectionDataSet.print();