Logstash 설정 파일(logstash.conf)
# MySQL 데이터베이스에서 데이터를 읽어오기
input {
jdbc {
jdbc_driver_library => [MySQL JDBC 드라이버 라이브러리 경로]
jdbc_driver_class => [MySQL JDBC 드라이버 클래스]
jdbc_connection_string => [MySQL 접속 URL]
jdbc_user => [username]
jdbc_password => [password]
statement => [실행할 SQL 쿼리]
# 스케줄 주기 설정(CRON 표현식)
schedule => "*/1 * * * *"
# 증분 추적을 위한 설정
use_column_value => true # 이전 실행 이후 변경된 데이터를 기준으로 가져오기 위해 활성화, tracking_column에 지정된 컬럼의 값을 기준으로 데이터 추적
tracking_column => "updated_at" # 추적 컬럼(이 컬럼을 기준으로 데이터 변경 여부를 확인함)
tracking_column_type => "timestamp" # 추적 컬럼 타입
last_run_metadata_path => "/path/to/last_run_metadata" # 로그스태시의 마지막 실행 정보 저장 경로
}
}
# 데이터를 필터링하고, 필요한 필드를 가공하기
filter {
mutate {
# 제거할 필드
remove_field => ["@timestamp", "@version"]
# 데이터베이스에서 반환된 문자열 데이터를 배열로 변환
split => { "tag_names" => "," }
split => { "author_names" => "," }
split => { "category_names" => ">" }
split => { "image_paths" => ","}
}
}
# 처리된 데이터를 Elasticsearch에 저장
output {
stdout {
codec => "rubydebug"
}
elasticsearch {
hosts => [[서버 주소]]
index => [데이터를 저장할 인덱스 이름]
document_id => [문서의 고유 ID]
user => [Elasticsearch 사용자명]
password => [Elasticsearch 비밀번호]
action => "update" # Elasticsearch에서 기존 데이터를 업데이트하는 방식은 2가지임(Replace 방식, _update API 방식), 이 설정은 _update API 방식(부분 수정 또는 조건부 업데이트)으로 지정함
doc_as_upsert => true # 문서가 없으면 새로 삽입
}
}
Logstash
데이터를 수집하고 필터링한 뒤, 다른 저장소나 서비스로 전송하는 데 사용된다.
input
데이터를 수집하기 위한 시작 지점
다양한 데이터 소스로부터 데이터를 읽어 들이는 역할을 한다.
외부 소스에서 데이터를 읽어와 Logstash 파이프라인으로 전달한다.
input 구조
input {
[플러그인 이름] {
[플러그인별 설정]
}
}
JDBC input plugin - 증분 추적
데이터베이스에서 변경된 데이터만 읽어오는 방법
매번 전체 데이터를 읽지 않고, 이전 실행 이후에 변경된 데이터만 수집하여 효율성을 높인다.
증분 추적의 동작 원리
증분 추적은 tracking_column 값을 기준으로 작동하며, 이전 실행 이후 변경된 데이터만 가져온다.
이를 위해 sql_last_value와 tracking_column이라는 두 가지 개념이 사용된다.
1. tracking_column
데이터베이스 테이블에서 변경 사항을 감지하기 위한 기준 열이다.
일반적으로 timestamp 또는 증가하는 정수형 ID를 사용한다.
2. sql_last_value
logstash가 마지막으로 데이터를 읽은 시점 또는 마지막으로 읽은 열 값을 저장한다.
last_run_metadata_path 파일에 저장되며, 다음 실행 시 해당 값을 기준으로 새로운 데이터를 필터링한다.
filter
데이터를 변환, 정제, 가공하는 데 사용되는 단계
input에서 수집된 데이터를 처리하고 output으로 전달하기 전에 원하는 형식으로 변환하거나 특정 조건에 따라 데이터를 필터링한다.
filter plugin
1. mutate : 데이터를 변환하거나 가공하는 데 사용됨
rename : 필드 이름 변경
remove_field : 불필요한 필드 제거
add_field : 새 필드 추가
gsub : 정규식을 사용한 문자열 치환
split : 특정 구분자를 기준으로 문자열을 나누어 배열로 변환
filter {
mutate {
rename => { "old_field" => "new_field" }
remove_field => ["unwanted_field"]
add_field => { "new_field" => "value" }
gsub => ["message", "\s+", "_"]
split => { "tags" => "," }
}
}