R2フィードバック巡回・Discordダイジェスト通知スキル

概要

Cloudflare R2の feedback/ ディレクトリを巡回し、未処理フィードバック(processed=false)を集約してDiscordにダイジェスト通知。その後Obsidianの週次ログに追記する。

前提条件

  • Cloudflare R2バケット設定済み(CF_R2_BUCKET in .env
  • feedback/ ディレクトリに OpenClaw が書き込んでいる
  • DISCORD_BOT_TOKEN と通知先チャンネルID設定済み
  • 05_Daily/03_Log/[今週のログファイル] が存在する

実行手順(呼び出しプロンプト)

「R2フィードバックをダイジェスト化してDiscordに報告して」

プロトタイプコード

"""
r2_digest.py — R2 feedback/ 巡回 → Discord ダイジェスト通知
"""
import json
import os
import boto3
from datetime import datetime
import discord
from pathlib import Path
 
R2_ENDPOINT = os.getenv("CF_R2_ENDPOINT")        # Cloudflare R2 S3互換エンドポイント
R2_ACCESS_KEY = os.getenv("CF_R2_ACCESS_KEY")
R2_SECRET_KEY = os.getenv("CF_R2_SECRET_KEY")
R2_BUCKET = os.getenv("CF_R2_BUCKET")
DISCORD_TOKEN = os.getenv("DISCORD_BOT_TOKEN")
DISCORD_CHANNEL_ID = int(os.getenv("DISCORD_LOG_CHANNEL_ID", "0"))
VAULT_LOG_BASE = Path(r"C:\ai_work\quartz-site\content\05_Daily\03_Log")
 
def get_r2_client():
    return boto3.client(
        "s3",
        endpoint_url=R2_ENDPOINT,
        aws_access_key_id=R2_ACCESS_KEY,
        aws_secret_access_key=R2_SECRET_KEY,
    )
 
def fetch_unprocessed_feedback(client):
    """feedback/内の未処理JSONを取得"""
    response = client.list_objects_v2(Bucket=R2_BUCKET, Prefix="feedback/")
    feedbacks = []
    for obj in response.get("Contents", []):
        raw = client.get_object(Bucket=R2_BUCKET, Key=obj["Key"])
        data = json.loads(raw["Body"].read())
        if not data.get("processed", False):
            feedbacks.append((obj["Key"], data))
    return sorted(feedbacks, key=lambda x: x[1].get("executed_at", ""))
 
def build_digest(feedbacks):
    """ダイジェストメッセージを構築"""
    success = [f for _, f in feedbacks if f["status"] == "success"]
    partial = [f for _, f in feedbacks if f["status"] == "partial"]
    failed  = [f for _, f in feedbacks if f["status"] == "failed"]
 
    lines = [f"📊 **エージェントダイジェスト** ({datetime.now().strftime('%m/%d %H:%M')})"]
    lines.append(f"✅ 成功: {len(success)}件 | ⚠️ 要確認: {len(partial)}件 | ❌ 失敗: {len(failed)}件")
    lines.append("─" * 30)
    for _, fb in feedbacks[:5]:  # 最大5件表示
        icon = {"success":"✅","partial":"⚠️","failed":"❌"}.get(fb["status"],"❓")
        lines.append(f"{icon} `{fb['task_id']}` — {fb['result']['summary']}")
        if fb.get("next_suggestion"):
            lines.append(f"   💡 {fb['next_suggestion']}")
    if len(feedbacks) > 5:
        lines.append(f"... 他 {len(feedbacks)-5}件")
    return "\n".join(lines)
 
def mark_processed(client, key, data):
    """フィードバックにprocessed=trueを付与して上書き"""
    data["processed"] = True
    client.put_object(Bucket=R2_BUCKET, Key=key, Body=json.dumps(data, ensure_ascii=False))
 
def append_to_obsidian_log(feedbacks):
    """週次ログに追記"""
    today = datetime.now()
    week_num = (today.day - 1) // 7 + 1
    log_file = VAULT_LOG_BASE / f"{today.year}年" / f"{today.year}{today.month}月第{week_num}週ログ.md"
    if not log_file.exists():
        return
    entry = [f"\n### {today.month}{today.day}日 — エージェントログ\n"]
    entry.append("| 時刻 | タスク | 結果 |")
    entry.append("|------|--------|------|")
    for _, fb in feedbacks:
        t = fb.get("executed_at","")[-14:-9]
        icon = {"success":"✅","partial":"⚠️","failed":"❌"}.get(fb["status"],"❓")
        entry.append(f"| {t} | {fb['task_id'][-6:]} | {icon} {fb['result']['summary']} |")
    with open(log_file, "a", encoding="utf-8") as f:
        f.write("\n".join(entry) + "\n")
 
async def send_digest():
    client = get_r2_client()
    feedbacks = fetch_unprocessed_feedback(client)
    if not feedbacks:
        print("未処理フィードバックなし")
        return
    digest = build_digest(feedbacks)
 
    # Discord送信
    intents = discord.Intents.default()
    bot = discord.Client(intents=intents)
    @bot.event
    async def on_ready():
        ch = bot.get_channel(DISCORD_CHANNEL_ID)
        await ch.send(digest)
        # Obsidianログ追記
        append_to_obsidian_log(feedbacks)
        # processed=trueに更新
        for key, data in feedbacks:
            mark_processed(client, key, data)
        await bot.close()
    await bot.start(DISCORD_TOKEN)
 
if __name__ == "__main__":
    import asyncio
    asyncio.run(send_digest())

期待される結果

  1. Discordのai-logチャンネルにダイジェストメッセージが投稿される
  2. 週次ログ(05_Daily/03_Log)にエージェントログ表が追記される
  3. R2の各フィードバックファイルにprocessed=trueが付与される

関連リンク

変更履歴

日付変更内容
2026-02-23初版登録(プロトタイプ)