2022년/Developement

nifi+kakfa+hdfs 연결해보기

위지원 2022. 8. 10. 17:52

2022.08.08 - [✎ 2022년/Developement] - Apache Nifi란?

2022.08.09 - [✎ 2022년/Developement] - Nifi 설치 및 실행, 예제 및 kafka, hdfs연결해보기

 

위에서 배운 것들을 활용해서 모두 합치려고하다가 How to integrate Apache Nifi with Kafka 이라는 글을 보고 나도 API를 활용해 해당 데이터를 nifi를 이용해서 kafka로 데이터를 수집 및 처리하고 hdfs에 저장해야겠다 라는 생각을 했다. 전체 흐름도는 8. NiFi Kafka to HDFS.md 도 참고하였다.

 

나는 요즘 날씨가 엉망이라서 날씨 데이터 API를 사용하려한다. 기상청_단기예보 ((구)_동네예보) 조회서비스 API를 사용하였다. API를 통해 데이터를 받아오는 Python 코드는OpenAPI 활용 - 날씨 정보를 사용한 맛집 추천 프로젝트를 참고하였다.

 

참고한 python 코드를 실행해본 결과 무리없이 데이터를 얻어올 수 있었다. 지금은 데이터 가공이나 변환 등이 필요없으므로 그냥 이 데이터 그대로 진행할 것이다.

 

 

python에서 hdfs 접근하는 방법은 다음과 같다. 

#pip install hdfs
>>> from hdfs import InsecureClient
>>> client_hdfs = InsecureClient('http://하둡 서버 IP주소:9870', user='weejw')
>>> client_hdfs.list('/user')
['weejw_hadoop']

 

OpenAPI 활용 - 날씨 정보를 사용한 맛집 추천 프로젝트에서 참고한 코드에서 hdfs를 접근하는 코드를 추가로 작성하여 업로드한다.  

items = res.json().get('response').get('body').get('items')

from hdfs import InsecureClient
client_hdfs = InsecureClient('http://192.168.1.33:9870', user='weejw')

now = datetime.datetime.now().strftime('%Y-%m-%d')
path = f'/user/weejw_hadoop/get_data_{now}'

client_hdfs.write(path, json.dumps(items), overwrite=True, encoding="UTF-8")

 

 

굳이? 왜이렇게 하느냐라고 생각이 들겠지마는,, 그냥 공부하는거니까 다 써보려고 하는 것이다 코쓱.. 😏 찾다보니까 python에서 바로 publish도 된다. kafka-python 라이브러리를 이용해 아래처럼 코드 간단하게 짜보고 진행해본결과 문제없이 잘된다. 그래도 공부를 하기위해 HDFS에 저장해서 꺼내서! publish를 해보자 크크

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
from pprint import pprint
pprint(items)
future = producer.send('weejw',json.dumps(items, indent=2).encode())
res = future.get(timeout=60)
producer.flush()

 

 nifi data flow는 간단하게 짰다 ㅋㅋ nifi 쓸 수록 물건이다 캬

 

전혀 무리없이 진행된다 😏👍