11package org.phpinnacle.toblerone
22
3+ import org.apache.kafka.common.cache.LRUCache
4+ import org.apache.kafka.common.cache.SynchronizedCache
35import org.apache.kafka.common.config.ConfigDef
46import org.apache.kafka.connect.connector.ConnectRecord
57import org.apache.kafka.connect.data.Schema
@@ -30,6 +32,8 @@ abstract class RadixTransform<R : ConnectRecord<R>?> : Transformation<R> {
3032 )
3133
3234 private const val PURPOSE = " base-convert-transform"
35+
36+ private val cache = SynchronizedCache (LRUCache <Schema , Schema >(16 ))
3337 }
3438
3539 private lateinit var fields: Map <String , Int >
@@ -76,30 +80,20 @@ abstract class RadixTransform<R : ConnectRecord<R>?> : Transformation<R> {
7680 val value = Requirements .requireStruct(operatingValue(record), PURPOSE )
7781 val schema = operatingSchema(record) ? : return record
7882
79- val outputValues = mutableMapOf< String , Any >( )
80- val outputSchema = SchemaUtil .copySchemaBasics(schema )
83+ val outputSchema = copySchema(schema )
84+ val outputValues = Struct (outputSchema )
8185
8286 for (field in schema.fields()) {
8387 val name = field.name()
8488
85- if (name !in fields.keys) {
86- outputValues[name] = value.get(field)
87- outputSchema.field(name, field.schema())
88-
89- continue
89+ if (name in fields.keys) {
90+ outputValues.put(name, convert(name, field.schema(), value))
91+ } else {
92+ outputValues.put(name, value.get(field))
9093 }
91-
92- val (newSchema, newValue) = convert(name, field.schema(), value)
93-
94- outputSchema.field(name, newSchema)
95- outputValues[name] = newValue
9694 }
9795
98- val outputStruct = Struct (outputSchema)
99-
100- outputValues.forEach { outputStruct.put(it.key, it.value) }
101-
102- return newRecord(record, outputSchema.schema(), outputStruct)
96+ return newRecord(record, outputSchema.schema(), outputValues)
10397 }
10498
10599 private fun convert (key : String , value : Any ): Any {
@@ -114,19 +108,55 @@ abstract class RadixTransform<R : ConnectRecord<R>?> : Transformation<R> {
114108 }
115109 }
116110
117- private fun convert (key : String , schema : Schema , value : Struct ): Pair < Schema , Any > {
118- val radix = fields[key] ? : return Pair (schema, value)
111+ private fun convert (key : String , schema : Schema , value : Struct ): Any {
112+ val radix = fields[key] ? : return value
119113
120114 return when (schema.type()) {
121- Schema .Type .INT8 -> Pair (Schema .STRING_SCHEMA , value.getInt16(key).toString(radix))
122- Schema .Type .INT16 -> Pair (Schema .STRING_SCHEMA , value.getInt16(key).toString(radix))
123- Schema .Type .INT32 -> Pair (Schema .STRING_SCHEMA , value.getInt32(key).toString(radix))
124- Schema .Type .INT64 -> Pair (Schema .STRING_SCHEMA , value.getInt64(key).toString(radix))
125- Schema .Type .STRING -> Pair (Schema .INT32_SCHEMA , value.getString(key).trim().toInt(radix))
126- else -> Pair (schema, value)
115+ Schema .Type .INT8 -> value.getInt16(key).toString(radix)
116+ Schema .Type .INT16 -> value.getInt16(key).toString(radix)
117+ Schema .Type .INT32 -> value.getInt32(key).toString(radix)
118+ Schema .Type .INT64 -> value.getInt64(key).toString(radix)
119+ Schema .Type .STRING -> value.getString(key).trim().toInt(radix)
120+ else -> value
121+ }
122+ }
123+
124+ private fun infer (schema : Schema ): Schema {
125+ return when (schema.type()) {
126+ Schema .Type .INT8 -> Schema .STRING_SCHEMA
127+ Schema .Type .INT16 -> Schema .STRING_SCHEMA
128+ Schema .Type .INT32 -> Schema .STRING_SCHEMA
129+ Schema .Type .INT64 -> Schema .STRING_SCHEMA
130+ Schema .Type .STRING -> Schema .INT32_SCHEMA
131+ else -> schema
127132 }
128133 }
129134
135+ private fun copySchema (schema : Schema ): Schema
136+ {
137+ val cached = cache.get(schema)
138+
139+ if (cached != null ) {
140+ return cached
141+ }
142+
143+ val output = SchemaUtil .copySchemaBasics(schema)
144+
145+ for (field in schema.fields()) {
146+ val name = field.name()
147+
148+ if (name in fields.keys) {
149+ output.field(name, infer(field.schema()))
150+ } else {
151+ output.field(name, field.schema())
152+ }
153+ }
154+
155+ cache.put(schema, output)
156+
157+ return output
158+ }
159+
130160 class Key <R : ConnectRecord <R >? > : RadixTransform <R >() {
131161 override fun operatingSchema (record : R ? ): Schema ? = record?.keySchema()
132162
0 commit comments