[Linux] Kafka
카프카를 이용한 데이터 분산
Kafka
용어
- Topic
- Partiton
- Producer
- Consumer
- Broker
- Zookeeper
- Consumer Group
- Replication
설치
가상 OS 3개 설치
- broker : 192.168.197.10
- producer : 192.168.197.20
-
consumer : 92.168.197.30
- 방화벽 해제(모두다)
systemctl stop firewalld systemctl disable firewalld setenforce 0
- jdk설치(모두다)
yum -y install java-1.8.0-openjdk-devel.x86_64
- Kafka 설치(모두다)
yum install -y wget wget https://archive.apache.org/dist/kafka/3.1.0/kafka_2.13-3.1.0.tgz tar -xzvf kafka_2.13-3.1.0.tgz mv kafka_2.13-3.1.0 /opt/kafka
- 호스트 이름설정
- 각 컴퓨터
vi /etc/hostname #각자 이름으로 바꾸고 재부팅
- 각 컴퓨터
- 호스트 끼리 연결하기
vi /etc/hosts
192.168.197.10 broker 192.168.197.20 producer 192.168.197.30 consumer
- 확인
- broker ```shell #broker 터미널을 두개 만들어 동시에 해야한다. /opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties # 쥬키퍼 실행
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties # 카프카 실행
![image](/assets/img/image/Pub_Sub패턴(kafka)/1.png)<br/>
다음과 같이 화면이 출력되면 카프카 서버와 주키퍼 서버가 연결이 성공적으로 된것이다.<br/>
- 토픽 생성하기
```shell
/opt/kafka/bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server broker:9092 # 토픽생성
/opt/kafka/bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server broker:9092 # 토픽 클러스터 확인
- 메세지 보내보기(producer)
/opt/kafka/bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server broker:9092 # producer의 콘솔이 하나 생성된다. 아무 거나 적고 엔터를 눌러 메세지를 카프카에 보내본다.
- 메세지 확인하기(consumer)
/opt/kafka/bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server broker:9092
다음과 같이 producer에 입력했던 메세지들을 확인 할 수 있다.
파이썬과 연결하기
pip install kafka-python
main.py
from kafka import KafkaProducer
import time
producer = KafkaProducer(
bootstrap_servers=['192.168.197.10:9092']
)
start = time.time()
for i in range(100):
producer.send('test', value="test".encode("utf-8"))
producer.flush()
print("elapsed :", time.time() - start)
- 카프카 설정
vi /opt/kafka/config/server.properties
advertised.listeners=PLAINTEXT://192.168.197.10:9092 # 내 아이피로 수정 후 재시작
- 실행
장고 프로젝트와 연결하기
웹 프로젝트 실행
pip install kafka-python
board/view/py
추가
from kafka import KafkaProducer
import time
producer = KafkaProducer(
bootstrap_servers=['192.168.197.10:9092']
)
@login_required(login_url='/accounts/login')
def like(request, bid):
post = Post.objects.get(id=bid)
user = request.user
producer.send('test', value="test".encode("utf-8")) #추가
producer.flush() #추가
if post.like.filter(id=user.id).exists():
post.like.remove(user)
return JsonResponse({'message': 'deleted', 'like_cnt': post.like.count()})
else:
post.like.add(user)
return JsonResponse({'message': 'added', 'like_cnt': post.like.count()})
- 확인
카프카 프로젝트에서consumer.py
생성 ```python from kafka import KafkaConsumer from json import loads
consumer = KafkaConsumer( ‘test’, bootstrap_servers=[‘192.168.197.10:9092’] )
print(‘[begin] get consumer list’)
for message in consumer: print(“Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s” % ( message.topic, message.partition, message.offset, message.key, message.value.decode(‘utf-8’) ))
print(‘[end] get consumer list’)
장고 프로젝트 실행 후 좋아요를 누르면<br/>
![image](/assets/img/image/Pub_Sub패턴(kafka)/7.png)<br/>
다음과 같이 `consumer.py` 콘솔에 문구가 전송된다.<br/>
### JSON형태로 받기
장고 프로젝트 `board/view/py` 변경
```python
from kafka import KafkaProducer
from json import dumps
import time
producer = KafkaProducer(
acks=0,
compression_type='gzip',
bootstrap_servers=['192.168.197.10:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8')
) # 추가
@login_required(login_url='/accounts/login')
def like(request, bid):
post = Post.objects.get(id=bid)
user = request.user
data = data = { 'user' : user.id, 'post_id' : post.id } # 추가
producer.send('logging.post.like', value= data)# 추가
producer.flush()# 추가
if post.like.filter(id=user.id).exists():
post.like.remove(user)
return JsonResponse({'message': 'deleted', 'like_cnt': post.like.count()})
else:
post.like.add(user)
return JsonResponse({'message': 'added', 'like_cnt': post.like.count()})
- 확인
카프카 프로젝트에서consumer.py
변경
```python from kafka import KafkaConsumer from json import loads
consumer = KafkaConsumer( ‘logging.post.like’, # 변경 후 실행 bootstrap_servers=[‘192.168.197.10:9092’] )
print(‘[begin] get consumer list’)
for message in consumer: print(“Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s” % ( message.topic, message.partition, message.offset, message.key, message.value.decode(‘utf-8’) ))
print(‘[end] get consumer list’)
```