Tuesday, January 22, 2019

Spring Boot + Kafka Producer API


წინა პოსტში განვიხილეთ, როგორ გავაგზავნოთ და მივიღოთ შეტყობინებები Apache Kafka-ს მეშვეობით command line-ის მეშვეობით. ახლა ცოტა წინ წავიწიოთ და ვნახოთ როგორ შეიძლება ეს ყველაფერი გავაკეთოთ Java-ში, კერძოდ კი Spring Boot აპლიკაციით. ჯერ საწყის ეტაპზე, შეგვიძლია შევქმნათ მხოლოდ Producer აპლიკაცია. ამისთვის შევიდეთ Spring-ის initializer-ზე (https://start.spring.io)  და დავამატოთ შესაბამისი dependency-ები.


Generate Project ღილაკზე დაჭერით გადმოვწეროთ და ამოვაარქივოთ გადმოწერილი zip ფაილი. შემდეგ გავხსნათ ჩვენთვის სასურველ IDE-ში (ამ შემთხვევაში IntelliJ).
საწყის ეტაპზე application.properties ფაილში ჩავწეროთ მისამართი იმ სერვერისა, რომელზეც დაინსტალირებულია Kafka.



ახლა შევქმნათ KafkaProducerConfig კლასი, რომ დავაკონფიგურიროთ Producer. საწყის ეტაპზე გავაგზავნოთ მხოლოდ String ტიპის შეტყობინებები, ხოლო შემდეგში ვნახავთ თუ როგორ გავაგზავნოთ ობიექტები.


ყურადღება მივაქციოთ იმას, რომ KEY_SERIALIZER_CLASS_CONFIG-სა და VALUE_SERIALIZER_CLASS_CONFIG ის მნიშვნელობები ორივე არის StringSerializer.class, ხოლო KafkaTemplate-ც შესაბამისად არის KafkaTemplate<String,String>. bootstrapAddress არის მნიშვნელობა, რომელსაც @Value ანოტაციით ვკითხულობთ application.properties ფაილიდან.

ახლა შევქმნათ REST endpoint. RestController, სადაც მარტივად მივიღებთ HttpRequest-ს და გავაგზავნით შეტყობინებას, რომელსაც Consumer მიიღებს (ამ ეტაპზე Consumer იქნება command line-ით).


თუ ჩვენს Spring Boot აპლიკაციას გავუშვებთ და გამოვიძახებთ შემდეგ ბრძანებას (URL-ს) http://localhost:8080/publish/message?msg=Hello  და პარალელურად Consumer დასტარტული გვექნება სერვერზე (sudo kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka_topic  --from-beginning) , რომელიც მოუსმენს kafka_topic TOPIC-ს, მაშინ Command line-ში მივიღებთ შემდეგ შეტყობინებას


ახლა ვნახოთ როგორ შეიძლება Spring Boot აპლიკაციით გავაგზავნოთ Java ობიექტი. ამისთვის საჭიროა მცირედი ცვლილებების შეტანა კონფიგურაციაში. VALUE_SERIALIZER_CLASS_CONFIG-ის მნიშვნელობა უნდა გავხადოთ JsonSerializer.class და ProducerFactory უნდა იყოს ProducerFactory<String,User> , თუ გვინდა User კლასის ტიპის ობიექტების გაგზავნა. შესაბამისად KafkaTemplate-ც უნდა იყოს KafkaTemplate<String,User> ტიპის.


რა იცვლება Producer კლასში (RestController-ში) ? ამ ეტაპისთვის, შევქმნათ User კლასის ობიექტი მეთოდშივე, რომლითაც გვინდა გაგზავნა. User კლასს აქვს მხოლოდ ორივე ველი id და username და შესაბამისად getter და setter მეთოდები.



თუ დავარესტარტებთ ჩვენს აპლიკაციას და გამოვიძახებთ შემდეგ URL-
და Consumer API ძველებურად დასტარტულია და უსმენს kafka_topic TOPIC-ს, მაშინ command line-ში უნდა მივიღოთ user ობიექტის შესაბამისი JSON String.









No comments:

Post a Comment