MQTT V1 / V2 切り分け改修仕様書
作成日: 2026-04-20
1. この資料の目的
この資料は、SWPF の MQTT 取り込み処理に対して、V1 / V2 の切り分けと、 V2 の場合のみ DEVICEID + UNITID で親デバイスを検索する改修についてまとめたものです。
今回の改修方針は次のとおりです。
FetchRawDataFromMqttQueueは 基本そのままswpfmqttimportの import service に以下を追加- V1 / V2 判定
- lookup key 生成
- 親UID解決
- V1 は 現行処理を維持
- V2 は
DEVICEID + UNITIDで検索 - invalid topic は ログ出力してスキップ
2. 背景
現状の MQTT 取り込みでは、基本的に DEVICEID をキーとして親デバイスを解決しています。 しかし今後は、1台のマイコン配下に複数ユニットがぶら下がる構成を想定しているため、 新仕様では UNITID を topic に含めて、ユニット単位で識別できるようにする必要があります。
そのため、topic 仕様を V1 / V2 に分けて扱います。
3. 用語
| 用語 | 意味 |
|---|---|
| V1 | 旧 topic 仕様。UNIT_ID を持たない |
| V2 | 新 topic 仕様。UNIT_ID を持つ |
| DEVICE_ID | ESP32本体などのデバイス識別子 |
| UNIT_ID | デバイス配下のユニット識別子。例: thermometer1, camera1 |
| channel | topic 上でデータ種別を表す要素。例: sensor, image, boot, status, error, env |
| lookup key | 親UID解決のための検索キー |
| parent uid | SWPF内部での親デバイスまたは対象エンティティの UID |
4. topic 仕様
4.1 V1(旧仕様)
swpf/{user_id}/{device_id}/{channel}
または既存運用上、sensor/env や image/meta のように 第4層が V1 channel の起点になる形式を含みます。
例
swpf/1/esp32_C80CABF16E20/sensor/env
swpf/1/esp32_C80CABF16E20/image/meta
swpf/1/esp32_C80CABF16E20/boot
swpf/1/esp32_C80CABF16E20/status
swpf/1/esp32_C80CABF16E20/error
特徴
UNIT_IDを持たない- 親検索は
DEVICE_IDのみで行う - 現行処理を維持する
4.2 V2(新仕様)
swpf/{user_id}/{device_id}/{unit_id}/{channel}
例
swpf/1/esp32_C80CABF16E20/thermometer_1/env
swpf/1/esp32_C80CABF16E20/camera_1/image
swpf/1/esp32_C80CABF16E20/system_1/boot
swpf/1/esp32_C80CABF16E20/waterpump_1/status
特徴
UNIT_IDを持つ- 親検索は
DEVICEID + UNITIDで行う - 取り込み後の後続処理は現行フローを維持する
5. 判定ルール
5.1 基本ルール
第4層が V1 channel 一覧に含まれる場合は V1 と判定します。 それ以外は V2 と判定します。
5.2 V1 channel 一覧
今回の判定に使用する V1 channel 定数は以下です。
sensorimagebootstatuserror
5.3 判定理由
UNITID は今後表記揺れが発生する可能性があります。 一方、V1 の channel 側は固定で管理しやすいため、 UNITIDの妥当性で判定するより、第4層が既知channelかどうかで判定する方が安全です。
6. lookup key 仕様
| 種別 | lookup key |
|---|---|
| V1 | device_id |
| V2 | device_id + "__" + unit_id |
例
| 入力 | lookup key |
|---|---|
V1: esp32_AAA | esp32_AAA |
V2: esp32AAA + thermometer1 | esp32_AAA__thermometer_1 |
7. 今回の改修範囲
7.1 変更しないもの
FetchRawDataFromMqttQueue
このクラスは基本的に変更しません。 現状どおり以下を行います。
- MQTT RAW の取得
- context への格納
groupRawRowsByParentUid()の結果を使ったMQTT_RECEIVED発火
7.2 変更するもの
swpfmqttimport の import service
この service に以下の機能を追加します。
- topic 解析
- V1 / V2 判定
- lookup key 生成
- 親UID解決
- invalid topic のログ出力とスキップ
8. 現在のモジュール構成とロジック
8.1 現在の大まかな流れ
- ミラーリングテーブルから未処理の MQTT RAW を取得
- queue プラグインが raw records を context に格納
- import service が親UIDごとに raw rows をグルーピング
MQTT_RECEIVEDを発火- 後続の現行処理へ渡す
8.2 現在の問題点
現状は基本的に DEVICEID 前提の解決であるため、 UNITID を持つ V2 topic が来たときに、ユニット単位で親を引き分けることができません。
9. 改修後の設計方針
9.1 基本方針
- 入口の topic 解釈だけを拡張する
- 後続ロジックはできるだけ現行維持
- V1 は完全維持
- V2 は lookup key だけ拡張
- invalid は落とさずログしてスキップ
9.2 ねらい
この方式により、影響範囲を import service にほぼ限定しつつ、 V2 の取り込みを可能にします。
10. Mermaid付き詳細フロー図
10.1 全体フロー
10.2 V1 / V2 判定フロー
10.3 lookup key 生成フロー
10.4 親UID解決フロー
11. 実装コードレベル設計(PHP関数単位)
以下は 実装対象の関数設計案 です。 関数名は現実装に合わせて変更可ですが、責務分離はこの形を推奨します。
11.1 既存メソッド: groupRawRowsByParentUid(array $rows): array
役割
raw rows を親UIDごとにまとめる既存メソッド。
改修内容
- 各rowごとに
parseTopicContext()を呼ぶ buildLookupKey()を呼ぶresolveParentUidByLookupKey()を呼ぶ- invalid または parent 未解決はスキップ
- 最終的に
parent_uid => rows[]の形で返す
擬似コード
public function groupRawRowsByParentUid(array $rows): array {
$grouped = [];
foreach ($rows as $row) {
$topicContext = $this->parseTopicContext($row);
if (!$topicContext['valid']) {
$this->logger->warning('Invalid MQTT topic skipped.', [
'topic' => $row['topic'] ?? '',
]);
continue;
}
$lookupKey = $this->buildLookupKey(
$topicContext['device_id'],
$topicContext['unit_id'],
$topicContext['version']
);
$parentUid = $this->resolveParentUidByLookupKey(
$lookupKey,
$topicContext['version']
);
if (!$parentUid) {
$this->logger->warning('MQTT parent uid not resolved.', [
'topic' => $row['topic'] ?? '',
'version' => $topicContext['version'],
'lookup_key' => $lookupKey,
]);
continue;
}
$row['_mqtt_topic_context'] = $topicContext;
$row['_mqtt_lookup_key'] = $lookupKey;
$grouped[$parentUid][] = $row;
}
return $grouped;
}
11.2 新規メソッド: parseTopicContext(array $row): array
役割
topic から V1 / V2 / invalid を判定し、必要な情報を返す。
返却例
[
'valid' => true,
'version' => 'v2',
'user_id' => '1',
'device_id' => 'esp32_C80CABF16E20',
'unit_id' => 'thermometer_1',
'channel' => 'env',
'topic' => 'swpf/1/esp32_C80CABF16E20/thermometer_1/env',
]
擬似コード
public function parseTopicContext(array $row): array {
$topic = (string) ($row['topic'] ?? '');
$parts = array_values(array_filter(explode('/', trim($topic, '/')), 'strlen'));
if (count($parts) < 4) {
return ['valid' => false, 'version' => 'invalid', 'topic' => $topic];
}
$userId = $parts[1] ?? null;
$deviceId = $parts[2] ?? null;
$part4 = $parts[3] ?? null;
if (!$deviceId || !$part4) {
return ['valid' => false, 'version' => 'invalid', 'topic' => $topic];
}
if ($this->isV1Channel((string) $part4)) {
return [
'valid' => true,
'version' => 'v1',
'user_id' => $userId,
'device_id' => $deviceId,
'unit_id' => null,
'channel' => $part4,
'topic' => $topic,
];
}
$unitId = $part4;
$channel = $parts[4] ?? null;
if (!$channel) {
return ['valid' => false, 'version' => 'invalid', 'topic' => $topic];
}
return [
'valid' => true,
'version' => 'v2',
'user_id' => $userId,
'device_id' => $deviceId,
'unit_id' => $unitId,
'channel' => $channel,
'topic' => $topic,
];
}
11.3 新規メソッド: isV1Channel(string $part4): bool
役割
第4層が V1 channel 一覧に含まれるか判定する。
擬似コード
protected function isV1Channel(string $part4): bool {
$v1Channels = [
'sensor',
'image',
'boot',
'status',
'error',
];
return in_array($part4, $v1Channels, true);
}
11.4 新規メソッド: buildLookupKey(string $deviceId, ?string $unitId, string $version): string
役割
V1 / V2 に応じた lookup key を返す。
擬似コード
protected function buildLookupKey(string $deviceId, ?string $unitId, string $version): string {
if ($version === 'v2') {
return $deviceId . '__' . (string) $unitId;
}
return $deviceId;
}
11.5 新規または既存改修メソッド: resolveParentUidByLookupKey(string $lookupKey, string $version): ?int
役割
lookup key から親UIDを解決する。
方針
- V1 は既存の
device_id解決を使う - V2 は
device_id__unit_idを解決する検索へ分岐
擬似コード
protected function resolveParentUidByLookupKey(string $lookupKey, string $version): ?int {
if ($version === 'v2') {
// device_id + unit_id 用の検索
return $this->findParentUidByDeviceAndUnitKey($lookupKey);
}
// 既存の device_id 検索
return $this->findParentUidByDeviceId($lookupKey);
}
11.6 FetchRawDataFromMqttQueue::execute(array $context)
方針
原則無改修。
任意追加
検証用にログ追加のみ可。
例:
- fetched件数
- grouped親UID数
- V1件数
- V2件数
- invalid件数
12. 改修箇所一覧
| 区分 | 対象 | 内容 |
|---|---|---|
| 主改修 | SwpfMqttImportService::groupRawRowsByParentUid() | V1/V2切り分けと parent uid 解決を追加 |
| 新規追加 | parseTopicContext() | topic解析と V1/V2判定 |
| 新規追加 | isV1Channel() | 第4層の V1 channel 判定 |
| 新規追加 | buildLookupKey() | V1/V2別の検索キー生成 |
| 新規または改修 | resolveParentUidByLookupKey() | V1/V2別の親UID解決 |
| 任意 | FetchRawDataFromMqttQueue::execute() | ログ追加のみ |
13. invalid topic の扱い
方針
invalid topic は エラーで停止しない。 ログ出力してスキップする。
理由
- 既存運用への影響を最小にするため
- topic typo や不正データを吸収しやすくするため
- まずは V2 移行を優先するため
14. テスト手順書
14.1 テスト目的
以下を確認する。
- V1 が現行どおり動く
- V2 が
DEVICEID + UNITIDで解決される - invalid がログだけでスキップされる
FetchRawDataFromMqttQueueの現行フローが壊れない
14.2 テスト前提
- MQTT broker が利用可能
- Drupal 側で対象 device / unit に対応する親UIDが登録済み
- import service の改修版が反映済み
- queue 実行または対象プラグイン実行が可能
14.3 実機テスト観点
ケース1: V1 sensor
送信topic例
swpf/1/esp32_C80CABF16E20/sensor/env
期待結果
- V1判定
- lookup key =
esp32_C80CABF16E20 - 現行どおり親UID解決
MQTT_RECEIVED発火
ケース2: V1 image
送信topic例
swpf/1/esp32_C80CABF16E20/image/meta
期待結果
- V1判定
- lookup key =
esp32_C80CABF16E20 - 現行どおり処理
ケース3: V2 env
送信topic例
swpf/1/esp32_C80CABF16E20/thermometer_1/env
期待結果
- V2判定
- lookup key =
esp32_C80CABF16E20__thermometer_1 - V2用親UID解決
- 後続は現行どおり
ケース4: V2 image
送信topic例
swpf/1/esp32_C80CABF16E20/camera_1/image
期待結果
- V2判定
- lookup key =
esp32_C80CABF16E20__camera_1 - 親UID解決
- 後続処理継続
ケース5: invalid topic
送信topic例
swpf/1/esp32_C80CABF16E20
swpf/1/esp32_C80CABF16E20/unknown_only
期待結果
- invalid判定
- warning または error ログ
- スキップ
- システム停止なし
14.4 MQTTコマンドによるテスト
以下は mosquitto_pub を使う例です。 環境に応じて host / port / topic / payload を変更してください。
V1 sensor テスト
mosquitto_pub -h <MQTT_HOST> -p <MQTT_PORT> -t "swpf/1/esp32_C80CABF16E20/sensor/env" -m '{"temperature":26.5,"humidity":47.0}'
V1 image テスト
mosquitto_pub -h <MQTT_HOST> -p <MQTT_PORT> -t "swpf/1/esp32_C80CABF16E20/image/meta" -m '{"type":"meta","event_id":"test001"}'
V2 env テスト
mosquitto_pub -h <MQTT_HOST> -p <MQTT_PORT> -t "swpf/1/esp32_C80CABF16E20/thermometer_1/env" -m '{"temperature":26.5,"humidity":47.0}'
V2 image テスト
mosquitto_pub -h <MQTT_HOST> -p <MQTT_PORT> -t "swpf/1/esp32_C80CABF16E20/camera_1/image" -m '{"type":"meta","event_id":"cam001"}'
invalid テスト
mosquitto_pub -h <MQTT_HOST> -p <MQTT_PORT> -t "swpf/1/esp32_C80CABF16E20/unknown_only" -m '{"x":1}'
14.5 確認ポイント
ログ確認
確認したいログ例:
- fetched rows count
- V1 count
- V2 count
- invalid count
- parent uid resolved
- parent uid unresolved
- lookup key
DB / Queue / Context確認
- context に raw rows が入っているか
groupRawRowsByParentUid()の戻りが正しいかMQTT_RECEIVEDが期待 parent uid で発火しているか
15. テスト結果記録欄
| ケース | 期待 | 実結果 | OK/NG | 備考 |
|---|---|---|---|---|
| V1 sensor | V1処理 | |||
| V1 image | V1処理 | |||
| V2 env | V2処理 | |||
| V2 image | V2処理 | |||
| invalid | ログのみ |
16. 改修規模
今回の改修規模は 小〜中規模 です。
理由
- テーブル定義変更なし
FetchRawDataFromMqttQueueの責務変更なし- 主改修は import service のみ
- 後続の Automation / FIRE / 保存ロジックは現行維持
17. 今回はやらないこと
今回の最小改修では以下は対象外です。
ALL対応- bundle payload 分解
- V1をV2形式へ自動変換
- UI変更
- topic保存形式の変更
- mirroring table のカラム追加
18. 最終方針まとめ
今回の最小改修の最終方針は以下です。
FetchRawDataFromMqttQueueは基本そのままswpfmqttimportの import service に- V1/V2判定
- lookup key生成
- 親UID解決
を追加
- V1 は現行維持
- V2 は
DEVICEID + UNITIDで検索 - invalid はログしてスキップ
この方針により、影響範囲を限定しながら V2 を導入できます。