MQTT_V1V2改修仕様書

← 一覧へ戻る MQTT_V1V2改修仕様書

MQTT V1 / V2 切り分け改修仕様書

作成日: 2026-04-20


1. この資料の目的

この資料は、SWPF の MQTT 取り込み処理に対して、V1 / V2 の切り分けと、 V2 の場合のみ DEVICEID + UNITID で親デバイスを検索する改修についてまとめたものです。

今回の改修方針は次のとおりです。

  • FetchRawDataFromMqttQueue基本そのまま
  • swpfmqttimport の import service に以下を追加
  • V1 / V2 判定
  • lookup key 生成
  • 親UID解決
  • V1 は 現行処理を維持
  • V2 は DEVICEID + UNITID で検索
  • invalid topic は ログ出力してスキップ

2. 背景

現状の MQTT 取り込みでは、基本的に DEVICEID をキーとして親デバイスを解決しています。 しかし今後は、1台のマイコン配下に複数ユニットがぶら下がる構成を想定しているため、 新仕様では UNITID を topic に含めて、ユニット単位で識別できるようにする必要があります。

そのため、topic 仕様を V1 / V2 に分けて扱います。


3. 用語

用語意味
V1旧 topic 仕様。UNIT_ID を持たない
V2新 topic 仕様。UNIT_ID を持つ
DEVICE_IDESP32本体などのデバイス識別子
UNIT_IDデバイス配下のユニット識別子。例: thermometer1, camera1
channeltopic 上でデータ種別を表す要素。例: sensor, image, boot, status, error, env
lookup key親UID解決のための検索キー
parent uidSWPF内部での親デバイスまたは対象エンティティの UID

4. topic 仕様

4.1 V1(旧仕様)

swpf/{user_id}/{device_id}/{channel}

または既存運用上、sensor/envimage/meta のように 第4層が V1 channel の起点になる形式を含みます。

swpf/1/esp32_C80CABF16E20/sensor/env
swpf/1/esp32_C80CABF16E20/image/meta
swpf/1/esp32_C80CABF16E20/boot
swpf/1/esp32_C80CABF16E20/status
swpf/1/esp32_C80CABF16E20/error

特徴

  • UNIT_ID を持たない
  • 親検索は DEVICE_ID のみで行う
  • 現行処理を維持する

4.2 V2(新仕様)

swpf/{user_id}/{device_id}/{unit_id}/{channel}

swpf/1/esp32_C80CABF16E20/thermometer_1/env
swpf/1/esp32_C80CABF16E20/camera_1/image
swpf/1/esp32_C80CABF16E20/system_1/boot
swpf/1/esp32_C80CABF16E20/waterpump_1/status

特徴

  • UNIT_ID を持つ
  • 親検索は DEVICEID + UNITID で行う
  • 取り込み後の後続処理は現行フローを維持する

5. 判定ルール

5.1 基本ルール

第4層が V1 channel 一覧に含まれる場合は V1 と判定します。 それ以外は V2 と判定します。

5.2 V1 channel 一覧

今回の判定に使用する V1 channel 定数は以下です。

  • sensor
  • image
  • boot
  • status
  • error

5.3 判定理由

UNITID は今後表記揺れが発生する可能性があります。 一方、V1 の channel 側は固定で管理しやすいため、 UNITIDの妥当性で判定するより、第4層が既知channelかどうかで判定する方が安全です。


6. lookup key 仕様

種別lookup key
V1device_id
V2device_id + "__" + unit_id

入力lookup key
V1: esp32_AAAesp32_AAA
V2: esp32AAA + thermometer1esp32_AAA__thermometer_1

7. 今回の改修範囲

7.1 変更しないもの

FetchRawDataFromMqttQueue

このクラスは基本的に変更しません。 現状どおり以下を行います。

  • MQTT RAW の取得
  • context への格納
  • groupRawRowsByParentUid() の結果を使った MQTT_RECEIVED 発火

7.2 変更するもの

swpfmqttimport の import service

この service に以下の機能を追加します。

  • topic 解析
  • V1 / V2 判定
  • lookup key 生成
  • 親UID解決
  • invalid topic のログ出力とスキップ

8. 現在のモジュール構成とロジック

