Claude Code のコストを最適化した。CLAUDE.md を整理し、コンテキストを削り、タスクに合ったモデルを選んだ。では次の不快な質問に答えられるだろうか。それが本当に機能していると、どうやって確認するのか。
これが可観測性の問題だ。最適化とは全く異なる問題である。最適化とは Claude Code の動作を変えること。可観測性とは Claude Code が今何をしているかを、問題の診断・結果の計測・異常のアラートに十分な粒度で見えるようにすることだ。
2026年現在、多くの Claude Code ユーザーはブラインドフライトをしている。月の請求額は把握している。しかしその請求の 40% を生み出したセッションはどれか、どのツール呼び出しがサイレントで失敗しているか、コンテキストがどの程度の頻度でコンパクションされているか(そして何が失われているか)、先週 CLAUDE.md を変更して本当に改善されたのか — これらは把握できていない。このガイドが埋めるのはそのギャップだ。
「Claude Code を監視する」とは何を意味するのか
実装に入る前に、何を観測したいかを整理しておく。Claude Code は複数の層でテレメトリを生成する。
セッションレベル: 入出力トークン合計、使用モデル、セッション時間、ターン数、コンパクション回数。
ターンレベル: メッセージごとのトークン数、実行されたツール呼び出し、ツールの成功・失敗、レイテンシ。
ツールレベル: 実行されたツール、渡された引数、出力サイズ、終了コード、エラーメッセージ。
コストレベル: トークン数と現在の料金表から導出。Anthropic は API でセッション単位のコストを公開していないため、クライアント側で計算する必要がある。
エラーレベル: ツール失敗、API エラー、パーミッション拒否、コンテキストオーバーフローイベント。
すべてをすぐに追跡する必要はない。まず着手すべき出発点として、セッションコスト・ツール失敗率・コンパクション頻度の 3 つを追跡することを勧める。この 3 指標だけで、修正する価値のある問題の 80% が浮かび上がる。
実装レイヤー 1: Hooks ベースのテレメトリ
Claude Code の Hooks システムは、テレメトリのインターセプトポイントとして最もクリーンな選択肢だ。Hooks は特定のライフサイクルイベントで発火し、構造化された JSON を stdin で受け取り、任意の宛先 — ローカル SQLite データベース、Postgres インスタンス、リモート API、あとで tail するフラットファイル — に書き込める。
監視のために使いたい Hook イベント:
| Hook イベント | 取得すべき内容 |
|---|---|
SessionStart | session_id、タイムスタンプ、モデル、作業ディレクトリ |
PreToolUse | ツール名、ツール入力(サニタイズ済み)、タイムスタンプ |
PostToolUse | ツール名、出力サイズ、所要時間、終了コード |
PostToolUseFailure | ツール名、エラーメッセージ、リトライ回数 |
Stop | セッションのトークン合計、ターン数、推定コスト |
PreCompact | コンパクション前のコンテキストサイズ、コンパクション理由 |
PostCompact | コンパクション後のコンテキストサイズ、削除されたトークン数 |
最小テレメトリ Hook の設定
ここで紹介するのは、セッションイベントをローカル SQLite データベースに記録する Python Hook だ。プロジェクトの .claude/hooks/ ディレクトリに配置する:
#!/usr/bin/env python3
# .claude/hooks/telemetry.py
"""
Claude Code 最小テレメトリ Hook。
~/.claude/telemetry.db にセッションイベントを記録する。
使い方: settings.json の該当 Hook イベントに設定する。
"""
from __future__ import annotations
import json
import sqlite3
import sys
import time
from datetime import datetime
from pathlib import Path
DB_PATH = Path.home() / ".claude" / "telemetry.db"
SCHEMA = """
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event_type TEXT NOT NULL,
session_id TEXT,
timestamp TEXT NOT NULL,
tool_name TEXT,
tool_input_size INTEGER,
tool_output_size INTEGER,
duration_ms INTEGER,
exit_code INTEGER,
error_message TEXT,
tokens_input INTEGER,
tokens_output INTEGER,
model TEXT,
raw_json TEXT
);
"""
def get_db() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH)
conn.execute(SCHEMA)
conn.commit()
return conn
def main() -> None:
raw = sys.stdin.read()
try:
data = json.loads(raw)
except json.JSONDecodeError:
sys.exit(0)
event_type = data.get("event", "unknown")
session_id = data.get("session_id")
timestamp = datetime.utcnow().isoformat()
record: dict = {
"event_type": event_type,
"session_id": session_id,
"timestamp": timestamp,
"raw_json": raw,
}
if event_type in ("PreToolUse", "PostToolUse", "PostToolUseFailure"):
tool = data.get("tool_use", {})
record["tool_name"] = tool.get("name")
inp = tool.get("input", {})
record["tool_input_size"] = len(json.dumps(inp))
if event_type == "PostToolUse":
result = data.get("tool_result", {})
output = result.get("output", "")
record["tool_output_size"] = len(str(output))
record["exit_code"] = result.get("exit_code", 0)
if event_type == "PostToolUseFailure":
record["error_message"] = data.get("error", "")
record["exit_code"] = data.get("exit_code", 1)
if event_type == "Stop":
usage = data.get("usage", {})
record["tokens_input"] = usage.get("input_tokens")
record["tokens_output"] = usage.get("output_tokens")
record["model"] = data.get("model")
conn = get_db()
cols = ", ".join(record.keys())
placeholders = ", ".join("?" for _ in record)
conn.execute(
f"INSERT INTO events ({cols}) VALUES ({placeholders})",
list(record.values()),
)
conn.commit()
conn.close()
# テレメトリのために実行をブロックしてはならない。常に exit 0
sys.exit(0)
if __name__ == "__main__":
main()
settings.json にこの Hook を登録する:
{
"hooks": {
"SessionStart": [
{ "type": "command", "command": "python3 .claude/hooks/telemetry.py" }
],
"PostToolUse": [
{ "type": "command", "command": "python3 .claude/hooks/telemetry.py" }
],
"PostToolUseFailure": [
{ "type": "command", "command": "python3 .claude/hooks/telemetry.py" }
],
"Stop": [
{ "type": "command", "command": "python3 .claude/hooks/telemetry.py" }
],
"PreCompact": [
{ "type": "command", "command": "python3 .claude/hooks/telemetry.py" }
],
"PostCompact": [
{ "type": "command", "command": "python3 .claude/hooks/telemetry.py" }
]
}
}
重要な点が 1 つある。Hook はブロックしてはならない。テレメトリデータベースが利用できない場合、Hook はサイレントに失敗して exit 0 すること。Claude Code セッションが監視システムを待ってハングすることは絶対に避けなければならない。
Stop Hook でのコスト推定
Claude Code は Stop イベントでドルコストを返さない — 自分で計算する。テレメトリ Hook に追加するヘルパー関数:
# 2026年5月時点の Claude API 料金 — Anthropic が変更したら更新する
PRICING = {
"claude-opus-4-6": {"input": 15.0, "output": 75.0}, # 100万トークンあたり USD
"claude-sonnet-4-6": {"input": 3.0, "output": 15.0},
"claude-haiku-4": {"input": 0.25, "output": 1.25},
}
def estimate_cost_usd(
model: str,
input_tokens: int,
output_tokens: int,
cached_input_tokens: int = 0,
) -> float:
"""
セッションコストを USD で推定する。
キャッシュされた入力トークンは標準入力レートの 10% で請求される。
"""
rates = PRICING.get(model, PRICING["claude-sonnet-4-6"])
cache_write_cost = (cached_input_tokens / 1_000_000) * rates["input"] * 0.25
cache_read_cost = (cached_input_tokens / 1_000_000) * rates["input"] * 0.10
fresh_input_cost = ((input_tokens - cached_input_tokens) / 1_000_000) * rates["input"]
output_cost = (output_tokens / 1_000_000) * rates["output"]
return cache_write_cost + cache_read_cost + fresh_input_cost + output_cost
キャッシュ読み取り割引について: プロンプトキャッシュから提供されたトークンは標準入力レートの約 10% で請求される。永続的な CLAUDE.md とシステムプロンプトを使用しているなら(使うべきだが)、実効入力コストは公表レートよりも意味のある分だけ安くなる。
実装レイヤー 2: JSONL トランスクリプト解析
Claude Code はすべてのセッションを JSONL トランスクリプトファイルに書き込む。デフォルトの保存先:
~/.claude/projects/<project-hash>/sessions/<session-id>.jsonl
各行はセッション内の 1 つのイベントを表す完全な JSON オブジェクトだ。これは事後分析のための宝の山だ。任意のセッションで何が起きたかを再現し、ツール呼び出しを数え、エラーパターンを抽出し、トークン合計を計算できる。
JSONL フォーマットの読み方
#!/usr/bin/env python3
# parse_transcripts.py
"""
分析のために Claude Code JSONL トランスクリプトを解析する。
使い方: python3 parse_transcripts.py [session_id]
session_id を省略すると最新セッションを分析する。
"""
from __future__ import annotations
import json
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
SESSIONS_BASE = Path.home() / ".claude" / "projects"
@dataclass
class SessionStats:
session_id: str
model: str = ""
turns: int = 0
tool_calls: int = 0
tool_failures: int = 0
compactions: int = 0
tokens_input: int = 0
tokens_output: int = 0
tokens_cache_read: int = 0
tokens_cache_write: int = 0
tool_call_counts: dict = field(default_factory=dict)
tool_failure_messages: list = field(default_factory=list)
@property
def tool_failure_rate(self) -> float:
if self.tool_calls == 0:
return 0.0
return self.tool_failures / self.tool_calls
@property
def estimated_cost_usd(self) -> float:
rates = {
"claude-opus-4-6": (15.0, 75.0),
"claude-sonnet-4-6": (3.0, 15.0),
"claude-haiku-4": (0.25, 1.25),
}
inp_rate, out_rate = rates.get(self.model, (3.0, 15.0))
fresh_input = self.tokens_input - self.tokens_cache_read
cost = (
(max(0, fresh_input) / 1_000_000) * inp_rate
+ (self.tokens_cache_read / 1_000_000) * inp_rate * 0.10
+ (self.tokens_cache_write / 1_000_000) * inp_rate * 0.25
+ (self.tokens_output / 1_000_000) * out_rate
)
return cost
def parse_session(path: Path) -> SessionStats:
stats = SessionStats(session_id=path.stem)
with path.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
except json.JSONDecodeError:
continue
etype = event.get("type", "")
if etype == "assistant":
stats.turns += 1
usage = event.get("usage", {})
stats.tokens_input += usage.get("input_tokens", 0)
stats.tokens_output += usage.get("output_tokens", 0)
stats.tokens_cache_read += usage.get("cache_read_input_tokens", 0)
stats.tokens_cache_write += usage.get("cache_creation_input_tokens", 0)
if not stats.model and event.get("model"):
stats.model = event["model"]
elif etype == "tool_use":
tool_name = event.get("name", "unknown")
stats.tool_calls += 1
stats.tool_call_counts[tool_name] = (
stats.tool_call_counts.get(tool_name, 0) + 1
)
elif etype == "tool_result" and event.get("is_error"):
stats.tool_failures += 1
content = event.get("content", "")
if isinstance(content, list):
msg = " ".join(
c.get("text", "") for c in content if isinstance(c, dict)
)
else:
msg = str(content)
stats.tool_failure_messages.append(msg[:200])
elif etype == "compaction":
stats.compactions += 1
return stats
実際のセッションに対して実行すると、次のような出力が得られる:
=== Session: 550e8400-e29b-41d4-a716-446655440000 ===
Model: claude-sonnet-4-6
Turns: 34
Tool calls: 89
Tool failures: 7 (7.9%)
Compactions: 2
Tokens — input: 284,193
Tokens — output: 18,442
Tokens — cache r: 241,764
Tokens — cache w: 42,429
Estimated cost: $0.1847
Tool call breakdown:
Bash 52
Read 23
Edit 11
Glob 3
ツール失敗率 7.9% と 1 セッションで 2 回のコンパクションは、調査する価値のあるシグナルだ。健全なセッションのツール失敗率は 3% 未満であるべきで、繰り返すコンパクションはコンテキスト管理に問題がある場合が多い。
実装レイヤー 3: LangSmith 連携
チームが既に LLM 可観測性に LangSmith を使用しているなら、Claude Code のテレメトリを直接ルーティングできる。これですべての LLM アプリケションにわたる統一されたトレースビューが得られる。
連携ポイントは Stop Hook だ。セッション終了後に LangSmith の Runs API にセッション統計をポストする:
#!/usr/bin/env python3
# .claude/hooks/langsmith_export.py
"""
Claude Code セッションデータを LangSmith にエクスポートする。
要件: LANGSMITH_API_KEY 環境変数
Hook イベント: Stop
"""
from __future__ import annotations
import json
import os
import sys
import uuid
from datetime import datetime, timezone
import urllib.request
import urllib.error
LANGSMITH_API_KEY = os.environ.get("LANGSMITH_API_KEY", "")
LANGSMITH_PROJECT = os.environ.get("LANGSMITH_PROJECT", "claude-code-monitoring")
LANGSMITH_BASE_URL = "https://api.smith.langchain.com"
def post_run(run_data: dict) -> None:
url = f"{LANGSMITH_BASE_URL}/runs"
payload = json.dumps(run_data).encode("utf-8")
req = urllib.request.Request(
url,
data=payload,
headers={
"Content-Type": "application/json",
"x-api-key": LANGSMITH_API_KEY,
},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=5) as resp:
_ = resp.read()
except urllib.error.URLError:
pass # テレメトリの失敗でセッションをブロックしない
def main() -> None:
if not LANGSMITH_API_KEY:
sys.exit(0)
raw = sys.stdin.read()
try:
data = json.loads(raw)
except json.JSONDecodeError:
sys.exit(0)
usage = data.get("usage", {})
model = data.get("model", "claude-sonnet-4-6")
session_id = data.get("session_id", str(uuid.uuid4()))
run_data = {
"id": session_id,
"name": "claude-code-session",
"run_type": "llm",
"start_time": data.get("session_start_time", datetime.now(timezone.utc).isoformat()),
"end_time": datetime.now(timezone.utc).isoformat(),
"inputs": {"session_id": session_id},
"outputs": {
"turns": data.get("turns", 0),
"tool_calls": data.get("tool_calls", 0),
"tool_failures": data.get("tool_failures", 0),
},
"extra": {
"metadata": {"model": model},
},
"token_usage": {
"prompt_tokens": usage.get("input_tokens", 0),
"completion_tokens": usage.get("output_tokens", 0),
"total_tokens": usage.get("input_tokens", 0) + usage.get("output_tokens", 0),
},
"tags": ["claude-code"],
}
post_run(run_data)
sys.exit(0)
if __name__ == "__main__":
main()
実装レイヤー 4: Langfuse 連携
Langfuse は LangSmith の強力な代替で、セルフホストを希望する場合に特に適している:
#!/usr/bin/env python3
# .claude/hooks/langfuse_export.py
"""
Claude Code セッションデータを Langfuse にエクスポートする。
要件: LANGFUSE_SECRET_KEY, LANGFUSE_PUBLIC_KEY 環境変数
オプション: LANGFUSE_HOST(デフォルト: cloud.langfuse.com)
Hook イベント: Stop
"""
from __future__ import annotations
import base64
import json
import os
import sys
import uuid
from datetime import datetime, timezone
import urllib.request
import urllib.error
def get_auth_header() -> str:
pk = os.environ.get("LANGFUSE_PUBLIC_KEY", "")
sk = os.environ.get("LANGFUSE_SECRET_KEY", "")
encoded = base64.b64encode(f"{pk}:{sk}".encode()).decode()
return f"Basic {encoded}"
def main() -> None:
pk = os.environ.get("LANGFUSE_PUBLIC_KEY", "")
sk = os.environ.get("LANGFUSE_SECRET_KEY", "")
if not pk or not sk:
sys.exit(0)
host = os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com")
raw = sys.stdin.read()
try:
data = json.loads(raw)
except json.JSONDecodeError:
sys.exit(0)
usage = data.get("usage", {})
session_id = data.get("session_id", str(uuid.uuid4()))
trace = {
"id": session_id,
"name": "claude-code-session",
"userId": os.environ.get("USER", "unknown"),
"metadata": {"model": data.get("model")},
"tags": ["claude-code"],
"usage": {
"input": usage.get("input_tokens", 0),
"output": usage.get("output_tokens", 0),
"total": usage.get("input_tokens", 0) + usage.get("output_tokens", 0),
"unit": "TOKENS",
},
}
url = f"{host}/api/public/ingestion"
payload = json.dumps({
"batch": [{
"id": str(uuid.uuid4()),
"type": "trace-create",
"timestamp": datetime.now(timezone.utc).isoformat(),
"body": trace,
}]
}).encode("utf-8")
req = urllib.request.Request(
url,
data=payload,
headers={"Content-Type": "application/json", "Authorization": get_auth_header()},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=5):
pass
except urllib.error.URLError:
pass
sys.exit(0)
if __name__ == "__main__":
main()
LangSmith と Langfuse はどちらもコスト追跡、セッション比較、アラートをサポートしている。選択は主にマネージド SaaS(LangSmith)か完全なデータ所有権を持つセルフホスト(Langfuse)かによって決まる。
コストアラート
パッシブな監視も良いが、アクティブなアラートの方がより効果的だ。セッションがコスト閾値を超えた場合に通知(メールまたは Slack)を送る完全なコストアラートシステムを紹介する。
セッションコストアラート Hook
#!/usr/bin/env python3
# .claude/hooks/cost_alert.py
"""
セッションが閾値を超えたときにアラートを送信する。
環境変数:
CLAUDE_COST_ALERT_THRESHOLD — float, USD(デフォルト: 0.50)
SLACK_WEBHOOK_URL — Slack webhook(オプション)
CLAUDE_ALERT_LOG — アラートログのパス
"""
from __future__ import annotations
import json
import os
import sys
import urllib.request
import urllib.error
from datetime import datetime, timezone
from pathlib import Path
THRESHOLD = float(os.environ.get("CLAUDE_COST_ALERT_THRESHOLD", "0.50"))
SLACK_WEBHOOK = os.environ.get("SLACK_WEBHOOK_URL", "")
ALERT_LOG = Path(os.environ.get("CLAUDE_ALERT_LOG", Path.home() / ".claude" / "cost_alerts.log"))
PRICING = {
"claude-opus-4-6": (15.0, 75.0),
"claude-sonnet-4-6": (3.0, 15.0),
"claude-haiku-4": (0.25, 1.25),
}
def compute_cost(model: str, usage: dict) -> float:
inp_rate, out_rate = PRICING.get(model, (3.0, 15.0))
fresh = max(0, usage.get("input_tokens", 0) - usage.get("cache_read_input_tokens", 0))
return (
(fresh / 1_000_000) * inp_rate
+ (usage.get("cache_read_input_tokens", 0) / 1_000_000) * inp_rate * 0.10
+ (usage.get("cache_creation_input_tokens", 0) / 1_000_000) * inp_rate * 0.25
+ (usage.get("output_tokens", 0) / 1_000_000) * out_rate
)
def main() -> None:
raw = sys.stdin.read()
try:
data = json.loads(raw)
except json.JSONDecodeError:
sys.exit(0)
usage = data.get("usage", {})
model = data.get("model", "claude-sonnet-4-6")
session_id = data.get("session_id", "unknown")
cost = compute_cost(model, usage)
if cost < THRESHOLD:
sys.exit(0)
message = (
f"Claude Code コストアラート: セッション {session_id[:8]}... "
f"コスト ${cost:.4f}(閾値: ${THRESHOLD:.2f})。"
f"モデル: {model}, "
f"入力: {usage.get('input_tokens', 0):,} トークン, "
f"出力: {usage.get('output_tokens', 0):,} トークン。"
)
ALERT_LOG.parent.mkdir(parents=True, exist_ok=True)
with ALERT_LOG.open("a") as f:
f.write(f"{datetime.now(timezone.utc).isoformat()} {message}\n")
if SLACK_WEBHOOK:
payload = json.dumps({"text": message}).encode("utf-8")
req = urllib.request.Request(
SLACK_WEBHOOK,
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=5):
pass
except urllib.error.URLError:
pass
print(message, file=sys.stderr)
sys.exit(0)
if __name__ == "__main__":
main()
シェルプロファイルに閾値を設定する:
# ~/.zshrc または ~/.bashrc
export CLAUDE_COST_ALERT_THRESHOLD="0.50"
export SLACK_WEBHOOK_URL="https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
日次コストサマリースクリプト
セッション単位のアラートに加え、日次サマリーでトレンドを把握できる:
#!/usr/bin/env python3
# daily_cost_summary.py
"""
Claude Code JSONL トランスクリプトから日次コストサマリーを生成する。
cron で実行: 0 9 * * * python3 /path/to/daily_cost_summary.py
"""
from __future__ import annotations
import json
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Optional
SESSIONS_BASE = Path.home() / ".claude" / "projects"
PRICING = {
"claude-opus-4-6": (15.0, 75.0),
"claude-sonnet-4-6": (3.0, 15.0),
"claude-haiku-4": (0.25, 1.25),
}
def compute_cost(model: str, usage: dict) -> float:
inp_rate, out_rate = PRICING.get(model, (3.0, 15.0))
fresh = max(0, usage.get("input_tokens", 0) - usage.get("cache_read_input_tokens", 0))
return (
(fresh / 1_000_000) * inp_rate
+ (usage.get("cache_read_input_tokens", 0) / 1_000_000) * inp_rate * 0.10
+ (usage.get("cache_creation_input_tokens", 0) / 1_000_000) * inp_rate * 0.25
+ (usage.get("output_tokens", 0) / 1_000_000) * out_rate
)
def analyze_session_file(path: Path, since: datetime) -> Optional[dict]:
mtime = datetime.fromtimestamp(path.stat().st_mtime, tz=timezone.utc)
if mtime < since:
return None
total_input = total_output = total_cache_read = total_cache_write = 0
model = "claude-sonnet-4-6"
turns = tool_calls = tool_failures = 0
try:
with path.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
event = json.loads(line)
etype = event.get("type", "")
if etype == "assistant":
turns += 1
usage = event.get("usage", {})
total_input += usage.get("input_tokens", 0)
total_output += usage.get("output_tokens", 0)
total_cache_read += usage.get("cache_read_input_tokens", 0)
total_cache_write += usage.get("cache_creation_input_tokens", 0)
if event.get("model"):
model = event["model"]
elif etype == "tool_use":
tool_calls += 1
elif etype == "tool_result" and event.get("is_error"):
tool_failures += 1
except (json.JSONDecodeError, OSError):
return None
usage_dict = {
"input_tokens": total_input,
"output_tokens": total_output,
"cache_read_input_tokens": total_cache_read,
"cache_creation_input_tokens": total_cache_write,
}
return {
"session_id": path.stem,
"model": model,
"turns": turns,
"tool_calls": tool_calls,
"tool_failures": tool_failures,
"cost": compute_cost(model, usage_dict),
}
def main() -> None:
yesterday = datetime.now(timezone.utc) - timedelta(days=1)
since = yesterday.replace(hour=0, minute=0, second=0, microsecond=0)
sessions = []
for f in SESSIONS_BASE.rglob("*.jsonl"):
result = analyze_session_file(f, since)
if result:
sessions.append(result)
if not sessions:
print(f"{since.date()} のセッションが見つかりません")
return
total_cost = sum(s["cost"] for s in sessions)
total_tools = sum(s["tool_calls"] for s in sessions)
total_failures = sum(s["tool_failures"] for s in sessions)
failure_rate = total_failures / total_tools if total_tools > 0 else 0
print(f"\n=== Claude Code 日次サマリー: {since.date()} ===")
print(f"セッション数: {len(sessions)}")
print(f"推定合計コスト: ${total_cost:.4f}")
print(f"ツール呼び出し: {total_tools}")
print(f"ツール失敗: {total_failures} ({failure_rate:.1%})")
print(f"\nコスト上位セッション:")
for s in sorted(sessions, key=lambda x: -x["cost"])[:5]:
print(f" {s['session_id'][:12]}... ${s['cost']:.4f} {s['turns']} ターン {s['model']}")
if __name__ == "__main__":
main()
ダッシュボード例
オプション 1: ターミナルダッシュボード(依存関係なし)
多くの時間をターミナルで過ごすエンジニアには、Grafana よりもシンプルなターミナルダッシュボードの方が実用的だ:
#!/usr/bin/env python3
# claude_dashboard.py
"""
Claude Code メトリクスのターミナルダッシュボード。
telemetry.py Hook が作成した ~/.claude/telemetry.db を読む。
"""
from __future__ import annotations
import sqlite3
from datetime import datetime, timezone, timedelta
from pathlib import Path
DB_PATH = Path.home() / ".claude" / "telemetry.db"
PRICING = {
"claude-opus-4-6": (15.0, 75.0),
"claude-sonnet-4-6": (3.0, 15.0),
"claude-haiku-4": (0.25, 1.25),
}
def main() -> None:
if not DB_PATH.exists():
print("テレメトリデータベースが見つかりません。telemetry Hook をインストールしてください。")
return
conn = sqlite3.connect(DB_PATH)
days = 7
cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat()
# セッション数
sessions = conn.execute(
"SELECT COUNT(DISTINCT session_id) FROM events WHERE timestamp > ? AND event_type = 'SessionStart'",
(cutoff,),
).fetchone()[0]
# コスト計算
rows = conn.execute(
"SELECT tokens_input, tokens_output, model FROM events WHERE timestamp > ? AND event_type = 'Stop'",
(cutoff,),
).fetchall()
total_cost = 0.0
for inp, out, model in rows:
if inp and out and model:
r_in, r_out = PRICING.get(model, (3.0, 15.0))
total_cost += (inp / 1_000_000) * r_in + (out / 1_000_000) * r_out
# ツール失敗率
tool_rows = conn.execute(
"SELECT event_type FROM events WHERE timestamp > ? AND event_type IN ('PostToolUse', 'PostToolUseFailure')",
(cutoff,),
).fetchall()
total_tool = len(tool_rows)
failures = sum(1 for r in tool_rows if r[0] == "PostToolUseFailure")
failure_rate = failures / total_tool if total_tool > 0 else 0
# 失敗ツールランキング
fail_rows = conn.execute(
"""SELECT tool_name, COUNT(*) FROM events
WHERE timestamp > ? AND event_type = 'PostToolUseFailure'
GROUP BY tool_name ORDER BY COUNT(*) DESC LIMIT 5""",
(cutoff,),
).fetchall()
line = "-" * 50
print(f"\n{line}")
print(f" Claude Code メトリクス — 直近 {days} 日間")
print(f"{line}")
print(f" セッション数: {sessions}")
print(f" 推定コスト: ${total_cost:.4f}")
print(f" ツール失敗率: {failure_rate:.1%}")
if fail_rows:
print(f"\n 失敗ツールランキング:")
for name, count in fail_rows:
print(f" {(name or 'unknown'):<30} {count}")
print(f"{line}\n")
conn.close()
if __name__ == "__main__":
main()
オプション 2: Prometheus Push Gateway 経由の Grafana
既存の Grafana 環境があるチーム向けに、Stop Hook から Prometheus Push Gateway にメトリクスをプッシュする:
# Stop Hook にこの関数を追加する:
def push_to_prometheus(metrics: dict) -> None:
pushgateway = os.environ.get("PUSHGATEWAY_URL")
if not pushgateway:
return
lines = [
f'claude_code_session_cost_usd{{model="{metrics["model"]}"}} {metrics["cost"]:.6f}',
f'claude_code_tokens_total{{type="input",model="{metrics["model"]}"}} {metrics["input_tokens"]}',
f'claude_code_tokens_total{{type="output",model="{metrics["model"]}"}} {metrics["output_tokens"]}',
f'claude_code_tool_failure_rate {metrics["tool_failure_rate"]:.4f}',
f'claude_code_compactions_total {metrics["compactions"]}',
]
payload = "\n".join(lines).encode("utf-8")
url = f"{pushgateway}/metrics/job/claude_code/instance/{metrics['session_id'][:8]}"
req = urllib.request.Request(
url, data=payload,
headers={"Content-Type": "text/plain"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=5):
pass
except urllib.error.URLError:
pass
これで Grafana の以下のパネルを構築できる:
- コストトレンド — モデル別の日次推定コストの折れ線グラフ
- ツール失敗率 — 閾値カラーリング付きのスタットパネル(3% 未満=緑、3-7%=黄、7% 超=赤)
- コンパクション頻度 — コンテキストがリセットされる頻度を示す棒グラフ
- セッション時間 vs コスト — コスト超過セッションを特定する散布図
エラートレーシングパターン
集計メトリクスに加え、特定のエラーをセッション全体で追跡する必要がある場合がある。JSONL トランスクリプトがその際の信頼できる情報源だ。
ツールエラーの相関分析
最も一般的なパターン: ツールが失敗し、Claude が別の方法でリトライする。その失敗の連鎖を追跡したい場合:
#!/usr/bin/env python3
# trace_errors.py
"""
Claude Code セッションのツールエラーをトレースする。
各失敗の前後コンテキスト(先行するツール呼び出しと
それをトリガーしたアシスタントメッセージ)を表示する。
使い方: python3 trace_errors.py <session_id>
"""
from __future__ import annotations
import json
import sys
from pathlib import Path
SESSIONS_BASE = Path.home() / ".claude" / "projects"
def main() -> None:
if len(sys.argv) < 2:
print("使い方: python3 trace_errors.py <session_id>")
sys.exit(1)
session_id = sys.argv[1]
matches = list(SESSIONS_BASE.rglob(f"{session_id}.jsonl"))
if not matches:
print(f"セッション {session_id} が見つかりません")
sys.exit(1)
path = matches[0]
events = []
with path.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
events.append(json.loads(line))
except json.JSONDecodeError:
continue
error_count = 0
for i, event in enumerate(events):
if event.get("type") != "tool_result" or not event.get("is_error"):
continue
error_count += 1
print(f"\n{'='*60}")
print(f"エラー #{error_count}(イベントインデックス {i})")
print(f"{'='*60}")
# 直前のツール呼び出しを探す
for j in range(i - 1, max(0, i - 5), -1):
if events[j].get("type") == "tool_use":
tu = events[j]
print(f"\nツール呼び出し: {tu.get('name', 'unknown')}")
inp = tu.get("input", {})
if "command" in inp:
print(f"コマンド: {inp['command']}")
elif "path" in inp:
print(f"パス: {inp['path']}")
break
content = event.get("content", "")
if isinstance(content, list):
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
print(f"\nエラー内容: {item.get('text', '')[:400]}")
else:
print(f"\nエラー内容: {str(content)[:400]}")
if error_count == 0:
print("このセッションにツールエラーは見つかりませんでした。")
else:
print(f"\n\n合計エラー数: {error_count}")
if __name__ == "__main__":
main()
主要メトリクスリファレンス
追跡する価値のあるメトリクスと、正常範囲を外れた場合の対処をまとめる:
| メトリクス | 正常範囲 | 範囲外の場合の対処 |
|---|---|---|
| ツール失敗率 | 3% 未満 | 失敗トップのツールを調査。パーミッション、パスを確認 |
| セッションあたりのコンパクション | 0〜1 | CLAUDE.md のサイズを削減。ファイル読み込みを精選する |
| キャッシュヒット率 | 60% 超 | CLAUDE.md の配置を見直す。システムプロンプトが安定しているか確認 |
| セッションコスト(Sonnet) | $0.20 未満 | 過剰なファイル読み込み、大きなツール出力を確認 |
| セッションコスト(Opus) | $1.00 未満 | そのタスクに Opus が本当に必要か確認 |
| 出力トークン比率 | 入力の 15% 未満 | 正常。高い場合は Claude が冗長な出力を生成している可能性 |
キャッシュヒット率は特に注意が必要だ。60% を下回る場合、CLAUDE.md やシステムプロンプトの何かがターン間で変化しており、キャッシュの連続性を壊している。よくある原因: プロンプト内のタイムスタンプ、セッションごとに変わる動的コンテンツ、会話の先頭(末尾ではなく)へのコンテキスト追加。
よくある落とし穴
落とし穴 1: テレメトリの失敗でセッションをブロックする。 監視インフラはいつかダウンする。Hook が誤って非ゼロで終了したり、ネットワーク呼び出しでハングしたりすると、Claude Code セッションが中断される。テレメトリ Hook では何が起きても常に exit 0 する。テレメトリシステムの障害は別のエラーログに記録する。
落とし穴 2: 機密性の高いツール入力をログに記録する。
ツール入力にはファイルの内容、コマンド引数、場合によってはシークレットが含まれる。raw_json をログに記録する前に、機密フィールドを削除またはハッシュ化する。少なくとも長い文字列を切り詰め、ファイルコンテンツフィールドを除外する。
落とし穴 3: 推定コストを正確なコストとして扱う。 本ガイドのコスト推定はトークン数と公開料金から算出している。税金、企業向け割引交渉、丸め、そしてこの文章の作成後に Anthropic が行った料金変更は含まれない。相対的な比較やトレンド分析に使い、会計には使わない。
落とし穴 4: アラート閾値を低く設定しすぎる。 Sonnet セッションの閾値を $0.10 に設定するとノイズが多くなる。最初の 1 週間の監視データをベースラインとして閾値を調整する。合理的な出発点: セッションコスト中央値の 3 倍でアラート。
FAQ
セッション終了後ではなく、セッション中にトークン数を取得できますか?
はい。JSONL トランスクリプトの各 assistant イベントには、そのターンの累積トークン数を含む usage オブジェクトが含まれている。ファイルをリアルタイムで tail することで実行中の合計を取得できる。Stop Hook はセッション終了時の合計を取得するよりクリーンな方法だ。
監視で Claude Code に意味のあるレイテンシが追加されますか?
正しく実装されていれば、いいえ。Hook はほとんどの設定で非同期に実行される。テレメトリ専用の Hook の場合、Claude はフックのレスポンスを待たないため処理時間は無関係だ。ブロックする可能性があるのは、実行をブロックするために使用している PreToolUse の場合のみだ。
セッションではなくプロジェクト単位でコストを追跡できますか?
はい。セッション JSONL ファイルは ~/.claude/projects/<hash>/ 以下にプロジェクトハッシュ別に整理されている。バッチ分析スクリプトを特定のプロジェクトディレクトリに対して実行してプロジェクト単位の合計を得る。
Claude Code CLI ではなく API 経由で使用している場合はどうなりますか?
Hooks システムは Claude Code CLI 専用だ。API 利用の場合は、Anthropic API レスポンスの usage フィールドを使って直接 API 呼び出しを計装する。このフィールドはすべてのリクエストに対して入出力トークン数を返す。
本ガイドのスクリプトはあくまで出発点だ。実際の監視セットアップはチームの既存インフラ、Claude Code の利用規模、リアルタイムアラートが必要かどうかによって変わってくる。まず最小テレメトリ Hook と JSONL パーサーから始めることを勧める。この 2 つだけで、修正する価値のある問題の大半が浮かび上がる。