Python でデータベースを監視し Slack に通知

はじめに

この記事では、新規に追加されたデータベースのレコードを監視し、特定の条件に一致した場合に Slack に通知を送る Python スクリプトの作成方法を紹介します。具体的には、MariaDB データベースのテーブルから新しいレコードを取得し、性別が ‘male’ である場合に Slack へ通知を送ります。また、通知の内容には、レコードの各カラムデータが含まれます。このスクリプトは cron を使って定期的に実行されることを想定しています。

必要なライブラリと環境変数の設定

このチュートリアルで作成する Python スクリプトを実行するためには、いくつかの外部ライブラリが必要です。また、スクリプト内で使用するデータベース接続情報や Slack API トークンなどの機密情報は、環境変数を通じて設定します。以下では、それぞれの手順を説明していきます。

ライブラリのインストール

このチュートリアルでは、以下のライブラリを使用します。

  1. mysql-connector-python: MariaDB データベースへ接続するためのライブラリ
  2. requests: HTTP リクエストを送信するためのライブラリ
  3. python-dotenv: .env ファイルから環境変数を読み込むためのライブラリ

これらのライブラリをインストールするには、以下のコマンドを実行します。

pip install mysql-connector-python requests python-dotenv

環境変数の設定

スクリプト内で使用する機密情報を環境変数として設定します。このチュートリアルでは、以下の環境変数が必要です。

  1. DB_HOST: データベースのホスト名
  2. DB_USER: データベースのユーザー名
  3. DB_PASSWORD: データベースのパスワード
  4. DB_NAME: データベース名
  5. SLACK_API_TOKEN: Slack API トークン
  6. SLACK_CHANNEL: Slack 通知を送信するチャンネル名

これらの環境変数を設定するために、プロジェクトのルートディレクトリに .env という名前のファイルを作成し、以下のように環境変数を記述します。

DB_HOST=your_database_host
DB_USER=your_database_user
DB_PASSWORD=your_database_password
DB_NAME=your_database_name
SLACK_API_TOKEN=your_slack_api_token
SLACK_CHANNEL=your_slack_channel

各変数の値は、実際のデータベース接続情報や Slack API トークンに置き換えてください。このファイルは機密情報を含むため、バージョン管理システムにはコミットしないように注意してください。

Slack Bot の作成と API トークンの取得

このセクションでは、Slack で Bot を作成し、API トークンを取得する手順について説明します。

Bot の作成

  1. Slack API のアプリケーション管理ページにアクセスします。
  2. 右上の「Create New App」ボタンをクリックします。
  3. 「App Name」に任意のアプリ名を入力し、「Development Slack Workspace」で対象となるワークスペースを選択して、「Create App」ボタンをクリックします。
  4. 左側のメニューで「OAuth & Permissions」を選択します。
  5. 「Scopes」の下の「Bot Token Scopes」の「Add an OAuth Scope」ボタンをクリックし、必要な権限を追加します。本例では、chat:write(メッセージの送信)を追加します。
  6. 左側のメニューで「App Home」を選択します。
  7. 「App Home」ページ内の「Your App’s Presence in Slack」の下にある「Edit」ボタンをクリックし、「Display Name」および「Default Username」に任意の名前を入力して保存します。
  8. 「OAuth & Permissions」ページに戻り、「Install to Workspace」をクリックし、アプリをインストールします。この時点で Bot Token が生成されます。

API トークンの取得

  1. 作成した Bot の設定ページにアクセスします。
  2. 「OAuth & Permissions」のページを開き、「Bot Token」欄に表示されている API トークンをコピーします。このトークンは、「xoxb-」で始まります。
  3. API トークンを環境変数の設定で使用するために、.env ファイルに以下の形式で追加します。
SLACK_API_TOKEN=your_bot_token_here

これで Slack 側で Bot が作成され、API トークンも取得できました。Python スクリプトで API トークンを使って通知を送ることができるようになります。

Python スクリプトの作成

このセクションでは、データベースから新しいレコードを取得し、特定の条件に一致した場合に Slack に通知を送る Python スクリプトの作成方法について説明します。

スクリプトの構造と主要な関数の説明

スクリプトは以下の主要な関数から構成されます。

  1. read_last_processed_id(file_path): 最後に処理されたレコードの ID をファイルから読み込む関数
  2. write_last_processed_id(file_path, last_id): 最後に処理されたレコードの ID をファイルに書き込む関数
  3. send_slack_notification(record, username): Slack に通知を送る関数
  4. main(): スクリプトのメイン関数。データベース接続、レコードの取得、通知の送信、最後に処理された ID の更新を行います。

スクリプトの全体サンプル

以下は、今回説明する Python スクリプトの全体サンプルです。このスクリプトはデータベースから新しいレコードを取得し、特定の条件に一致した場合に Slack に通知を送る機能を実現します。

