Faust에는 중요한개념 2가지 Processor과 Operation이 있습니다.
Processors
스트림데이터는 끝없는 데이터의 연속적인 흐름입니다. 1개 이상의 Processor들은 callback형태로 동작하게 됩니다. Faust 기반의 application에서 동작하며, function을 사용하여 추가 library등을 조합하여 사용할 수도 있습니다.
def add_default_language(value: MyModel) -> MyModel:
if not value.language:
value.language = 'US'
return value
async def add_client_info(value: MyModel) -> MyModel:
value.client = await get_http_client_info(value.account_id)
return value
s = app.stream(my_topic,
processors=[add_default_language, add_client_info])
위 코드와 같이 my_topic에서 가져온 데이터를 2개의 function에서 처리할 수 있습니다. 위 2개의 function은 async, sync 2가지 방식으로 선언되어 있는데 Faust의 Processor는 2가지 방식 모두 지원합니다.
위와 같은 방식을 통해 function단위는 재사용가능하게 되고 코드의 직관성을 높여줍니다.
Operations
group_by, filter 등의 operation을 통해 다양한 데이터 처리를 할 수 있습니다.
import faust
class Order(faust.Record):
account_id: str
product_id: str
amount: float
price: float
app = faust.App('group-by-example')
orders_topic = app.topic('orders', value_type=Order)
@app.agent(orders_topic)
async def process(orders):
async for order in orders.group_by(Order.account_id):
위 코드는 group_by operation을 사용한 데이터 처리입니다.
@app.agent()
async def process(stream):
async for value in stream.filter(lambda: v > 1000).group_by(...):
...
위 코드는 filter operation을 사용한 데이터 처리입니다. 데이터 처리 전에 실질적으로 원하는 데이터만 뽑아오는데 사용할 수 있습니다.
반응형
'빅데이터' 카테고리의 다른 글
빅데이터에서 사용하는 포멧 종류 및 설명 (0) | 2019.12.19 |
---|---|
스트림 프로세싱 with Faust - Windows (0) | 2019.11.22 |
스트림 프로세싱 with Faust - Table (0) | 2019.11.21 |
스트림 프로세싱 with Faust - kafka consumer/producer (1) | 2019.11.21 |
데이터파이프라인이란 무엇인가? (0) | 2019.10.07 |
AWS에서 공개한 Data validation library 소개 - Deequ (0) | 2019.09.27 |