diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala index 3bd42dc31..a3f27636d 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala @@ -26,7 +26,7 @@ import org.apache.spark.util.AccumulatorV2 import org.openeo.geotrellis import org.openeo.geotrellis.creo.CreoS3Utils import org.openeo.geotrellis.netcdf.NetCDFRDDWriter.fixedTimeOffset -import org.openeo.geotrellis.stac.STACItem +import org.openeo.geotrellis.stac.{Item, Asset, STACItem} import org.openeo.geotrellis.tile_grid.TileGrid import org.openeo.geotrelliscommon.ByKeyPartitioner import org.slf4j.LoggerFactory @@ -39,7 +39,8 @@ import java.nio.file.StandardCopyOption.REPLACE_EXISTING import java.nio.file.attribute.PosixFilePermissions import java.time.Duration import java.time.format.DateTimeFormatter -import java.util.{ArrayList, Collections, Map, List => JList} +import java.util.{ArrayList, Collections, Map, UUID, List => JList} +import java.util.stream.Collectors import scala.collection.JavaConverters._ import scala.reflect._ @@ -78,16 +79,16 @@ package object geotiff { ) // ~ SpatialTiledRasterLayer in GeoPySpark but supports compression - def saveStitched(rdd: SRDD, path: String, compression: Compression): Extent = + def saveStitched(rdd: SRDD, path: String, compression: Compression): Item = saveStitched(rdd, path, None, None, compression) - def saveStitched(rdd: SRDD, path: String, cropBounds: Map[String, Double], compression: Compression): Extent = + def saveStitched(rdd: SRDD, path: String, cropBounds: Map[String, Double], compression: Compression): Item = saveStitched(rdd, path, Some(cropBounds), None, compression) - def saveStitchedTileGrid(rdd: SRDD, path: String, tileGrid: String, compression: Compression): java.util.List[(String, Extent)] = + def saveStitchedTileGrid(rdd: SRDD, path: String, tileGrid: String, compression: Compression): JList[Item] = saveStitchedTileGrid(rdd, path, tileGrid, None, None, compression) - def saveStitchedTileGrid(rdd: SRDD, path: String, tileGrid: String, cropBounds: Map[String, Double], compression: Compression): java.util.List[(String, Extent)] = + def saveStitchedTileGrid(rdd: SRDD, path: String, tileGrid: String, cropBounds: Map[String, Double], compression: Compression): JList[Item] = saveStitchedTileGrid(rdd, path, tileGrid, Some(cropBounds), None, compression) def saveRDDTiled(rdd:MultibandTileLayerRDD[SpaceTimeKey], path:String,zLevel:Int=6,cropBounds:Option[Extent]=Option.empty[Extent]):Unit = { @@ -104,16 +105,21 @@ package object geotiff { zLevel: Int = 6, cropBounds: Option[Extent] = Option.empty[Extent], formatOptions: GTiffOptions = new GTiffOptions - ): java.util.List[(String, String, Extent)] = { + ): JList[(String, String, Extent)] = { rdd.sparkContext.setCallSite(s"save_result(GTiff, temporal)") formatOptions.assertNoConflicts() - val ret = saveRDDTemporalAllowAssetPerBand(rdd, path, zLevel, cropBounds, formatOptions).asScala + val ret = saveRDDTemporalAllowAssetPerBand(rdd, path, zLevel, cropBounds, formatOptions) logger.warn("Calling backwards compatibility version for saveRDDTemporalConsiderAssetPerBand") // val duplicates = ret.groupBy(_._2).filter(_._2.size > 1) // if (duplicates.nonEmpty) { // throw new Exception(s"Multiple returned files with same timestamp: ${duplicates.keys.mkString(", ")}") // } - ret.map(t => (t._1, t._2, t._3)).asJava + ret.stream() + .flatMap { item => + item.assets.values().stream() + .map[(String, String, Extent)] { asset => (asset.path, item.datetime, item.bbox) } + } + .collect(Collectors.toList()) } private val executorAttemptDirectoryPrefix = "executorAttemptDirectory" @@ -198,7 +204,7 @@ package object geotiff { zLevel: Int = 6, cropBounds: Option[Extent] = Option.empty[Extent], formatOptions: GTiffOptions = new GTiffOptions - ): java.util.List[(String, String, Extent, java.util.List[Int])] = { + ): JList[Item] = { formatOptions.assertNoConflicts() val preProcessResult: (GridBounds[Int], Extent, RDD[(SpaceTimeKey, MultibandTile)] with Metadata[TileLayerMetadata[SpaceTimeKey]]) = preProcess(rdd,cropBounds) val gridBounds: GridBounds[Int] = preProcessResult._1 @@ -280,8 +286,21 @@ package object geotiff { val res = geotiffResults.map { case (geoTiffResultObject, timestamp, croppedExtent, bandIndices) => val destinationPath = moveFromExecutorAttemptDirectory(Path.of(path), geoTiffResultObject) - (destinationPath.toString, timestamp, croppedExtent, bandIndices) - }.toList.asJava + (destinationPath, timestamp, croppedExtent, bandIndices) + } + + val items = res + .groupBy { case (_, timestamp, _, _) => timestamp } + .map { case (timestamp, geotiffs) => + val assets = geotiffs + .map { case (path, _, _, bandIndices) => + val assetKey = if (formatOptions.separateAssetPerBand) f"openEO_${bandLabels(bandIndices.get(0))}" else "openEO" + assetKey -> Asset(path, bandIndices) + } + .toMap + + Item(id = s"${UUID.randomUUID()}_$timestamp", datetime=timestamp, bbox = croppedExtent, assets.asJava) + } for ((geotiffResult, _, _, _) <- geotiffResults) { val successfulExecutorAttemptDirectory = extractExecutorAttemptDirectory(Path.of(path), geotiffResult) @@ -289,7 +308,7 @@ package object geotiff { } toBeGrouped.unpersist() - res + items.toList.asJava } @@ -299,14 +318,19 @@ package object geotiff { zLevel: Int = 6, cropBounds: Option[Extent] = Option.empty[Extent], formatOptions: GTiffOptions = new GTiffOptions - ): java.util.List[String] = { + ): JList[String] = { rdd.sparkContext.setCallSite(s"save_result(GTiff, spatial, $bandCount)") - val tmp = saveRDDAllowAssetPerBand(rdd, bandCount, path, zLevel, cropBounds, formatOptions).asScala + val tmp = saveRDDAllowAssetPerBand(rdd, bandCount, path, zLevel, cropBounds, formatOptions) logger.warn("Calling backwards compatibility version for saveRDDAllowAssetPerBand") // if (tmp.size() > 1) { // throw new Exception("Multiple returned files, probably meant to call saveRDDAllowAssetPerBand") // } - tmp.map(_._1).asJava + tmp.stream() + .flatMap { item => + item.assets.values().stream() + .map[String] { asset => asset.path } + } + .collect(Collectors.toList()) } //noinspection ScalaWeakerAccess @@ -316,7 +340,7 @@ package object geotiff { zLevel: Int = 6, cropBounds: Option[Extent] = Option.empty[Extent], formatOptions: GTiffOptions = new GTiffOptions - ): java.util.List[(String, Extent, java.util.List[Int])] = { + ): JList[Item] = { formatOptions.assertNoConflicts() if (formatOptions.separateAssetPerBand) { val bandLabels = formatOptions.tags.bandTags.map(_("DESCRIPTION")) @@ -376,7 +400,7 @@ package object geotiff { } else { (geoTiffResultObject.correctPath, extent, bandIndices) } - }.toList.sortBy(_._1).asJava + } if (path.endsWith("out")) { val beforeOut = path.substring(0, path.length - "out".length) @@ -386,12 +410,18 @@ package object geotiff { } } - res + val assets = res.map { case (path, _, bandIndices) => + val bandNames = bandIndices.asScala.map(bandLabels.apply) + s"openEO_${bandNames mkString "_"}" -> Asset(path, bandIndices) + }.toMap.asJava + + Collections.singletonList(Item(id = UUID.randomUUID().toString, datetime = null, bbox = extent, assets)) + // TODO: restore asset ordering? } else { - val tiffPaths = saveRDDGeneric(rdd, bandCount, path, zLevel, cropBounds, formatOptions).asScala - tiffPaths.map { case (tiffPath, extent) => - (tiffPath, extent, (0 until bandCount).toList.asJava) - }.asJava + val (tiffPath, extent) = saveRDDGeneric(rdd, bandCount, path, zLevel, cropBounds, formatOptions) + val assets = Collections.singletonMap("openEO", Asset(tiffPath, (0 until bandCount).asJava)) + + Collections.singletonList(Item(id = UUID.randomUUID().toString, datetime = null, bbox = extent, assets)) } } @@ -484,7 +514,7 @@ package object geotiff { def levelFor(extent: Extent, cellSize: CellSize): LayoutLevel = ??? } - def saveRDDGeneric[K: SpatialComponent: Boundable : ClassTag](rdd: MultibandTileLayerRDD[K], bandCount: Int, path: String, zLevel: Int = 6, cropBounds: Option[Extent] = None, formatOptions: GTiffOptions = new GTiffOptions): java.util.List[(String, Extent)] = { + private def saveRDDGeneric[K: SpatialComponent: Boundable : ClassTag](rdd: MultibandTileLayerRDD[K], bandCount: Int, path: String, zLevel: Int = 6, cropBounds: Option[Extent] = None, formatOptions: GTiffOptions = new GTiffOptions): (String, Extent) = { val preProcessResult: (GridBounds[Int], Extent, RDD[(K, MultibandTile)] with Metadata[TileLayerMetadata[K]]) = preProcess(rdd,cropBounds) val gridBounds: GridBounds[Int] = preProcessResult._1 val croppedExtent: Extent = preProcessResult._2 @@ -558,13 +588,10 @@ package object geotiff { case None => // do nothing } - Collections.singletonList((geoTiffResultObject.correctPath, croppedExtent)) - }finally { + (geoTiffResultObject.correctPath, croppedExtent) + } finally { preprocessedRdd.unpersist() } - - - } private def getCompressedTiles[K: SpatialComponent : Boundable : ClassTag](preprocessedRdd: RDD[(K, MultibandTile)] with Metadata[TileLayerMetadata[K]],gridBounds: GridBounds[Int], compression: Compression): (collection.Map[Int, Array[Byte]], CellType, Double, Int) = { @@ -784,7 +811,7 @@ package object geotiff { path: String, cropBounds: Option[Map[String, Double]], cropDimensions: Option[ArrayList[Int]], - compression: Compression): Extent = { + compression: Compression): Item = { val contextRDD = ContextRDD(rdd, rdd.metadata) val stitched: Raster[MultibandTile] = contextRDD.stitch() @@ -811,7 +838,9 @@ package object geotiff { .withOverviews(NearestNeighbor) writeGeoTiff(geoTiff, path, gtiffOptions = None) - adjusted.extent + + Item(id = UUID.randomUUID().toString, datetime = null, bbox = adjusted.extent, + Collections.singletonMap("openEO", Asset(path))) } def saveStitchedTileGrid( @@ -821,7 +850,7 @@ package object geotiff { cropBounds: Option[Map[String, Double]], cropDimensions: Option[ArrayList[Int]], compression: Compression) - : java.util.List[(String, Extent)] = { + : JList[Item] = { val features = TileGrid.computeFeaturesForTileGrid(tileGrid, ProjectedExtent(rdd.metadata.extent,rdd.metadata.crs)) def newFilePath(path: String, tileId: String) = { @@ -850,19 +879,25 @@ package object geotiff { val executorAttemptDirectory = createExecutorAttemptDirectory(Path.of(path).getParent) val filePath = executorAttemptDirectory + "/" + newFilePath(Path.of(path).getFileName.toString, tileId) - (stitchAndWriteToTiff(tiles, filePath, layout, crs, extent, croppedExtent, cropDimensions, compression), extent) + (stitchAndWriteToTiff(tiles, filePath, layout, crs, extent, croppedExtent, cropDimensions, compression), tileId, extent) }.collect() val res = geotiffResults.map { - case (geoTiffResultObject, croppedExtent) => + case (geoTiffResultObject, tileId, croppedExtent) => val destinationPath = moveFromExecutorAttemptDirectory(Path.of(path).getParent, geoTiffResultObject) - (destinationPath.toString, croppedExtent) - }.toList.asJava - for ((geotiffResult, _) <- geotiffResults) { + (destinationPath, tileId, croppedExtent) + } + + val items = res.map { case (path, tileId, extent) => + Item(id = s"${UUID.randomUUID()}_$tileId", datetime = null, bbox = extent, + assets = Collections.singletonMap("openEO", Asset(path))) + } + + for ((geotiffResult, _, _) <- geotiffResults) { val successfulExecutorAttemptDirectory = extractExecutorAttemptDirectory(Path.of(path).getParent, geotiffResult) CreoS3Utils.assetDeleteFolders(List(successfulExecutorAttemptDirectory)) } - res + items.toList.asJava } private def stitchAndWriteToTiff(tiles: Iterable[(SpatialKey, MultibandTile)], filePath: String, @@ -932,47 +967,22 @@ package object geotiff { polygons: ProjectedPolygons, sampleNames: JList[String], compression: Compression, - ): JList[(String, String, Extent)] = - saveSamples(rdd, path, polygons, sampleNames, compression, None) - - def saveSamples(rdd: MultibandTileLayerRDD[SpaceTimeKey], - path: String, - polygons: ProjectedPolygons, - sampleNames: JList[String], - compression: Compression, - filenamePrefix: Option[String], - ): JList[(String, String, Extent)] = { + filenamePrefix: Option[String] = None, + ): JList[Item] = { val reprojected = ProjectedPolygons.reproject(polygons, rdd.metadata.crs) val features = sampleNames.asScala.zip(reprojected.polygons) - groupByFeatureAndWriteToTiff(rdd, Option.empty, features, path, Option.empty, compression, filenamePrefix) + groupByFeatureAndWriteToTiff(rdd, cropBounds = None, features, path, cropDimensions = None, compression, filenamePrefix) } def saveStitchedTileGridTemporal(rdd: MultibandTileLayerRDD[SpaceTimeKey], path: String, tileGrid: String, compression: Compression, - filenamePrefix: Option[String], - ): java.util.List[(String, String, Extent)] = - geotrellis.geotiff.saveStitchedTileGridTemporal(rdd, path, tileGrid, Option.empty, Option.empty, compression, filenamePrefix) - - def saveStitchedTileGridTemporal(rdd: MultibandTileLayerRDD[SpaceTimeKey], - path: String, - tileGrid: String, - compression: Compression, - ): java.util.List[(String, String, Extent)] = - geotrellis.geotiff.saveStitchedTileGridTemporal(rdd, path, tileGrid, Option.empty, Option.empty, compression) - - def saveStitchedTileGridTemporal(rdd: MultibandTileLayerRDD[SpaceTimeKey], - path: String, - tileGrid: String, - cropBounds: Option[Map[String, Double]], - cropDimensions: Option[ArrayList[Int]], - compression: Compression, filenamePrefix: Option[String] = None, - ): java.util.List[(String, String, Extent)] = { + ): JList[Item] = { val features = TileGrid.computeFeaturesForTileGrid(tileGrid, ProjectedExtent(rdd.metadata.extent, rdd.metadata.crs)) - .map { case (s, extent) => (s, extent.toPolygon()) } - groupByFeatureAndWriteToTiff(rdd, cropBounds, features, path, cropDimensions, compression, filenamePrefix) + .map { case (name, extent) => (name, extent.toPolygon()) } + groupByFeatureAndWriteToTiff(rdd, cropBounds = None, features, path, cropDimensions = None, compression, filenamePrefix) } private def groupByFeatureAndWriteToTiff(rdd: MultibandTileLayerRDD[SpaceTimeKey], @@ -982,7 +992,7 @@ package object geotiff { cropDimensions: Option[ArrayList[Int]], compression: Compression, filenamePrefix: Option[String] = None, - ): java.util.List[(String, String, Extent)] = { + ): JList[Item] = { val featuresBC: Broadcast[Seq[(String, Geometry)]] = SparkContext.getOrCreate().broadcast(features) val croppedExtent = cropBounds.map(toExtent) @@ -990,7 +1000,7 @@ package object geotiff { val layout = rdd.metadata.layout val crs = rdd.metadata.crs - rdd + val ret = rdd .flatMap { case (key, tile) => featuresBC.value .filter { case (_, geometry) => layout.mapTransform.keysForGeometry(geometry) contains key.spatialKey } .map { case (name, geometry) => ((name, (geometry, key.time)), (key.spatialKey, tile)) } @@ -1001,10 +1011,16 @@ package object geotiff { val filePath = Paths.get(path).resolve(filename).toString val timestamp = time format DateTimeFormatter.ISO_ZONED_DATE_TIME (stitchAndWriteToTiff(tiles, filePath, layout, crs, geometry, croppedExtent, cropDimensions, compression).correctPath, - timestamp, geometry.extent) + timestamp, geometry.extent, name) } .collect() - .toList.asJava + + val items = for { + (path, timestamp, extent, name) <- ret + } yield Item(id = f"${UUID.randomUUID()}_${timestamp}_$name", datetime = timestamp, bbox = extent, + assets = Collections.singletonMap("openEO", Asset(path))) + + items.toList.asJava } private[geotrellis] case class GeoTiffResultObject(correctPath: String, fileExists: Boolean, gdalInfoPath: Option[String]) diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/netcdf/NetCDFRDDWriter.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/netcdf/NetCDFRDDWriter.scala index 9fb915cd4..7b71662e8 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/netcdf/NetCDFRDDWriter.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/netcdf/NetCDFRDDWriter.scala @@ -17,6 +17,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.openeo.geotrellis.creo.CreoS3Utils import org.openeo.geotrellis.geotiff.preProcess +import org.openeo.geotrellis.stac.{Asset, Item} import org.openeo.geotrellis.{OpenEOProcesses, ProjectedPolygons, TemporalResolution} import org.openeo.geotrelliscommon.ByKeyPartitioner import org.slf4j.LoggerFactory @@ -33,7 +34,7 @@ import java.nio.file.{Files, Path, Paths} import java.time.format.DateTimeFormatter import java.time.{Duration, ZoneOffset, ZonedDateTime} import java.util -import java.util.{ArrayList, Collections} +import java.util.{ArrayList, Collections, UUID} import scala.collection.JavaConverters._ import scala.reflect.ClassTag @@ -71,7 +72,7 @@ object NetCDFRDDWriter { override def iterator: Iterator[(K, V)] = tiles.iterator } - def writeRasters(rdd:Object,path:String,options:NetCDFOptions): java.util.List[String] = { + def writeRasters(rdd:Object,path:String,options:NetCDFOptions): java.util.List[Item] = { rdd match { case rdd1 if rdd.asInstanceOf[MultibandTileLayerRDD[SpaceTimeKey]].metadata.bounds.get.maxKey.isInstanceOf[SpatialKey] => @@ -89,7 +90,7 @@ object NetCDFRDDWriter { dimensionNames: java.util.Map[String,String], attributes: java.util.Map[String,String], zLevel:Int - ): java.util.List[String] = { + ): java.util.List[Item] = { saveSingleNetCDFGeneric(rdd,path,bandNames, dimensionNames, attributes, zLevel) } @@ -99,12 +100,12 @@ object NetCDFRDDWriter { dimensionNames: java.util.Map[String,String], attributes: java.util.Map[String,String], zLevel:Int - ): java.util.List[String] = { + ): java.util.List[Item] = { saveSingleNetCDFGeneric(rdd,path,bandNames, dimensionNames, attributes, zLevel) } - def saveSingleNetCDFGeneric[K: SpatialComponent: Boundable : ClassTag](rdd: MultibandTileLayerRDD[K], path:String, options:NetCDFOptions): java.util.List[String] = { + def saveSingleNetCDFGeneric[K: SpatialComponent: Boundable : ClassTag](rdd: MultibandTileLayerRDD[K], path:String, options:NetCDFOptions): java.util.List[Item] = { saveSingleNetCDFGeneric(rdd,path,options.bandNames.orNull,options.dimensionNames.orNull,options.attributes.orNull,options.zLevel,options.cropBounds) } @@ -115,7 +116,7 @@ object NetCDFRDDWriter { attributes: java.util.Map[String,String], zLevel:Int, cropBounds:Option[Extent]= None - ): java.util.List[String] = { + ): java.util.List[Item] = { val preProcessResult: (GridBounds[Int], Extent, RDD[(K, MultibandTile)] with Metadata[TileLayerMetadata[K]]) = preProcess(rdd,cropBounds) val extent = preProcessResult._2 @@ -243,22 +244,25 @@ object NetCDFRDDWriter { cachedRDD.unpersist(blocking = false) val finalPath = - if (path.startsWith("s3:/")) { - // TODO: Change spark-jobs-staging-disabled back to spark-jobs-staging - if(rdd.context.getConf.get("spark.kubernetes.namespace","nothing").equals("spark-jobs-staging-disabled")) { - uploadToS3LargeFile(path, intermediatePath) - }else{ - uploadToS3(path, intermediatePath) + if (path.startsWith("s3:/")) { + // TODO: Change spark-jobs-staging-disabled back to spark-jobs-staging + if(rdd.context.getConf.get("spark.kubernetes.namespace","nothing").equals("spark-jobs-staging-disabled")) { + uploadToS3LargeFile(path, intermediatePath) + }else{ + uploadToS3(path, intermediatePath) + } + }else if(forceTempFile) { + Files.move(Paths.get(intermediatePath),Paths.get(path),java.nio.file.StandardCopyOption.REPLACE_EXISTING) + path } - }else if(forceTempFile) { - Files.move(Paths.get(intermediatePath),Paths.get(path),java.nio.file.StandardCopyOption.REPLACE_EXISTING) - path - } - else{ - path - } + else{ + path + } + + val item = Item(id = UUID.randomUUID().toString, bbox = extent, datetime = null, + assets = Collections.singletonMap("openEO", Asset(finalPath))) - return Collections.singletonList(finalPath) + Collections.singletonList(item) } @@ -313,7 +317,7 @@ object NetCDFRDDWriter { polygons: ProjectedPolygons, sampleNames: ArrayList[String], bandNames: ArrayList[String], - ): java.util.List[(String, Extent)] = + ): java.util.List[Item] = saveSamples(rdd, path, polygons, sampleNames, bandNames, dimensionNames = null, attributes = null) // Overload to avoid: "multiple overloaded alternatives of method saveSamples define default arguments" @@ -323,7 +327,7 @@ object NetCDFRDDWriter { sampleNames: ArrayList[String], bandNames: ArrayList[String], filenamePrefix: Option[String], - ): java.util.List[(String, Extent)] = + ): java.util.List[Item] = saveSamples(rdd, path, polygons, sampleNames, bandNames, dimensionNames = null, attributes = null, filenamePrefix) def saveSamples(rdd: MultibandTileLayerRDD[SpaceTimeKey], @@ -333,7 +337,7 @@ object NetCDFRDDWriter { bandNames: ArrayList[String], dimensionNames: java.util.Map[String, String], attributes: java.util.Map[String, String], - ): java.util.List[(String, Extent)] = + ): java.util.List[Item] = saveSamples(rdd, path, polygons, sampleNames, bandNames, dimensionNames, attributes, None) def saveSamples(rdd: MultibandTileLayerRDD[SpaceTimeKey], @@ -344,7 +348,7 @@ object NetCDFRDDWriter { dimensionNames: java.util.Map[String,String], attributes: java.util.Map[String,String], filenamePrefix: Option[String], - ): java.util.List[(String, Extent)] = { + ): java.util.List[Item] = { val reprojected = ProjectedPolygons.reproject(polygons,rdd.metadata.crs) val features = sampleNames.asScala.zip(reprojected.polygons) logger.info(s"Using metadata: ${rdd.metadata}.") @@ -360,7 +364,7 @@ object NetCDFRDDWriter { dimensionNames: java.util.Map[String,String], attributes: java.util.Map[String,String], filenamePrefix: Option[String] = None, - ): java.util.List[(String, Extent)] = { + ): java.util.List[Item] = { val reprojected = ProjectedPolygons.reproject(polygons,rdd.metadata.crs) val features = sampleNames.asScala.toList.zip(reprojected.polygons.map(_.extent)) groupByFeatureAndWriteToNetCDFSpatial(rdd, features,path,bandNames,dimensionNames,attributes, filenamePrefix) @@ -371,7 +375,7 @@ object NetCDFRDDWriter { dimensionNames: java.util.Map[String,String], attributes: java.util.Map[String,String], filenamePrefix: Option[String] = None, - ): java.util.List[(String, Extent)] = { + ): java.util.List[Item] = { val featuresBC: Broadcast[Seq[(String, Geometry)]] = SparkContext.getOrCreate().broadcast(features) val crs = rdd.metadata.crs @@ -380,22 +384,21 @@ object NetCDFRDDWriter { //logger.info(s"Writing ${groupedBySample.keys.count()} samples to disk.") groupedBySample.map { case (name, tiles: Iterable[(Long, Raster[MultibandTile])]) => val outputAsPath: Path = getSamplePath(name, path, filenamePrefix) - val filePath = outputAsPath.toString // Sort by date before writing. - val sorted = tiles.toSeq.sortBy(_._1) - val dates = sorted.map( t=> ZonedDateTime.ofInstant(t._1, ZoneOffset.UTC)) - logger.info(s"Writing ${name} with dates ${dates}.") + val sorted = tiles.toSeq.sortBy { case (instant, _) => instant } + val dates = sorted.map { case (instant, _) => ZonedDateTime.ofInstant(instant, ZoneOffset.UTC) } + logger.info(s"Writing $name with dates $dates.") val extent = sorted.head._2.extent - try{ - (writeToDisk(sorted.map(_._2), dates, filePath, bandNames, crs, dimensionNames, attributes),extent) - }catch { - case t: IOException => { - (handleSampleWriteError(t, name, outputAsPath),extent) - } - case t: Throwable => throw t + + val assetPath = try { + writeToDisk(sorted.map(_._2), dates, outputAsPath.toString, bandNames, crs, dimensionNames, attributes) + } catch { + case e: IOException => handleSampleWriteError(e, name, outputAsPath) } + Item(id = UUID.randomUUID().toString, datetime = null , bbox = extent, + assets = Collections.singletonMap("openEO", Asset(path = assetPath))) }.collect() .toList.asJava } @@ -464,7 +467,7 @@ object NetCDFRDDWriter { dimensionNames: java.util.Map[String,String], attributes: java.util.Map[String,String], filenamePrefix: Option[String], - ): java.util.List[(String, Extent)] = { + ): java.util.List[Item] = { val featuresBC: Broadcast[List[(String, Extent)]] = SparkContext.getOrCreate().broadcast(features) val layout = rdd.metadata.layout val crs = rdd.metadata.crs @@ -474,17 +477,19 @@ object NetCDFRDDWriter { val outputAsPath: Path = getSamplePath(name, path, filenamePrefix) val sample: Raster[MultibandTile] = stitchAndCropTiles(tiles, extent, layout) - try{ - (writeToDisk(Seq(sample), dates = null, outputAsPath.toString, bandNames, crs, dimensionNames, attributes), - extent.extent) + val assetPath = try { + writeToDisk(Seq(sample), dates = null, outputAsPath.toString, bandNames, crs, dimensionNames, attributes) } catch { - case t: IOException => (handleSampleWriteError(t, name, outputAsPath), extent.extent) + case e: IOException => handleSampleWriteError(e, name, outputAsPath) } + + Item(id = UUID.randomUUID().toString, datetime = null, bbox = extent.extent, + assets = Collections.singletonMap("openEO", Asset(assetPath))) }.collect() .toList.asJava } - private def handleSampleWriteError(t: IOException, sampleName: String, outputAsPath: Path) = { + private def handleSampleWriteError(t: IOException, sampleName: String, outputAsPath: Path): String = { logger.error("Failed to write sample: " + sampleName, t) val theFile = outputAsPath.toFile if (theFile.exists()) { diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/stac/Asset.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/stac/Asset.scala new file mode 100644 index 000000000..a3ff096a0 --- /dev/null +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/stac/Asset.scala @@ -0,0 +1,5 @@ +package org.openeo.geotrellis.stac + +import java.util + +case class Asset(path: String, bandIndices: util.List[Int] = null) diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/stac/Item.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/stac/Item.scala new file mode 100644 index 000000000..1cc6c6206 --- /dev/null +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/stac/Item.scala @@ -0,0 +1,7 @@ +package org.openeo.geotrellis.stac + +import geotrellis.vector.Extent + +import java.util + +case class Item(id: String, datetime: String, bbox: Extent, assets: util.Map[String, Asset]) // TODO: order assets by key? diff --git a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/TileGridTest.scala b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/TileGridTest.scala index 6559be1f6..bc7928c42 100644 --- a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/TileGridTest.scala +++ b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/TileGridTest.scala @@ -12,6 +12,7 @@ import org.junit.jupiter.api.{BeforeAll, Test} import org.junit.{AfterClass, Assert} import org.openeo.geotrellis.LayerFixtures.rgbLayerProvider import org.openeo.geotrellis.png.PngTest +import org.openeo.geotrellis.stac.Item import org.openeo.geotrellis.tile_grid.TileGrid import org.openeo.geotrellis.{LayerFixtures, geotiff} @@ -71,7 +72,12 @@ class TileGridTest { ) // TODO: check if extents (in the layer CRS) are 10000m wide/high (in UTM) - Assert.assertEquals(expectedPaths, tiles.asScala.map { case (path, _) => path }.toSet) + val actualPaths = for { + item <- tiles.asScala + asset <- item.assets.values().asScala + } yield asset.path + + Assert.assertEquals(expectedPaths, actualPaths.toSet) val extent = bbox.reproject(spatialLayer.metadata.crs) val cropBounds = mapAsJavaMap(Map("xmin" -> extent.xmin, "xmax" -> extent.xmax, "ymin" -> extent.ymin, "ymax" -> extent.ymax)) @@ -85,7 +91,12 @@ class TileGridTest { ) // TODO: also check extents - Assert.assertEquals(expectedCroppedPaths, croppedTiles.asScala.map { case (path, _) => path }.toSet) + val actualCroppedPaths = for { + item <- croppedTiles.asScala + asset <- item.assets.values().asScala + } yield asset.path + + Assert.assertEquals(expectedCroppedPaths, actualCroppedPaths.toSet) } @@ -148,7 +159,12 @@ class TileGridTest { ("/tmp/openEO_2020-04-05Z_31UDS_2_5.tif", isoFormattedDate) ) - Assert.assertEquals(expectedTiles, tiles.asScala.map { case (path, timestamp, _) => (path, timestamp) }.toSet) + val actualTiles = for { + item <- tiles.asScala + asset <- item.assets.values().asScala + } yield (asset.path, item.datetime) + + Assert.assertEquals(expectedTiles, actualTiles.toSet) } @Test @@ -168,7 +184,12 @@ class TileGridTest { ("/tmp/testPrefix_2020-04-05Z_31UDS_2_5.tif", isoFormattedDate) ) - Assert.assertEquals(expectedTiles, tiles.asScala.map { case (path, timestamp, _) => (path, timestamp) }.toSet) + val actualTiles = for { + item <- tiles.asScala + asset <- item.assets.values().asScala + } yield (asset.path, item.datetime) + + Assert.assertEquals(expectedTiles, actualTiles.toSet) } @Test diff --git a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/WriteRDDToGeotiffTest.scala b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/WriteRDDToGeotiffTest.scala index ad9eecb38..c99348650 100644 --- a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/WriteRDDToGeotiffTest.scala +++ b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/WriteRDDToGeotiffTest.scala @@ -547,7 +547,7 @@ class WriteRDDToGeotiffTest { val ret = saveSamples(tileLayerRDD, outDir.toString, tiltedRectangle, sampleNames, DeflateCompression(BEST_COMPRESSION)) - assertTrue(ret.get(0)._2.contains("T")) + assertTrue(ret.get(0).datetime.contains("T")) } @Test diff --git a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/netcdf/NetCDFRDDWriterTest.scala b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/netcdf/NetCDFRDDWriterTest.scala index 7863d2b84..4895e3440 100644 --- a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/netcdf/NetCDFRDDWriterTest.scala +++ b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/netcdf/NetCDFRDDWriterTest.scala @@ -17,6 +17,7 @@ import org.junit.Assert.{assertFalse, assertTrue} import org.junit._ import org.junit.rules.TemporaryFolder import org.openeo.geotrellis.TemporalResolution +import org.openeo.geotrellis.stac.Item import org.openeo.geotrellis.{LayerFixtures, ProjectedPolygons} import org.openeo.geotrelliscommon.{ByKeyPartitioner, DataCubeParameters, SparseSpaceTimePartitioner} import org.slf4j.LoggerFactory @@ -27,7 +28,7 @@ import java.time.ZoneOffset.UTC import java.time.{LocalDate, ZonedDateTime} import java.util import scala.annotation.meta.getter -import scala.collection.JavaConverters.asScalaBufferConverter +import scala.collection.JavaConverters._ import scala.io.Source @@ -97,18 +98,17 @@ class NetCDFRDDWriterTest extends RasterMatchers{ val targetDir = temporaryFolder.getRoot.toString - val sampleFilenames: util.List[(String, Extent)] = NetCDFRDDWriter.saveSamples(layer, targetDir, polygonsUTM31, + val sampleFilenames: util.List[String] = assetFileNames(NetCDFRDDWriter.saveSamples(layer, targetDir, polygonsUTM31, sampleNameList, new util.ArrayList(util.Arrays.asList("TOC-B04_10M", "TOC-B03_10M", "TOC-B02_10M", - "SCENECLASSIFICATION_20M")), - Some("prefixTest")) + "SCENECLASSIFICATION_20M")), Some("prefixTest"))) - val expectedPaths = List(s"$targetDir/prefixTest_0.nc", s"$targetDir/prefixTest_1.nc") + val expectedPaths = util.Arrays.asList(s"$targetDir/prefixTest_0.nc", s"$targetDir/prefixTest_1.nc") - Assert.assertEquals(sampleFilenames.asScala.map(_._1).groupBy(identity), expectedPaths.groupBy(identity)) + Assert.assertEquals(expectedPaths, sampleFilenames) // note: tests first geometry only val bandName = "TOC-B04_10M" - val rasterSource = GDALRasterSource(s"""NETCDF:"${expectedPaths.head}":$bandName""") + val rasterSource = GDALRasterSource(s"""NETCDF:"${expectedPaths.get(0)}":$bandName""") val Some(multiBandRaster) = rasterSource.read() val raster = multiBandRaster.mapTile(_.band(0)) // first timestamp @@ -176,12 +176,12 @@ class NetCDFRDDWriterTest extends RasterMatchers{ val targetDir = temporaryFolder.getRoot.toString - val sampleFilenames: util.List[(String, Extent)] = NetCDFRDDWriter.saveSamples( + val sampleFilenames: util.List[String] = assetFileNames(NetCDFRDDWriter.saveSamples( layer, targetDir, polygons, sampleNameList, bandNames - ) + )) - val raster1: Raster[MultibandTile] = GDALRasterSource(s"""NETCDF:${sampleFilenames.get(0)._1}:TOC-B04_10M""").read().get - val raster2: Raster[MultibandTile] = GDALRasterSource(s"""NETCDF:${sampleFilenames.get(1)._1}:TOC-B04_10M""").read().get + val raster1: Raster[MultibandTile] = GDALRasterSource(s"""NETCDF:${sampleFilenames.get(0)}:TOC-B04_10M""").read().get + val raster2: Raster[MultibandTile] = GDALRasterSource(s"""NETCDF:${sampleFilenames.get(1)}:TOC-B04_10M""").read().get // Compare raster extents. //assert(raster1.extent.width == 2560.0) @@ -241,14 +241,20 @@ class NetCDFRDDWriterTest extends RasterMatchers{ null, null, Some("prefixTest"), - ).asScala + ).stream() + .flatMap { item => + item.assets.values().stream().map[(String, Extent)] { asset => + (asset.path, item.bbox) + } + } + .collect(util.stream.Collectors.toSet()) val expectedSamples = Set( ("/tmp/prefixTest_0.nc", polygonsUTM31.polygons(0).extent), ("/tmp/prefixTest_1.nc", polygonsUTM31.polygons(1).extent), ) - Assert.assertEquals(expectedSamples, samples.toSet) + Assert.assertEquals(expectedSamples.asJava, samples) } @Ignore @@ -266,12 +272,19 @@ class NetCDFRDDWriterTest extends RasterMatchers{ val layer = LayerFixtures.sentinel2TocLayerProviderUTM.readMultibandTileLayer(date,date.plusDays(10),bbox,Array(MultiPolygon(bbox.extent.toPolygon())),bbox.crs,13,sc,datacubeParams = Some(dcParams)) - val sampleFilenames: util.List[String] = NetCDFRDDWriter.saveSingleNetCDF(layer,"/tmp/stitched.nc", new util.ArrayList(util.Arrays.asList("TOC-B04_10M", "TOC-B03_10M", "TOC-B02_10M", "SCENECLASSIFICATION_20M")),null,null,6) - val expectedPaths = List("/tmp/stitched.nc") + val sampleFilenames: util.List[String] = assetFileNames(NetCDFRDDWriter.saveSingleNetCDF(layer,"/tmp/stitched.nc", new util.ArrayList(util.Arrays.asList("TOC-B04_10M", "TOC-B03_10M", "TOC-B02_10M", "SCENECLASSIFICATION_20M")),null,null,6)) + val expectedPaths = util.Collections.singletonList("/tmp/stitched.nc") - Assert.assertEquals(sampleFilenames.asScala.groupBy(identity), expectedPaths.groupBy(identity)) + Assert.assertEquals(expectedPaths, sampleFilenames) } + private def assetFileNames(items: util.List[Item]): util.List[String] = + items.stream() + .flatMap { item => + item.assets.values().stream().map[String] { asset => asset.path } + } + .collect(util.stream.Collectors.toList()) + @Test def testWriteSingleNetCDFLarge(): Unit = { @@ -282,10 +295,10 @@ class NetCDFRDDWriterTest extends RasterMatchers{ val options = new NetCDFOptions options.setBandNames(new util.ArrayList(util.Arrays.asList("TOC-B04_10M", "TOC-B03_10M", "TOC-B02_10M"))) - val sampleFilenames: util.List[String] = NetCDFRDDWriter.writeRasters(layer,"/tmp/stitched.nc",options) - val expectedPaths = List("/tmp/stitched.nc") + val sampleFilenames: util.List[String] = assetFileNames(NetCDFRDDWriter.writeRasters(layer,"/tmp/stitched.nc",options)) + val expectedPaths = util.Collections.singletonList("/tmp/stitched.nc") - Assert.assertEquals(sampleFilenames.asScala.groupBy(identity), expectedPaths.groupBy(identity)) + Assert.assertEquals(expectedPaths, sampleFilenames) val ds = NetcdfDataset.openDataset("/tmp/stitched.nc",true,null) val b04 = ds.findVariable("TOC-B04_10M") @@ -300,9 +313,9 @@ class NetCDFRDDWriterTest extends RasterMatchers{ options.setBandNames(new util.ArrayList(util.Arrays.asList("TOC-B04_10M", "TOC-B03_10M", "TOC-B02_10M"))) val layerDefault= LayerFixtures.aSpacetimeTileLayerRddShortFillValue(20,20) - val sampleFilenames: util.List[String] = NetCDFRDDWriter.writeRasters(layerDefault,"/tmp/stitched.nc",options) - val expectedPaths = List("/tmp/stitched.nc") - Assert.assertEquals(sampleFilenames.asScala.groupBy(identity), expectedPaths.groupBy(identity)) + val sampleFilenames: util.List[String] = assetFileNames(NetCDFRDDWriter.writeRasters(layerDefault,"/tmp/stitched.nc",options)) + val expectedPaths = util.Collections.singletonList("/tmp/stitched.nc") + Assert.assertEquals(expectedPaths, sampleFilenames) val ds = NetcdfDataset.openDataset("/tmp/stitched.nc",true,null) val b04 = ds.findVariable("TOC-B04_10M") @@ -344,8 +357,8 @@ class NetCDFRDDWriterTest extends RasterMatchers{ val layerChosen= LayerFixtures.aSpacetimeTileLayerRddShortFillValue(20,20,fillValue = 9) - val sampleFilenamesChosen: util.List[String] = NetCDFRDDWriter.writeRasters(layerChosen,"/tmp/stitched.nc",options) - Assert.assertEquals(sampleFilenamesChosen.asScala.groupBy(identity), expectedPaths.groupBy(identity)) + val sampleFilenamesChosen: util.List[String] = assetFileNames(NetCDFRDDWriter.writeRasters(layerChosen,"/tmp/stitched.nc",options)) + Assert.assertEquals(expectedPaths, sampleFilenamesChosen) val dsChosen = NetcdfDataset.openDataset("/tmp/stitched.nc",true,null) val b04Chosen = dsChosen.findVariable("TOC-B04_10M") @@ -392,10 +405,10 @@ class NetCDFRDDWriterTest extends RasterMatchers{ val (image,layer) = LayerFixtures.createLayerWithGaps(5,5) - val sampleFilenames: util.List[String] = NetCDFRDDWriter.saveSingleNetCDFSpatial(layer,"/tmp/stitched.nc", new util.ArrayList(util.Arrays.asList("TOC-B04_10M", "TOC-B03_10M", "TOC-B02_10M")),null,null,6) - val expectedPaths = List("/tmp/stitched.nc") + val sampleFilenames: util.List[String] = assetFileNames(NetCDFRDDWriter.saveSingleNetCDFSpatial(layer,"/tmp/stitched.nc", new util.ArrayList(util.Arrays.asList("TOC-B04_10M", "TOC-B03_10M", "TOC-B02_10M")),null,null,6)) + val expectedPaths = util.Collections.singletonList("/tmp/stitched.nc") - Assert.assertEquals(sampleFilenames.asScala.groupBy(identity), expectedPaths.groupBy(identity)) + Assert.assertEquals(expectedPaths, sampleFilenames) val ds = NetcdfDataset.openDataset("/tmp/stitched.nc",true,null) val b04 = ds.findVariable("TOC-B04_10M") @@ -424,12 +437,12 @@ class NetCDFRDDWriterTest extends RasterMatchers{ crs = CRS.fromName("EPSG:3857"), dates = Some(dates) ) - val sampleFilenames: util.List[String] = NetCDFRDDWriter.saveSingleNetCDF( + val sampleFilenames: util.List[String] = assetFileNames(NetCDFRDDWriter.saveSingleNetCDF( dataCubeContextRDD, "tmp/testWriteSingleNetCDFMultipleSamplesOnADay_" + date2.toString.replace(":", "_") + ".nc", new util.ArrayList(util.Arrays.asList("band")), null, null, 6 - ) + )) val ds = NetcdfDataset.openDataset(sampleFilenames.get(0), true, null) // When the samples are in the same day, they should still be separate Assert.assertEquals(dates.length, ds.findDimension("t").getLength) @@ -451,7 +464,7 @@ class NetCDFRDDWriterTest extends RasterMatchers{ val options = new NetCDFOptions options.setBandNames(new util.ArrayList(util.Arrays.asList("NDVI"))) - val sampleFilenames: util.List[String] = NetCDFRDDWriter.writeRasters(layer,"/tmp/cgls_ndvi300.nc",options) + val sampleFilenames: util.List[String] = assetFileNames(NetCDFRDDWriter.writeRasters(layer,"/tmp/cgls_ndvi300.nc",options)) val referenceTile = GeoTiffRasterSource("https://artifactory.vgt.vito.be/artifactory/testdata-public/cgls_ndvi300.tiff").read().get val actualTile = GDALRasterSource("/tmp/cgls_ndvi300.nc").read().get