Tuesday, January 22, 2019

Spring Boot + Kafka Producer API


წინა პოსტში განვიხილეთ, როგორ გავაგზავნოთ და მივიღოთ შეტყობინებები Apache Kafka-ს მეშვეობით command line-ის მეშვეობით. ახლა ცოტა წინ წავიწიოთ და ვნახოთ როგორ შეიძლება ეს ყველაფერი გავაკეთოთ Java-ში, კერძოდ კი Spring Boot აპლიკაციით. ჯერ საწყის ეტაპზე, შეგვიძლია შევქმნათ მხოლოდ Producer აპლიკაცია. ამისთვის შევიდეთ Spring-ის initializer-ზე (https://start.spring.io)  და დავამატოთ შესაბამისი dependency-ები.


Generate Project ღილაკზე დაჭერით გადმოვწეროთ და ამოვაარქივოთ გადმოწერილი zip ფაილი. შემდეგ გავხსნათ ჩვენთვის სასურველ IDE-ში (ამ შემთხვევაში IntelliJ).
საწყის ეტაპზე application.properties ფაილში ჩავწეროთ მისამართი იმ სერვერისა, რომელზეც დაინსტალირებულია Kafka.



ახლა შევქმნათ KafkaProducerConfig კლასი, რომ დავაკონფიგურიროთ Producer. საწყის ეტაპზე გავაგზავნოთ მხოლოდ String ტიპის შეტყობინებები, ხოლო შემდეგში ვნახავთ თუ როგორ გავაგზავნოთ ობიექტები.


ყურადღება მივაქციოთ იმას, რომ KEY_SERIALIZER_CLASS_CONFIG-სა და VALUE_SERIALIZER_CLASS_CONFIG ის მნიშვნელობები ორივე არის StringSerializer.class, ხოლო KafkaTemplate-ც შესაბამისად არის KafkaTemplate<String,String>. bootstrapAddress არის მნიშვნელობა, რომელსაც @Value ანოტაციით ვკითხულობთ application.properties ფაილიდან.

ახლა შევქმნათ REST endpoint. RestController, სადაც მარტივად მივიღებთ HttpRequest-ს და გავაგზავნით შეტყობინებას, რომელსაც Consumer მიიღებს (ამ ეტაპზე Consumer იქნება command line-ით).


თუ ჩვენს Spring Boot აპლიკაციას გავუშვებთ და გამოვიძახებთ შემდეგ ბრძანებას (URL-ს) http://localhost:8080/publish/message?msg=Hello  და პარალელურად Consumer დასტარტული გვექნება სერვერზე (sudo kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka_topic  --from-beginning) , რომელიც მოუსმენს kafka_topic TOPIC-ს, მაშინ Command line-ში მივიღებთ შემდეგ შეტყობინებას


ახლა ვნახოთ როგორ შეიძლება Spring Boot აპლიკაციით გავაგზავნოთ Java ობიექტი. ამისთვის საჭიროა მცირედი ცვლილებების შეტანა კონფიგურაციაში. VALUE_SERIALIZER_CLASS_CONFIG-ის მნიშვნელობა უნდა გავხადოთ JsonSerializer.class და ProducerFactory უნდა იყოს ProducerFactory<String,User> , თუ გვინდა User კლასის ტიპის ობიექტების გაგზავნა. შესაბამისად KafkaTemplate-ც უნდა იყოს KafkaTemplate<String,User> ტიპის.


რა იცვლება Producer კლასში (RestController-ში) ? ამ ეტაპისთვის, შევქმნათ User კლასის ობიექტი მეთოდშივე, რომლითაც გვინდა გაგზავნა. User კლასს აქვს მხოლოდ ორივე ველი id და username და შესაბამისად getter და setter მეთოდები.



თუ დავარესტარტებთ ჩვენს აპლიკაციას და გამოვიძახებთ შემდეგ URL-
და Consumer API ძველებურად დასტარტულია და უსმენს kafka_topic TOPIC-ს, მაშინ command line-ში უნდა მივიღოთ user ობიექტის შესაბამისი JSON String.









Monday, January 21, 2019

Apache Kafka-ს ინსტალაცია Ubuntu-ზე და Publish/Consume მაგალითი


რა არის Apache Kafka ?

Apache Kafka არის streaming პლათფორმა რომელსაც შეუძლია დაამუშავოს ტრილიონობით event-ები დღის განმავლობაში. თავდაპირველად ჩაფიქრებული იყო როგორც messaging queue. Kafka დაფუძნებულია distributed commit log-ის აბსტრაქციაზე. მას შემდეგ რაც შეიქმნა და open source გახდა LinkedIn-ის მიერ 2011 წელს, Kafka messaging queue-დან გარდაიქმნა სრულფასოვან streaming პლათფორმად.


Install Apache Kafka on Ubuntu
პირველ რიგში უნდა მოვახდინოთ package reposiotry cache-ის update შემდეგი ბრძანებით.
sudo apt-get update
Apache Kafka დამოკიდებულია Java-ზე, ამიტომ წინაპირობაა რომ სერვერზე, რომელზეც გვინდა Kafka-ს დაყენება დაინსტალირებული იყოს JDK. თუ უკვე არ გვაქვს დაინსტალირებული, ამის გაკეთება შეგვიძლია შემდეგი ბრძანებით:
sudo apt-get install openjdk-8-jdk
შემდეგი ეტაპი არის zookeeper-ის დაინსტალირება.
sudo apt-get install zookeeperd
იმის შესამოწმებლად დაინსტალირდა თუ არა zookeeper ჩვენს სერვერზე, შეგვიძლია გავუშვათ შემდეგი ბრძანება:
sudo systemctl status zookeeper


თუ რაიმე მიზეზის გამო, zookeeper არ არის started მდგომარეობაში, მაშინ შეგიძლიათ გაუშვათ შემდეგი ბრძანება:

sudo systemctl start zookeeper
და ასევე zookeeper რომ system startup-ზე მივაბათ საჭიროა გავუშვათ შემდეგი ბრძანება:
sudo systemctl enable zookeeper


ახლა უკვე დადგა დრო გადმოვწეროთ Apache Kafka. ამ კონკრეტული შემთხვევისთვის, შეგვიძლია გადმოვწეროთ 1.1.1 ვერსია.
წინა ბრძანება გადმოწერს Apache Kafka-ს კომპრესირებულ არქივს.
ახლა შეგიძლიათ შექმნათ Kafka ფოლდერი /opt ფოლდერში.
mkdir opt
mkdir opt/Kafka
ახლა კი შეგვიძლია extract გავუკეთოთ Kafka არქივს /opt/Kafka ფოლდერში
sudo tar xvzf kafka_2.12-1.1.1.tgz -C /opt/Kafka
წინა ოპერაციის წარმატებულობა შეგიძლია შეამოწმოთ შემდეგი ბრძანებით
ls /root/opt/kafka

ახლა გახსენით ~/.profile ფაილი
sudo nano ~/.profile
და ბოლოში დაამატეთ
export KAFKA_HOME=”/root/opt/Kafka/kafka_2.12-1.1.1”
export PATH=”$PATH:${KAFKA_HOME}/bin”
და შეინახეთ ფაილი.

ახლა გახსენით ~/.bashrc ფაილი
sudo nano ~/.bashrc
და ბოლოში დაამატეთ
Alias sudo=’sudo env PATH=”$PATH”’
და შეინახეთ.

შემდეგ კი დაარესტარტეთ სერვერი
sudo reboot

სისტემის ახლიდან ჩატვირთვის შემდეგ, შეამოწმეთ KAFKA_HOME
echo $KAFKA_HOME

შემდეგი ეტაპი არის ჩვენთვის „სიცოცხლის გამარტივება“, გავაკეთოთ სიმბოლური ლინკი Kafka server.properties ფაილის  შემდეგი ბრძანებით
sudo ln -s $KAFKA_HOME/config/server.properties /etc/kafka.properties

შემდეგი ეტაპი უკვე არის გატესტვა
შემდეგი ბრძანებით შეგვიძლია Kafka-ს სერვერი „დავქოქოთ“ ანუ start  გავაკეთოთ
sudo kafka-server-start.sh /etc/kafka.properties
როცა უკვე Kafka Server დაისტარტება შეგვიძლია შევქმნათ testing topic-ი. ამისთვის გავხსნათ ახალი command line tab და გავუშვათ შემდეგი ბრძანება
sudo kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testing
შედეგი უნდა იყოს შემდეგი


ახლა გავუშვათ Producer API,რომ შევძლოთ შეტყობინების გაგზავნა
sudo kafka-console-producer.sh --broker-list localhost:9092 --topic testing
როცა Enter-ს დავაწვებით გამოვა ინტერაქტიული ფანჯარა (>) იქნება, სადაც შეგვიძლია ავკრიფოთ შეტყობინება და Enter-ზე დაჭერით გავაგზავნოთ, მაგალითად


შემდეგ უნდა გავუშვათ Consumer API, რომ გაგზავნილი შეტყობინებები მივიღოთ. შეტყობინების მიღება ხდება მომენტალურად, ხოლო თუ გაგზავნი შემთხვევაში Consumer API არ არის დასტარტული, მაშინ დასტარტვამდე გაგზანილ ყველა შეტყობინებას ერთიანად მივიღებთ.
Consumer API-ც ახალ command line tab-ში დავსტარტოთ
sudo kafka-console-consumer.sh --zookeeper localhost:2181 --topic testing --from-beginning
ახლა შევამოწმოთ ჩვენი გაგზავნილი შეტყობინება თუ მივიღეთ, გადავიდეთ Consumer-ის TAB-ში და ვნახავთ რომ შეტყობინება მოვიდა.