https://docs.datastax.com/en/kafka/doc/kafka/kafkaStringJson.html
다음과 같은 레코드의 메시지 키/값이 있다고 가정하자.
key | value |
APPLE | {"symbol":"APPL", "value":208, "exchange":"NASDAQ", "industry":"TECH", "ts":"2018-11-26T19:26:27.483"} |
EXXON MOBIL | {"symbol":"M", "value":80, "exchange":"NYSE", "industry":"ENERGY", "ts":"2018-11-26T19:26:27.483"} |
GENERAL MOTORS | {"symbol":"GM", "value":38, "exchange":"NYSE", "industry":"AUTO", "ts":"2018-11-26T19:26:27.483"} |
AT&T | {"symbol":"AT&T", "value":33, "exchange":"NYSE", "industry":"TELECOM", "ts":"2018-11-26T19:26:27.483"} |
FORD MOTOR | {"symbol":"F", "value":10, "exchange":"NYSE", "industry":"AUTO", "ts":"2018-11-26T19:26:27.483"} |
상기 데이터를 테이블에 저장할 때 필요한 것
데이터 적재시 아래 2개의 컬럼을 만족하도록 한다.
- 각 카프카에서 사용하는 JSON의 필드는 테이블 컬럼의 데이터 타입과 동일하게 적용
- 각 카프카에서 사용하는 JSON의 필드 중 하나는 PK 컬럼에 매핑됨. null은 허용되지 않음.
Distributed connect에 적용 방법
1. connect-distributed.properties에 key.converter과 value.converter정상 적용 확인.
2-a. 키스페이스 생성
$ cqlsh -e "CREATE KEYSPACE stocks_keyspace \
WITH replication = {'class': 'NetworkTopologyStrategy',\
'Cassandra': 1};"
토폴로지 설정과 데이터센터 이름같은 것은 각 환경에 따라 다르게 설정한다.
2-b. 테이블 생성
stocks_table 이라는 이름의 테이블 을 생성한다.
cqlsh -e "CREATE TABLE stocks_keyspace.stocks_table ( \
symbol text, \
ts timestamp, \
exchange text, \
industry text, \
name text, \
value double, \
PRIMARY KEY (symbol, ts));"
symbol, ts, exchange, industry, name, value를 컬럼으로 설정. symbol과 ts를 pk로 묶어서 생성
3-a. 토픽 생성
파이프라인으로 사용할 토픽을 생성합니다. 여기서는 stocks_topic 이라는 이름의 토픽을 생성하여 활용합니다.
3-b. prefix를 포함한 토픽 테이블 매핑 확인
다음 신텍스가 기본 설정으로 매핑됩니다.
topic.topic_name.keyspace_name.table_name
3-c. field column 매핑 확인
레코드의 메시지 값을 기반으로 매핑하기 위해 value.symbol을 호출하면 레코드의 메시지 값에서 json 데이터 중 symbol의 값을 파싱하여 가져온다.
만약 레코드의 헤더의 값을 가져오고 싶다면 header.f4와 같이 수행하여 파싱 할 수 있다.
https://docs.datastax.com/en/kafka/doc/kafka/kafkaRecordHeaderToTable.html
4. cassandra sinnk connector 설정
분산 커넥트에 설정할 경우 json으로 설정
{
"name": "stocks-sink",
"config": {
"connector.class": "com.datastax.kafkaconnector.DseSinkConnector",
"tasks.max": "1",
"topics": "stocks_topic",
"topic.stocks_topic.stocks_keyspace.stocks_table.mapping":
"symbol=value.symbol, ts=value.ts, exchange=value.exchange, industry=value.industry, name=key, value=value.value"
}
}
단일 커넥트에 설정할 경우 key/value로 설정
name=stocks-sink
connector.class=com.datastax.kafkaconnector.DseSinkConnector
tasks.max=1
topics=stocks_topic
topic.stocks_topic.stocks_keyspace.stocks_table.mapping = symbol=value.symbol, ts=value.ts, exchange=value.exchange, industry=value.industry, name=key, value=value.value
5. cassandra 데이터 전송 및 적재된 데이터 확인
$ ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic stocks_topic \
--property "parse.key=true" --property "key.separator=:"
>APPLE:{"symbol":"APPL","value":208,"exchange":"NASDAQ","industry":"TECH","ts":"2018-11-26T19:26:27.483"}
>EXXON MOBIL:{"symbol":"M","value":80,"exchange":"NYSE","industry":"ENERGY","ts":"2018-11-26T19:26:27.483"}
>GENERAL MOTORS:{"symbol":"GM","value":38,"exchange":"NYSE","industry":"AUTO","ts":"2018-11-26T19:26:27.483"}
>AT&T:{"symbol":"AT&T","value":33,"exchange":"NYSE","industry":"TELECOM","ts":"2018-11-26T19:26:27.483"}
>FORD MOTOR:{"symbol":"F","value":10,"exchange":"NYSE","industry":"AUTO","ts":"2018-11-26T19:26:27.483"}
Table 데이터 확인
cqlsh> select * from stocks_table;
symbol | ts | exchange | industry | name | value
--------+---------------------------------+----------+----------+----------------+-------
M | 2018-11-26 19:26:27.483000+0000 | NYSE | ENERGY | EXXON MOBIL | 80
APPL | 2018-11-26 19:26:27.483000+0000 | NASDAQ | TECH | APPLE | 208
F | 2018-11-26 19:26:27.483000+0000 | NYSE | AUTO | FORD MOTOR | 10
AT&T | 2018-11-26 19:26:27.483000+0000 | NYSE | TELECOM | AT&T | 33
GM | 2018-11-26 19:26:27.483000+0000 | NYSE | AUTO | GENERAL MOTORS | 38
(5 rows)
cqlsh> select * from stocks_table where symbol='APPL';
symbol | ts | exchange | industry | name | value
--------+---------------------------------+----------+----------+-------+-------
APPL | 2018-11-26 19:26:27.483000+0000 | NASDAQ | TECH | APPLE | 208
(1 rows)
cqlsh> select * from stocks_table where exchange='NASDAQ';
InvalidRequest: Error from server: code=2200 [Invalid query] message="Cannot execute this query as it might involve data filtering and thus may have unpredictable performance. If you want to execute this query despite the performance unpredictability, use ALLOW FILTERING"
당연하게도 PK가 걸리지 않은 컬럼에 대해서는 where 조건이 걸리지 않는다.
데이터 추가 및 확인
$ ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic stocks_topic \
--property "parse.key=true" --property "key.separator=:"
>APPLE:{"symbol":"APPL","value":208,"exchange":"NASDAQ","industry":"TECH","ts":"2018-11-26T19:27:27.483"}
>APPLE:{"symbol":"APPL","value":208,"exchange":"NASDAQ","industry":"TECH","ts":"2018-11-26T19:28:27.483"}
>APPLE:{"symbol":"APPL","value":208,"exchange":"NASDAQ","industry":"TECH","ts":"2018-11-26T19:29:27.483"}
>APPLE:{"symbol":"APPL","value":208,"exchange":"NASDAQ","industry":"TECH","ts":"2018-11-26T19:30:27.483"}
cqlsh> select * from stocks_table where symbol='APPL' order by ts desc;
symbol | ts | exchange | industry | name | value
--------+---------------------------------+----------+----------+-------+-------
APPL | 2018-11-26 19:30:27.483000+0000 | NASDAQ | TECH | APPLE | 208
APPL | 2018-11-26 19:29:27.483000+0000 | NASDAQ | TECH | APPLE | 208
APPL | 2018-11-26 19:28:27.483000+0000 | NASDAQ | TECH | APPLE | 208
APPL | 2018-11-26 19:27:27.483000+0000 | NASDAQ | TECH | APPLE | 208
APPL | 2018-11-26 19:26:27.483000+0000 | NASDAQ | TECH | APPLE | 208
(5 rows)
cqlsh> select * from stocks_table where symbol='APPL' and ts = '2018-11-26 19:27:27.483000+0000';
InvalidRequest: Error from server: code=2200 [Invalid query] message="Unable to coerce '2018-11-26 19:27:27.483000+0000' to a formatted date (long)"
cqlsh:cory> select * from stocks_table where symbol='APPL' and ts = '2018-11-26 19:27:27.483';
symbol | ts | exchange | industry | name | value
--------+---------------------------------+----------+----------+-------+-------
APPL | 2018-11-26 19:27:27.483000+0000 | NASDAQ | TECH | APPLE | 208
(1 rows)
cqlsh> select * from stocks_table where symbol='APPL' and ts > '2018-11-26 19:27:27.483';
symbol | ts | exchange | industry | name | value
--------+---------------------------------+----------+----------+-------+-------
APPL | 2018-11-26 19:28:27.483000+0000 | NASDAQ | TECH | APPLE | 208
APPL | 2018-11-26 19:29:27.483000+0000 | NASDAQ | TECH | APPLE | 208
APPL | 2018-11-26 19:30:27.483000+0000 | NASDAQ | TECH | APPLE | 208
(3 rows)
2018-11-26 19:27:27.483000+0000로 조회가 되지 않는 이유 > Cassandra timestamp types only support milliseconds
https://stackoverflow.com/a/36454277/9634545
'빅데이터 > cassandra' 카테고리의 다른 글
아파치 카산드라 다이나모, 일관된 해싱, 복제 개념 살펴보기 (0) | 2022.02.03 |
---|---|
아파치 카산드라 살펴보기, 설명, 기본 개념 (0) | 2022.02.03 |
카산드라 TTL에 따른 데이터 삭제 정리 (0) | 2022.01.20 |
카산드라 모델링 분석하기 좋은 테이블 구성하기 (0) | 2021.11.11 |
macos에서 카산드라 테스트 방법 (0) | 2021.11.03 |
NoSQL강의) 모델링 예제로 알아보는 Cassandra Query Language (CQL) (375) | 2019.07.25 |