웹로그 분석을 위한 Apache Log Format 과 Logstash Filter 설정
ElasticStack (ELK) 을 활용한 웹서비스 로그 분석 인프라 구축 작업을 진행하였다. 소프트웨어 설치와 기본적인 가이드는 Elastic 공식 사이트나 구글링을 통해서 충분히 습득이 가능했다. 하지만 실제 우리 상황에 맞는 웹서비스 분석을 위해 로그 형식을 정의하고 이를 Parsing 하기 위해 Logstash Filter를 구현하는 부분에서는 많은 시행 착오가 있었다. (실력이 부족하여..) 관련한 내용을 공유해본다.
Apache Log Format
- 웹서버(Apache,Jboss-EWS) 의 로그포멧은 아래 형식으로 표준 설정
- 기본적인 Combined Log 에
쿠키
,수행시간
,x-request-with
추가
LogFormat "%h %l %u %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-Agent}i\" \"%{Cookie}i\" %D \"%{x-requested-with}i\"" combined
형식문자 | 설명 |
---|---|
%...h? | 원격 호스트 |
%...l | (있다면 identd가 제공한) 원격 로그인명. mod_ident가 있고 IdentityCheck가 On이 아니면 빼기기호를 기록한다. |
%...u | 원격 사용자 (auth가 제공하며, 상태(%s)가 401인 경우 이상한 값을 나올 수 있음) |
%...t | common log format 시간 형식(표준 영어 형식)의 시간 |
%...r | 요청의 첫번째 줄 |
%...s | 상태(status). 내부 리다이렉션된 요청의 경우 원래 요청의 상태이다. 최종 요청의 상태는 %...>s. |
%...b | HTTP 헤더를 제외한 전송 바이트수. CLF 형식과 같이 전송한 내용이 없는 경우 0 대신 '-'가 나온다. |
%{Referer} | Referer 정보 |
%{User-Agent} | Agent 정보 |
%{Cookie} | 쿠키정보 |
%...D | 요청을 처리하는데 걸린 시간 (마이크로초 단위). |
%{x-requested-with} | HTTP 표준헤더가 아닌 커스텀 헤더이며, Ajax 요청을 판별할 수 있다. |
Logstash Configuration
Logstash grok-patterns 을 참고하면 이해가 쉽다.
-
filter 구현 내용
- grok
- 기본 message 필드를 grok 패턴을 이용해 분해
- request 에서 querystring 제외한 부분만 별도 추출 (uripath)
- referrer 정보 세분화
- date
- 로그파일에 있는 timestamp 를 @timestamp 로 사용했다.
- kv
- request 정보에서 Kev-Value 추출 (include_keys 에 정한 항목)
- cookie 정보에서 Kev-Value 추출 (include_keys 에 정한 항목)
- user-agent
- OS,Browser,Device 등에 대한 정보 추출 (알아서 해준다!)
- geo-ip
- 위치기반 분석 정보 추출
- geo 정보 database 파일 필요
- ruby
- @timestamp 필드에서 년월일을 추출함. (날짜별 index 명을 부여하기 위해)
- 서버 시간으로 사용시 로그 파일에 기록된 시간과의 차이로 혼선이 있었다.
- mutate
- convert
- index template 을 전체 string 에 not_analyzed 로 설정했다.
- 따라서, 숫자형으로 처리가 필요한 항목에 대해 명시적으로 type을 정해줬다.
- remove_field : 불필요한 필드 제거
- convert
- grok
input {
beats {
port => 5044
}
}
filter {
grok {
match => { "message" => ["%{COMBINEDAPACHELOG} %{QS:cookie} %{NUMBER:time} %{QS:x-requested-with}"] }
}
grok {
match => { "request" => ["%{URIPATH:uripath}"] }
}
grok {
match => { "referrer" => ["%{URIPROTO}://%{IPORHOST:referrer_host}(?::%{POSINT})?%{URIPATH:referrer_uripath}"] }
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
target => "@timestamp"
}
kv {
source => "request"
field_split => "&?"
include_keys => ["name"]
}
kv {
source => "cookie"
field_split => "; "
include_keys => [ "id"]
}
useragent {
source => "agent"
prefix => "a_"
}
geoip {
source => "clientip"
target => "geoip"
database => "/home/hello/logstash-2.4.0/dat/GeoLiteCity.dat"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
ruby {
code => "event['index_day'] = event['@timestamp'].time.localtime.strftime('%Y%m%d')"
}
mutate {
convert => {
"[geoip][coordinates]" => "float"
"time" => "integer"
"bytes" => "integer"
}
remove_field => ["@version","timestamp","tags","input_type","type","offset","fields","beat","count","cookie"]
}
}
output {
elasticsearch {
hosts => ["http://my_elastic_server"]
index => "web-log-%{index_day}"
}
}
ElasticSearch
- Index Tempate 은 아래와 같이 설정하였다.
"web-log" : {
"order" : 0,
"template" : "web-log-*",
"settings" : { },
"mappings" : {
"_default_" : {
"dynamic_templates" : [ {
"string_fields" : {
"mapping" : {
"index" : "not_analyzed",
"type" : "string"
},
"match_mapping_type" : "string",
"match" : "*"
}
} ],
"properties" : {
"geoip" : {
"dynamic" : true,
"properties" : {
"location" : {
"type" : "geo_point"
},
"longitude" : {
"type" : "float"
},
"latitude" : {
"type" : "float"
},
"ip" : {
"type" : "ip"
}
}
}
}
}
},
"aliases" : { }
}
}