こんにちは、Sumo Logic担当の新藤です。 2度目の緊急事態宣言が発令されめっぽうオフィスに行かなくなりました。そのため最近はオフィス近くの赤坂見附の美味しいランチ巡りがあまりできておらずとても寂しいです(´;ω;`)
ところで、読者の皆さんはSumo Logicのアラート機能では文字のみ送信可能で、円グラフや棒グラフなどを画像化したものをSlackなどのチャットツールに送信できないことはご存知でしょうか。なにかしらのアプリケーションにおいてユーザーの作成を検知したらアラートを送信する、などのイベントベースのアラートでは文字のみのアラートで問題ないとは思いますが、特定のアプリケーションの社内ユーザー全体の利用状況の統計情報などは文字ではなくグラフで送信したいですよね。
そこでSumo Logic APIを実行し、APIレスポンスをAWS Lambdaで整形し、Slackに送信してみたのでその手順について書きたいと思います。今回は特に、IDaaS製品としておなじみのOneLoginのユーザー利用状況グラフ(以下、OneLoginユーザーレポート)をSlackに送信するプログラムを作成しました。OneLoginに限らず、Sumo Logicとログ連携しているSaaSやサーバーであれば似たものは作成できますよ!
完成図
今回作成したOneLoginユーザーレポートでは、
logged into onelogin
:24時間以内のOneLoginへのログインlogged into app
:24時間以内のOneLoginから各アプリケーションへのログイン
の数を棒グラフにしています。
前提条件
必要なもの
- Sumo Logic APIが使用できるプラン(下記のいずれか)への加入
- Credits
- Trial
- Enterprise Operations
- Enterprise Security
- Enterprise Suite
- CloudFlex
- Enterprise
- Credits
- Slackアカウント
準備
まず、Sumo Logic APIを取得します。Sumo Logicにログインし、下記URLを開きます。
https://service.jp.sumologic.com/ui/#/preferences
そして、My Access Keysという項目の右端にある、「Add Access Key」という青いボタンを押します。 青いボタンを押すとポップアップが表示されます。Nameにわかりやすい名前を入力します。そして、右下のCreate Keyを押します。 Access IDとAccess Keyは後ほど用いるため、控えておきます。
※ Access Keyはここで表示された後は、2度とこちらの画面から値を参照できなくなるため、保管には気をつけてください。
次に、Slack API Tokenを取得します。
ブログ記事が冗長となってしまうため、ここではSlack側での操作の詳細は割愛させていただきます。以下のURLが参考になるかと思います。下記URLの通り、Bot Token Scopesにfiles:write
権限を付与し、Bot User OAuth Access Tokenを控えておきます。
smdbanana.hatenablog.com
次に、Slackを開き、先程作成したBot Userを画像を送信する予定のチャンネルに招待します。 そして、画像を送信する予定のチャンネルIDを取得します。
以上で準備は完了です。
Sumo Logic 検索ジョブ APIの使い方
今回はPythonでOneLoginユーザーレポートを作成したため、Pythonスクリプトを用いて説明したいと思います。 まず、必要なモジュールをインストールします。
$ pip install requests matplotlib pandas sumologic-sdk
Sumo Logic検索ジョブAPIはSumo Logicクエリと、APIを実行するためのスクリプトを用意することで簡単に実行することができます。たとえば以下のクエリを実行したいとします。
_sourceCategory="Dev/OneLogin/JP" and _collector="Demo - OneLogin" | json "event.event_type_id" as type_id | where type_id in ("5", "8") | timeslice 1h | if(type_id = 5, "Logged Into OneLogin", "Logged Into App") as event | count by _timeslice, event | fillmissing timeslice(1h) | where !isNull(event) | transpose row _timeslice column event
上記をquery.sumoql
という名前で保存します。そして上記のクエリをPythonで実行するには、以下のコードを記述します。
import json import datetime import os import time import requests from sumologic import SumoLogic class OneLoginReport: def __init__(self): jst = datetime.timezone(datetime.timedelta(hours=9)) dt_now = datetime.datetime.now(jst) self.dt_today = datetime.datetime(dt_now.year, dt_now.month, dt_now.day, tzinfo=jst) self.dt_yesterday = datetime.datetime(dt_now.year, dt_now.month, dt_now.day - 1, tzinfo=jst) def search_result_sumologic(self): access_id = os.environ['SUMOLOGIC_API_ACCESS_ID'] access_key = os.environ['SUMOLOGIC_API_ACCESS_KEY'] url_endpoint_jp = 'https://api.jp.sumologic.com/api' LIMIT = 42 today_for_sumologic = str(self.dt_today).replace(' ', 'T') yesterday_for_sumologic = str(self.dt_yesterday).replace(' ', 'T') sumologic = SumoLogic(access_id, access_key, url_endpoint_jp) from_time = yesterday_for_sumologic to_time = today_for_sumologic time_zone = 'JST' by_receipt_time = False delay = 5 with open('query.sumoql', 'r') as f: q = ' '.join(f.readlines()) sj = sumologic.search_job(q, from_time, to_time, time_zone, by_receipt_time) status = sumologic.search_job_status(sj) while status['state'] != 'DONE GATHERING RESULTS': if status['state'] == 'CANCELLED': break time.sleep(delay) status = sumologic.search_job_status(sj) if status['state'] == 'DONE GATHERING RESULTS': count = status['recordCount'] limit = count if count < LIMIT and count != 0 else LIMIT # compensate bad limit check r = sumologic.search_job_records(sj, limit=limit) print(r) return r test = OneLoginReport() test.search_result_sumologic()
上記をsumo_api.py
という名前でquery.sumoql
と同じディレクトリに保存します。次に、Sumo Logic API Access ID & Keyを環境変数に登録します。
$ export SUMOLOGIC_API_ACCESS_ID=*********** $ export SUMOLOGIC_API_ACCESS_KEY=*********************************
そして、sumo_api.py
を実行するとJSONレスポンスが返されます。上記のSumo Logic検索ジョブAPI実行のためのプログラムは下記URLを参考に作成しました。
github.com
レスポンスは以下の形式のようになります。
{ "fields": [ { "name": "_timeslice", "fieldType": "long", "keyField": true }, { "name": "logged into app", "fieldType": "int", "keyField": false }, { "name": "logged into onelogin", "fieldType": "int", "keyField": false } ], "records": [ { "map": { "_timeslice": "1612148400000", "logged into onelogin": "1", "logged into app": "" } }, { "map": { "_timeslice": "1612159200000", "logged into onelogin": "1", "logged into app": "1" } } ] }
フィールド名はfields
ブロックに格納され、値はrecords
ブロックに格納されます。こちらのJSONレスポンスを解析し、描画ライブラリでグラフ化したものをSlackに送信することでレポート画像を見ることが可能となります。
描画の処理を含めた完成版のスクリプトは以下のとおりです。
import json import datetime import os import time import logging import requests import pandas as pd import matplotlib.pyplot as plt from sumologic import SumoLogic class OneLoginReport: def __init__(self): jst = datetime.timezone(datetime.timedelta(hours=9)) dt_now = datetime.datetime.now(jst) self.dt_today = datetime.datetime(dt_now.year, dt_now.month, dt_now.day, tzinfo=jst) self.dt_yesterday = datetime.datetime(dt_now.year, dt_now.month, dt_now.day - 1, tzinfo=jst) self.search_result = self.search_result_sumologic() self.record_list = self.process_record_list() self.field_list = [ field['name'] for field in self.search_result['fields'] ]; self.field_list[0] = 'timeslice' self.file_name = f'/tmp/onelogin_user_monitoring_{str(self.dt_yesterday)[:10]}.png' self.save_fig() self.post_slack() def search_result_sumologic(self): access_id = os.environ['SUMOLOGIC_API_ACCESS_ID'] access_key = os.environ['SUMOLOGIC_API_ACCESS_KEY'] url_endpoint_jp = 'https://api.jp.sumologic.com/api' LIMIT = 42 today_for_sumologic = str(self.dt_today).replace(' ', 'T') yesterday_for_sumologic = str(self.dt_yesterday).replace(' ', 'T') sumologic = SumoLogic(access_id, access_key, url_endpoint_jp) from_time = yesterday_for_sumologic to_time = today_for_sumologic time_zone = 'JST' by_receipt_time = False delay = 5 with open('query.sumoql', 'r') as f: q = ' '.join(f.readlines()) sj = sumologic.search_job(q, from_time, to_time, time_zone, by_receipt_time) status = sumologic.search_job_status(sj) while status['state'] != 'DONE GATHERING RESULTS': if status['state'] == 'CANCELLED': break time.sleep(delay) status = sumologic.search_job_status(sj) if status['state'] == 'DONE GATHERING RESULTS': count = status['recordCount'] limit = count if count < LIMIT and count != 0 else LIMIT # compensate bad limit check r = sumologic.search_job_records(sj, limit=limit) # print(json.dumps(r, indent=4)) return r def process_record_list(self): exist_timeslice_list = [ int(int(record['map']['_timeslice']) / 1000) for record in self.search_result['records'] ] all_timeslice_list = [ self.dt_yesterday.timestamp() + 3600 * i for i in range(24) ] record_dic = {} for record in self.search_result['records']: timeslice = int(int(record['map']['_timeslice']) / 1000) login_app_count = 0 if record['map']['logged into app'] == '' else int(record['map']['logged into app']) login_onelogin_count = 0 if record['map']['logged into onelogin'] == '' else int(record['map']['logged into onelogin']) record_dic[timeslice] = { 'logged into app': login_app_count, 'logged into onelogin': login_onelogin_count } record_list = [] for timeslice in all_timeslice_list: if timeslice not in exist_timeslice_list: record_list.append([timeslice, 0, 0]) else: login_app_count = 0 if record_dic[timeslice]['logged into app'] == '' else int(record_dic[timeslice]['logged into app']) login_onelogin_count = 0 if record_dic[timeslice]['logged into onelogin'] == '' else int(record_dic[timeslice]['logged into onelogin']) record_list.append([timeslice, login_app_count, login_onelogin_count]) return record_list def save_fig(self): pd_data = pd.DataFrame(self.record_list, columns=self.field_list) plt.figure() pd_data.plot.bar(y=self.field_list[1:], figsize=(12,6), stacked=True) plt.title(f'OneLogin User Monitoring Daily Report ( {str(self.dt_yesterday)[:10]} )', size=12) plt.xlabel('hour') plt.xticks(rotation=0) plt.savefig(self.file_name) plt.close('all') def post_slack(self): slack_token = os.environ['SLACK_DAILY_REPORT_TOKEN'] channel_id = os.environ['SLACK_DAILY_REPORT_CHANNEL_ID'] files = {'file': open(self.file_name, 'rb')} param = { 'token': slack_token, 'channels': channel_id, 'filename': 'filename', 'initial_comment': 'OneLoginユーザーモニタリングレポートです。', 'title': self.file_name } requests.post(url='https://slack.com/api/files.upload', params=param, files=files) def lambda_handler(event, context): OneLoginReport() lambda_handler(1, 1)
終わりに
以上で終了です。お疲れさまでした!
本記事ではSumo Logic 検索ジョブ APIのレスポンスをグラフ化してSlackに送信するということをやってみました。Sumo Logicとその他のSaaS連携についてのブログ記事はこれからどんどんアップしていきたいと思います。どなたかの参考となれば幸いです。
それでは〜