diff --git a/pom.xml b/pom.xml
index 1d45db2..b73ca6c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,6 +39,11 @@
 			spring-boot-starter-test
 			test
 		
+		
+			org.apache.kafka
+			kafka-clients
+			3.1.1
+		
 	
 
 	
diff --git a/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/config/KafkaConfiguration.java b/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/config/KafkaConfiguration.java
new file mode 100644
index 0000000..b759b73
--- /dev/null
+++ b/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/config/KafkaConfiguration.java
@@ -0,0 +1,69 @@
+package com.techprimers.kafka.springbootkafkaproducerexample.config;
+
+import com.techprimers.kafka.springbootkafkaproducerexample.model.User;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@EnableKafka
+@Configuration
+public class KafkaConfiguration {
+
+    public static final String KAFKA_EXAMPLE = "Kafka_Example";
+    public static final String KAFKA_EXAMPLE_JSON = "Kafka_Example_User";
+
+    @Value("${kafka.bootstrapAddress}")
+    private String bootstrapAddress;
+
+    // Create related kafka topics
+    @Bean
+    public NewTopic testTopic() {
+        return new NewTopic(KAFKA_EXAMPLE, 1, (short) 1);
+    }
+
+    @Bean
+    public NewTopic testJsonTopic() {
+        return new NewTopic(KAFKA_EXAMPLE_JSON, 1, (short) 1);
+    }
+
+    @Bean
+    public KafkaTemplate kafkaSimpleMessageTemplate() {
+        return new KafkaTemplate<>(simpleMessageProducerFactory());
+    }
+
+    // Default Factory is to send String messages
+    private ProducerFactory simpleMessageProducerFactory() {
+        final Map configProps = new HashMap<>();
+        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
+        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+        return new DefaultKafkaProducerFactory<>(configProps);
+    }
+
+    // Object producer factory & template
+    @Bean
+    public KafkaTemplate kafkaObjectTemplate() {
+        return new KafkaTemplate<>(objectProducerFactory());
+    }
+
+    private ProducerFactory objectProducerFactory() {
+        final Map configProps = new HashMap<>();
+        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
+        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+
+        return new DefaultKafkaProducerFactory<>(configProps);
+    }
+}
\ No newline at end of file
diff --git a/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/config/KakfaConfiguration.java b/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/config/KakfaConfiguration.java
deleted file mode 100644
index 38165d4..0000000
--- a/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/config/KakfaConfiguration.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package com.techprimers.kafka.springbootkafkaproducerexample.config;
-
-import com.techprimers.kafka.springbootkafkaproducerexample.model.User;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.core.DefaultKafkaProducerFactory;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.core.ProducerFactory;
-import org.springframework.kafka.support.serializer.JsonSerializer;
-
-import java.util.HashMap;
-import java.util.Map;
-
-@Configuration
-public class KakfaConfiguration {
-
-    @Bean
-    public ProducerFactory producerFactory() {
-        Map config = new HashMap<>();
-
-        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
-        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
-
-        return new DefaultKafkaProducerFactory<>(config);
-    }
-
-
-    @Bean
-    public KafkaTemplate kafkaTemplate() {
-        return new KafkaTemplate<>(producerFactory());
-    }
-
-
-}
diff --git a/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/model/User.java b/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/model/User.java
index 5b66a54..d738dc7 100644
--- a/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/model/User.java
+++ b/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/model/User.java
@@ -6,6 +6,9 @@ public class User {
     private String dept;
     private Long salary;
 
+    public User() {
+    }
+
     public User(String name, String dept, Long salary) {
         this.name = name;
         this.dept = dept;
@@ -35,4 +38,13 @@ public Long getSalary() {
     public void setSalary(Long salary) {
         this.salary = salary;
     }
+
+    @Override
+    public String toString() {
+        return "User{" +
+                "name='" + name +
+                ", dept='" + dept +
+                ", salary=" + salary +
+                '}';
+    }
 }
diff --git a/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/resource/UserResource.java b/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/resource/UserResource.java
index cd477ab..27b8c6b 100644
--- a/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/resource/UserResource.java
+++ b/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/resource/UserResource.java
@@ -1,27 +1,39 @@
 package com.techprimers.kafka.springbootkafkaproducerexample.resource;
 
+import com.techprimers.kafka.springbootkafkaproducerexample.config.KafkaConfiguration;
 import com.techprimers.kafka.springbootkafkaproducerexample.model.User;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 
 @RestController
-@RequestMapping("kafka")
+@RequestMapping("/kafka")
 public class UserResource {
 
     @Autowired
-    private KafkaTemplate kafkaTemplate;
+    @Qualifier("kafkaSimpleMessageTemplate")
+    private KafkaTemplate kafkaSimpleMessageTemplate;
 
-    private static final String TOPIC = "Kafka_Example";
-
-    @GetMapping("/publish/{name}")
-    public String post(@PathVariable("name") final String name) {
+    @Autowired
+    @Qualifier("kafkaObjectTemplate")
+    private KafkaTemplate kafkaObjectTemplate;
 
-        kafkaTemplate.send(TOPIC, new User(name, "Technology", 12000L));
+    @GetMapping("/publish/{message}")
+    public String publishMessage(@PathVariable final String message) {
+        kafkaSimpleMessageTemplate.send(KafkaConfiguration.KAFKA_EXAMPLE, message);
+        return "Message : " + message + " published successfully";
+    }
 
-        return "Published successfully";
+    @PostMapping("/publish")
+    public String publishUser(@RequestBody final User user) {
+        kafkaObjectTemplate.send(KafkaConfiguration.KAFKA_EXAMPLE_JSON, user);
+        return "User : " + user + "published successfully";
     }
-}
+
+}
\ No newline at end of file
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index bafddce..a0c8a98 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -1 +1 @@
-server.port=8081
\ No newline at end of file
+kafka.bootstrapAddress=localhost:9092
\ No newline at end of file