8.1 現在の大まかな流れ

  1. ミラーリングテーブルから未処理の MQTT RAW を取得
  2. queue プラグインが raw records を context に格納
  3. import service が親UIDごとに raw rows をグルーピング
  4. MQTT_RECEIVED を発火
  5. 後続の現行処理へ渡す

8.2 現在の問題点

現状は基本的に DEVICEID 前提の解決であるため、 UNITID を持つ V2 topic が来たときに、ユニット単位で親を引き分けることができません。


9. 改修後の設計方針

9.1 基本方針

  • 入口の topic 解釈だけを拡張する
  • 後続ロジックはできるだけ現行維持
  • V1 は完全維持
  • V2 は lookup key だけ拡張
  • invalid は落とさずログしてスキップ

9.2 ねらい

この方式により、影響範囲を import service にほぼ限定しつつ、 V2 の取り込みを可能にします。


10. Mermaid付き詳細フロー図

10.1 全体フロー

flowchart TD A["FetchRawDataFromMqttQueue<br/>未処理MQTT RAW取得"] --> B["input.raws.mqtt.records に格納"] B --> C["Import Service<br/>groupRawRowsByParentUid"] C --> D["各rowのtopic解析"] D --> E{{"V1 / V2 / invalid 判定"}} E -->|V1| F["V1 lookup key生成<br/>device_id"] E -->|V2| G["V2 lookup key生成<br/>device_id__unit_id"] E -->|invalid| H["ログ出力してスキップ"] F --> I["親UID解決"] G --> I I --> J["親UIDごとにグルーピング"] J --> K["FetchRawDataFromMqttQueue に返却"] K --> L["MQTT_RECEIVED を FIRE"]

10.2 V1 / V2 判定フロー

flowchart TD A["topicを / で分解"] --> B["第4層を取得"] B --> C{{"第4層がV1 channelか?"}} C -->|Yes| D["V1"] C -->|No| E["V2"]

10.3 lookup key 生成フロー

flowchart TD A["topic解析結果"] --> B{{"version"}} B -->|v1| C["lookup_key = device_id"] B -->|v2| D["lookup_key = device_id + '__' + unit_id"]

10.4 親UID解決フロー

flowchart TD A["lookup_key取得"] --> B{{"version"}} B -->|v1| C["device_id で既存解決"] B -->|v2| D["device_id__unit_id で解決"] C --> E{{"親UID取得できたか"}} D --> E E -->|Yes| F["グルーピング対象へ追加"] E -->|No| G["警告ログ出力"]

11. 実装コードレベル設計(PHP関数単位)

以下は 実装対象の関数設計案 です。 関数名は現実装に合わせて変更可ですが、責務分離はこの形を推奨します。

11.1 既存メソッド: groupRawRowsByParentUid(array $rows): array

役割

raw rows を親UIDごとにまとめる既存メソッド。

改修内容

  • 各rowごとに parseTopicContext() を呼ぶ
  • buildLookupKey() を呼ぶ
  • resolveParentUidByLookupKey() を呼ぶ
  • invalid または parent 未解決はスキップ
  • 最終的に parent_uid => rows[] の形で返す

擬似コード

public function groupRawRowsByParentUid(array $rows): array {
    $grouped = [];

    foreach ($rows as $row) {
        $topicContext = $this->parseTopicContext($row);

        if (!$topicContext['valid']) {
            $this->logger->warning('Invalid MQTT topic skipped.', [
                'topic' => $row['topic'] ?? '',
            ]);
            continue;
        }

        $lookupKey = $this->buildLookupKey(
            $topicContext['device_id'],
            $topicContext['unit_id'],
            $topicContext['version']
        );

        $parentUid = $this->resolveParentUidByLookupKey(
            $lookupKey,
            $topicContext['version']
        );

        if (!$parentUid) {
            $this->logger->warning('MQTT parent uid not resolved.', [
                'topic' => $row['topic'] ?? '',
                'version' => $topicContext['version'],
                'lookup_key' => $lookupKey,
            ]);
            continue;
        }

        $row['_mqtt_topic_context'] = $topicContext;
        $row['_mqtt_lookup_key'] = $lookupKey;
        $grouped[$parentUid][] = $row;
    }

    return $grouped;
}

11.2 新規メソッド: parseTopicContext(array $row): array

役割

topic から V1 / V2 / invalid を判定し、必要な情報を返す。

返却例

