12
12
import org .apache .kafka .clients .consumer .ConsumerConfig ;
13
13
import org .apache .kafka .clients .consumer .KafkaConsumer ;
14
14
import org .apache .kafka .clients .consumer .OffsetAndTimestamp ;
15
+ import org .apache .kafka .clients .consumer .OffsetResetStrategy ;
15
16
import org .apache .kafka .common .TopicPartition ;
16
17
import org .apache .kafka .common .serialization .StringDeserializer ;
17
18
@@ -82,8 +83,12 @@ public KafkaConsumer getKafkaConsumer(String streamName) throws Exception {
82
83
if (kafkaSchema == null ) {
83
84
throw new Exception ("Kafka Schema not Found for Stream: " + streamName );
84
85
}
85
- kafkaConsumer = getConsumer (kafkaSchema );
86
- kafkaConsumer .subscribe (Collections .singletonList (streamName + ".stream" ));
86
+ kafkaConsumer = getConsumer (kafkaSchema , streamName );
87
+ TopicPartition topicPartition = new TopicPartition (streamName + ".stream" ,0 );
88
+ kafkaConsumer .assign (Collections .singletonList (topicPartition ));
89
+ if (kafkaProps .get (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG ).equals (OffsetResetStrategy .EARLIEST .toString ().toLowerCase ())) {
90
+ return seekToMidNight (topicPartition );
91
+ }
87
92
}
88
93
catch (Exception e ) {
89
94
throw (e );
@@ -106,7 +111,7 @@ public KafkaConsumer getKafkaConsumer(String streamName, long timestamp) throws
106
111
if (kafkaSchema == null ) {
107
112
throw new Exception ("Kafka Schema not Found for Stream: " + streamName );
108
113
}
109
- kafkaConsumer = getConsumer (kafkaSchema );
114
+ kafkaConsumer = getConsumer (kafkaSchema , streamName );
110
115
TopicPartition topicPartition = new TopicPartition (streamName + ".stream" ,0 );
111
116
kafkaConsumer .assign (Collections .singleton (topicPartition ));
112
117
@@ -137,7 +142,7 @@ public KafkaConsumer getKafkaConsumer(String streamName, long timestamp) throws
137
142
*/
138
143
139
144
140
- public KafkaAvroConsumer getConsumer (Schema avroSchema ) throws Exception {
145
+ public KafkaAvroConsumer getConsumer (Schema avroSchema , String streamName ) throws Exception {
141
146
try {
142
147
if (!IsItJunit .isJUnitTest ()) {
143
148
ConfigProperties .resolveAndExportToSystemProperties (securityProps );
@@ -147,9 +152,9 @@ public KafkaAvroConsumer getConsumer(Schema avroSchema) throws Exception {
147
152
kafkaProps .put ("key.deserializer" , StringDeserializer .class .getName ());
148
153
kafkaProps .put ("value.deserializer" , AvroDeserializer .class .getName ());
149
154
if (!kafkaProps .containsKey (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG )) {
150
- kafkaProps .put (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , "earliest" );
155
+ kafkaProps .put (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , OffsetResetStrategy . EARLIEST . toString (). toLowerCase () );
151
156
}
152
- kafkaProps .put (ConsumerConfig .GROUP_ID_CONFIG , this .clientID + "_" +getDate () + "_" + UUID . randomUUID (). toString ());
157
+ kafkaProps .put (ConsumerConfig .GROUP_ID_CONFIG , this .clientID + "_" + streamName + "_" + getDate ());
153
158
ConfigProperties .resolve (kafkaProps );
154
159
return new KafkaAvroConsumer (kafkaProps , avroSchema );
155
160
}
@@ -201,8 +206,12 @@ public KafkaConsumer getNewsConsumer(String topic) throws Exception {
201
206
if (newsSchema == null ) {
202
207
throw new Exception ("News Schema not Found " );
203
208
}
204
- kafkaConsumer = getConsumer (newsSchema );
205
- kafkaConsumer .subscribe (Collections .singletonList (topic +".stream" ));
209
+ kafkaConsumer = getConsumer (newsSchema , topic );
210
+ TopicPartition topicPartition = new TopicPartition (topic + ".stream" ,0 );
211
+ kafkaConsumer .assign (Collections .singletonList (topicPartition ));
212
+ if (kafkaProps .get (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG ).equals (OffsetResetStrategy .EARLIEST .toString ().toLowerCase ())) {
213
+ return seekToMidNight (topicPartition );
214
+ }
206
215
return kafkaConsumer ;
207
216
}
208
217
catch (Exception e ){
@@ -217,4 +226,32 @@ private String getDate(){
217
226
String date = dateformat .format (new Date ());
218
227
return date ;
219
228
}
229
+
230
+ private KafkaConsumer seekToMidNight (TopicPartition topicPartition ){
231
+ Map <TopicPartition ,Long > timestmaps = new HashMap ();
232
+ timestmaps .put (topicPartition , getTodayMidNightTimeStamp ());
233
+ Map <TopicPartition , OffsetAndTimestamp > offsetsForTimes = kafkaConsumer .offsetsForTimes (timestmaps );
234
+ OffsetAndTimestamp offsetAndTimestamp = null ;
235
+ if (offsetsForTimes != null && (offsetAndTimestamp = offsetsForTimes .get (topicPartition )) != null ) {
236
+ kafkaConsumer .seek (topicPartition , offsetAndTimestamp .offset ());
237
+ } else {
238
+ kafkaConsumer .seekToBeginning (Collections .singleton (topicPartition ));
239
+ }
240
+ return kafkaConsumer ;
241
+ }
242
+
243
+ private long getTodayMidNightTimeStamp (){
244
+
245
+ TimeZone timeZone = TimeZone .getTimeZone ("America/New_York" );
246
+
247
+ Calendar today = Calendar .getInstance (timeZone );
248
+ today .set (Calendar .HOUR_OF_DAY , 0 );
249
+ today .set (Calendar .MINUTE , 0 );
250
+ today .set (Calendar .SECOND , 0 );
251
+
252
+ long timestampFromMidnight = today .getTimeInMillis ();
253
+
254
+ return timestampFromMidnight ;
255
+ }
256
+
220
257
}
0 commit comments