UC-FEED-005Data FeedsLv3
JSON / NDJSON / CSV での定期供給
各種フォーマットで定期データ配信を受け、社内 DWH と連動させる。
Data Feeds
KPI 例
- 配信成功率
- SLA 遵守率
- 件数変動率
単発で取れたデータをそのまま本番運用に持ち込むと、更新頻度、保存形式、再処理の設計で詰まりやすくなります。このユースケースでは、Data Feeds を使って JSON / NDJSON / CSV で定期供給する前提に切り替え、下流の DWH や分析基盤が扱いやすい受け渡しを設計します。
誰の課題か
- 取得自体はできているが、下流への受け渡しが都度実装になっているデータエンジニア
- DWH への投入形式を標準化したい BI チーム
- 単発スクレイピングから継続運用へ移行したいプロダクトチーム
ここでの課題は「もっと多く取ること」ではなく、「同じ形式で、同じ粒度で、同じ更新サイクルで渡し続けること」です。分析側は収集ロジックより供給契約の安定性を必要とします。
推奨製品セット
| 製品 | 役割 | このユースケースでの位置づけ |
|---|---|---|
| Data Feeds | 継続供給 | JSON / NDJSON / CSV の受け渡し基盤にする |
| Scrapers または Marketplace | 上流のデータソース | どのデータを供給するかを決める |
| API Access | 認証 | ジョブ起動や結果取得の入口にする |
- 上流が Scrapers なのか Marketplace なのかを先に分けておくと、データ更新の考え方が整理しやすくなります。
- 出力形式は「見やすさ」ではなく、下流システムの取り込みやすさで選びます。
最小実装イメージ
最小構成では、まず非同期ジョブを起動し、結果を snapshot_id ベースで受け取る流れを確認します。ここでは「1 回の取得を、継続供給に転用できる形で扱う」ことが重要です。
curl -X POST https://api.brightdata.com/datasets/v3/trigger \
-H "Authorization: Bearer $BRIGHTDATA_API_KEY" \
-H "Content-Type: application/json" \
-d '[
{"url":"https://example.com/items/1"},
{"url":"https://example.com/items/2"}
]'結果取得側は snapshot_id を入力にして処理を分けます。下の例では JSON 配列を受け取る前提で、NDJSON や CSV へ変換する処理を後段に分離しています。
import os
import requests
API_KEY = os.getenv("BRIGHTDATA_API_KEY")
SNAPSHOT_ID = os.getenv("BRIGHTDATA_SNAPSHOT_ID")
resp = requests.get(
f"https://api.brightdata.com/v1/snapshots/{SNAPSHOT_ID}/results",
headers={"Authorization": f"Bearer {API_KEY}"},
timeout=60,
)
resp.raise_for_status()
rows = resp.json()
print(len(rows) if isinstance(rows, list) else type(rows))- JSON はアプリで扱いやすく、NDJSON はストリーム処理や差分処理に向きます。
- CSV は BI や表計算系への受け渡しが簡単ですが、ネスト構造には向きません。
- どの形式を正とするかを 1 つ決め、他形式は派生物として生成するほうが運用しやすくなります。
運用ポイント
- 形式選定は下流起点で決めます。分析基盤が NDJSON を前提にしているのに、見やすさだけで CSV を正本にすると後で詰まります。
snapshot_id、取得日時、データセット識別子をセットで保存すると、再処理時の追跡がしやすくなります。- API キーは環境ごとに分け、検証と本番で同じキーを共有しないようにします。
- 全件再処理が必要な形式か、差分取り込みで足りる形式かを先に決めると、DWH 側の負荷設計が安定します。
- 配信遅延、欠損率、件数の急変を監視対象に入れると、単なる HTTP 成功率より実務に近い異常検知ができます。