წინა პოსტში განვიხილეთ, როგორ გავაგზავნოთ და მივიღოთ
შეტყობინებები Apache Kafka-ს მეშვეობით command line-ის
მეშვეობით. ახლა ცოტა წინ წავიწიოთ და ვნახოთ როგორ შეიძლება ეს ყველაფერი გავაკეთოთ
Java-ში,
კერძოდ კი Spring Boot აპლიკაციით. ჯერ საწყის ეტაპზე, შეგვიძლია შევქმნათ მხოლოდ Producer აპლიკაცია. ამისთვის შევიდეთ Spring-ის initializer-ზე (https://start.spring.io) და დავამატოთ შესაბამისი dependency-ები.
Generate Project ღილაკზე დაჭერით გადმოვწეროთ და ამოვაარქივოთ
გადმოწერილი zip ფაილი. შემდეგ გავხსნათ ჩვენთვის სასურველ IDE-ში (ამ შემთხვევაში
IntelliJ).
საწყის ეტაპზე application.properties
ფაილში ჩავწეროთ მისამართი იმ სერვერისა, რომელზეც დაინსტალირებულია Kafka.
ახლა შევქმნათ KafkaProducerConfig კლასი,
რომ დავაკონფიგურიროთ Producer. საწყის ეტაპზე გავაგზავნოთ მხოლოდ String ტიპის
შეტყობინებები, ხოლო შემდეგში ვნახავთ თუ როგორ გავაგზავნოთ ობიექტები.
ყურადღება მივაქციოთ იმას, რომ KEY_SERIALIZER_CLASS_CONFIG-სა
და VALUE_SERIALIZER_CLASS_CONFIG ის მნიშვნელობები ორივე არის StringSerializer.class,
ხოლო KafkaTemplate-ც შესაბამისად არის KafkaTemplate<String,String>.
bootstrapAddress არის მნიშვნელობა, რომელსაც @Value ანოტაციით ვკითხულობთ application.properties
ფაილიდან.
ახლა შევქმნათ REST endpoint. RestController, სადაც
მარტივად მივიღებთ HttpRequest-ს და გავაგზავნით შეტყობინებას, რომელსაც Consumer მიიღებს
(ამ ეტაპზე Consumer იქნება command line-ით).
თუ ჩვენს Spring Boot აპლიკაციას გავუშვებთ და გამოვიძახებთ
შემდეგ ბრძანებას (URL-ს) http://localhost:8080/publish/message?msg=Hello და პარალელურად Consumer დასტარტული გვექნება სერვერზე
(sudo kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka_topic --from-beginning) , რომელიც მოუსმენს kafka_topic
TOPIC-ს, მაშინ Command line-ში მივიღებთ შემდეგ შეტყობინებას
ახლა ვნახოთ როგორ შეიძლება Spring Boot აპლიკაციით
გავაგზავნოთ Java ობიექტი. ამისთვის საჭიროა მცირედი ცვლილებების შეტანა კონფიგურაციაში.
VALUE_SERIALIZER_CLASS_CONFIG-ის მნიშვნელობა უნდა გავხადოთ JsonSerializer.class
და ProducerFactory უნდა იყოს ProducerFactory<String,User> , თუ გვინდა User
კლასის ტიპის ობიექტების გაგზავნა. შესაბამისად KafkaTemplate-ც უნდა იყოს KafkaTemplate<String,User>
ტიპის.
რა იცვლება Producer კლასში (RestController-ში) ? ამ
ეტაპისთვის, შევქმნათ User კლასის ობიექტი მეთოდშივე, რომლითაც გვინდა გაგზავნა. User
კლასს აქვს მხოლოდ ორივე ველი id და username და
შესაბამისად getter და setter მეთოდები.
თუ დავარესტარტებთ ჩვენს აპლიკაციას და გამოვიძახებთ
შემდეგ URL-ს
და Consumer API ძველებურად დასტარტულია და უსმენს kafka_topic
TOPIC-ს, მაშინ command line-ში უნდა მივიღოთ user ობიექტის შესაბამისი JSON
String.