R2フィードバック巡回・Discordダイジェスト通知スキル
概要
Cloudflare R2の feedback/ ディレクトリを巡回し、未処理フィードバック(processed=false)を集約してDiscordにダイジェスト通知。その後Obsidianの週次ログに追記する。
前提条件
- Cloudflare R2バケット設定済み(
CF_R2_BUCKETin.env) feedback/ディレクトリに OpenClaw が書き込んでいるDISCORD_BOT_TOKENと通知先チャンネルID設定済み05_Daily/03_Log/[今週のログファイル]が存在する
実行手順(呼び出しプロンプト)
「R2フィードバックをダイジェスト化してDiscordに報告して」
プロトタイプコード
"""
r2_digest.py — R2 feedback/ 巡回 → Discord ダイジェスト通知
"""
import json
import os
import boto3
from datetime import datetime
import discord
from pathlib import Path
R2_ENDPOINT = os.getenv("CF_R2_ENDPOINT") # Cloudflare R2 S3互換エンドポイント
R2_ACCESS_KEY = os.getenv("CF_R2_ACCESS_KEY")
R2_SECRET_KEY = os.getenv("CF_R2_SECRET_KEY")
R2_BUCKET = os.getenv("CF_R2_BUCKET")
DISCORD_TOKEN = os.getenv("DISCORD_BOT_TOKEN")
DISCORD_CHANNEL_ID = int(os.getenv("DISCORD_LOG_CHANNEL_ID", "0"))
VAULT_LOG_BASE = Path(r"C:\ai_work\quartz-site\content\05_Daily\03_Log")
def get_r2_client():
return boto3.client(
"s3",
endpoint_url=R2_ENDPOINT,
aws_access_key_id=R2_ACCESS_KEY,
aws_secret_access_key=R2_SECRET_KEY,
)
def fetch_unprocessed_feedback(client):
"""feedback/内の未処理JSONを取得"""
response = client.list_objects_v2(Bucket=R2_BUCKET, Prefix="feedback/")
feedbacks = []
for obj in response.get("Contents", []):
raw = client.get_object(Bucket=R2_BUCKET, Key=obj["Key"])
data = json.loads(raw["Body"].read())
if not data.get("processed", False):
feedbacks.append((obj["Key"], data))
return sorted(feedbacks, key=lambda x: x[1].get("executed_at", ""))
def build_digest(feedbacks):
"""ダイジェストメッセージを構築"""
success = [f for _, f in feedbacks if f["status"] == "success"]
partial = [f for _, f in feedbacks if f["status"] == "partial"]
failed = [f for _, f in feedbacks if f["status"] == "failed"]
lines = [f"📊 **エージェントダイジェスト** ({datetime.now().strftime('%m/%d %H:%M')})"]
lines.append(f"✅ 成功: {len(success)}件 | ⚠️ 要確認: {len(partial)}件 | ❌ 失敗: {len(failed)}件")
lines.append("─" * 30)
for _, fb in feedbacks[:5]: # 最大5件表示
icon = {"success":"✅","partial":"⚠️","failed":"❌"}.get(fb["status"],"❓")
lines.append(f"{icon} `{fb['task_id']}` — {fb['result']['summary']}")
if fb.get("next_suggestion"):
lines.append(f" 💡 {fb['next_suggestion']}")
if len(feedbacks) > 5:
lines.append(f"... 他 {len(feedbacks)-5}件")
return "\n".join(lines)
def mark_processed(client, key, data):
"""フィードバックにprocessed=trueを付与して上書き"""
data["processed"] = True
client.put_object(Bucket=R2_BUCKET, Key=key, Body=json.dumps(data, ensure_ascii=False))
def append_to_obsidian_log(feedbacks):
"""週次ログに追記"""
today = datetime.now()
week_num = (today.day - 1) // 7 + 1
log_file = VAULT_LOG_BASE / f"{today.year}年" / f"{today.year}年{today.month}月第{week_num}週ログ.md"
if not log_file.exists():
return
entry = [f"\n### {today.month}月{today.day}日 — エージェントログ\n"]
entry.append("| 時刻 | タスク | 結果 |")
entry.append("|------|--------|------|")
for _, fb in feedbacks:
t = fb.get("executed_at","")[-14:-9]
icon = {"success":"✅","partial":"⚠️","failed":"❌"}.get(fb["status"],"❓")
entry.append(f"| {t} | {fb['task_id'][-6:]} | {icon} {fb['result']['summary']} |")
with open(log_file, "a", encoding="utf-8") as f:
f.write("\n".join(entry) + "\n")
async def send_digest():
client = get_r2_client()
feedbacks = fetch_unprocessed_feedback(client)
if not feedbacks:
print("未処理フィードバックなし")
return
digest = build_digest(feedbacks)
# Discord送信
intents = discord.Intents.default()
bot = discord.Client(intents=intents)
@bot.event
async def on_ready():
ch = bot.get_channel(DISCORD_CHANNEL_ID)
await ch.send(digest)
# Obsidianログ追記
append_to_obsidian_log(feedbacks)
# processed=trueに更新
for key, data in feedbacks:
mark_processed(client, key, data)
await bot.close()
await bot.start(DISCORD_TOKEN)
if __name__ == "__main__":
import asyncio
asyncio.run(send_digest())期待される結果
- Discordのai-logチャンネルにダイジェストメッセージが投稿される
- 週次ログ(05_Daily/03_Log)にエージェントログ表が追記される
- R2の各フィードバックファイルに
processed=trueが付与される
関連リンク
- プロトコル定義: 自律運用プロトコルv1
- Cloudflare仕様: [[03_Interface/Cloudflare/Cloudflare連携仕様]]
- OpenClaw仕様: [[03_Interface/OpenClaw/OpenClaw連携仕様]]
変更履歴
| 日付 | 変更内容 |
|---|---|
| 2026-02-23 | 初版登録(プロトタイプ) |