Skip to content

Commit 95aa2b1

Browse files
authored
netcdf: track spatial keys (#421)
* merge_cubes: support cube partitioned by tile, as provided by netcdf collection Open-EO/openeo-geopyspark-driver#1146 #43 * consistently implement spatial key provider Open-EO/openeo-geopyspark-driver#1146 #43 * consistently implement spatial key provider Open-EO/openeo-geopyspark-driver#1146 #43 * more testing of spatial key provider Open-EO/openeo-geopyspark-driver#1146 #43 * more testing of partitioner Open-EO/openeo-geopyspark-driver#1146 #43 * more testing of partitioner Open-EO/openeo-geopyspark-driver#1146 #43 * more testing of partitioner Open-EO/openeo-geopyspark-driver#1146 #43 * fix netcdf sample geometry lookup Open-EO/openeo-geopyspark-driver#1146
1 parent fbfdda4 commit 95aa2b1

File tree

4 files changed

+102
-46
lines changed

4 files changed

+102
-46
lines changed

geotrellis-common/src/main/scala/org/openeo/geotrelliscommon/package.scala

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,19 +46,23 @@ package object geotrelliscommon {
4646

4747
}
4848

49-
object ByTileSpacetimePartitioner extends PartitionerIndex[SpaceTimeKey] {
49+
class ByTileSpacetimePartitioner(val theKeys: Option[Array[SpatialKey]] = Option.empty) extends PartitionerIndex[SpaceTimeKey] with SpatialKeysProvider {
5050
private def toZ(key: SpaceTimeKey): Z2 = Z2(key.col, key.row)
5151

5252
def toIndex(key: SpaceTimeKey): BigInt = toZ(key).z
5353

5454
def indexRanges(keyRange: (SpaceTimeKey, SpaceTimeKey)): Seq[(BigInt, BigInt)] =
5555
Z2.zranges(ZRange(toZ(keyRange._1), toZ(keyRange._2))).map(r => (BigInt(r.lower), BigInt(r.upper)))
5656

57+
override def spatialKeys: Option[Array[SpatialKey]] = {
58+
theKeys
59+
}
5760
}
5861

5962
object SparseSpaceOnlyPartitioner {
6063
// Shift by 8 removes the last 8 bytes: 256 tiles max in one partition.
6164
def toIndex(key: SpaceTimeKey, indexReduction:Int = 8): BigInt = Z2(key.col,key.row).z >> indexReduction
65+
def toIndex(key: SpatialKey, indexReduction:Int): BigInt = Z2(key.col,key.row).z >> indexReduction
6266
}
6367

6468
object SparseSpaceTimePartitioner {
@@ -68,7 +72,12 @@ package object geotrelliscommon {
6872
def toIndex(key: SpaceTimeKey, indexReduction:Int = 8): BigInt = keyIndex.toIndex(key) >> indexReduction
6973
}
7074

71-
class SparseSpaceTimePartitioner (val indices: Array[BigInt], val indexReduction:Int = 8, val theKeys: Option[Array[SpaceTimeKey]] = Option.empty) extends PartitionerIndex[SpaceTimeKey] {
75+
76+
trait SpatialKeysProvider {
77+
def spatialKeys: Option[Array[SpatialKey]]
78+
}
79+
80+
class SparseSpaceTimePartitioner (val indices: Array[BigInt], val indexReduction:Int = 8, val theKeys: Option[Array[SpaceTimeKey]] = Option.empty) extends PartitionerIndex[SpaceTimeKey] with SpatialKeysProvider {
7281

7382
def toIndex(key: SpaceTimeKey): BigInt = SparseSpaceTimePartitioner.toIndex(key, indexReduction)
7483

@@ -99,9 +108,14 @@ package object geotrelliscommon {
99108

100109

101110
override def toString = s"SparseSpaceTimePartitioner ${indices.length} ${theKeys.isDefined}"
111+
112+
override def spatialKeys: Option[Array[SpatialKey]] = {
113+
theKeys.map(_.map(_.spatialKey).distinct)
114+
}
115+
102116
}
103117

104-
class SparseSpaceOnlyPartitioner (val indices: Array[BigInt], val indexReduction:Int = 8, val theKeys: Option[Array[SpaceTimeKey]] = Option.empty ) extends PartitionerIndex[SpaceTimeKey] {
118+
class SparseSpaceOnlyPartitioner (val indices: Array[BigInt], val indexReduction:Int = 8, val theKeys: Option[Array[SpaceTimeKey]] = Option.empty ) extends PartitionerIndex[SpaceTimeKey] with SpatialKeysProvider {
105119

106120
def toIndex(key: SpaceTimeKey): BigInt = SparseSpaceOnlyPartitioner.toIndex(key, indexReduction)
107121

@@ -123,9 +137,11 @@ package object geotrelliscommon {
123137
val state = Seq( indexReduction)
124138
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
125139
}
140+
141+
override def spatialKeys: Option[Array[SpatialKey]] = theKeys.map(_.map(_.spatialKey).distinct)
126142
}
127143

128-
class SparseSpatialPartitioner (val indices: Array[BigInt], val indexReduction:Int = 8, val theKeys: Option[Array[SpatialKey]] = Option.empty ) extends PartitionerIndex[SpatialKey] {
144+
class SparseSpatialPartitioner (val indices: Array[BigInt], val indexReduction:Int = 8, val theKeys: Option[Array[SpatialKey]] = Option.empty ) extends PartitionerIndex[SpatialKey] with SpatialKeysProvider {
129145

130146
def toIndex(key: SpatialKey): BigInt = Z2(key.col,key.row).z >> indexReduction
131147

@@ -147,6 +163,8 @@ package object geotrelliscommon {
147163
val state = Seq( indexReduction)
148164
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
149165
}
166+
167+
override def spatialKeys: Option[Array[SpatialKey]] = theKeys
150168
}
151169

152170
class ConfigurableSpatialPartitioner(val indexReduction:Int = 4) extends PartitionerIndex[SpatialKey] {

openeo-geotrellis/src/main/scala/org/openeo/geotrellis/OpenEOProcesses.scala

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.{Partitioner, SparkContext}
2727
import org.openeo.geotrellis.OpenEOProcessScriptBuilder.{MaxIgnoreNoData, MinIgnoreNoData, OpenEOProcess, safeConvert}
2828
import org.openeo.geotrellis.focal._
2929
import org.openeo.geotrellis.netcdf.NetCDFRDDWriter.ContextSeq
30-
import org.openeo.geotrelliscommon.{ByTileSpacetimePartitioner, ByTileSpatialPartitioner, ConfigurableSpaceTimePartitioner, ConfigurableSpatialPartitionerReduceZ, DatacubeSupport, FFTConvolve, OpenEORasterCube, OpenEORasterCubeMetadata, SCLConvolutionFilter, SpaceTimeByMonthPartitioner, SparseSpaceOnlyPartitioner, SparseSpaceTimePartitioner, SparseSpatialPartitioner}
30+
import org.openeo.geotrelliscommon.{ByTileSpacetimePartitioner, ByTileSpatialPartitioner, ConfigurableSpaceTimePartitioner, ConfigurableSpatialPartitionerReduceZ, DatacubeSupport, FFTConvolve, OpenEORasterCube, OpenEORasterCubeMetadata, SCLConvolutionFilter, SpaceTimeByMonthPartitioner, SparseSpaceOnlyPartitioner, SparseSpaceTimePartitioner, SparseSpatialPartitioner, SpatialKeysProvider}
3131
import org.slf4j.LoggerFactory
3232

3333
import java.io.File
@@ -173,8 +173,6 @@ class OpenEOProcesses extends Serializable {
173173
}
174174

175175
private def transformTimeDimension[KT](datacube: MultibandTileLayerRDD[SpaceTimeKey], scriptBuilder: OpenEOProcessScriptBuilder, context: util.Map[String, Any], reduce:Boolean=false): RDD[(KT, MultibandTile)] = {
176-
177-
178176
val expectedCellType = datacube.metadata.cellType
179177
val applyToTimeseries: Iterable[(SpaceTimeKey, MultibandTile)] => Map[KT, MultibandTile] =
180178
if(reduce){
@@ -191,8 +189,6 @@ class OpenEOProcesses extends Serializable {
191189
}
192190

193191
private def transformTimeDimension[KT](datacube: MultibandTileLayerRDD[SpaceTimeKey],applyToTimeseries: Iterable[(SpaceTimeKey, MultibandTile)] => Map[KT, MultibandTile], reduce:Boolean ): RDD[(KT, MultibandTile)] = {
194-
195-
196192
val index: Option[PartitionerIndex[SpaceTimeKey]] =
197193
if (datacube.partitioner.isDefined && datacube.partitioner.get.isInstanceOf[SpacePartitioner[SpaceTimeKey]]) {
198194
Some(datacube.partitioner.get.asInstanceOf[SpacePartitioner[SpaceTimeKey]].index)
@@ -201,15 +197,15 @@ class OpenEOProcesses extends Serializable {
201197
}
202198
logger.info(s"Applying callback on time dimension of cube with partitioner: ${datacube.partitioner.getOrElse("no partitioner")} - index: ${index.getOrElse("no index")} and metadata ${datacube.metadata}")
203199
val rdd: RDD[(SpaceTimeKey, MultibandTile)] =
204-
if (index.isDefined && (index.get.isInstanceOf[SparseSpaceOnlyPartitioner] || index.get == ByTileSpacetimePartitioner )) {
200+
if (index.isDefined && (index.get.isInstanceOf[SparseSpaceOnlyPartitioner] || index.get.isInstanceOf[ByTileSpacetimePartitioner] )) {
205201
datacube
206202
} else {
207-
val keys: Option[Array[SpaceTimeKey]] = findPartitionerKeys(datacube)
203+
val keys: Option[Array[SpatialKey]] = findPartitionerSpatialKeys(datacube)
208204
val spatiallyGroupingIndex =
209205
if(keys.isDefined){
210-
new SparseSpaceOnlyPartitioner(keys.get.map(SparseSpaceOnlyPartitioner.toIndex(_, indexReduction = 0)).distinct.sorted, 0, keys)
206+
new SparseSpaceOnlyPartitioner(keys.get.map(SparseSpaceOnlyPartitioner.toIndex(_, indexReduction = 0)).distinct.sorted, 0, findPartitionerKeys(datacube))
211207
}else{
212-
ByTileSpacetimePartitioner
208+
new ByTileSpacetimePartitioner()
213209
}
214210
logger.info(f"Regrouping data cube along the time dimension, with index $spatiallyGroupingIndex. Cube metadata: ${datacube.metadata}")
215211
val partitioner: Partitioner = new SpacePartitioner(datacube.metadata.bounds)(implicitly, implicitly, spatiallyGroupingIndex)
@@ -289,10 +285,10 @@ class OpenEOProcesses extends Serializable {
289285

290286
val newBounds = retiled.metadata.bounds.asInstanceOf[KeyBounds[SpaceTimeKey]].toSpatial
291287

292-
val keys = findPartitionerKeys(datacube)
288+
val keys = findPartitionerSpatialKeys(datacube)
293289
val spatiallyGroupingIndex =
294290
if(keys.isDefined){
295-
val spatialKeys: Array[SpatialKey] = keys.get.map(_.spatialKey).distinct
291+
val spatialKeys: Array[SpatialKey] = keys.get
296292
new SparseSpatialPartitioner(spatialKeys.map(ByTileSpatialPartitioner.toIndex).distinct.sorted, 0, Some(spatialKeys))
297293
}else{
298294
ByTileSpatialPartitioner
@@ -306,11 +302,11 @@ class OpenEOProcesses extends Serializable {
306302
private def groupOnTimeDimension(datacube: MultibandTileLayerRDD[SpaceTimeKey]) = {
307303
val targetBounds = datacube.metadata.bounds.asInstanceOf[KeyBounds[SpaceTimeKey]].toSpatial
308304

309-
val keys: Option[Array[SpaceTimeKey]] = findPartitionerKeys(datacube)
305+
val keys: Option[Array[SpatialKey]] = findPartitionerSpatialKeys(datacube)
310306

311307
val index =
312308
if (keys.isDefined) {
313-
new SparseSpatialPartitioner(keys.get.map(SparseSpaceOnlyPartitioner.toIndex(_, indexReduction = 0)).distinct.sorted, 0, keys.map(_.map(_.spatialKey)))
309+
new SparseSpatialPartitioner(keys.get.map(SparseSpaceOnlyPartitioner.toIndex(_, indexReduction = 0)).distinct.sorted, 0, keys)
314310
} else {
315311
ByTileSpatialPartitioner
316312
}
@@ -322,6 +318,20 @@ class OpenEOProcesses extends Serializable {
322318
groupedOnTime
323319
}
324320

321+
def findPartitionerSpatialKeys(datacube: MultibandTileLayerRDD[SpaceTimeKey]): Option[Array[SpatialKey]] = {
322+
val keys: Option[Array[SpatialKey]] = if (datacube.partitioner.isDefined && (datacube.partitioner.get.isInstanceOf[SpacePartitioner[SpaceTimeKey]] || datacube.partitioner.get.isInstanceOf[SpacePartitioner[SpatialKey]])) {
323+
val index = datacube.partitioner.get.asInstanceOf[SpacePartitioner[SpaceTimeKey]].index
324+
index match {
325+
case value: SpatialKeysProvider =>
326+
value.spatialKeys
327+
case _ =>
328+
Option.empty
329+
}
330+
} else {
331+
Option.empty
332+
}
333+
keys
334+
}
325335

326336
def findPartitionerKeys(datacube: MultibandTileLayerRDD[SpaceTimeKey]): Option[Array[SpaceTimeKey]] = {
327337
val keys: Option[Array[SpaceTimeKey]] = if (datacube.partitioner.isDefined && datacube.partitioner.get.isInstanceOf[SpacePartitioner[SpaceTimeKey]]) {
@@ -461,9 +471,9 @@ class OpenEOProcesses extends Serializable {
461471

462472
val filteredCube = filterNegativeSpatialKeys(datacube)
463473

464-
val keys = findPartitionerKeys(filteredCube).map(_.filter( k => k.row >= 0 && k.col >= 0 ))
474+
val keys = findPartitionerSpatialKeys(filteredCube).map(_.filter( k => k.row >= 0 && k.col >= 0 ))
465475
val allPossibleKeys: immutable.Seq[SpatialKey] = if(keys.isDefined) {
466-
keys.get.map(_.spatialKey).distinct.toList
476+
keys.get.distinct.toList
467477
} else{
468478
filteredCube.metadata.tileBounds
469479
.coordsIter
@@ -480,7 +490,7 @@ class OpenEOProcesses extends Serializable {
480490
}else{
481491
if (datacube.partitioner.isDefined && datacube.partitioner.get.isInstanceOf[SpacePartitioner[SpaceTimeKey]]) {
482492
val index = datacube.partitioner.get.asInstanceOf[SpacePartitioner[SpaceTimeKey]].index
483-
if (index.isInstanceOf[SparseSpaceOnlyPartitioner] || index == ByTileSpacetimePartitioner) {
493+
if (index.isInstanceOf[SparseSpaceOnlyPartitioner] || index.isInstanceOf[ByTileSpacetimePartitioner]) {
484494
index//a space only partitioner does not care about time, so can be reused as-is
485495
} else {
486496
SpaceTimeByMonthPartitioner
@@ -537,7 +547,7 @@ class OpenEOProcesses extends Serializable {
537547

538548
val tilesByInterval: RDD[(SpaceTimeKey, MultibandTile)] =
539549
if(reduce) {
540-
if(datacube.partitioner.isDefined && datacube.partitioner.get.isInstanceOf[SpacePartitioner[SpaceTimeKey]] && (datacube.partitioner.get.asInstanceOf[SpacePartitioner[SpaceTimeKey]].index.isInstanceOf[SparseSpaceOnlyPartitioner] || datacube.partitioner.get.asInstanceOf[SpacePartitioner[SpaceTimeKey]].index == ByTileSpacetimePartitioner )) {
550+
if(datacube.partitioner.isDefined && datacube.partitioner.get.isInstanceOf[SpacePartitioner[SpaceTimeKey]] && (datacube.partitioner.get.asInstanceOf[SpacePartitioner[SpaceTimeKey]].index.isInstanceOf[SparseSpaceOnlyPartitioner] || datacube.partitioner.get.asInstanceOf[SpacePartitioner[SpaceTimeKey]].index.isInstanceOf[ByTileSpacetimePartitioner] )) {
541551
filteredCube.mapPartitions(elements =>{
542552
val byNewKey= elements.flatMap(mapToNewKey).toStream.groupBy(_._1)
543553
byNewKey.mapValues(v=>aggregateTiles(v.map(_._2))).iterator
@@ -717,7 +727,7 @@ class OpenEOProcesses extends Serializable {
717727
implicit val newIndex: PartitionerIndex[K] = new SparseSpaceOnlyPartitioner(newIndices,leftPart.index.asInstanceOf[SparseSpaceOnlyPartitioner].indexReduction).asInstanceOf[PartitionerIndex[K]]
718728
SpacePartitioner[K](kb)(implicitly,implicitly,newIndex)
719729
}
720-
else if(leftPart.index == rightPart.index && (leftPart.index == ByTileSpatialPartitioner || leftPart.index == ByTileSpacetimePartitioner)) {
730+
else if(leftPart.index == rightPart.index && (leftPart.index == ByTileSpatialPartitioner || leftPart.index.isInstanceOf[ByTileSpacetimePartitioner])) {
721731
leftPart
722732
}
723733
else if(leftPart.index == rightPart.index && leftPart.index.isInstanceOf[ConfigurableSpaceTimePartitioner] ) {

openeo-geotrellis/src/main/scala/org/openeo/geotrellis/layers/NetCDFCollection.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,8 @@ object NetCDFCollection {
107107
val spatialBounds = KeyBounds(layout.mapTransform(extent))
108108
val temporalBounds = KeyBounds(SpaceTimeKey(spatialBounds.minKey,TemporalKey(LocalDate.of(1990,1,1).atStartOfDay(ZoneId.of("UTC")))),SpaceTimeKey(spatialBounds.maxKey,TemporalKey(LocalDate.now().atStartOfDay(ZoneId.of("UTC")))))
109109

110-
val partitioner: Partitioner = new SpacePartitioner(temporalBounds)(implicitly, implicitly, ByTileSpacetimePartitioner)
110+
val keys: Array[SpatialKey] = items.map(i => i.geometry.getOrElse(i.bbox.toPolygon())).clipToGrid(layout).map(_._1).distinct().collect()
111+
val partitioner: Partitioner = new SpacePartitioner(temporalBounds)(implicitly, implicitly, new ByTileSpacetimePartitioner(Some(keys)))
111112

112113
val metadata = TileLayerMetadata[SpaceTimeKey](cellType, layout, extent, crs(0), temporalBounds)
113114
val retiled: RDD[(SpaceTimeKey, MultibandTile)] = features.tileToLayout(metadata).partitionBy(partitioner)

0 commit comments

Comments
 (0)