본문 바로가기

빅데이터

아파치 플링크는 2.0 버전부터는 더이상 scala API를 지원하지 않습니다.

 

 

https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support

 

FLIP-265 Deprecate and remove Scala API support - Apache Flink - Apache Software Foundation

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast). Motivation Apache Flink offers APIs for building your Flink application using the DataStream and Table API. These are offered in Java an

cwiki.apache.org

플링크1.15버전까지는 스칼라와 강결합되어 있는 코드를 제공했습니다. 그러나 점차 스칼라 관련 기여가 적어지고 있는 상황에서 더 이상 스칼라에 대한 직접적인 api 지원이 의미 없다고 생각되어서 더 이상 지원을 하지 않는 방향의 의견이 나왔습니다.

 

이에 따라 플링크 PMC(Project Management Committee)의 투표를 받았고, 거의 만장일치로 승인이 떨어졌습니다. vote 관련 링크는 다음에서 볼 수 있습니다. https://lists.apache.org/thread/qfz4opcbc2p59fhmymncxyzxb70cn098

 

https://lists.apache.org/thread/qfz4opcbc2p59fhmymncxyzxb70cn098

 

lists.apache.org

 

플링크 프로젝트 운영 규칙(Flink Bylaws)에 따라 PMC는 이런 사안에 대해 투표를 해야만 합니다. 투표에 대한 상세한 사항은 https://cwiki.apache.org/confluence/display/FLINK/Flink+Bylaws 에서 확인할 수 있습니다.

 

무엇이 바뀌나?

가장 크게 바뀌는 부분은 직접적인 scala api가 없어진다는 부분입니다. scala는 tuple, 간결한 문법, collection 등 다양한 장점을 가진 언어이고 이를 지원함으로써 스칼라를 사용하는 유저들은 플링크를 더욱 scala 친화적으로 사용할 수 있었습니다. 동일한 jvm 라이브러리라고 하더라도 언어가 다르면 지저분해지기 때문에 이런 부분은 스칼라 유저들에게 친숙하게 다가왔습니다. 스파크를 사용하던 스칼라 유저는 이런 점이 반가웠으리라 생각됩니다.

 

Flink Scala API를 활용한 예제

import org.apache.flink.api.scala._

object WordCountScala {
  case class WordWithCount(word: String, count: Int)

  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    val text = env.fromElements(
      "Who's there?",
      "I think I hear them. Stand, ho! Who's there?"
    )

    val counts = text
      .flatMap(_.toLowerCase.split("\\W+"))
      .filter(_.nonEmpty)
      .map(word => WordWithCount(word, 1))
      .groupBy(_.word)
      .reduce { (a, b) => WordWithCount(a.word, a.count + b.count) }

    counts.print()
  }
}

 

위와 같이 scala flink로 사용한 구문은 무척이나 간결합니다. 그럼 자바 라이브러리로 구현하면 어떻게 될까요?

// Java Tuple2를 사용하는 예제
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector

object JavaStyleWordCount {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val text = env.socketTextStream("localhost", 9999)

    val counts = text
      .flatMap(new FlatMapFunction[String, Tuple2[String, Integer]] {
        override def flatMap(value: String, out: Collector[Tuple2[String, Integer]]): Unit = {
          for (word <- value.toLowerCase.split("\\W+")) {
            if (word.nonEmpty) out.collect(new Tuple2(word, 1))
          }
        }
      })
      .keyBy(0)
      .sum(1)

    counts.print()
    env.execute("Java Style Word Count in Scala")
  }
}

 

JAVA api를 그대로 사용하면 코드의 구조 자체가 자바에 의존적이기 때문에 상대적으로 읽기 어렵고 장황해집니다. 예를 들자면, Java의 Tuple 클래스를 사용하거나, Collector를 사용한 사용자 정의 함수 정의가 필요할 때 코드가 더 복잡해집니다. 그리고 Java POJO를 사용하면 타입 정보가 감소되고 간결한 코드 작성이 어려워집니다. Scala에서는 케이스 클래스를 사용하여 더욱 타입 안전하고 간결한 코드를 작성할 수 있습니다.

 

반응형