AvroFlumeEvent포멧은 아래와 같은 특징을 가진다.
- Header : Map<CharSequence, CharSequence>
- Body : ByteBuffer
AvroFlumeEvent포멧을 사용하기 위해서 필요한 dependency
dependencies {
compile 'org.apache.flume:flume-ng-core:1.9.0'
}
DeserializeValue method :
private static Event deserializeValue(byte[] value) throws IOException {
Event e;
DatumReader<AvroFlumeEvent> reader = new SpecificDatumReader<>(AvroFlumeEvent.class);
ByteArrayInputStream in = new ByteArrayInputStream(value);
BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
AvroFlumeEvent event = reader.read(null, decoder);
e = EventBuilder.withBody(event.getBody().array(), toStringMap(event.getHeaders()));
return e;
}
private static Map<String, String> toStringMap(Map<CharSequence, CharSequence> charSeqMap) {
Map<String, String> stringMap = new HashMap<String, String>();
for (Map.Entry<CharSequence, CharSequence> entry : charSeqMap.entrySet()) {
stringMap.put(entry.getKey().toString(), entry.getValue().toString());
}
return stringMap;
}
Use case :
Event event = deserializeValue(record.value());
Map<String, String> headers = event.getHeaders();
String body = event.getBody();
System.out.println("head : " + headers.toString());
System.out.println("body : " + new String(body));
Result :
=============
head : {version=v1, table=click_test}
body : test_data
=============
head : {version=v1, table=log_test}
body : log_data
=============
반응형
'빅데이터' 카테고리의 다른 글
macOS에 pyspark설치, pyspark실행시 jupyterlab 실행시키기 (0) | 2020.07.01 |
---|---|
pyspark 데이터프레임 조건절(when)로 데이터 처리하기 (0) | 2020.06.23 |
pyspark UDF(User Defined Functions) 만들기 방법 및 예제 (0) | 2020.02.13 |
빅데이터에서 사용하는 포멧 종류 및 설명 (0) | 2019.12.19 |
스트림 프로세싱 with Faust - Windows (0) | 2019.11.22 |
스트림 프로세싱 with Faust - Table (0) | 2019.11.21 |