Elasticsearch

Logstash 설정 파일(logstash.conf)

뚜sh뚜sh 2024. 11. 27. 09:46
# MySQL 데이터베이스에서 데이터를 읽어오기
input {
    jdbc {
        jdbc_driver_library => [MySQL JDBC 드라이버 라이브러리 경로]
        jdbc_driver_class => [MySQL JDBC 드라이버 클래스]
        jdbc_connection_string => [MySQL 접속 URL]
        jdbc_user => [username]
        jdbc_password => [password]

        statement => [실행할 SQL 쿼리]
        
        # 스케줄 주기 설정(CRON 표현식)
        schedule => "*/1 * * * *"

        # 증분 추적을 위한 설정
        use_column_value => true # 이전 실행 이후 변경된 데이터를 기준으로 가져오기 위해 활성화, tracking_column에 지정된 컬럼의 값을 기준으로 데이터 추적
        tracking_column => "updated_at" # 추적 컬럼(이 컬럼을 기준으로 데이터 변경 여부를 확인함)
        tracking_column_type => "timestamp" # 추적 컬럼 타입
        last_run_metadata_path => "/path/to/last_run_metadata" # 로그스태시의 마지막 실행 정보 저장 경로
    }
}

# 데이터를 필터링하고, 필요한 필드를 가공하기
filter {
    mutate {
    	# 제거할 필드
        remove_field => ["@timestamp", "@version"]
        
        # 데이터베이스에서 반환된 문자열 데이터를 배열로 변환
        split => { "tag_names" => "," }
        split => { "author_names" => "," }
        split => { "category_names" => ">" }
        split => { "image_paths" => ","}
    }
}

# 처리된 데이터를 Elasticsearch에 저장
output {
    stdout {
        codec => "rubydebug"
    }
    
    elasticsearch {
        hosts => [[서버 주소]]
        index => [데이터를 저장할 인덱스 이름]
        document_id => [문서의 고유 ID]
        user => [Elasticsearch 사용자명]
        password => [Elasticsearch 비밀번호]
        action => "update" # Elasticsearch에서 기존 데이터를 업데이트하는 방식은 2가지임(Replace 방식, _update API 방식), 이 설정은 _update API 방식(부분 수정 또는 조건부 업데이트)으로 지정함
        doc_as_upsert => true # 문서가 없으면 새로 삽입
    }
}

 

 

 

Logstash

데이터를 수집하고 필터링한 뒤, 다른 저장소나 서비스로 전송하는 데 사용된다.

 

 

input

데이터를 수집하기 위한 시작 지점

다양한 데이터 소스로부터 데이터를 읽어 들이는 역할을 한다.

외부 소스에서 데이터를 읽어와 Logstash 파이프라인으로 전달한다.

 

input 구조

input {
	[플러그인 이름] {
    	[플러그인별 설정]
    }
}

 

JDBC input plugin - 증분 추적

데이터베이스에서 변경된 데이터만 읽어오는 방법

매번 전체 데이터를 읽지 않고, 이전 실행 이후에 변경된 데이터만 수집하여 효율성을 높인다.

 

증분 추적의 동작 원리

증분 추적은 tracking_column 값을 기준으로 작동하며, 이전 실행 이후 변경된 데이터만 가져온다.

이를 위해 sql_last_value와 tracking_column이라는 두 가지 개념이 사용된다.

 

1. tracking_column

데이터베이스 테이블에서 변경 사항을 감지하기 위한 기준 열이다.

일반적으로 timestamp 또는 증가하는 정수형 ID를 사용한다.

 

2. sql_last_value

logstash가 마지막으로 데이터를 읽은 시점 또는 마지막으로 읽은 열 값을 저장한다.

last_run_metadata_path 파일에 저장되며, 다음 실행 시 해당 값을 기준으로 새로운 데이터를 필터링한다.

 

 

 

filter

데이터를 변환, 정제, 가공하는 데 사용되는 단계

input에서 수집된 데이터를 처리하고 output으로 전달하기 전에 원하는 형식으로 변환하거나 특정 조건에 따라 데이터를 필터링한다.

 

filter plugin

1. mutate : 데이터를 변환하거나 가공하는 데 사용됨

rename : 필드 이름 변경

remove_field : 불필요한 필드 제거

add_field : 새 필드 추가

gsub : 정규식을 사용한 문자열 치환

split : 특정 구분자를 기준으로 문자열을 나누어 배열로 변환

filter {
    mutate {
        rename => { "old_field" => "new_field" }
        remove_field => ["unwanted_field"]
        add_field => { "new_field" => "value" }
        gsub => ["message", "\s+", "_"]
        split => { "tags" => "," }
    }
}