Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ COPY ./pipeline/hudi-connector/target/hudi-connector-1.0.0.jar $FLINK_HOME/custo

FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as cache-indexer-image
USER flink
COPY --from=build-pipeline /app/pipeline/cache-indexer/target/cache-indexer-1.0.0.jar $FLINK_HOME/lib
COPY --from=build-pipeline /app/pipeline/cache-indexer/target/cache-indexer-1.0.0.jar $FLINK_HOME/lib
2 changes: 1 addition & 1 deletion data-products/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -455,4 +455,4 @@
</plugin>
</plugins>
</build>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,29 @@ object DatasetRegistryService {
}
}

def readAllDatasetSourceConfig(): Option[List[DatasetSourceConfig]] = {
def readDataset(id: String): Option[Dataset] = {
val postgresConnect = new PostgresConnect(postgresConfig)
var preparedStatement: PreparedStatement = null
var resultSet: ResultSet = null
try {
val query = "SELECT * FROM datasets WHERE id = ?"
preparedStatement = postgresConnect.prepareStatement(query)
preparedStatement.setString(1, id)
resultSet = postgresConnect.executeQuery(preparedStatement = preparedStatement)
if (resultSet.next()) {
Some(parseDataset(resultSet))
} else {
None
}
} finally {
if (resultSet != null) resultSet.close()
if (preparedStatement != null) preparedStatement.close()
postgresConnect.closeConnection()
}
}


def readAllDatasetSourceConfig(): Option[List[DatasetSourceConfig]] = {
val postgresConnect = new PostgresConnect(postgresConfig)
try {
val rs = postgresConnect.executeQuery("SELECT * FROM dataset_source_config")
Expand All @@ -84,6 +105,27 @@ object DatasetRegistryService {
}
}


def readDatasetSourceConfig(datasetId: String): Option[List[DatasetSourceConfig]] = {
val postgresConnect = new PostgresConnect(postgresConfig)
var preparedStatement: PreparedStatement = null
var resultSet: ResultSet = null
try {
val query = "SELECT * FROM dataset_source_config WHERE dataset_id = ?"
preparedStatement = postgresConnect.prepareStatement(query)
preparedStatement.setString(1, datasetId)
resultSet = postgresConnect.executeQuery(preparedStatement = preparedStatement)
Option(Iterator.continually((resultSet, resultSet.next)).takeWhile(f => f._2).map(f => f._1).map(result => {
val datasetSourceConfig = parseDatasetSourceConfig(result)
datasetSourceConfig
}).toList)
} finally {
if (resultSet != null) resultSet.close()
if (preparedStatement != null) preparedStatement.close()
postgresConnect.closeConnection()
}
}

def readAllDatasetTransformations(): Map[String, List[DatasetTransformation]] = {

val postgresConnect = new PostgresConnect(postgresConfig)
Expand Down Expand Up @@ -114,12 +156,32 @@ object DatasetRegistryService {
def readAllDatasources(): Option[List[DataSource]] = {

val postgresConnect = new PostgresConnect(postgresConfig)
var preparedStatement: PreparedStatement = null
val query = "UPDATE datasources SET datasource_ref = ? WHERE datasource = ? AND dataset_id = ?"
try {
val rs = postgresConnect.executeQuery(s"SELECT * FROM datasources")
Option(Iterator.continually((rs, rs.next)).takeWhile(f => f._2).map(f => f._1).map(result => {
parseDatasource(result)
}).toList)
} finally {
if (preparedStatement != null) preparedStatement.close()
postgresConnect.closeConnection()
}
}

def updateConnectorStats(id: String, lastFetchTimestamp: Timestamp, records: Long): Int = {
val postgresConnect = new PostgresConnect(postgresConfig)
var preparedStatement: PreparedStatement = null
val query = "UPDATE dataset_source_config SET connector_stats = COALESCE(connector_stats, '{}')::jsonb || jsonb_build_object('records', COALESCE(connector_stats->>'records', '0')::int + ? ::int) || jsonb_build_object('last_fetch_timestamp', ? ::timestamp) || jsonb_build_object('last_run_timestamp', ? ::timestamp) WHERE id = ?;"
try {
preparedStatement = postgresConnect.prepareStatement(query)
preparedStatement.setString(1, records.toString)
preparedStatement.setTimestamp(2, lastFetchTimestamp)
preparedStatement.setTimestamp(3, new Timestamp(System.currentTimeMillis()))
preparedStatement.setString(4, id)
postgresConnect.executeUpdate(preparedStatement)
} finally {
if (preparedStatement != null) preparedStatement.close()
postgresConnect.closeConnection()
}
}
Expand Down Expand Up @@ -156,6 +218,36 @@ object DatasetRegistryService {
}
}

def updateConnectorDisconnections(id: String, disconnections: Int): Int = {
val postgresConnect = new PostgresConnect(postgresConfig)
var preparedStatement: PreparedStatement = null
val query = "UPDATE dataset_source_config SET connector_stats = jsonb_set(coalesce(connector_stats, '{}')::jsonb, '{disconnections}', to_jsonb(?)) WHERE id = ?"
try {
preparedStatement = postgresConnect.prepareStatement(query)
preparedStatement.setInt(1, disconnections)
preparedStatement.setString(2, id)
postgresConnect.executeUpdate(preparedStatement)
} finally {
if (preparedStatement != null) preparedStatement.close()
postgresConnect.closeConnection()
}
}

def updateConnectorAvgBatchReadTime(id: String, avgReadTime: Long): Int = {
val postgresConnect = new PostgresConnect(postgresConfig)
var preparedStatement: PreparedStatement = null
val query = "UPDATE dataset_source_config SET connector_stats = jsonb_set(coalesce(connector_stats, '{}')::jsonb, '{avg_batch_read_time}', to_jsonb(?)) WHERE id = ?"
try {
preparedStatement = postgresConnect.prepareStatement(query)
preparedStatement.setLong(1, avgReadTime)
preparedStatement.setString(2, id)
postgresConnect.executeUpdate(preparedStatement)
} finally {
if (preparedStatement != null) preparedStatement.close()
postgresConnect.closeConnection()
}
}

private def parseDataset(rs: ResultSet): Dataset = {
val datasetId = rs.getString("id")
val datasetType = rs.getString("type")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ trait SystemEventHandler {
}

private def getTime(timespans: Map[String, AnyRef], producer: Producer): Option[Long] = {
timespans.get(producer.toString).map(f => f.asInstanceOf[Long])
timespans.get(producer.toString).map(f => f.asInstanceOf[Number].longValue())
}

private def getStat(obsrvMeta: Map[String, AnyRef], stat: Stats): Option[Long] = {
obsrvMeta.get(stat.toString).map(f => f.asInstanceOf[Long])
obsrvMeta.get(stat.toString).map(f => f.asInstanceOf[Number].longValue())
}

def getError(error: ErrorConstants.Error, producer: Producer, functionalError: FunctionalError): Option[ErrorLog] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,40 @@ class MapDeserializationSchema extends KafkaRecordDeserializationSchema[mutable.

}

class TopicDeserializationSchema extends KafkaRecordDeserializationSchema[mutable.Map[String, AnyRef]] {

private val serialVersionUID = -3224825136576915426L

override def getProducedType: TypeInformation[mutable.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[mutable.Map[String, AnyRef]])

override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]], out: Collector[mutable.Map[String, AnyRef]]): Unit = {
val msg = try {
val event = JSONUtil.deserialize[Map[String, AnyRef]](record.value())
mutable.Map[String, AnyRef](
"dataset" -> record.topic(),
"event" -> event
)
} catch {
case _: Exception =>
mutable.Map[String, AnyRef](Constants.INVALID_JSON -> new String(record.value, "UTF-8"))
}
initObsrvMeta(msg, record)
out.collect(msg)
}

private def initObsrvMeta(msg: mutable.Map[String, AnyRef], record: ConsumerRecord[Array[Byte], Array[Byte]]): Unit = {
if (!msg.contains("obsrv_meta")) {
msg.put("obsrv_meta", Map(
"syncts" -> record.timestamp(),
"processingStartTime" -> System.currentTimeMillis(),
"flags" -> Map(),
"timespans" -> Map(),
"error" -> Map()
))
}
}
}

class StringDeserializationSchema extends KafkaRecordDeserializationSchema[String] {

private val serialVersionUID = -3224825136576915426L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,4 @@ abstract class BaseJobConfig[T](val config: Config, val jobName: String) extends
val CONST_OBSRV_META = "obsrv_meta"
val CONST_DATASET = "dataset"
val CONST_EVENT = "event"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ abstract class BaseStreamTask[T] extends BaseStreamTaskSink[T] {
.rebalance()
}

def getTopicMapDataStream(env: StreamExecutionEnvironment, config: BaseJobConfig[T], kafkaTopics: List[String],
consumerSourceName: String, kafkaConnector: FlinkKafkaConnector): DataStream[mutable.Map[String, AnyRef]] = {
env.fromSource(kafkaConnector.kafkaTopicMapSource(kafkaTopics), WatermarkStrategy.noWatermarks[mutable.Map[String, AnyRef]](), consumerSourceName)
.uid(consumerSourceName).setParallelism(config.kafkaConsumerParallelism)
.rebalance()
}

def getStringDataStream(env: StreamExecutionEnvironment, config: BaseJobConfig[T], kafkaConnector: FlinkKafkaConnector): DataStream[String] = {
env.fromSource(kafkaConnector.kafkaStringSource(config.inputTopic()), WatermarkStrategy.noWatermarks[String](), config.inputConsumer())
.uid(config.inputConsumer()).setParallelism(config.kafkaConsumerParallelism)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ class FlinkKafkaConnector(config: BaseJobConfig[_]) extends Serializable {
.build()
}

def kafkaTopicMapSource(kafkaTopics: List[String]): KafkaSource[mutable.Map[String, AnyRef]] = {
KafkaSource.builder[mutable.Map[String, AnyRef]]()
.setTopics(kafkaTopics.asJava)
.setDeserializer(new TopicDeserializationSchema)
.setProperties(config.kafkaConsumerProperties())
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.build()
}

def kafkaMapDynamicSink(): KafkaSink[mutable.Map[String, AnyRef]] = {
KafkaSink.builder[mutable.Map[String, AnyRef]]()
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package org.sunbird.obsrv.core.util
import org.postgresql.ds.PGSimpleDataSource
import org.slf4j.LoggerFactory

import java.sql.{Connection, ResultSet, SQLException, Statement}
import java.sql.{Connection, PreparedStatement, ResultSet, SQLException, Statement}

final case class PostgresConnectionConfig(user: String, password: String, database: String, host: String, port: Int, maxConnections: Int)

Expand Down
12 changes: 0 additions & 12 deletions pipeline/cache-indexer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,12 @@
<groupId>org.sunbird.obsrv</groupId>
<artifactId>dataset-registry</artifactId>
<version>1.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-native_${scala.maj.version}</artifactId>
<version>4.0.6</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.maj.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class CacheIndexerStreamTask(config: CacheIndexerConfig, kafkaConnector: FlinkKa

val datasets = DatasetRegistry.getAllDatasets(Some(DatasetType.master.toString))
val datasetIds = datasets.map(f => f.id)
val dataStream = getMapDataStream(env, config, datasetIds, config.kafkaConsumerProperties(), consumerSourceName = s"cache-indexer-consumer", kafkaConnector)
val dataStream = getTopicMapDataStream(env, config, datasetIds, consumerSourceName = s"cache-indexer-consumer", kafkaConnector)
processStream(dataStream)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package org.sunbird.obsrv.util

import org.json4s.native.JsonMethods._
import org.json4s.{JNothing, JValue}
import org.json4s.{JField, JNothing, JValue}
import org.slf4j.LoggerFactory
import org.sunbird.obsrv.core.cache.RedisConnect
import org.sunbird.obsrv.core.model.Constants.OBSRV_META
import org.sunbird.obsrv.model.DatasetModels.Dataset
import org.sunbird.obsrv.pipeline.task.CacheIndexerConfig
import redis.clients.jedis.Jedis
Expand Down Expand Up @@ -37,7 +38,11 @@ class MasterDataCache(val config: CacheIndexerConfig) {
def process(dataset: Dataset, key: String, event: JValue): (Int, Int) = {
val jedis = this.datasetPipelineMap(dataset.id)
val dataFromCache = getDataFromCache(dataset, key, jedis)
updateCache(dataset, dataFromCache, key, event, jedis)
val updatedEvent = event.removeField {
case JField(OBSRV_META, _) => true
case _ => false
}
updateCache(dataset, dataFromCache, key, updatedEvent, jedis)
(if (dataFromCache == null) 1 else 0, if (dataFromCache == null) 0 else 1)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package org.sunbird.obsrv.fixture

object EventFixture {

val VALID_BATCH_EVENT_D3_INSERT = """{"dataset":"dataset3","event":{"code":"HYUN-CRE-D6","manufacturer":"Hyundai","model":"Creta","variant":"SX(O)","modelYear":"2023","price":"2200000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","dealer":{"email":"john.doe@example.com","locationId":"KUN12345"}}}"""
val VALID_BATCH_EVENT_D3_INSERT_2 = """{"dataset":"dataset3","event":{"code":"HYUN-TUC-D6","manufacturer":"Hyundai","model":"Tucson","variant":"Signature","modelYear":"2023","price":"4000000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","dealer":{"email":"admin.hyun@gmail.com","locationId":"KUN134567"}}}"""
val VALID_BATCH_EVENT_D3_UPDATE = """{"dataset":"dataset3","event":{"code":"HYUN-CRE-D6","dealer":{"email":"john.doe@example.com","locationId":"KUN12345"},"safety":"3 Star (Global NCAP)","seatingCapacity":5}}"""
val VALID_BATCH_EVENT_D4 = """{"dataset":"dataset4","event":{"code":"JEEP-CP-D3","manufacturer":"Jeep","model":"Compass","variant":"Model S (O) Diesel 4x4 AT","modelYear":"2023","price":"3800000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","safety":"5 Star (Euro NCAP)","seatingCapacity":5}}"""
val INVALID_BATCH_EVENT_D4 = """{"dataset":"dataset4","event":{"code1":"JEEP-CP-D3","manufacturer":"Jeep","model":"Compass","variant":"Model S (O) Diesel 4x4 AT","modelYear":"2023","price":"3800000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","safety":"5 Star (Euro NCAP)","seatingCapacity":5}}"""
val VALID_BATCH_EVENT_D3_INSERT = """{"code":"HYUN-CRE-D6","manufacturer":"Hyundai","model":"Creta","variant":"SX(O)","modelYear":"2023","price":"2200000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","dealer":{"email":"john.doe@example.com","locationId":"KUN12345"}}"""
val VALID_BATCH_EVENT_D3_INSERT_2 = """{"code":"HYUN-TUC-D6","manufacturer":"Hyundai","model":"Tucson","variant":"Signature","modelYear":"2023","price":"4000000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","dealer":{"email":"admin.hyun@gmail.com","locationId":"KUN134567"}}"""
val VALID_BATCH_EVENT_D3_UPDATE = """{"code":"HYUN-CRE-D6","dealer":{"email":"john.doe@example.com","locationId":"KUN12345"},"safety":"3 Star (Global NCAP)","seatingCapacity":5}"""
val VALID_BATCH_EVENT_D4 = """{"code":"JEEP-CP-D3","manufacturer":"Jeep","model":"Compass","variant":"Model S (O) Diesel 4x4 AT","modelYear":"2023","price":"3800000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","safety":"5 Star (Euro NCAP)","seatingCapacity":5}"""
val INVALID_BATCH_EVENT_D4 = """{"code1":"JEEP-CP-D3","manufacturer":"Jeep","model":"Compass","variant":"Model S (O) Diesel 4x4 AT","modelYear":"2023","price":"3800000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","safety":"5 Star (Euro NCAP)","seatingCapacity":5}"""
}
12 changes: 12 additions & 0 deletions pipeline/denormalizer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@
<artifactId>dataset-registry</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.maj.version}</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.sunbird.obsrv</groupId>
<artifactId>transformation-sdk</artifactId>
Expand Down
Loading