[
    'valid' => true,
    'version' => 'v2',
    'user_id' => '1',
    'device_id' => 'esp32_C80CABF16E20',
    'unit_id' => 'thermometer_1',
    'channel' => 'env',
    'topic' => 'swpf/1/esp32_C80CABF16E20/thermometer_1/env',
]

擬似コード

public function parseTopicContext(array $row): array {
    $topic = (string) ($row['topic'] ?? '');
    $parts = array_values(array_filter(explode('/', trim($topic, '/')), 'strlen'));

    if (count($parts) < 4) {
        return ['valid' => false, 'version' => 'invalid', 'topic' => $topic];
    }

    $userId = $parts[1] ?? null;
    $deviceId = $parts[2] ?? null;
    $part4 = $parts[3] ?? null;

    if (!$deviceId || !$part4) {
        return ['valid' => false, 'version' => 'invalid', 'topic' => $topic];
    }

    if ($this->isV1Channel((string) $part4)) {
        return [
            'valid' => true,
            'version' => 'v1',
            'user_id' => $userId,
            'device_id' => $deviceId,
            'unit_id' => null,
            'channel' => $part4,
            'topic' => $topic,
        ];
    }

    $unitId = $part4;
    $channel = $parts[4] ?? null;

    if (!$channel) {
        return ['valid' => false, 'version' => 'invalid', 'topic' => $topic];
    }

    return [
        'valid' => true,
        'version' => 'v2',
        'user_id' => $userId,
        'device_id' => $deviceId,
        'unit_id' => $unitId,
        'channel' => $channel,
        'topic' => $topic,
    ];
}

11.3 新規メソッド: isV1Channel(string $part4): bool

役割

第4層が V1 channel 一覧に含まれるか判定する。

擬似コード

protected function isV1Channel(string $part4): bool {
    $v1Channels = [
        'sensor',
        'image',
        'boot',
        'status',
        'error',
    ];

    return in_array($part4, $v1Channels, true);
}

11.4 新規メソッド: buildLookupKey(string $deviceId, ?string $unitId, string $version): string

役割

V1 / V2 に応じた lookup key を返す。

擬似コード

protected function buildLookupKey(string $deviceId, ?string $unitId, string $version): string {
    if ($version === 'v2') {
        return $deviceId . '__' . (string) $unitId;
    }
    return $deviceId;
}

11.5 新規または既存改修メソッド: resolveParentUidByLookupKey(string $lookupKey, string $version): ?int

役割

lookup key から親UIDを解決する。

方針

  • V1 は既存の device_id 解決を使う
  • V2 は device_id__unit_id を解決する検索へ分岐

擬似コード

protected function resolveParentUidByLookupKey(string $lookupKey, string $version): ?int {
    if ($version === 'v2') {
        // device_id + unit_id 用の検索
        return $this->findParentUidByDeviceAndUnitKey($lookupKey);
    }

    // 既存の device_id 検索
    return $this->findParentUidByDeviceId($lookupKey);
}

11.6 FetchRawDataFromMqttQueue::execute(array $context)

方針

原則無改修。

任意追加

検証用にログ追加のみ可。

例:

  • fetched件数
  • grouped親UID数
  • V1件数
  • V2件数
  • invalid件数

12. 改修箇所一覧

区分対象内容
主改修SwpfMqttImportService::groupRawRowsByParentUid()V1/V2切り分けと parent uid 解決を追加
新規追加parseTopicContext()topic解析と V1/V2判定
新規追加isV1Channel()第4層の V1 channel 判定
新規追加buildLookupKey()V1/V2別の検索キー生成
新規または改修resolveParentUidByLookupKey()V1/V2別の親UID解決
任意FetchRawDataFromMqttQueue::execute()ログ追加のみ

13. invalid topic の扱い

方針

invalid topic は エラーで停止しない。 ログ出力してスキップする。

理由

  • 既存運用への影響を最小にするため
  • topic typo や不正データを吸収しやすくするため
  • まずは V2 移行を優先するため

14. テスト手順書

14.1 テスト目的

以下を確認する。

  • V1 が現行どおり動く
  • V2 が DEVICEID + UNITID で解決される
  • invalid がログだけでスキップされる
  • FetchRawDataFromMqttQueue の現行フローが壊れない

14.2 テスト前提

  • MQTT broker が利用可能
  • Drupal 側で対象 device / unit に対応する親UIDが登録済み
  • import service の改修版が反映済み
  • queue 実行または対象プラグイン実行が可能

