Skip to content

return items instead of files #410

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 10 commits into
base: develop
Choose a base branch
from

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.openeo.geotrellis.creo.CreoS3Utils
import org.openeo.geotrellis.geotiff.preProcess
import org.openeo.geotrellis.stac.{Asset, Item}
import org.openeo.geotrellis.{OpenEOProcesses, ProjectedPolygons, TemporalResolution}
import org.openeo.geotrelliscommon.ByKeyPartitioner
import org.slf4j.LoggerFactory
Expand All @@ -33,7 +34,7 @@ import java.nio.file.{Files, Path, Paths}
import java.time.format.DateTimeFormatter
import java.time.{Duration, ZoneOffset, ZonedDateTime}
import java.util
import java.util.{ArrayList, Collections}
import java.util.{ArrayList, Collections, UUID}
import scala.collection.JavaConverters._
import scala.reflect.ClassTag

Expand Down Expand Up @@ -71,7 +72,7 @@ object NetCDFRDDWriter {
override def iterator: Iterator[(K, V)] = tiles.iterator
}

def writeRasters(rdd:Object,path:String,options:NetCDFOptions): java.util.List[String] = {
def writeRasters(rdd:Object,path:String,options:NetCDFOptions): java.util.List[Item] = {

rdd match {
case rdd1 if rdd.asInstanceOf[MultibandTileLayerRDD[SpaceTimeKey]].metadata.bounds.get.maxKey.isInstanceOf[SpatialKey] =>
Expand All @@ -89,7 +90,7 @@ object NetCDFRDDWriter {
dimensionNames: java.util.Map[String,String],
attributes: java.util.Map[String,String],
zLevel:Int
): java.util.List[String] = {
): java.util.List[Item] = {
saveSingleNetCDFGeneric(rdd,path,bandNames, dimensionNames, attributes, zLevel)
}

Expand All @@ -99,12 +100,12 @@ object NetCDFRDDWriter {
dimensionNames: java.util.Map[String,String],
attributes: java.util.Map[String,String],
zLevel:Int
): java.util.List[String] = {
): java.util.List[Item] = {

saveSingleNetCDFGeneric(rdd,path,bandNames, dimensionNames, attributes, zLevel)
}

def saveSingleNetCDFGeneric[K: SpatialComponent: Boundable : ClassTag](rdd: MultibandTileLayerRDD[K], path:String, options:NetCDFOptions): java.util.List[String] = {
def saveSingleNetCDFGeneric[K: SpatialComponent: Boundable : ClassTag](rdd: MultibandTileLayerRDD[K], path:String, options:NetCDFOptions): java.util.List[Item] = {
saveSingleNetCDFGeneric(rdd,path,options.bandNames.orNull,options.dimensionNames.orNull,options.attributes.orNull,options.zLevel,options.cropBounds)
}

Expand All @@ -115,7 +116,7 @@ object NetCDFRDDWriter {
attributes: java.util.Map[String,String],
zLevel:Int,
cropBounds:Option[Extent]= None
): java.util.List[String] = {
): java.util.List[Item] = {

val preProcessResult: (GridBounds[Int], Extent, RDD[(K, MultibandTile)] with Metadata[TileLayerMetadata[K]]) = preProcess(rdd,cropBounds)
val extent = preProcessResult._2
Expand Down Expand Up @@ -243,22 +244,25 @@ object NetCDFRDDWriter {
cachedRDD.unpersist(blocking = false)

val finalPath =
if (path.startsWith("s3:/")) {
// TODO: Change spark-jobs-staging-disabled back to spark-jobs-staging
if(rdd.context.getConf.get("spark.kubernetes.namespace","nothing").equals("spark-jobs-staging-disabled")) {
uploadToS3LargeFile(path, intermediatePath)
}else{
uploadToS3(path, intermediatePath)
if (path.startsWith("s3:/")) {
// TODO: Change spark-jobs-staging-disabled back to spark-jobs-staging
if(rdd.context.getConf.get("spark.kubernetes.namespace","nothing").equals("spark-jobs-staging-disabled")) {
uploadToS3LargeFile(path, intermediatePath)
}else{
uploadToS3(path, intermediatePath)
}
}else if(forceTempFile) {
Files.move(Paths.get(intermediatePath),Paths.get(path),java.nio.file.StandardCopyOption.REPLACE_EXISTING)
path
}
}else if(forceTempFile) {
Files.move(Paths.get(intermediatePath),Paths.get(path),java.nio.file.StandardCopyOption.REPLACE_EXISTING)
path
}
else{
path
}
else{
path
}

val item = Item(id = UUID.randomUUID().toString, bbox = extent, datetime = null,
assets = Collections.singletonMap("openEO", Asset(finalPath)))

return Collections.singletonList(finalPath)
Collections.singletonList(item)
}


Expand Down Expand Up @@ -313,7 +317,7 @@ object NetCDFRDDWriter {
polygons: ProjectedPolygons,
sampleNames: ArrayList[String],
bandNames: ArrayList[String],
): java.util.List[(String, Extent)] =
): java.util.List[Item] =
saveSamples(rdd, path, polygons, sampleNames, bandNames, dimensionNames = null, attributes = null)

// Overload to avoid: "multiple overloaded alternatives of method saveSamples define default arguments"
Expand All @@ -323,7 +327,7 @@ object NetCDFRDDWriter {
sampleNames: ArrayList[String],
bandNames: ArrayList[String],
filenamePrefix: Option[String],
): java.util.List[(String, Extent)] =
): java.util.List[Item] =
saveSamples(rdd, path, polygons, sampleNames, bandNames, dimensionNames = null, attributes = null, filenamePrefix)

def saveSamples(rdd: MultibandTileLayerRDD[SpaceTimeKey],
Expand All @@ -333,7 +337,7 @@ object NetCDFRDDWriter {
bandNames: ArrayList[String],
dimensionNames: java.util.Map[String, String],
attributes: java.util.Map[String, String],
): java.util.List[(String, Extent)] =
): java.util.List[Item] =
saveSamples(rdd, path, polygons, sampleNames, bandNames, dimensionNames, attributes, None)

def saveSamples(rdd: MultibandTileLayerRDD[SpaceTimeKey],
Expand All @@ -344,7 +348,7 @@ object NetCDFRDDWriter {
dimensionNames: java.util.Map[String,String],
attributes: java.util.Map[String,String],
filenamePrefix: Option[String],
): java.util.List[(String, Extent)] = {
): java.util.List[Item] = {
val reprojected = ProjectedPolygons.reproject(polygons,rdd.metadata.crs)
val features = sampleNames.asScala.zip(reprojected.polygons)
logger.info(s"Using metadata: ${rdd.metadata}.")
Expand All @@ -360,7 +364,7 @@ object NetCDFRDDWriter {
dimensionNames: java.util.Map[String,String],
attributes: java.util.Map[String,String],
filenamePrefix: Option[String] = None,
): java.util.List[(String, Extent)] = {
): java.util.List[Item] = {
val reprojected = ProjectedPolygons.reproject(polygons,rdd.metadata.crs)
val features = sampleNames.asScala.toList.zip(reprojected.polygons.map(_.extent))
groupByFeatureAndWriteToNetCDFSpatial(rdd, features,path,bandNames,dimensionNames,attributes, filenamePrefix)
Expand All @@ -371,7 +375,7 @@ object NetCDFRDDWriter {
dimensionNames: java.util.Map[String,String],
attributes: java.util.Map[String,String],
filenamePrefix: Option[String] = None,
): java.util.List[(String, Extent)] = {
): java.util.List[Item] = {
val featuresBC: Broadcast[Seq[(String, Geometry)]] = SparkContext.getOrCreate().broadcast(features)

val crs = rdd.metadata.crs
Expand All @@ -380,22 +384,21 @@ object NetCDFRDDWriter {
//logger.info(s"Writing ${groupedBySample.keys.count()} samples to disk.")
groupedBySample.map { case (name, tiles: Iterable[(Long, Raster[MultibandTile])]) =>
val outputAsPath: Path = getSamplePath(name, path, filenamePrefix)
val filePath = outputAsPath.toString

// Sort by date before writing.
val sorted = tiles.toSeq.sortBy(_._1)
val dates = sorted.map( t=> ZonedDateTime.ofInstant(t._1, ZoneOffset.UTC))
logger.info(s"Writing ${name} with dates ${dates}.")
val sorted = tiles.toSeq.sortBy { case (instant, _) => instant }
val dates = sorted.map { case (instant, _) => ZonedDateTime.ofInstant(instant, ZoneOffset.UTC) }
logger.info(s"Writing $name with dates $dates.")
val extent = sorted.head._2.extent
try{
(writeToDisk(sorted.map(_._2), dates, filePath, bandNames, crs, dimensionNames, attributes),extent)
}catch {
case t: IOException => {
(handleSampleWriteError(t, name, outputAsPath),extent)
}
case t: Throwable => throw t

val assetPath = try {
writeToDisk(sorted.map(_._2), dates, outputAsPath.toString, bandNames, crs, dimensionNames, attributes)
} catch {
case e: IOException => handleSampleWriteError(e, name, outputAsPath)
}

Item(id = UUID.randomUUID().toString, datetime = null , bbox = extent,
assets = Collections.singletonMap("openEO", Asset(path = assetPath)))
}.collect()
.toList.asJava
}
Expand Down Expand Up @@ -464,7 +467,7 @@ object NetCDFRDDWriter {
dimensionNames: java.util.Map[String,String],
attributes: java.util.Map[String,String],
filenamePrefix: Option[String],
): java.util.List[(String, Extent)] = {
): java.util.List[Item] = {
val featuresBC: Broadcast[List[(String, Extent)]] = SparkContext.getOrCreate().broadcast(features)
val layout = rdd.metadata.layout
val crs = rdd.metadata.crs
Expand All @@ -474,17 +477,19 @@ object NetCDFRDDWriter {
val outputAsPath: Path = getSamplePath(name, path, filenamePrefix)
val sample: Raster[MultibandTile] = stitchAndCropTiles(tiles, extent, layout)

try{
(writeToDisk(Seq(sample), dates = null, outputAsPath.toString, bandNames, crs, dimensionNames, attributes),
extent.extent)
val assetPath = try {
writeToDisk(Seq(sample), dates = null, outputAsPath.toString, bandNames, crs, dimensionNames, attributes)
} catch {
case t: IOException => (handleSampleWriteError(t, name, outputAsPath), extent.extent)
case e: IOException => handleSampleWriteError(e, name, outputAsPath)
}

Item(id = UUID.randomUUID().toString, datetime = null, bbox = extent.extent,
assets = Collections.singletonMap("openEO", Asset(assetPath)))
}.collect()
.toList.asJava
}

private def handleSampleWriteError(t: IOException, sampleName: String, outputAsPath: Path) = {
private def handleSampleWriteError(t: IOException, sampleName: String, outputAsPath: Path): String = {
logger.error("Failed to write sample: " + sampleName, t)
val theFile = outputAsPath.toFile
if (theFile.exists()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.openeo.geotrellis.stac

import java.util

case class Asset(path: String, bandIndices: util.List[Int] = null)
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.openeo.geotrellis.stac

import geotrellis.vector.Extent

import java.util

case class Item(id: String, datetime: String, bbox: Extent, assets: util.Map[String, Asset]) // TODO: order assets by key?
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.junit.jupiter.api.{BeforeAll, Test}
import org.junit.{AfterClass, Assert}
import org.openeo.geotrellis.LayerFixtures.rgbLayerProvider
import org.openeo.geotrellis.png.PngTest
import org.openeo.geotrellis.stac.Item
import org.openeo.geotrellis.tile_grid.TileGrid
import org.openeo.geotrellis.{LayerFixtures, geotiff}

Expand Down Expand Up @@ -71,7 +72,12 @@ class TileGridTest {
)

// TODO: check if extents (in the layer CRS) are 10000m wide/high (in UTM)
Assert.assertEquals(expectedPaths, tiles.asScala.map { case (path, _) => path }.toSet)
val actualPaths = for {
item <- tiles.asScala
asset <- item.assets.values().asScala
} yield asset.path

Assert.assertEquals(expectedPaths, actualPaths.toSet)

val extent = bbox.reproject(spatialLayer.metadata.crs)
val cropBounds = mapAsJavaMap(Map("xmin" -> extent.xmin, "xmax" -> extent.xmax, "ymin" -> extent.ymin, "ymax" -> extent.ymax))
Expand All @@ -85,7 +91,12 @@ class TileGridTest {
)

// TODO: also check extents
Assert.assertEquals(expectedCroppedPaths, croppedTiles.asScala.map { case (path, _) => path }.toSet)
val actualCroppedPaths = for {
item <- croppedTiles.asScala
asset <- item.assets.values().asScala
} yield asset.path

Assert.assertEquals(expectedCroppedPaths, actualCroppedPaths.toSet)
}


Expand Down Expand Up @@ -148,7 +159,12 @@ class TileGridTest {
("/tmp/openEO_2020-04-05Z_31UDS_2_5.tif", isoFormattedDate)
)

Assert.assertEquals(expectedTiles, tiles.asScala.map { case (path, timestamp, _) => (path, timestamp) }.toSet)
val actualTiles = for {
item <- tiles.asScala
asset <- item.assets.values().asScala
} yield (asset.path, item.datetime)

Assert.assertEquals(expectedTiles, actualTiles.toSet)
}

@Test
Expand All @@ -168,7 +184,12 @@ class TileGridTest {
("/tmp/testPrefix_2020-04-05Z_31UDS_2_5.tif", isoFormattedDate)
)

Assert.assertEquals(expectedTiles, tiles.asScala.map { case (path, timestamp, _) => (path, timestamp) }.toSet)
val actualTiles = for {
item <- tiles.asScala
asset <- item.assets.values().asScala
} yield (asset.path, item.datetime)

Assert.assertEquals(expectedTiles, actualTiles.toSet)
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ class WriteRDDToGeotiffTest {

val ret = saveSamples(tileLayerRDD, outDir.toString, tiltedRectangle, sampleNames,
DeflateCompression(BEST_COMPRESSION))
assertTrue(ret.get(0)._2.contains("T"))
assertTrue(ret.get(0).datetime.contains("T"))
}

@Test
Expand Down
Loading