Faust는 python기반 스트림프로세싱을 위한 library입니다. 아래 코드는 kafka로부터 데이터를 가지고 오는 code입니다.
Producer
from dataclasses import asdict, dataclass
import json
import faust # 모듈 import
@dataclass # 데이터 직렬화를 위한 json dataclass 선언 for faust
class ClickEventSanitized(faust.Record):
timestamp: str
uri: str
number: int
app = faust.App("exercise3", broker="kafka://localhost:9092") # 카프카 브로커
clickevents_topic = app.topic("com.udacity.streams.clickevents", # 전송하고자 하는 topic이름
value_type=ClickEventSanitized)
sanitized_topic = app.topic(
"com.udacity.streams.clickevents.sanitized",
key_type=str,
value_type=ClickEventSanitized,
)
@app.agent(clickevents_topic)
async def clickevent(clickevents):
async for clickevent in clickevents:
sanitized = ClickEventSanitized(
timestamp=clickevent.timestamp,
uri=clickevent.uri,
number=clickevent.number
)
await sanitized_topic.send(key=sanitized.uri, value=sanitized) # 전송!
if __name__ == "__main__":
app.main()
Producer는 json type의 데이터를 'com.udacity.streams.clickevents.sanitized' 라는 topic에 데이터를 전송합니다.
Consumer
from dataclasses import asdict, dataclass
import json
import faust
@dataclass
class ClickEvent(faust.Record):
email: str
timestamp: str
uri: str
number: int
app = faust.App("sample2", broker="kafka://localhost:9092")
clickevents_topic = app.topic(
"com.udacity.streams.clickevents", # 전송 받고자 하는 topic이름
key_type=str,
value_type=ClickEvent,
)
@app.agent(clickevents_topic) # 전달받기 with ClickEvent dataclass(역직렬화)
async def clickevent(clickevents):
async for ce in clickevents:
print(json.dumps(asdict(ce), indent=2))
if __name__ == "__main__":
app.main()
Consumer는 'com.udacity.streams.clickevents' 토픽으로부터 json type의 데이터를 가져옵니다.
Filtering with Consumer & Producer
Learn more or give us feedback
from dataclasses import asdict, dataclass
import json
import faust
@dataclass
class ClickEvent(faust.Record):
email: str
timestamp: str
uri: str
number: int
app = faust.App("exercise4", broker="kafka://localhost:9092")
clickevents_topic = app.topic("com.udacity.streams.clickevents", value_type=ClickEvent)
popular_uris_topic = app.topic(
"com.udacity.streams.clickevents.popular",
key_type=str,
value_type=ClickEvent,
)
@app.agent(clickevents_topic)
async def clickevent(clickevents):
# clickevents 중 number가 100보다 크거나 같은 경우에
# com.udacity.streams.clickevents.popular topic에 전달
async for clickevent in clickevents.filter(lambda x: x.number >= 100):
await popular_uris_topic.send(key=clickevent.uri, value=clickevent)
if __name__ == "__main__":
app.main()
'com.udacity.streams.clickevents' topic 중 number가 100보다 크거나 같으면 'com.udacity.streams.clickevents.popular' 로 보내는 역할을 합니다. consume뒤 일정 범위의 데이터를 filtering하여 produce하는 코드입니다.
반응형
'빅데이터' 카테고리의 다른 글
스트림 프로세싱 with Faust - Windows (0) | 2019.11.22 |
---|---|
스트림 프로세싱 with Faust - Table (0) | 2019.11.21 |
스트림 프로세싱 with Faust - Processors, Operations (0) | 2019.11.21 |
데이터파이프라인이란 무엇인가? (0) | 2019.10.07 |
AWS에서 공개한 Data validation library 소개 - Deequ (0) | 2019.09.27 |
Fluentd로 데이터파이프라인 구축하기 kafka→kafka→s3 (1) | 2019.09.17 |