[Kafka 공부] #3
Kafka랑 Python 연동하기
연동 전 JSON 데이터 처리해보기
파이썬으로 데이터 처리하기 위해서 JSON이라는 데이터 포맷을 알아야한다!!
source > Kafka Server > target 처럼 데이터가 이동할 때,
JSON 포맷으로 변환해서 사용할 것이기 때문이다.
이렇게 JSON 포맷으로 변환해서 보내고, JSON 포맷의 데이터를 받아서 컴퓨터가 필요한 데이터로 바꾸는 것을 직렬화/역직렬화라고 한다.
- JSON 파일 포맷 (참조 사이트: json.org/jason-ko.html) > json규격을 갖추고 있다.
1) object { 문자열 : value, 문자열 : value } * 공백은 무시가능
2) array [vaule, value, value]
3) value에 올수 있는 값: 문자열, 숫자, object, array, true/false, null
등등
JSON 값 처리해보기
VScode 열기 : json 연습해보기
1-1. simple.json 파일 만들기
{
"weather": [
{
"id":800,
"main": "Clear",
"description":"clear sky"
}
]
}
1-2. demo_json1.py
#1. 파일에서 json 값 읽어오기
import json
with open("simple.json","r") as jf:
sample_json=json.load(jf) #파일로 부터 데이터를 읽어와서 json형태로 만들겠다.
print(sample_json)
결과 : {'weather': [{'id': 800, 'main': 'Clear', 'description': 'clear sky'}]}
2. demo_json2.py
#파일로 존재하지 않는 것 읽어오기
#2. 메모리에서 읽어오기
import json
sample = {
"weather": [
{
"id":800,
"main": "Clear",
"description":"clear sky"
}
]
}
#메모리에 있는 값을 처리해주는 함수가 dumps
#들여쓰기 indent=2
in_memory_json=json.dumps(sample, indent=2)
loaded_json=json.loads(in_memory_json)
print(loaded_json)
#3. json 값 처리하기
print(loaded_json['weather'])
print(loaded_json['weather'][0])
print(loaded_json['weather'][0]['description'])
결과 :
{'weather': [{'id': 800, 'main': 'Clear', 'description': 'clear sky'}]}
[{'id': 800, 'main': 'Clear', 'description': 'clear sky'}]
{'id': 800, 'main': 'Clear', 'description': 'clear sky'}
clear sky
3-1. demo_json3.py
#4. 메모리의 값 파일에 저장하기
import json
sample = {
"weather": [
{
"id":800,
"main": "Clear",
"description":"clear sky"
}
]
}
with open('simple2.json','w') as jf:
json.dump(sample, jf)
print("Json file exported")
결과 : Json file exported 가 실행되면서 simple2.json에 값이 저장
3-2. simple2.json 파일 생성
{"weather": [{"id": 800, "main": "Clear", "description": "clear sky"}]}
4. demo_json4.py
#5. indent 유무
import json
sample = {
"weather": [
{
"id":800,
"main": "Clear",
"description":"clear sky"
}
]
}
in_memory_json=json.dumps(sample)
print(in_memory_json)
in_memory_json2=json.dumps(sample,indent=2)
print(in_memory_json2)
결과 :
{"weather": [{"id": 800, "main": "Clear", "description": "clear sky"}]}
{
"weather": [
{
"id": 800,
"main": "Clear",
"description": "clear sky"
}
]
}
파이썬이랑 Kafka 연동하기
목표 : Json 포맷의 파이썬 메시지를 kafka topic에 전달하기(producer), 받기(consumer)
이때,이미 Zookeeper 서버, Kafka 서버 모두 구동되어 있어야 한다.
producer > Kafka cluster(zookeeper, broker) > consumer
[ producer 만들기]
1. producer로 10개의 값 전달하기.
1) Python 코드 만들고 실행하기
from kafka import KafkaProducer
from json import dumps
producer=KafkaProducer(acks=0, #메시지 받은 사람이 메시지를 잘 받았는지 체크하는 옵션 (0은 그냥 보내기만 한다. 확인x)
compression_type='gzip', #메시지 전달할 때 압축
bootstrap_servers=['127.0.0.1:9092'], #전달하고자하는 카프카 브로커의 위치
value_serializer=lambda x: dumps(x).encode('utf-8') #직렬화 : 데이터 전송을 위해 byte단위로 바꿔주는 작업 :
#dumps 함수이용. dump : json 값을 메모리에 올려준다. encode를 통해서 올린다.
#x가 있으면, x를 dumps로 바꾸고 encode 한다.
)
for i in range(10): #10개의 값을
data={'name':'Dowon-'+str(i)}
producer.send('test-2021-09-05',value=data)
producer.flush() # 비우는 작업.
print("DONE")
2) Topic 리스트 확인하기
3) Topic에 값이 들어 왔는지 확인하기
2. producer에게 1000개의 값 전달하기.
1) 파이썬 코드 작성 및 실행하기
[Consumer 만들기]
3. consumer에 있는 값 가져오기
1) 파이썬 코드 작성 및 실행하기
from kafka import KafkaConsumer
from json import loads #읽어오는 함수
import time
import datetime
consumer=KafkaConsumer("test-2021-09-05", #읽어올 토픽의 이름 필요.
bootstrap_servers=['127.0.0.1:9092'], # 어떤 서버에서 읽어 올지 지정
auto_offset_reset="earliest", # 어디서부터 값을 읽어올지 (earlest 가장 처음 latest는 가장 최근)
enable_auto_commit=True, # 완료되었을 떄 문자 전송
group_id='my-group', # 그룹핑하여 토픽 지정할 수 있다 > 같은 컨슈머로 작업
value_deserializer=lambda x: loads(x.decode('utf-8')), # 역직렬화 ( 받을 떄 ) ; 메모리에서 읽어오므로 loads라는 함수를 이용한다. // 직렬화 (보낼 떄)
consumer_timeout_ms=1000 # 1000초 이후에 메시지가 오지 않으면 없는 것으로 취급.
)
start = time.time() # 현재 시간
print("START= ", start)
for message in consumer:
topic = message.topic
partition=message.partition
offset=message.offset
value=message.value
timestamp=message.timestamp
datetimeobj=datetime.datetime.fromtimestamp(timestamp/10000)
print("Topic:{}, partition:{}, offset:{}, value:{}, datetimeobj:{}".format(topic,partition,offset,value,datetimeobj))
print("Elapsed time= ",(time.time()-start)) # 걸리는 시간
파이썬 코드를 실행하면, Topic에 있는 데이터를 가지고와서 아래와 같이 출력된다.