파이썬 새프로젝트 생성
파이썬 > 카프카에게 메세지를 보내는 소스를 작성
아래소스는 파이썬코드로 만들어진 test 토픽인 프로듀서이다.
from kafka import KafkaProducer
import time
producer = KafkaProducer(
bootstrap_servers=['192.168.179.100:9092']
)
start = time.time()
for i in range(100):
producer.send('test', value="test".encode("utf-8"))
# 토픽 메세지
producer.flush()
print("elapsed :", time.time() - start)
실행하면 안된다
카프카 서버가 리스너가 #advertised.listeners=PLAINTEXT://your.host.name:9092
막혀있어서 아이피주소를 바꿔준다
그후
파이썬 코드로 만들어진 test토픽을 받아오는 컨슈머를 생성한다.
from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer(
'test',
bootstrap_servers=['192.168.179.100:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')),
consumer_timeout_ms=10000
)
print('[begin] get consumer list')
for message in consumer:
print("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s" % ( message.topic, message.partition, message.offset, message.key, message.value ))
print('[end] get consumer list')
이제 연습한것을 장고에 올려보자
from kafka import KafkaProducer
from json import dumps
# 토픽 방식은 아래 방식으로
'''
logging.post.like
logging.post.un_like
logging.post.create
보낼떄 제이슨 타입의 형식을 대체적으로 사용한다.
'''
producer = KafkaProducer(
acks=0,
compression_type='gzip',
bootstrap_servers=['192.168.179.100:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8')
)
@login_required(login_url='/accounts/login')
def like(request, bid):
post = Post.objects.get(id=bid)
user = request.user
if post.like.filter(id=user.id).exists():
post.like.remove(user)
data = {'user': user.id, 'post': post.id, 'like_cnt': post.like.count()}
producer.send('logging.post.like', value=data)
producer.flush()
return JsonResponse({'message': 'deleted', 'like_cnt': post.like.count()})
else:
post.like.add(user)
data = {'user': user.id, 'post': post.id, 'like_cnt': post.like.count()}
producer.send('logging.post.like', value=data)
producer.flush()
return JsonResponse({'message': 'added', 'like_cnt': post.like.count()})
카프카에서 아래의 엔진(커스터머) 에게 보낸다.
Elasticsearch,Kibana,Logstash,Elastic Stack , Beats
Logstash > 카프카에게서 메세지를 받으면 Elasticsearch에 맞는 형태로 변환
Kibana > Elasticsearch 시각화툴
실습
가상머신 준비 3대
엘라스틱 서치 : 120
로그스테치 : 110
키바나 : 130
systemctl stop firewalld
sudo yum install net-tools
1번서버 셋팅(엘라스틱서치) ip 192.168.179.120
0. 가상머신 준비
cpu mem 프로그램
centos8 2 4 엘라스틱서치 ip:120
centos8 1 2 로그스태시 ip: 110
centos8 1 2 키바나 ip: 130
1. 엘라스틱 서치
1) 레포지토리 추가
cat > /etc/yum.repos.d/elasticsearch.repo <https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF
2) 설치
dnf -y install elasticsearch
3) 실행
systemctl enable elasticsearch
systemctl restart elasticsearch
4) 확인
curl http://127.0.0.1:9200
외부접속허용하기
vi /etc/elasticsearch/elasticsearch.yml
56라인 주석제거및 허용아이피
network.host: 0.0.0.0
74라인주석제거
cluster.initial_master_nodes: ["node-1", "node-2"]
systemctl restart elasticsearch
2번서버 셋팅 ip 192.168.179.110
input {
file {
type => "seucure_log" #보안로그타입
path => "/var/log/secure" #로그 주소
}
}
filter {
grok {
add_tag => [ "sshd_fail" ] #sshd 접속오류태그
match => { "message" => "Failed %{WORD:sshd_auth_type} for %{USERNAME:sshd_invalid_user} from %{IP:sshd_client_ip} port %{NUMBER:sshd_port} %{GREEDYDATA:sshd_protocol}" }
#메세지중 Failed 인녀석만 불러오는것이다.
}
}
output {
elasticsearch {
hosts => ["http://엘라스틱서치IP주소:9200"]
index => "sshd_fail-%{+YYYY.MM}"
}
}
2. 로그스태시
1) 레포지토리 추가
cat > /etc/yum.repos.d/elasticsearch.repo <https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF
2) 설치
dnf -y install logstash
3) 간단한 파이프라인 설정
chgrp logstash /var/log/secure
chmod 640 /var/log/secure
vi /etc/logstash/conf.d/sshd.conf
input {
file {
type => "seucure_log"
path => "/var/log/secure"
}
}
filter {
grok {
add_tag => [ "sshd_fail" ]
match => { "message" => "Failed %{WORD:sshd_auth_type} for %{USERNAME:sshd_invalid_user} from %{IP:sshd_client_ip} port %{NUMBER:sshd_port} %{GREEDYDATA:sshd_protocol}" }
}
}
output {
elasticsearch {
hosts => ["http://엘라스틱서치IP주소:9200"]
index => "sshd_fail-%{+YYYY.MM}"
}
}
4) 실행
systemctl enable logstash
systemctl restart logstash
5) 확인
curl 엘라스틱서치IP주소:9200/_cat/indices?v
3번서버까지 설치하면아래와 같이 정보가 나오고
2번서버에 sshd에러 로그를 엘라스틱 서치에 저장하게 하였으니
2번서버에 일부로 로그인에러를 만들면 아래와 같이 생길것이다
3번서버 키바나 서버 ip : 192.168.179.130
3. 키바나
1) 레포지토리 추가
cat > /etc/yum.repos.d/elasticsearch.repo <https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF
2) 설치
dnf -y install kibana
3) 설정
vi /etc/kibana/kibana.yml
# 7번 라인 주석 해제 후 다음과 같이 설정
server.host: "0.0.0.0"
# 32번 라인 주석 해제 후
elasticsearch.hosts: ["http:// 엘라스틱 서치의 IP :9200"]
4) 실행
systemctl enable kibana
systemctl restart kibana
설정후 시간이좀 지난뒤 접속해보면 대쉬보드형태로 뜬다
설정에서 로그스태쉬에서 설정한 sshd에러를 불러오게 한다
그러고나면 대쉬보드에서 나타난다
여기까지가 기본이고 다음에는 kafka에서 데이타를 받아오게 설정한다.
logstash 서버에 설정하기
vi /etc/logstash/conf.d/kafka.conf
input {
kafka {
bootstrap_servers => "192.168.179.100:9092"
topics => ["logging.post.like"]
}
}
output {
file {
path => "/etc/logstash/data/kafka.log"
}
}
#kafka.log 로그파일 만들기
mkdir /etc/logstash/data
touch /etc/logstash/data/kafka.log
chmod 755 /etc/logstash/data/kafka.log
chown logstash:logstash /etc/logstash/data/kafka.log
systemctl stop logstash
systemctl start logstash
tail -f /etc/logstash/data/kafka.log <<계속읽어오기
output 수정하기
output {
elasticsearch {
hosts => ["http://192.168.179.120:9200"]
index => "post-like-%{+YYYY.MM}"
}
}
하고나면 완료....
고생하였다..
참고자료
https://needjarvis.tistory.com/607
https://jyoondev.tistory.com/185
. 파이썬 producer, consumer
1) 프로듀서
from kafka import KafkaProducer
import time
producer = KafkaProducer(
bootstrap_servers=['192.168.100.201:9092']
)
start = time.time()
for i in range(100):
producer.send('test', value="test".encode("utf-8"))
producer.flush()
print("elapsed :", time.time() - start)
2) 컨슈머
from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer(
'test',
bootstrap_servers=['192.168.100.201:9092']
)
print('[begin] get consumer list')
for message in consumer:
print("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s" % ( message.topic, message.partition, message.offset, message.key, message.value.decode('utf-8') ))
print('[end] get consumer list')
=========================================
json 주고 받기
=========================================
1. 프로듀서
from kafka import KafkaProducer
from json import dumps
import time
producer = KafkaProducer(
acks=0,
compression_type='gzip',
bootstrap_servers=['192.168.100.201:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8')
)
start = time.time()
for i in range(100):
data = {'str' : 'result'+str(i)}
producer.send('test', value=data)
producer.flush()
print("elapsed :", time.time() - start)
2. 컨슈머
from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer(
'test',
bootstrap_servers=['192.168.100.201:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')),
consumer_timeout_ms=10000
)
print('[begin] get consumer list')
for message in consumer:
print("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s" % ( message.topic, message.partition, message.offset, message.key, message.value ))
print('[end] get consumer list')