14.3 実機テスト観点

ケース1: V1 sensor

送信topic例

swpf/1/esp32_C80CABF16E20/sensor/env

期待結果

  • V1判定
  • lookup key = esp32_C80CABF16E20
  • 現行どおり親UID解決
  • MQTT_RECEIVED 発火

ケース2: V1 image

送信topic例

swpf/1/esp32_C80CABF16E20/image/meta

期待結果

  • V1判定
  • lookup key = esp32_C80CABF16E20
  • 現行どおり処理

ケース3: V2 env

送信topic例

swpf/1/esp32_C80CABF16E20/thermometer_1/env

期待結果

  • V2判定
  • lookup key = esp32_C80CABF16E20__thermometer_1
  • V2用親UID解決
  • 後続は現行どおり

ケース4: V2 image

送信topic例

swpf/1/esp32_C80CABF16E20/camera_1/image

期待結果

  • V2判定
  • lookup key = esp32_C80CABF16E20__camera_1
  • 親UID解決
  • 後続処理継続

ケース5: invalid topic

送信topic例

swpf/1/esp32_C80CABF16E20
swpf/1/esp32_C80CABF16E20/unknown_only

期待結果

  • invalid判定
  • warning または error ログ
  • スキップ
  • システム停止なし

14.4 MQTTコマンドによるテスト

以下は mosquitto_pub を使う例です。 環境に応じて host / port / topic / payload を変更してください。

V1 sensor テスト

mosquitto_pub -h <MQTT_HOST> -p <MQTT_PORT> -t "swpf/1/esp32_C80CABF16E20/sensor/env" -m '{"temperature":26.5,"humidity":47.0}'

V1 image テスト

mosquitto_pub -h <MQTT_HOST> -p <MQTT_PORT> -t "swpf/1/esp32_C80CABF16E20/image/meta" -m '{"type":"meta","event_id":"test001"}'

V2 env テスト

mosquitto_pub -h <MQTT_HOST> -p <MQTT_PORT> -t "swpf/1/esp32_C80CABF16E20/thermometer_1/env" -m '{"temperature":26.5,"humidity":47.0}'

V2 image テスト

mosquitto_pub -h <MQTT_HOST> -p <MQTT_PORT> -t "swpf/1/esp32_C80CABF16E20/camera_1/image" -m '{"type":"meta","event_id":"cam001"}'

invalid テスト

mosquitto_pub -h <MQTT_HOST> -p <MQTT_PORT> -t "swpf/1/esp32_C80CABF16E20/unknown_only" -m '{"x":1}'

14.5 確認ポイント

ログ確認

確認したいログ例:

  • fetched rows count
  • V1 count
  • V2 count
  • invalid count
  • parent uid resolved
  • parent uid unresolved
  • lookup key

DB / Queue / Context確認

  • context に raw rows が入っているか
  • groupRawRowsByParentUid() の戻りが正しいか
  • MQTT_RECEIVED が期待 parent uid で発火しているか

15. テスト結果記録欄

ケース期待実結果OK/NG備考
V1 sensorV1処理
V1 imageV1処理
V2 envV2処理
V2 imageV2処理
invalidログのみ

16. 改修規模

今回の改修規模は 小〜中規模 です。

理由

  • テーブル定義変更なし
  • FetchRawDataFromMqttQueue の責務変更なし
  • 主改修は import service のみ
  • 後続の Automation / FIRE / 保存ロジックは現行維持

17. 今回はやらないこと

今回の最小改修では以下は対象外です。

  • ALL 対応
  • bundle payload 分解
  • V1をV2形式へ自動変換
  • UI変更
  • topic保存形式の変更
  • mirroring table のカラム追加

18. 最終方針まとめ

今回の最小改修の最終方針は以下です。

  • FetchRawDataFromMqttQueue は基本そのまま
  • swpfmqttimport の import service に
  • V1/V2判定
  • lookup key生成
  • 親UID解決

を追加

  • V1 は現行維持
  • V2 は DEVICEID + UNITID で検索
  • invalid はログしてスキップ

この方針により、影響範囲を限定しながら V2 を導入できます。


 

当サイトまたはIoTカスタムモジュール、開発支援に関するお問い合わせはこちらのメールフォームからお気軽にお問い合わせください。