FilebeatはNanoPi NEO2にインストールというのしかやってなかったのでFreeBSDで簡単だけど実際に使えるようなのをやってみる。今回はFail2banが出力するログをFilebeatで監視し、認証を複数回失敗したなどでそのIPアドレスをBan(そのIPアドレスからの通信をブロック)したという記録が追加されたらそれをFilebeatでLogstashに送り、Logstashでそのデータを加工してelasticsearchに送る。さらにKibanaで可視化できる情報(BanされたIPアドレスの世界地図化)と国別ランキングを作る。
この記事のFilebeatとLogstashのファイルの配置構成はFreeBSDのportsでインストールしたものとなるので他の環境では読み替える。
FilebeatはMetricbeatをインストールしたら一緒にインストールされているので自動起動の有効化だけ追加。
/etc/rc.conf (追加1行)filebeat_enable="YES"
Filebeatの設定
/usr/local/etc/filebeat.yml1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | filebeat.prospectors:
- type: log
enabled: true
paths:
- /var/log/fail2ban.log
fields:
type: fail2ban
include_lines: [' Ban '] # " Ban "を含む行を抽出する
exclude_lines: ['Restore'] # 上で抽出した中でRestoreを含む行を除外
processors:
- drop_fields:
fields: ['offset', 'source'] #beat.name, offsetフィールドを出力しない
filebeat.config.modules:
path: ${path.config}/beats/file_*.yml
reload.enabled: false
#reload.period: 10s
output.logstash:
hosts: ["192.168.0.24:5043"] #Logstash稼働ホストのテスト用ポートに出力
# hosts: ["192.168.0.24:5044"] #Logstash稼働ホストの本番用ポート
logging.level: debug
logging.selectors: ["*"]
logging.to_syslog: false
logging.to_files: true
logging.files:
path: /var/log
name: filebeat.log
|
設定ファイルは行頭の字下げ、または前行と揃えるというのが重要なので疎かにしない。
読み込むログファイルはFail2banのログ /var/log/fail2ban とする。
本来は出力されるoffsetとsourceフィールドは出力しない。
Filebeatのモジュール関係の設定ファイルは /usr/local/etc/beats/file_*.ymlとする。(この記事では触れない)
Filebeatは前回読み込んだログの最終場所を憶えている(筈な)ので再読み込みテストを行う場合はregistryファイルを削除してFilebeatを再起動する。
設定を作って試してまた設定を変更して試すというときにはこれ忘れたらダメ。(以下3手順)
# service filebeat stop # rm /var/db/beats/filebeat/data/registry # service filebeat start
これでFail2banのログの中で抽出されてLogstashに送られるのは以下のような BanされたIPアドレスと引っ掛けたJailルール名が記された行だけになる筈。
2018-03-12 12:01:55,431 fail2ban.actions [27942]: NOTICE [jail-name] Ban 192.168.1.200
上のようなログが出現したときにFilebeatが出力するデータをLogstashで受信させると次のようになる。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | {
"@timestamp" => 2018-03-12T12:32:44.180Z,
"message" => "2018-03-06 13:43:26,222 fail2ban.actions [27956]: NOTICE [jail-name] Ban 192.168.1.200",
"prospector" => {
"type" => "log"
},
"source" => "/var/log/fail2ban.log",
"host" => "hoge.localnet",
"@version" => "1",
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"beat" => {
"version" => "6.2.2",
"name" => "hoge.localnet",
"hostname" => "hoge.localnet"
},
"fields" => {
"type" => "fail2ban"
}
}
|
Logstashによるデータ整形
Logstashでデータ整形にあたりGrokプラグインの機能を使いたい。grokパターンデータは変数を組み合わせて作った定義のようなものなので自分で作るのでも良いが、楽をしたいので GitHubの /logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns から貰ってきて /usr/local/etc/logstash/patterns に置く。
# mkdir /usr/local/etc/logstash/patterns # wget https://raw.githubusercontent.com/logstash-plugins/logstash-patterns-core/master/patterns/grok-patterns
マッチパターンに使用する定義済の変数名が1つでも誤ってるとLogstashが一発で終了してしまうので注意。エラーメッセージを出すだけにするとかして貰わないキツいよねぇ。
テスト用Logstashの設定。/usr/local/etc/logstash/test.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | input {
beats {
port => 5043 #テスト用ポート
}
}
filter {
if [fields][type] == "fail2ban" {
grok {
patterns_dir => ["/usr/local/etc/logstash/patterns"]
match => {"message" => "%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{TIME},%{NUMBER} fail2ban\.actions%{SPACE}\[%{NUMBER}\]\: NOTICE \[%{USERNAME:jail}\] Ban %{IP:ip}"}
remove_field => ["message", "beat", "tags"]
}
geoip {
source => "ip"
}
}
}
output {
stdout { codec => rubydebug } #テスト用コンソールに出力
#elasticsearch { hosts => [ "localhost:9200" ] }
}
|
Filebeatの設定でfieldsに type: failban というのを追加したのでそれを識別用として使う。fields =>{ type=> hoge }というデータは [fiedls][type] のように書く。ifでfieldsがtype: fail2banなデータだけをフィルタ処理させる。他のデータに影響させないのと処理を重くしないため。
patterns_dirには先程取得したGrokパターンデータを置いたディレクトリのPathを指定する。
マッチさせるデータとGrokパターンの対応は以下。
データ: 2018 - 03 - 06 13:43:26,222 fail2ban.actions [27956]: NOTICE [jail-name] Ban 192.168.1.200 パターン: %{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{TIME} ,%{NUMBER} fail2ban\.actions%{SPACE}\[%{NUMBER}\]\: NOTICE \[%{USERNAME:jail}\] Ban %{IP:ip}
jail-name (Fail2banのルール名)をマッチさせるのに普通なら文字列 %{WORD} を選びたくなるが、 %{WORD} ではハイフンなどが通らないので [a-zA-Z0-9._-]+ である %{USERNAME} を使用している。半角スペースが連続で並んでいるところは \s* である %{SPACE} を使用。
IPv4 & IPv6アドレスは %{IP} だが、それをipというフィールド名で出力したいので %{IP:ip} と指定。同様にJail名をjailというフィールド名で出力したいので %{USERNAME:jail} とした。幾つかの記号は\ (¥)でエスケープしている。
elasticsearchに出力する際にログそのものであるmessageフィールドは不要なのでremove_fieldで指定。beatやtagsフィールドも要らないと思ったので一緒に指定した。
IPアドレスを出力するようにしたは良いが、IPアドレスだけでは何の使い途もないのでGeoIPプラグインでそのIPアドレスの国(州など)の情報を追加する。そのソースとなるIPアドレスとしては、すぐ上で抽出&出力するようにしたipフィールドを使用。
なお、上のやり方は簡単だけどFail2Banで記録された元のBan時刻が失われてFilebeatがログを見つけてLogstashに送信する時間が記録されるので念の為。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | {
"host" => "hoge.localnet",
"prospector" => {
"type" => "log"
},
"geoip" => {
"city_name" => "Ibirite",
"latitude" => -20.015,
"region_code" => "MG",
"location" => {
"lon" => -44.0769,
"lat" => -20.015
},
"country_code3" => "BR",
"longitude" => -44.0769,
"continent_code" => "SA",
"postal_code" => "32400",
"timezone" => "America/Sao_Paulo",
"country_name" => "Brazil",
"ip" => "177.***.***.194",
"country_code2" => "BR",
"region_name" => "Minas Gerais"
},
"ip" => "177.***.***.194",
"jail" => "jail-name",
"@version" => "1",
"@timestamp" => 2018-03-13T02:47:11.703Z,
"fields" => {
"type" => "fail2ban"
}
}
|
意図したとおりip, jail, geoipフィールドが出力され、message, beat, tagsフィールドは出力されなくなった。
思ったとおりに設定が出来たら本番用Logstashの設定ファイルに作成したFilterルールを書き込んで本番用Logstashを再起動する。
Filebeatの設定ファイルの出力先ポートも本番用の5044に変更してFilebeat再起動。
本番用Logstashとテスト用Logstashを並列稼働させるなら以前の記事参照。
Kibanaでグラフ化とランキング化
Fail2banのログデータを加工してelasticsearchに流し込むところまでできたのでそのデータをぱっと見てわかるものに(可視化)する。どこの国から来たアクセスが(認証失敗)などでBanされたのかを地図で見たらわかりやすい。それとどこの国からがBanが多いのかをランキング表示させたい。
新しいフィールドが増えているがインデックスを更新しないとそれが反映されない。Kibanaの左列メニューの (Management)からIndex Patternsを選択し、右上の (Refresh)を押す。
画面1:
左列メニューのDiscoverからサーチオプジェクトを作成する。今回はホスト名とFail2Banのデータで絞れば良いので フィールド名 host で目的のホスト、フィールド名 fiels.typeでfail2banを見つけてそれぞれ右にある(絞込条件追加)を押す。そのとき右上の検索対象期間でBanの発生がある期間に拡げておかないと項目名が表示されないかも。これ油断して忘れがち。
上の画像ではfields.typeでfail2banを表示している。絞り込んだ条件名が上部に表示される。(上の画像の赤破線の四角)
2つの条件で絞り込んだらそれを任意の名前で保存する。(Visualizeのフィルターで絞り込むなら保存しなくても良い。)
画面2:
左列メニューのVisualizeからグラフを作成する。Visualizeのメニューで(新規作成)を押すとVisualization typeの選択画面が表示される。
今回は地図を作成したいのでRegion Mapを選択する。
画面3:
元データとなるサーチオブジェクトを選択する。つまり2つ上の画面1で作成・保存したサーチオブジェクトを選択する。
または
画面1でオブジェクト保存をしなかった場合はlogstash-* (またはLogstashのデータを持つインデックス) を選択する。(次の画面4でFilterの追加が必要になる)
画面4:
1つ前の画面3でサーチオブジェクトではなくインデックスを選択した場合はFilterの指定を行う。
左上にある Add a Filter を押す。
[host] [is] [ホスト名]
[fields.type] [is] [fail2ban]
この2つのフィルターをそれぞれ指定して[Save]を押す。
以下はサーチオブジェクトを選択した場合、インデックスを選択した場合共通
MetricsはCountを選択する。(初期値がCountなので変更不要)
その下のBucketsにshape fieldという表示があるのでそれを押す。
AggregationのドロップダウンメニューでTermsを選択する。
Fieldsのドロップダウンメニューでgeoip.country_code2.keywordを探すか直接入力する。このドロップダウンメニューには全てのフィールドが表示されるとは限らないので最初の数文字を入力すると表示されていないフィールドも表示される(筈)。
ここでも右上で表示対象期間を拡げておく。少なくともBanが1件以上発生している期間を選択しないと正しく表示できるか確認できない。
なお、この画面の設定だけでは (グラフ表示)を押しても白地図以外は表示されないので次へ。
画面5:
Optionsタグを押す。
Vector mapでWorld Countriesを選択する。ここの初期値が(Kibana6.2.2では)何故かCanada Provincesになっているので白地図しか表示されない。
Join Fieldは国名の表記の仕方で2文字, 3文字, 普通の国名から選択するが、World Countriesを選択すると初期値はTwo letter abbreviation (2文字国名)になるのでそのまま触らない。先に指定したgeoip.country_code2.keywordが2文字国名なのでそれで良い。
(グラフ表示)を押して地図上に黄色〜赤色の国が存在することを確認。
Visualzationを任意の名前で保存する。
画面6:
ランキング用のVisualizationを新規で作成する。
Visualizeのメニューで(新規作成)を押すとVisualization typeの選択画面が表示される。(上の画面2の画像)
今回はData Tableを選択する。
上の画面3と同じ状態になるので同じく画面1で作成したSearchオブジェクトを選択する。またはインデックスを選択する。インデックスを選択した場合はFilterを指定するのは上の地図の場合と同じ。
MetricsのAggregationは初期値のCountで良いので変更しない。
その下のBucketsでは今回はSplit Rowsを選択する。
BucketsのSplit RowsのAggregationのドロップダウンメニューではTermを選択する。
Fieldのドロップダウンメニューではgeoip.country_name.keywordを選択したいところだが、これもメニューに全フィールドが表示されるとは限らないので最初の数文字を入力するとメニューに出てくる筈。出てこない場合は「geoip.country_name.keyword」を全て入力。
Sizeはテーブルに何行表示させるかなので希望の数を指定する。トップテンにしたければ10にするなど。その左のOrderはリストの並び順を数が多い順(降順)にするか少ない順(昇順)にするか。
今回は国名のランキングだけどカラム名の初期値がgeoip.country_name.keyword: Descending などになる筈なのでCustom LabelにCountryなどという風に指定する。「国名」のように日本語も使えるがあまり勧めない。
(テーブル表示)を押してランキングが表示されることを確認。(これも右上の対象期間でBanが1件以上発生している範囲に調節する)
今回は単純な国別Banランキングにしたけど、sub-bucketsを追加しSplit TableでTermsを追加しFieldをjail.keywordにするとJailの種類別の国別ランキングもできる。
画面7:
ダッシュボードに上で作成したFail2Banのマップとランキングを追加した。
上の画像は縦画面に単に並べただけなので素敵さがないけど横画面なら見栄えがするように配置できるかも。
おまけ:
Kibanaの表示期間内で直近のBanを時系列で10件リスト表示した。これはVisualizeのData TableでMetricsにTop Hitを選択指定することで作成できる。
- TelegrafでElasticsearchにメトリクス送信+Kibanaで可視化 (後編)
- TelegrafでElasticsearchにメトリクス送信+Kibanaで可視化 (中編)
- TelegrafでElasticsearchにメトリクス送信+Kibanaで可視化 (前編)
- Metricbeatで収集したメトリクスデータをKibanaで可視化する
- Kibanaを操作する前にユーザーを作成する
- Metricbeat 8.6.0のインストールと設定
- Elastic Stack 8系をFreeBSDにインストール
- Elastic Stack 6.4.2への更新 FreeBSD ports用メモ
- WinlogbeatでWindowsイベントログを可視化 後編
- WinlogbeatでWindowsイベントログを可視化 中編
- WinlogbeatでWindowsイベントログを可視化 前編
- Elastic Stackを6.3.2に更新する
- Elastic Stackでシステム監視 Heartbeatで収集した死活情報をKibanaで可視化
- Elastic Stackでシステム監視 Heartbeatを使う準備
- Elastic Stackでシステム監視 FreeBSDのportsで6.2.3に更新
- ELK Stackでシステム監視 Filebeatで収集したVolumioのログから時系列の再生曲名リストを表示
- ELK Stackでシステム監視 Rspamd 1.7系のElasticsearchモジュールを試す
- ELK Stackでシステム監視 FilebeatでNTP統計ログ取得 Logstashで加工
- ELK Stackでシステム監視 FilebeatでRaspberry Pi Zero WのVolumio楽曲再生ランキング
- ELK Stackでシステム監視 MeticbeatでRaspberry Pi Zero WのVolumioを監視
- ELK Stackでシステム監視 FilebeatでFreeBSDのCPU温度取得+Kibanaグラフ化
- ELK Stackでシステム監視 FilebeatでFail2banのBan情報+地図表示
- ELK Stackでシステム監視 MetricbeatでNginxのステータス情報を取得+グラフ化
- ELK Stackでシステム監視 FreeBSDのportsでELK Stack6系をインストール
- ELK Stackでシステム監視 FreeBSDにMetricbeatをインストールしてみる
- ELK Stackでシステム監視 elasticsearchインデックスのスキーマが勝手に変わる対処 テンプレート作成
- NanoPi NEO2(arm64)用にFilebeatをビルド
- ELK Stackでシステム監視 kibanaでDNSサーバの情報表示
- ELK Stackでシステム監視 kibanaのTimelion,Timeseriesでグラフ作成
- ELK Stackでシステム監視 collectdでDNSサーバの情報収集