# ----------------------------------------------------------------------------
# Author: tomo
# Creation Date: 2023-04-12
#
# This script is free to use and modify at your own discretion.
# No warranty is provided for the script's functionality.
# Use this script at your own risk.
# ----------------------------------------------------------------------------
# -*- coding: utf-8 -*-
import mysql.connector
import os
import requests
import json  
from dotenv import load_dotenv

usernames = {
    "saki": "aaa",
    "akiya": "bbb",
}

def read_last_processed_id(file_path):
    if not os.path.exists(file_path):
        return 0
    with open(file_path, "r") as f:
        content = f.read().strip()
        if content:
            return int(content)
        else:
            return 0

def write_last_processed_id(file_path, last_id):
    with open(file_path, "w") as f:
        f.write(str(last_id))

def send_slack_notification(record, username):
    slack_token = os.getenv("SLACK_API_TOKEN")
    slack_channel = os.getenv("SLACK_CHANNEL")

    text = f"@{username} cc: @channel\n新しいレコードが追加されました:\n" \
           f"・Name: {record['name']}\n" \
           f"・Age: {record['age']}\n" \
           f"・Gender: {record['gender']}\n" \
           f"・Job Type: {record['job_type']}"

    response = requests.post(
        "https://slack.com/api/chat.postMessage",
        headers={
            "Authorization": f"Bearer {slack_token}",
            "Content-Type": "application/json; charset=utf-8"
        },
        data=json.dumps({
            "channel": slack_channel,
            "text": text,
            "link_names": True,
            "username": "MariaDB Notifier"
        })
    )

    if not response.json().get("ok"):
        print(f"Error: The request to the Slack API failed. (url: {response.url}                                                                                                             )")
        print(f"The server responded with: {response.json()}")

def main():
    load_dotenv()

    last_processed_id_file = "last_processed_id.txt"
    last_processed_id = read_last_processed_id(last_processed_id_file)

    conn = mysql.connector.connect(
        host=os.getenv("DB_HOST"),
        user=os.getenv("DB_USER"),
        password=os.getenv("DB_PASSWORD"),
        database=os.getenv("DB_NAME")
    )

    cursor = conn.cursor()

    cursor.execute(f"SELECT * FROM sample_table WHERE id > {last_processed_id} A                                                                                                             ND gender = 'male' ORDER BY id ASC")
    records = cursor.fetchall()

    cursor.close()
    conn.close()

    for record_tuple in records:
        record = {
            'id': record_tuple[0],
            'name': record_tuple[1],
            'age': record_tuple[2],
            'gender': record_tuple[3],
            'job_type': record_tuple[4]
        }

        username = usernames[record['name']]
        send_slack_notification(record, username)

        last_processed_id = record['id']  

    write_last_processed_id(last_processed_id_file, last_processed_id)

if __name__ == "__main__":
    main()

データベース接続とレコードの取得

main 関数の中で、mysql.connector を使ってMariaDB データベースに接続し、cursor.execute() でSQL クエリを実行して新しいレコードを取得します。その際、gender = 'male' の条件を追加し、対象のレコードが男性に限定されるようにします。また、最後に処理された ID よりも大きい ID を持つレコードのみを取得します

Slack への通知の送信

send_slack_notification 関数では、requests.post() を使って Slack API に対して HTTP リクエストを送信し、通知を送ります。この際、Authorization ヘッダーに Slack API トークンを含めることが重要です。また、通知の内容にはレコードの各カラムデータを含めるようにしています。

スクリプトの実行と最後に処理された ID の保存

main 関数の最後で、最後に処理された ID を更新し、それを last_processed_id.txt ファイルに書き込みます。これにより、次回スクリプトが実行された際に、前回処理されたレコードよりも新しいレコードのみを取得できるようになります。

cron を使った定期実行の設定

cron を使った定期実行の設定

スクリプトが正常に動作することを確認したら、次に cron を使ってスクリプトを定期的に実行するように設定します。これにより、新しいレコードが追加されるたびに自動的に Slack 通知が送られるようになります。

crontab ファイルの編集

まず、ターミナルで以下のコマンドを実行して、crontab ファイルを開きます。

crontab -e

cron ジョブの追加

crontab ファイルが開いたら、以下のような行を追加して、スクリプトが1分ごとに実行されるように設定します。

* * * * * /usr/bin/python3 /path/to/your/script.py

ここで、/path/to/your/script.py は実際のスクリプトのファイルパスに置き換えてください。また、/usr/bin/python3 は Python インタープリタのパスですが、環境によって異なる場合があります。適切なパスに変更してください。

cron ジョブの保存と実行

追加した cron ジョブを保存し、crontab ファイルを閉じます。これで、指定した間隔でスクリプトが自動的に実行され、新しいレコードが追加された際に Slack 通知が送信されるようになります。

まとめ

このチュートリアルでは、MariaDB データベースに新しいレコードが追加された際に Slack で通知を送る Python スクリプトの作成方法を説明しました。また、cron を使ってスクリプトを定期的に実行する方法も紹介しました。このスクリプトをカスタマイズして、独自の通知システムを構築することができます。

以上