본문 바로가기

빅데이터

스트림 프로세싱 with Faust - Processors, Operations

Faust에는 중요한개념 2가지 Processor과 Operation이 있습니다. 

 

Processors

스트림데이터는 끝없는 데이터의 연속적인 흐름입니다. 1개 이상의 Processor들은 callback형태로 동작하게 됩니다. Faust 기반의 application에서 동작하며, function을 사용하여 추가 library등을 조합하여 사용할 수도 있습니다.

def add_default_language(value: MyModel) -> MyModel:
    if not value.language:
        value.language = 'US'
    return value

async def add_client_info(value: MyModel) -> MyModel:
    value.client = await get_http_client_info(value.account_id)
    return value

s = app.stream(my_topic,
               processors=[add_default_language, add_client_info])

위 코드와 같이 my_topic에서 가져온 데이터를 2개의 function에서 처리할 수 있습니다. 위 2개의 function은 async, sync 2가지 방식으로 선언되어 있는데 Faust의 Processor는 2가지 방식 모두 지원합니다.

 

위와 같은 방식을 통해 function단위는 재사용가능하게 되고 코드의 직관성을 높여줍니다.

Operations

group_by, filter 등의 operation을 통해 다양한 데이터 처리를 할 수 있습니다. 

import faust

class Order(faust.Record):
    account_id: str
    product_id: str
    amount: float
    price: float

app = faust.App('group-by-example')
orders_topic = app.topic('orders', value_type=Order)

@app.agent(orders_topic)
async def process(orders):
    async for order in orders.group_by(Order.account_id):

위 코드는 group_by operation을 사용한 데이터 처리입니다. 

@app.agent()
async def process(stream):
    async for value in stream.filter(lambda: v > 1000).group_by(...):
        ...

위 코드는 filter operation을 사용한 데이터 처리입니다. 데이터 처리 전에 실질적으로 원하는 데이터만 뽑아오는데 사용할 수 있습니다.

반응형