docker compose 로 Kafka + ELK stack 사용
여러 채용 공고 및 회사별 기술 스택을 보며 대부분 MSA아키텍처를 채택하며 이를 분석하는데 ELK Stack을 사용하는것을 보고 파이프라인을 구현해보았습니다.
ELK 스택 파이프라인
먼저 전체적인 파이프라인 및 디렉토리 구조입니다. ( 터미널에서 tree
혹은 tree -f
로 볼 수 있음 )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
├── docker-elk-stack # ELK 스택을 위한 Docker 구성 디렉토리
│ ├── docker-compose.yml # 전체 ELK 스택 서비스 정의 및 구성
│ ├── elasticsearch # Elasticsearch 관련 파일
│ │ ├── Dockerfile # Elasticsearch 컨테이너 빌드 스크립트
│ │ └── config
│ │ └── elasticsearch.yml # Elasticsearch 설정 파일
│ ├── filebeat # Filebeat 관련 파일
│ │ └── config
│ │ └── filebeat.yml # Filebeat 설정 파일 (로그 수집 및 전송 구성)
│ ├── kibana # Kibana 관련 파일
│ │ ├── Dockerfile # Kibana 컨테이너 빌드 스크립트
│ │ └── config
│ │ └── kibana.yml # Kibana 설정 파일
│ └── logstash # Logstash 관련 파일
│ ├── Dockerfile # Logstash 컨테이너 빌드 스크립트
│ ├── config
│ │ ├── logstash.yml # Logstash 주 설정 파일
│ │ └── pipelines.yml # Logstash 파이프라인 정의 파일
│ └── pipeline
│ └── logstash.conf # Logstash 파이프라인 설정 파일
├── logs
│ ├── application.log # 애플리케이션 로그 파일 (Filebeat가 수집할 대상)
올바른 ELK stack + Kafka 흐름
- 애플리케이션이 application.log에 로그를 작성합니다.
- Filebeat가 application.log 파일을 읽습니다.
- Filebeat가 로그 데이터를 Kafka(토픽: “OMG”)로 전송합니다.
- Logstash가 Kafka 토픽에서 데이터를 읽습니다.
- Logstash가 로그 데이터를 처리(필터 적용)하고 Elasticsearch로 전송합니다.
- Kibana가 Elasticsearch에서 로그 데이터를 읽어 시각화합니다.
일반적인 ELK 스택 구성과의 주요 차이점은 Filebeat와 Logstash 사이에 Kafka가 있다는 것입니다. 이 구성은 더 나은 확장성과 컴포넌트 간의 분리를 가능하게 합니다.
기본적으로 모두 도커로 띄워 설치관리하였습니다. 로컬에서만 테스트해볼 예정이라면 Docker Desktop 사용을 권장드립니다. 참고로 진행환경은 MacOS M3입니다.
다음으로 코드를 알아보겠습니다.
application.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
logging:
file:
name: /app/logs/application.log
pattern:
console: "%d{yyyy-MM-dd HH:mm:ss} - %msg%n"
file: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"
level:
org.hibernate.SQL: debug
com.ssafy.omg: debug
kafka:
producer:
bootstrap-servers: kafka:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
bootstrap-servers: kafka:9092
group-id: your-group-id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
로컬에서 테스트하신다면 kafka 설정 부분에서 localhost:9092로 바꾸면 됩니다.
docker-compose.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.1
container_name: elasticsearch
environment:
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- ELASTIC_PASSWORD=${ELASTIC_PASSWORD}
- xpack.security.enabled=true
- xpack.security.authc.token.enabled=true
- xpack.security.authc.api_key.enabled=true
ports:
- 9200:9200
- 9300:9300
volumes:
- elasticsearch_data:/usr/share/elasticsearch/data
healthcheck:
test: [ "CMD-SHELL", "curl -s -u elastic:${ELASTIC_PASSWORD} http://localhost:9200 >/dev/null || exit 1" ]
interval: 30s
timeout: 10s
retries: 5
networks:
- elk_network
logstash:
build:
context: ./logstash
dockerfile: Dockerfile
args:
ELK_VERSION: 8.11.1
container_name: logstash
ports:
- 5001:5000
volumes:
- ./logstash/pipeline:/usr/share/logstash/pipeline
- ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml
- ./logstash/config/pipelines.yml:/usr/share/logstash/config/pipelines.yml
- /home/ubuntu/logs:/usr/share/logstash/logs
depends_on:
elasticsearch:
condition: service_healthy
kafka:
condition: service_started
environment:
- SLACK_WEBHOOK_URL=${SLACK_WEBHOOK_URL}
- LANG=en_US.UTF-8
- LC_ALL=en_US.UTF-8
- LS_JAVA_OPTS=-Xmx1g -Xms1g
- ELASTIC_USERNAME=elastic
- ELASTIC_PASSWORD=${ELASTIC_PASSWORD}
- LOG_LEVEL_DEBUG
healthcheck:
test: [ "CMD", "curl", "-f", "http://localhost:9600" ]
interval: 30s
timeout: 10s
retries: 5
networks:
- elk_network
kibana:
image: docker.elastic.co/kibana/kibana:8.11.1
container_name: kibana
ports:
- 5601:5601
depends_on:
elasticsearch:
condition: service_healthy
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
- ELASTICSEARCH_SERVICEACCOUNTTOKEN=${KIBANA_SERVICE_TOKEN}
- ELASTICSEARCH_SSL_VERIFICATIONMODE=none
networks:
- elk_network
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- 2181:2181
networks:
- elk_network
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
- "9093:9093"
volumes:
- /home/ubuntu/logs:/logs
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:9093
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_NUM_PARTITIONS: 3
KAFKA_LOG_FLUSH_INTERVAL_MESSAGES: 10000
KAFKA_LOG_FLUSH_INTERVAL_MS: 1000
depends_on:
- zookeeper
networks:
- elk_network
filebeat:
image: docker.elastic.co/beats/filebeat:8.11.1
container_name: filebeat
volumes:
- ./filebeat/config/filebeat.yml:/usr/share/filebeat/filebeat.yml
- /home/ubuntu/logs:/logs
command: [ "filebeat", "-e", "-strict.perms=false" ]
depends_on:
- kafka
- logstash
networks:
- elk_network
volumes:
elasticsearch_data:
driver: local
networks:
elk_network:
driver: bridge
마찬가지로 로컬 환경에서 테스트 하신다면 volume에서 마운트 하는 경로를 /home/ubuntu 부분을 ../logs등으로 바꾸시면됩니다.
Filebeat
filebeat.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
filebeat.inputs:
- type: log
paths:
- /logs/*.log
close_inactive: 5s
scan_frequency: 1s
output.kafka:
hosts: [ "kafka:9092" ]
topic: "OMG"
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
codec.json:
pretty: false
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
queue.mem:
events: 1024
flush.min_events: 10
flush.timeout: "100ms"
Kafka
카프카는 이벤트 메시지 브로커로 도커로 이미지를 실행하여 진행했습니다. 차후 실행 방법과 함께 다루도록 하겠습니다.
logstash
logstash.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
## Default Logstash configuration from Logstash base image.
## https://github.com/elastic/logstash/blob/master/docker/data/logstash/config/logstash-full.yml
#
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "elasticsearch:9200" ]
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.username: "elastic"
xpack.monitoring.elasticsearch.password: ${ELASTIC_PASSWORD}
xpack.monitoring.collection.pipeline.details.enabled: true
#xpack.monitoring.elasticsearch.api_key: null
#xpack.monitoring.elasticsearch.data_stream: true
pipeline.ordered: false
pipeline.workers: 2
pipeline.batch.size: 125
pipeline.batch.delay: 50
log.level: debug
pipelines.yml
1
2
- pipeline.id: main
path.config: "/usr/share/logstash/pipeline/*.conf"
logstash.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
input {
kafka {
bootstrap_servers => "kafka:9092"
topics => ["OMG"]
group_id => "logstash"
auto_offset_reset => "latest"
consumer_threads => 3
decorate_events => true
}
}
filter {
json {
source => "message"
target => "parsed_json"
skip_on_invalid_json => true
}
if [parsed_json] {
mutate {
rename => { "[parsed_json][message]" => "log_entry" }
}
} else {
mutate {
rename => { "message" => "log_entry" }
}
}
grok {
match => {
"log_entry" => [
"%{TIMESTAMP_ISO8601:timestamp} \[%{DATA:thread}\] %{LOGLEVEL:log_level} %{DATA:logger} - %{GREEDYDATA:log_message}",
"%{TIMESTAMP_ISO8601:timestamp} \[%{DATA:thread}\] %{LOGLEVEL:log_level} %{GREEDYDATA:log_message}",
"%{GREEDYDATA:log_message}"
]
}
}
date {
match => [ "timestamp", "yyyy-MM-dd HH:mm:ss", "yyyy-MM-dd HH:mm:ss.SSS", "ISO8601" ]
target => "@timestamp"
}
if [log_level] == "ERROR" {
mutate {
add_field => {
"error_type" => "application_error"
"slack_message" => "🚨 *에러 발생* 🚨\n*Timestamp:* %{timestamp}\n*Thread:* %{thread}\n*Logger:* %{logger}\n*Message:* %{log_message}"
}
}
}
}
output {
if [log_level] == "ERROR" {
http {
url => "${SLACK_WEBHOOK_URL}"
http_method => "post"
content_type => "application/json"
format => "message"
message => '{"text":"%{slack_message}"}'
request_timeout => 60
retry_failed => true
retryable_codes => [500, 502, 503, 504]
}
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "omg-game-error-log-%{+YYYY.MM.dd}"
user => "${ELASTIC_USERNAME}"
password => "${ELASTIC_PASSWORD}"
}
} else {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "omg-game-log-%{+YYYY.MM.dd}"
user => "${ELASTIC_USERNAME}"
password => "${ELASTIC_PASSWORD}"
}
}
}
http output 부분은 slack 에러 수신 웹훅 메시징도 추가한 부분입니다.
Dockerfile
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
ARG ELK_VERSION
# https://www.docker.elastic.co/
FROM docker.elastic.co/logstash/logstash:${ELK_VERSION}
USER root
RUN apt-get update && apt-get install -y locales
RUN locale-gen en_US.UTF-8
ENV LANG en_US.UTF-8
ENV LANGUAGE en_US:en
ENV LC_ALL en_US.UTF-8
# Add your logstash plugins setup here
# Example: RUN logstash-plugin install logstash-filter-json
RUN logstash-plugin install logstash-output-http
COPY config/logstash.yml /usr/share/logstash/config/logstash.yml
COPY config/pipelines.yml /usr/share/logstash/config/pipelines.yml
COPY pipeline /usr/share/logstash/pipeline
USER logstash
CMD ["logstash", "-f", "/usr/share/logstash/pipeline/logstash.conf", "--config.reload.automatic"]
elasticsearch
elasticsearch.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
## Default Elasticsearch configuration from Elasticsearch base image.
## https://github.com/elastic/elasticsearch/blob/master/distribution/docker/src/docker/config/elasticsearch.yml
#
cluster.name: "docker-cluster"
network.host: 0.0.0.0
## X-Pack settings
## see https://www.elastic.co/guide/en/elasticsearch/reference/current/setup-xpack.html
#
xpack.license.self_generated.type: trial
xpack.security.enabled: true
xpack.security.authc.token.enabled: true
xpack.monitoring.collection.enabled: true
discovery.type: single-node
index.refresh_interval: "1s"
Dockerfile
1
2
3
4
5
6
7
ARG ELK_VERSION
# https://www.docker.elastic.co/
FROM docker.elastic.co/elasticsearch/elasticsearch:${ELK_VERSION}
# Add your elasticsearch plugins setup here
# Example: RUN elasticsearch-plugin install analysis-icu
kibana
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
## Default Kibana configuration from Kibana base image.
## https://github.com/elastic/kibana/blob/master/src/dev/build/tasks/os_packages/docker_generator/templates/kibana_yml.template.js
#
server.name: kibana
server.host: "0"
elasticsearch.hosts: [ "http://elasticsearch:9200" ]
xpack.monitoring.ui.container.elasticsearch.enabled: true
xpack.security.encryptionKey: "${KIBANA_ENCRYPTION_KEY}"
xpack.encryptedSavedObjects.encryptionKey: "${KIBANA_ENCRYPTION_KEY}"
xpack.reporting.encryptionKey: "${KIBANA_ENCRYPTION_KEY}"
## X-Pack security credentials
elasticsearch.serviceAccountToken: "${KIBANA_SERVICE_TOKEN}"
elasticsearch.requestTimeout: 30000
elasticsearch.ssl.verificationMode: none
password로 인증하는 방법도 있고 encryption키 토큰으로 인증하는 방법도 있는데 전 더 선호되는 기술을 사용해봤습니다.
Dockerfile
1
2
3
4
5
6
7
8
ARG ELK_VERSION
# https://www.docker.elastic.co/
FROM docker.elastic.co/kibana/kibana:${ELK_VERSION}
# Add your kibana plugins setup here
# Example: RUN kibana-plugin install <name|url>
환경변수 .env
1
2
3
4
5
6
ELASTIC_USERNAME=elastic
ELASTIC_PASSWORD=ssafya206
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/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
KIBANA_SERVICE_TOKEN=AAEAAWVsYXN0aWMva2liYW5hL2tpYmFuYS10b2tlbjp3cW02cFdSTlFsMlh5SDltakdzYjBn
이 환경변수 파일을 추가하는 방법도 아래 설명하겠습니다.
실행 방법
먼저 로컬에서 실행해봅시다.
Docker Desktop을 실행해봅시다. 물론 터미널에서도 진행 가능하지만 처음 여러 도커 이미지를 띄우고 실행하며, 종료됐을 때 docker logs [컨테이너]로 로그 분석 및 각 컨테이터 터미널을 쉽게 접속하려면 도커 데스크탑을 추천합니다. 물론 리눅스 개발 환경이 더 익숙하신 분들은 터미널로 똑같이 진행하시면 됩니다.
1. docker-elk-stack 이 있는 루트 디렉토리로 이동
2. docker로 실행
-
Elasticsearch만 먼저 시작합니다
docker-compose up -d elasticsearch
- Elasticsearch가 완전히 시작될 때까지 기다립니다.
서비스 계정 토큰을 생성하고 .env 파일에 설정합니다.
docker exec -it elasticsearch /bin/bash -c "bin/elasticsearch-service-tokens create elastic/kibana kibana-token"
- 나머지 서비스를 시작합니다
docker-compose up -d
- docker ps로 실행중인 컨테이너 확인합니다
docker ps
실행 후 로그 확인
각각 잘 실행되었는지, 뭔가가 exited되면 왜 종료되었는지를 확인하기 위해 logs를 봐야할 때가 있습니다. 이때는
1
2
3
4
5
docker-compose logs filebeat
docker-compose logs kafka
docker-compose logs logstash
docker-compose logs elasticsearch
docker-compose logs kibana
다음 5개 순서대로 플로우가 흘러가니 문제가 생긴 컨테이너의 터미널로 가 실행하여 오류를 로그를 통해 해결할 수 있습니다.
카프카의 경우 토픽에 제대로 로그가 들어오는지를 확인해야 하는 경우가 있기 때문에 복잡한 명령어를 미리 적어두었습니다.
카프카 터미널 명령어 (서버의 경우 kafka:9092, 로컬의 경우 localhost:9092)
-
Kafka 컨테이너에 접속하기 :
docker exec -it kafka /bin/bash
-
Kafka 스크립트 경로로 이동: 컨테이너에 접속한 후, Kafka 스크립트가 위치한 경로로 이동합니다. 일반적으로 Kafka의 스크립트는 /opt/kafka/bin에 위치합니다. :
cd /opt/kafka
-
토픽 목록 확인: 이제 kafka-topics.sh 스크립트를 실행하여 현재 Kafka 클러스터의 토픽 목록을 확인할 수 있습니다. :
./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
사진의 경우 바로 cd /opt/kafka/bin으로 이동해 진행한 모습
-
토픽 리스트 확인:
./bin/kafka-topics.sh --list --bootstrap-server kafka:9092
- 특정 토픽의 메시지 확인 (예: “OMG” 토픽):
./bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic OMG --from-beginning
이 명령어는 “OMG” 토픽의 모든 메시지를 처음부터 보여줍니다. -
실시간으로 새로운 메시지만 보고 싶다면:
./bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic OMG
-
메시지와 함께 키, 타임스탬프 등을 보려면:
./bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic OMG --property print.key=true --property print.timestamp=true --from-beginning
-
특정 파티션의 메시지만 보려면 (예: 파티션 0):
./bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic OMG --partition 0 --from-beginning
- 메시지 생산을 테스트 :
./bin/kafka-console-producer.sh --bootstrap-server kafka:9092 --topic OMG
elastic 접속
localhost:5601(http://localhost:5601/
) 또는 도메인:5601(http://j11a206.p.ssafy.io:5601/
)로 접속합니다
이때 저희가 설정한 ELASTIC_USERNAME과 ELASTIC_PASSWORD로 로그인 합니다.
https 면 접속이 안되니까 주의!
인덱스 생성
좌측 탭에서 맨 아래 Stack Management로 들어간다.
이후 다시 좌측 탭을 보면 여러 카테고리가 변경되는데 여기서 Index Management로 들어간다.
아마 아무것도 없고 새로 생성해야할텐데 이미 로그를 생성할 때 logstash의 output에서
index => "omg-game-error-log-%{+YYYY.MM.dd}"
,
index => "omg-game-log-%{+YYYY.MM.dd}"
와 같이 인덱스를 지정했으므로 해당 인덱스를 적어준다. 둘 다 포함하기 위해 omg-game*과 같이 적어주었다.
만약 인덱스를 모르겠으면 에서 Dev Tools로 들어가 간단한 콘솔문으로 인덱스를 모두 출력해볼 수 있다.
GET _cat/indices?v
로그 분석
이제 로그를 확인해 볼 차례다. 결국 이 단계를 위해 모든 과정을 거친것이다.
Discover에 들어간다.
이런식으로 분석 페이지 및 로그 내용이 나온다. 우리 서비스를 많이 이용했는지 로그데이터가 130만건이 쌓였다.
어떤 로그가 있는지 보도록 해보자.
일단 우측상단 시간을 통해 원하는 시간대를 설정한다.
이런식으로 설정을 한 뒤 원하는 조건을 걸어보자. message 중 에러가 있는걸 보도록 하자.
이렇게 message : error과 같이 필터를 걸어주면 원하는 로그 데이터만 볼 수 있다. 분석해보니
웹소켓 연결을 계속 시도하면서 에러가 발생했음을 알 수 있다. 이렇게 특정 에러를 분석하고 바로 대응할 수 있다.
이에 특정 에러를 감지하여 수신 웹후크 앱을 통해 Slack으로 에러메시지를 실시간 전송하도록 추가했다.
먼저 슬랙을 통해 자동화 앱을 선택한다.
우측 상단 Slack 마켓플레이스를 선택한다.
찾아보기에서 web을 검색해 수신 웹후크를 선택.
이후 사용자화 한 뒤 사용
생성된 웹후크 URL은 .env에 추가하면된다.