[Filter Plugins] CSV → Logstash → ES 저장 방법

    Logstash의 Filter 란?

    정형화되어있는 데이터(예 RDB, CSV 등)를 ES에 저장하기 위해 알맞게 가공하는 것으로 Logstash 7.6 기준으로 46개의 Filter Plugin을 지원합니다. 상세한 정보는 공식 홈페이지 참고하시기 바랍니다. 

     

    Filter plugins | Logstash Reference [7.6] | Elastic

     

    www.elastic.co

     

    CSV Filter 설정값

    CSV는 쉼표(,)로 구분된 정형화된 데이터입니다.

    CSV Filter Plugin을 사용하여 ES에 저장하기 위해서는 CSV Filter 설정값을 확인해야 합니다.

    설정값에 대한 설명은 첨부해드리는 링크 확인해 주시기 바라며, 많은 옵션중 우리가 사용할 옵션 위주로 살펴보도록 하겠습니다. (CSV Filter 설명 참고 : CSV Filter Configuration Options) 

    1) separator

    데이터를 구분하는 구분자를 지정하는 설정입니다.
    CSV는 기본적으로 쉼표(,)로 구분된 데이터이기 때문에 separator에 쉼표(,)를 설정하면 됩니다.
    추가로 separator의 기본값은 쉼표(,)이기 때문에 CSV 파일인 경우 별도 설정할 필요가 없습니다.

    (하지만 다른 사람도 알아보기 쉽게 명시적으로 설정함)

     

    2) columns

    CSV의 열 이름을 배열 형태로 작성하는 설정입니다. 작성방법은 표시되는 순서대로 작성합니다.
    예) 아래와 같은 CSV 파일에 대한 설정 방법(한 줄로 작성하며, 열 이름은 똑같이 작성할 필요 없음)

    columns => ["Object","Elapsed","Service","StartTime","EndTime","Cpu","SQL Count","SQL Time","Kbytes","IP","API Count","API Time"]

     

    3) Convert

    ES에 저장될 Type을 지정하는 설정입니다.
    지정하지 않은 경우 Dynamic Mapping으로 적용됩니다. aggregate 사용하기 위해서는 각 필드의 type을 지정해주면 됩니다. Type 지정하는 데는 2가지 방법이 있습니다. Convert를 사용하여 Type 지정하거나, Index Template 만들어서 사용하면 됩니다.

     

     

    CSV Filter 적용 예
    input {
      file {
        path => "D:/apps/elk/csv/scouter/error/*_(20200302).csv"
        start_position => "beginning"
        codec => plain { charset => "CP949" }
      }
    }
    filter {
      csv {
    		separator => ","
    		columns => ["Object","Elapsed","Service","StartTime","EndTime","Cpu","SQL Count","SQL Time","Kbytes","IP","API Count","API Time","Login","Desc","Text1","Text2","Dump","TMP","Txid","Gxid"]
    		convert => {
                 "Elapsed" => "integer"
                 "Cpu" => "integer"
                 "SQL Count" => "integer"
                 "SQL Time" => "integer"
                 "Kbytes" => "integer"
                 "API Count" => "integer"
                 "API Time" => "integer"
    		}
    	}
    	  
      date { 
             match => ["StartTime", "yyyyMMddHH:mm:ss.SSS"] 
             target => "@timestamp"
    	}
    	ruby {
    		code => "event.set('index_day',event.get('@timestamp').time.localtime('+09:00').strftime('%Y.%m.%d'))"
        }
    
    	mutate { remove_field => ["Login","Desc","Text1","Text2","TMP","host","read_date"] }
    
    }
    
    output {
        if "_grokparsefailure" in [tags] {
            file { path => "D:\apps\elk\logstash-7.5.2\logs\logstash-scouter-error-grokparsefailure-%{index_day}.log" }
        }else{
    		elasticsearch {
         hosts => ["http://xxx.xxx.xxx.xxx:9200"]
    	 index => "scouter-error-%{index_day}"
         manage_template => "false"
      }
     }
    }

     

    Kibana에서 데이터 확인

    댓글

    Designed by JB FACTORY