본문 바로가기

빅데이터/Kafka

kafka spark structured stream 예제코드 및 실행

카프카와 연동하는 스파크 스트림은 2가지 방식으로 구현할 수 있습니다.

 

1. 구조적 스트림(DataFrame readStream)

2. 스파크 스트리밍(DStream)

 

그 중 구조적 스트림 방식을 gradle + scala로 구현하기 위한 방법을 설명합니다.

 

build.gradle

plugins {
    id 'idea'
    id 'java'
    id 'scala'
}

group 'com.example'
version '1.0-SNAPSHOT'

sourceCompatibility = 1.8
targetCompatibility = 1.8

repositories {
    mavenCentral()
    jcenter()
}

ext {
    scalaVersion = '2.12.14'
    sparkVersion = '3.1.2'
}

dependencies {
    compile "org.apache.spark:spark-core_2.12:$sparkVersion"
    compile "org.apache.spark:spark-sql_2.12:$sparkVersion"
    compile "org.apache.spark:spark-sql-kafka-0-10_2.12:$sparkVersion"

    compile "org.scala-lang:scala-library:$scalaVersion"
}

여기서 중요한점은 스칼라와 스파크 호환성을 따져보고 작성해야 한다는 것입니다. 스파크 3.1.2는 스칼라 2.12와 호환됩니다.

 

Main.java

package com.example

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger.ProcessingTime

object Main {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .appName("Spark Structured Streaming Example")
      .master("local[4]")
      .getOrCreate()

    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "test")
      .option("kafka.group.id", "test-group")
      .load()


    import spark.implicits._
    val values = df.select($"value".cast("STRING").as("json"))

    values.writeStream
      .trigger(ProcessingTime("5 seconds"))
      .outputMode("append")
      .format("console")
      .start()
      .awaitTermination()
  }
}

localhost의 카프카로부터 test토픽의 레코드를 가져와서 메시지 값을 json형태로 읽고 지속적으로 프린팅하도록 동작합니다. 컨슈머 그룹은 test-group 입니다.

 

테스트를 위해 스파크를 RUN하고, kafka-console-producer.sh 로 데이터를 추가합니다.

$ ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
> {"hello":"world"}
> {"hello":"world"}

애플리케이션 로그를 확인하면 마이크로 배치형태로 데이터를 가져와 처리하는 모습을 볼 수 있습니다.

21/06/04 00:18:56 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "53309be6-eacb-4c6f-9053-4e5e9d68e975",
  "runId" : "de5e7c32-f6d0-44ae-9dbd-7efb17b093df",
  "name" : null,
  "timestamp" : "2021-06-04T00:18:55.003Z",
  "batchId" : 1,
  "numInputRows" : 5,
  "inputRowsPerSecond" : 0.9994003597841296,
  "processedRowsPerSecond" : 4.317789291882557,
  "durationMs" : {
    "addBatch" : 1072,
    "getBatch" : 0,
    "latestOffset" : 19,
    "queryPlanning" : 11,
    "triggerExecution" : 1158,
    "walCommit" : 32
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[test]]",
    "startOffset" : {
      "test" : {
        "2" : 3501800,
        "1" : 3511881,
        "3" : 3618022,
        "0" : 4136345
      }
    },
    "endOffset" : {
      "test" : {
        "2" : 3501800,
        "1" : 3511881,
        "3" : 3618022,
        "0" : 4136350
      }
    },
    "numInputRows" : 2,
    "inputRowsPerSecond" : 0.9994003597841296,
    "processedRowsPerSecond" : 4.317789291882557
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@60ca6fef",
    "numOutputRows" : 2
  }
}
-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+
|                json|
+--------------------+
|{"hello":"world"}   |
|{"hello":"world"}   |
+--------------------+

흥미롭게도 스파크 구조적 스트림을 사용하면 마이크로 배치로 가져오는 토픽의 각 파티션별 시작, 마지막 오프셋이 로그로 남습니다. 그리고 몇개의 row를 가져와서 처리했는지? 시간은 얼마나 걸렸는지 확인할 수 있습니다.

 

반응형