Cascading
Cascading은 opensource library로서 JVM에서 쉽게 bigdata처리를 가능캐 한다.
오픈소스이며 apache license를 보유하고있다.
# Website : https://www.cascading.org/
# Github : https://github.com/Cascading/cascading
# twitter : https://twitter.com/Cascading
<그림. pipeline 처럼 물흐르듯이 data를 처리하는 cascading library>
Source-pipe-sink 패러다임을 채용하여 객체지향 프로그래밍 언어에서도 data처리를 직관적으로 구현할 수 있다.
기존에 많이 사용하던 MapReduce code와 비교해보자.
Old MapReduce code
Bigdata를 다룰 때 맵리듀스(MapReduce)는 흩어져 있는 데이터를 수직화 하여 종류별로 모으고(MAP), Filtering과 Sorting을 통해 데이터를 뽑아(Reduce)낸다.
가장 유명한 MapReduce문제는 wordCount이다. 데이터의 word의 개수를 count하여 각각의 word가 얼마나 노출되었는지 확인해주는데, 코드는 아래와 같다.
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.* public class WordCount { public static class TokenizerMapper extends Mapper { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
그리 길지않지만, 직관적이지 않아서 읽기 어렵다. 하지만 cascading이라면?
WordCount program with Cascading library
아래와 같이 간단하고 명료하게 작성 가능하다.
cascading의 여러 기능을 통해 pipeline을 구성하여 replace, count, aggregate 등등 강력한 기능을 local / hadoop 할 것없이 유동적으로 작성 가능하다
package cascading.project; import java.util.Properties; import cascading.* public class MainLocal { public static void main(String[] args) { String inPath = "~/Documents/github/ProjectTemplate/testWord"; String outPath = "~/Documents/github/ProjectTemplate/test2"; LocalFlowConnector flowConnector = new LocalFlowConnector(new Properties()); // source and result field 정의 Tap srctap = new FileTap(new TextLine(new Fields("line")), inPath); Tap sinkTap = new FileTap(new TextLine(new Fields("word", "count")), outPath, SinkMode.REPLACE); // source에서 구분 기준을 Regex로 정의 Pipe words = new Each("start", new RegexSplitGenerator("\\s+")); Pipe wordCount = new Every(new GroupBy(words), new Count()); FlowDef flowDef = FlowDef.flowDef() .addSource(wordCount, srctap) // connect pipe and src .addTailSink(wordCount, sinkTap); // connect pipe and sink // run the flow flowConnector.connect(flowDef).complete(); } }
Cascading library를 사용한 MapReduce 결과물
Source file
아파치 하둡은 대량의 자료를 처리할 수 있는 큰 컴퓨터 클러스터에서 동작하는 분산 응용 프로그램을 지원하는 프리웨어 자바 소프부 프로젝트이다. 분산처리 시스템인 구글 파일 시스템을 대체할 수 있는 하둡 분산 파일 시스템과 맵리듀스를 구현한 것이다.
Result file
개발된 1 것으로, 1 것이다. 1 구글 1 구현한 1 너치의 1 대량의 1 대체할 1 동작하는 1 루씬의 1 맵리듀스를 1 분산 3 분산처리 1 소프트웨어 1 수 2 시스템과 1 시스템을 1 시스템인 1 아파치 2 원래 1 위해 1 응용 1 있는 2 자료를 1 자바 1 지원하기 1 지원하는 1 처리를 1 처리할 1 컴퓨터 1 큰 1 클러스터에서 1 파일 2 프레임워크이다. 1 프로그램을 1 프로젝트이다. 1 프리웨어 1 하둡 1 하둡은 1 하부 1
End of Document
반응형
'빅데이터' 카테고리의 다른 글
(번역)Netflix에서 데이터를 통해 유연하고, 안전한 클라우드 인프라로 활용하는 방법 (253) | 2019.03.22 |
---|---|
[Stream Process as a Platform] Netflix의 실시간 스트림 처리 플랫폼 Keystone 소개 (251) | 2019.01.10 |
모든 것을 측정하는 방법 - Bigdata시대에 부족한 data로 예측하기 (262) | 2018.12.16 |
Mongodb 기본 명령어 모음 (951) | 2018.10.30 |
가격 최적화를 위해 맞춤형 회귀 분석 모델을 활용한 airbnb (948) | 2018.09.19 |
HDFS(Hadoop Distributed File System) 아키텍쳐 개요 및 설명 (271) | 2018.09.17 |