Sumo Logic APIのレスポンスをグラフ化してSlackに送信してみた

こんにちは、Sumo Logic担当の新藤です。 2度目の緊急事態宣言が発令されめっぽうオフィスに行かなくなりました。そのため最近はオフィス近くの赤坂見附の美味しいランチ巡りがあまりできておらずとても寂しいです(´;ω;`)

ところで、読者の皆さんはSumo Logicのアラート機能では文字のみ送信可能で、円グラフや棒グラフなどを画像化したものをSlackなどのチャットツールに送信できないことはご存知でしょうか。なにかしらのアプリケーションにおいてユーザーの作成を検知したらアラートを送信する、などのイベントベースのアラートでは文字のみのアラートで問題ないとは思いますが、特定のアプリケーションの社内ユーザー全体の利用状況の統計情報などは文字ではなくグラフで送信したいですよね。

そこでSumo Logic APIを実行し、APIレスポンスをAWS Lambdaで整形し、Slackに送信してみたのでその手順について書きたいと思います。今回は特に、IDaaS製品としておなじみのOneLoginのユーザー利用状況グラフ(以下、OneLoginユーザーレポート)をSlackに送信するプログラムを作成しました。OneLoginに限らず、Sumo Logicとログ連携しているSaaSやサーバーであれば似たものは作成できますよ!

完成図

f:id:cmt_192cm:20210202125816p:plain
OneLoginユーザーレポート

今回作成したOneLoginユーザーレポートでは、

  • logged into onelogin:24時間以内のOneLoginへのログイン
  • logged into app:24時間以内のOneLoginから各アプリケーションへのログイン

の数を棒グラフにしています。

前提条件

必要なもの

  • Sumo Logic APIが使用できるプラン(下記のいずれか)への加入
    • Credits
      • Trial
      • Enterprise Operations
      • Enterprise Security
      • Enterprise Suite
    • CloudFlex
      • Enterprise
  • Slackアカウント

準備

まず、Sumo Logic APIを取得します。Sumo Logicにログインし、下記URLを開きます。

https://service.jp.sumologic.com/ui/#/preferences

そして、My Access Keysという項目の右端にある、「Add Access Key」という青いボタンを押します。

f:id:cmt_192cm:20210129133043p:plain
Sumo Logic API Access ID & Keyの追加
青いボタンを押すとポップアップが表示されます。Nameにわかりやすい名前を入力します。そして、右下のCreate Keyを押します。
f:id:cmt_192cm:20210129134040p:plain
Sumo Logic API Access ID & Keyの発行
Access IDとAccess Keyは後ほど用いるため、控えておきます。

※ Access Keyはここで表示された後は、2度とこちらの画面から値を参照できなくなるため、保管には気をつけてください。

f:id:cmt_192cm:20210129133636p:plain
Sumo Logic API Access ID & Keyの保存

次に、Slack API Tokenを取得します。 ブログ記事が冗長となってしまうため、ここではSlack側での操作の詳細は割愛させていただきます。以下のURLが参考になるかと思います。下記URLの通り、Bot Token Scopesにfiles:write権限を付与し、Bot User OAuth Access Tokenを控えておきます。 smdbanana.hatenablog.com

次に、Slackを開き、先程作成したBot Userを画像を送信する予定のチャンネルに招待します。 そして、画像を送信する予定のチャンネルIDを取得します。

以上で準備は完了です。

Sumo Logic 検索ジョブ APIの使い方

今回はPythonでOneLoginユーザーレポートを作成したため、Pythonスクリプトを用いて説明したいと思います。 まず、必要なモジュールをインストールします。

$ pip install requests matplotlib pandas sumologic-sdk

Sumo Logic検索ジョブAPIはSumo Logicクエリと、APIを実行するためのスクリプトを用意することで簡単に実行することができます。たとえば以下のクエリを実行したいとします。

_sourceCategory="Dev/OneLogin/JP" and _collector="Demo - OneLogin"
| json "event.event_type_id" as type_id
| where type_id in ("5", "8")
| timeslice 1h
| if(type_id = 5, "Logged Into OneLogin", "Logged Into App") as event
| count by _timeslice, event
| fillmissing timeslice(1h)
| where !isNull(event)
| transpose row _timeslice column event

上記をquery.sumoqlという名前で保存します。そして上記のクエリをPythonで実行するには、以下のコードを記述します。

import json
import datetime
import os
import time

import requests
from sumologic import SumoLogic

class OneLoginReport:
    def __init__(self):
        jst = datetime.timezone(datetime.timedelta(hours=9))
        dt_now = datetime.datetime.now(jst)
        self.dt_today = datetime.datetime(dt_now.year, dt_now.month, dt_now.day, tzinfo=jst)
        self.dt_yesterday = datetime.datetime(dt_now.year, dt_now.month, dt_now.day - 1, tzinfo=jst)

    def search_result_sumologic(self):
        access_id = os.environ['SUMOLOGIC_API_ACCESS_ID']
        access_key = os.environ['SUMOLOGIC_API_ACCESS_KEY']
        url_endpoint_jp = 'https://api.jp.sumologic.com/api'
        
        LIMIT = 42
        today_for_sumologic = str(self.dt_today).replace(' ', 'T')
        yesterday_for_sumologic = str(self.dt_yesterday).replace(' ', 'T')
        
        sumologic = SumoLogic(access_id, access_key, url_endpoint_jp)
        from_time = yesterday_for_sumologic
        to_time = today_for_sumologic
        time_zone = 'JST'
        by_receipt_time = False
        
        delay = 5
        with open('query.sumoql', 'r') as f:
            q = ' '.join(f.readlines())
        
        sj = sumologic.search_job(q, from_time, to_time, time_zone, by_receipt_time)
        
        status = sumologic.search_job_status(sj)
        while status['state'] != 'DONE GATHERING RESULTS':
            if status['state'] == 'CANCELLED':
                break
            time.sleep(delay)
            status = sumologic.search_job_status(sj)
        
        if status['state'] == 'DONE GATHERING RESULTS':
            count = status['recordCount']
            limit = count if count < LIMIT and count != 0 else LIMIT # compensate bad limit check
            r = sumologic.search_job_records(sj, limit=limit)
            print(r)
            return r

test = OneLoginReport()
test.search_result_sumologic()

上記をsumo_api.pyという名前でquery.sumoqlと同じディレクトリに保存します。次に、Sumo Logic API Access ID & Keyを環境変数に登録します。

$ export SUMOLOGIC_API_ACCESS_ID=***********
$ export SUMOLOGIC_API_ACCESS_KEY=*********************************

そして、sumo_api.pyを実行するとJSONレスポンスが返されます。上記のSumo Logic検索ジョブAPI実行のためのプログラムは下記URLを参考に作成しました。 github.com レスポンスは以下の形式のようになります。

{
    "fields": [
        {
            "name": "_timeslice",
            "fieldType": "long",
            "keyField": true
        },
        {
            "name": "logged into app",
            "fieldType": "int",
            "keyField": false
        },
        {
            "name": "logged into onelogin",
            "fieldType": "int",
            "keyField": false
        }
    ],
    "records": [
        {
            "map": {
                "_timeslice": "1612148400000",
                "logged into onelogin": "1",
                "logged into app": ""
            }
        },
        {
            "map": {
                "_timeslice": "1612159200000",
                "logged into onelogin": "1",
                "logged into app": "1"
            }
        }
    ]
}

フィールド名はfieldsブロックに格納され、値はrecordsブロックに格納されます。こちらのJSONレスポンスを解析し、描画ライブラリでグラフ化したものをSlackに送信することでレポート画像を見ることが可能となります。

描画の処理を含めた完成版のスクリプトは以下のとおりです。

import json
import datetime
import os
import time
import logging

import requests
import pandas as pd
import matplotlib.pyplot as plt
from sumologic import SumoLogic

class OneLoginReport:
    def __init__(self):
        jst = datetime.timezone(datetime.timedelta(hours=9))
        dt_now = datetime.datetime.now(jst)
        self.dt_today = datetime.datetime(dt_now.year, dt_now.month, dt_now.day, tzinfo=jst)
        self.dt_yesterday = datetime.datetime(dt_now.year, dt_now.month, dt_now.day - 1, tzinfo=jst)
        self.search_result = self.search_result_sumologic()
        self.record_list = self.process_record_list()
        self.field_list = [ field['name'] for field in self.search_result['fields'] ]; self.field_list[0] = 'timeslice'
        self.file_name = f'/tmp/onelogin_user_monitoring_{str(self.dt_yesterday)[:10]}.png'

        self.save_fig()
        self.post_slack()

    def search_result_sumologic(self):
        access_id = os.environ['SUMOLOGIC_API_ACCESS_ID']
        access_key = os.environ['SUMOLOGIC_API_ACCESS_KEY']
        url_endpoint_jp = 'https://api.jp.sumologic.com/api'
        
        LIMIT = 42
        today_for_sumologic = str(self.dt_today).replace(' ', 'T')
        yesterday_for_sumologic = str(self.dt_yesterday).replace(' ', 'T')
        
        sumologic = SumoLogic(access_id, access_key, url_endpoint_jp)
        from_time = yesterday_for_sumologic
        to_time = today_for_sumologic
        time_zone = 'JST'
        by_receipt_time = False
        
        delay = 5
        with open('query.sumoql', 'r') as f:
            q = ' '.join(f.readlines())
        
        sj = sumologic.search_job(q, from_time, to_time, time_zone, by_receipt_time)
        
        status = sumologic.search_job_status(sj)
        while status['state'] != 'DONE GATHERING RESULTS':
            if status['state'] == 'CANCELLED':
                break
            time.sleep(delay)
            status = sumologic.search_job_status(sj)
        
        if status['state'] == 'DONE GATHERING RESULTS':
            count = status['recordCount']
            limit = count if count < LIMIT and count != 0 else LIMIT # compensate bad limit check
            r = sumologic.search_job_records(sj, limit=limit)
            # print(json.dumps(r, indent=4))
            return r

    def process_record_list(self):
        exist_timeslice_list = [ int(int(record['map']['_timeslice']) / 1000) for record in self.search_result['records'] ]
        all_timeslice_list = [ self.dt_yesterday.timestamp() + 3600 * i for i in range(24) ]
    
        record_dic = {}
        for record in self.search_result['records']:
            timeslice = int(int(record['map']['_timeslice']) / 1000)
            login_app_count = 0 if record['map']['logged into app'] == '' else int(record['map']['logged into app'])
            login_onelogin_count = 0 if record['map']['logged into onelogin'] == '' else int(record['map']['logged into onelogin'])
            record_dic[timeslice] = {
                'logged into app': login_app_count,
                'logged into onelogin': login_onelogin_count
            }
        
        record_list = []
        for timeslice in all_timeslice_list:
            if timeslice not in exist_timeslice_list: record_list.append([timeslice, 0, 0])
            else:
                login_app_count = 0 if record_dic[timeslice]['logged into app'] == '' else int(record_dic[timeslice]['logged into app'])
                login_onelogin_count = 0 if record_dic[timeslice]['logged into onelogin'] == '' else int(record_dic[timeslice]['logged into onelogin'])
                record_list.append([timeslice, login_app_count, login_onelogin_count])
    
        return record_list
            
    
    def save_fig(self):
        pd_data = pd.DataFrame(self.record_list, columns=self.field_list)
        plt.figure()
        pd_data.plot.bar(y=self.field_list[1:], figsize=(12,6), stacked=True)
        plt.title(f'OneLogin User Monitoring Daily Report ( {str(self.dt_yesterday)[:10]} )', size=12)
        plt.xlabel('hour')
        plt.xticks(rotation=0)
        plt.savefig(self.file_name)
        plt.close('all')
    
    def post_slack(self):
        slack_token = os.environ['SLACK_DAILY_REPORT_TOKEN']
        channel_id = os.environ['SLACK_DAILY_REPORT_CHANNEL_ID']
        
        files = {'file': open(self.file_name, 'rb')}
        param = {
            'token': slack_token, 
            'channels': channel_id,
            'filename': 'filename',
            'initial_comment': 'OneLoginユーザーモニタリングレポートです。',
            'title': self.file_name
        }
        requests.post(url='https://slack.com/api/files.upload', params=param, files=files)

def lambda_handler(event, context):
    OneLoginReport()

lambda_handler(1, 1)

終わりに

以上で終了です。お疲れさまでした!

本記事ではSumo Logic 検索ジョブ APIのレスポンスをグラフ化してSlackに送信するということをやってみました。Sumo Logicとその他のSaaS連携についてのブログ記事はこれからどんどんアップしていきたいと思います。どなたかの参考となれば幸いです。

それでは〜