MQTTデータ取り込み時のキー変換処理仕様書

← 一覧へ戻る MQTTデータ取り込み時のキー変換処理仕様書

MQTT_IMPORT 変換マッパープラグイン設計書

作成日: 2026-04-21


1. この設計書の目的

この設計書は、MQTT受信から IOTDATAHEADER 保存までの全体像を説明したうえで、 今回新規作成する MQTT_IMPORT モジュール内の変換マッパープラグインタイプ について、 はじめて実装する開発者向けに一から説明するものです。

特に重要なポイントは次の3つです。

  1. MQTT_IMPORT モジュールの役割を明確にすること
  2. IOTDATAHEADER 保存用サービスから変換ロジックを分離すること
  3. UNITタイプごとに payload キー → IOTDATAHEADER カラムへの変換を切り替えられるようにすること

2. 背景と現在の問題

現在の構成では、mimamorirestapi モジュールの MimamoriUpdateIotdataService が IOTDATAHEADER 保存を行っています。

その中で、対象デバイスユーザーに設定された ACTION プラグイン (例: growplants)を読み込み、getIotDataProcessParams() で 変換ルールを取得して保存しています。

つまり今は、

  • 受信
  • 変換ルール取得
  • 値変換
  • IOTDATAHEADER 保存

が、MimamoriUpdateIotdataService に強く結びついています。

この方式は、初期段階では動きますが、今後以下の理由で苦しくなります。

  • MQTT topic が V1 / V2 に分かれる
  • V2 では unit_id によりユニット単位で処理したい
  • デバイスごとではなく UNITタイプごと に変換ルールを変えたい
  • growplants のような業務用 ACTION プラグインに、変換責務を持たせ続けたくない
  • thermometer / co2 / weather などのユニットが増える

そのため、変換責務を MQTT_IMPORT 側に戻す 設計へ切り替えます。


3. 目指す最終イメージ

今回の設計では、責務を次のように整理します。

  • MQTT_IMPORT
  • MQTT RAW を解釈する
  • V1 / V2 を判定する
  • UNITタイプを特定する
  • UNITタイプ用の変換マッパープラグインを呼ぶ
  • 正規化済みデータを IOTDATAHEADER 保存サービスへ渡す
  • IOTDATAHEADER 保存サービス
  • 受け取った正規化済みデータを保存する
  • 保存そのものに集中する
  • ACTION プラグイン
  • 保存後の業務処理や通知、分析などに集中する

つまり、どのキーをどの value 列に入れるか という責務は MQTT_IMPORT の新規マッパープラグインへ移します。


4. 全体像: MQTT受信から IOTDATAHEADER 保存まで

4.1 全体フロー

flowchart TD A["MQTT Broker"] --> B["ミラーリングテーブル<br/>swpf_mqtt_mirror_mqtt_raw_queue"] B --> C["FetchRawDataFromMqttQueue<br/>未処理RAW取得"] C --> D["SwpfMqttImportService"] D --> E["V1 / V2 判定"] E --> F["parent uid 解決"] F --> G["Queue item 作成"] G --> H["MQTT_IMPORT QueueWorker / 保存処理"] H --> I["UNITタイプ判定"] I --> J["MqttImportValueMapper プラグイン選択"] J --> K["payload → IOTDATAHEADER 保存用配列へ変換"] K --> L["MimamoriUpdateIotdataService または保存サービス"] L --> M["IOTDATAHEADER 保存"] M --> N["必要なら ACTION プラグイン実行"]

4.2 現在の IMPORT モジュールの重要な動き

swpfmqttimport の中心は SwpfMqttImportService です。 このサービスは主に次の責務を持っています。

現在の責務

  • 最終取り込み位置(sourcerowid)の取得
  • 未処理 RAW (processed = 0) の取得
  • topic の解析
  • 疑似MAC / topic から対象デバイス解決
  • QueueWorker に積むための queue item 作成
  • 処理済み / エラー更新

特に重要なメソッド

  • getLastImportedSourceRowId()
  • fetchLatestMqttRawData()
  • groupRawRowsByParentUid()
  • syncMqttDataByQueueWorker()
  • markRowsProcessed()
  • markRowsError()

4.3 MimamoriUpdateIotdataService の現在の動き

現在の MimamoriUpdateIotdataService は、受信データを保存する際に次のような流れで動いています。

  1. 対象 device user の fieldpluginid を取得
  2. plugin.manager.mimamori_action から ACTION プラグインを生成
  3. getIotDataProcessParams() でキー変換定義を取得
  4. payload の各キーを value1 などへ変換
  5. IotDataHeader を保存
  6. 保存後に executeJustAfterReceived() を呼ぶ

この構造だと、保存前変換が ACTION プラグインに依存しています。


5. なぜ ACTION プラグインではなく MQTT_IMPORT に置くのか

5.1 ACTION プラグインに置き続ける問題

現在の growplants 方式では、getIotDataProcessParams() が payload の変換ルールを持っています。

例:

  • temp -> value1
  • humi -> value2
  • light -> value3

これは初期構成としては分かりやすいですが、今後は問題になります。

問題点

  • growplants が業務処理と I/F 定義を両方持ってしまう
  • unit_type ごとに変換したいのに、device user 単位の plugin に縛られる
  • thermometer / co2 / weather などを growplants に寄せるのは不自然
  • MQTTIMPORT 側で unittype が分かっているのに、その情報を活かしきれない

5.2 MQTT_IMPORT に置く利点

変換マッパーを MQTT_IMPORT に置くと、責務がきれいに分かれます。

利点

  • topic / unitid / unittype に最も近い場所で変換できる
  • unit_type ごとのマッパーに分離できる
  • ACTION プラグインを業務処理に集中させられる
  • IOTDATAHEADER 保存サービスを汎用化できる
  • V2 と非常に相性がよい

6. 今回新規作成するものの概要

今回新規作成するのは、MQTT_IMPORT モジュール専用の変換マッパープラグインタイプ です。

仮称を以下とします。

  • プラグインタイプ名: MqttImportValueMapper
  • プラグイン管理サービス名: plugin.manager.mqttimportvalue_mapper
  • プラグイン格納場所:

swpfmqttimport/src/Plugin/MqttImportValueMapper/

役割

  • UNITタイプごとに payload を解釈する
  • payload のキーを IOTDATAHEADER カラムへ変換する
  • 必要に応じて function を使い値を変換する
  • extraFunction 的な派生計算も返せるようにする

7. どの情報をプラグインが受け取るか

新しいマッパープラグインは、最低限次の情報を受け取る想定です。

入力

  • unit_type
  • unit_id
  • topic_context
  • payload
  • device/user/context(必要なら)

出力

  • IOTDATAHEADER に保存すべきフィールド配列
  • 追加の計算結果
  • ログ用メタ情報

出力例

[
  'header_values' => [
    'value1' => 26.5,
    'value2' => 47.0,
    'message' => '...',
  ],
  'derived_values' => [
    'temp_var_rate' => 1.2,
    'effective_temp' => 28.1,
  ],
  'meta' => [
    'unit_type' => 'thermometer',
    'matched_keys' => ['temp', 'humi'],
  ],
]

8. 設計方針

8.1 基本方針

  • 変換ルールは unit_type ごとに持つ
  • payload キーのゆらぎは unit_type の MAP で吸収する
  • 型番は補助情報に留める
  • 摂氏 / 華氏のような単位差は function で吸収する
  • 保存サービスはマッピング済み値を受け取って保存するだけにする

8.2 unit_type 主体で設計する理由

たとえば温度系センサーは、メーカーや型番によって payload キーが次のように揺れる可能性があります。

  • temp
  • temperature
  • t
  • temp_c

これをセンサー型番ごとに完全分離するより、 まず thermometer という論理タイプでまとめる方が管理しやすいです。

つまり、メーカーごとのゆらぎは thermometer の MAP で吸収する という考え方です。


9. 新プラグインタイプの責務

新しい MqttImportValueMapper プラグインは次の責務を持ちます。

必須責務

  1. 自分がどの unit_type に対応するか宣言する
  2. payload キーを IOTDATAHEADER カラムへ変換する
  3. function による値変換を実行する
  4. 変換結果を統一形式で返す

任意責務

  1. 派生値(例: 温度変化率)を計算する
  2. メッセージ生成の補助を行う
  3. validation を行う

10. プラグインタイプの実装構成

以下の構成を推奨します。

swpf_mqtt_import/
  src/
    Annotation/
      MqttImportValueMapper.php
    MqttImportValueMapperInterface.php
    MqttImportValueMapperBase.php
    MqttImportValueMapperManager.php
    Plugin/
      MqttImportValueMapper/
        DefaultMapper.php
        ThermometerMapper.php
        Co2SensorMapper.php

11. アノテーション設計

11.1 アノテーションクラス

ファイル例: swpfmqttimport/src/Annotation/MqttImportValueMapper.php

役割

Drupal の annotated plugin として、各 mapper の定義情報を記述する。

推奨プロパティ

  • id
  • label
  • unit_type
  • description

サンプル

<?php

namespace Drupal\swpf_mqtt_import\Annotation;

use Drupal\Component\Annotation\Plugin;

/**
 * MQTT IMPORT value mapper plugin definition.
 *
 * @Annotation
 */
class MqttImportValueMapper extends Plugin {

  /**
   * The plugin ID.
   *
   * @var string
   */
  public $id;

  /**
   * Human readable label.
   *
   * @var \Drupal\Core\Annotation\Translation
   */
  public $label;

  /**
   * Target unit type.
   *
   * @var string
   */
  public $unit_type;

  /**
   * Description.
   *
   * @var string
   */
  public $description;

}

11.2 プラグイン定義の使用例

/**
 * @MqttImportValueMapper(
 *   id = "thermometer",
 *   label = @Translation("Thermometer Mapper"),
 *   unit_type = "thermometer",
 *   description = "Maps thermometer payload into IOTDATAHEADER values."
 * )
 */

12. インターフェース設計

ファイル例: swpfmqttimport/src/MqttImportValueMapperInterface.php

役割

すべての mapper が実装すべき共通契約を定義する。

推奨メソッド

<?php

namespace Drupal\swpf_mqtt_import;

interface MqttImportValueMapperInterface {

  /**
   * 対応 unit_type を返す。
   */
  public function getUnitType(): string;

  /**
   * payload を IOTDATAHEADER 用の配列に変換する。
   *
   * @param array $payload
   * @param array $context
   *
   * @return array
   *   [
   *     'header_values' => [],
   *     'derived_values' => [],
   *     'meta' => [],
   *   ]
   */
  public function mapPayload(array $payload, array $context = []): array;

}

13. ベースクラス設計

ファイル例: swpfmqttimport/src/MqttImportValueMapperBase.php

役割

共通機能を持つ抽象クラス。

持たせたい共通機能

  • アノテーションから unit_type を読む
  • 共通の helper を持つ
  • function 呼び出しの共通処理
  • payload から存在するキーだけ拾う補助処理

サンプル

<?php

namespace Drupal\swpf_mqtt_import;

use Drupal\Component\Plugin\PluginBase;

abstract class MqttImportValueMapperBase extends PluginBase implements MqttImportValueMapperInterface {

  public function getUnitType(): string {
    return (string) ($this->pluginDefinition['unit_type'] ?? '');
  }

  protected function getPayloadValue(array $payload, array $candidateKeys, mixed $default = null): mixed {
    foreach ($candidateKeys as $key) {
      if (array_key_exists($key, $payload)) {
        return $payload[$key];
      }
    }
    return $default;
  }

}

14. Plugin Manager 設計

ファイル例: swpfmqttimport/src/MqttImportValueMapperManager.php

役割

Plugin/MqttImportValueMapper 配下の annotated plugin を発見し、 plugin instance を生成する。

役割のポイント

  • unit_type から mapper を引けるようにする
  • 定義一覧の取得
  • キャッシュ

概略

MimamoriActionManager と同じ考え方で作成する。 違いは subdir と annotation class の名前だけです。


15. services.yml 設計

swpfmqttimport.services.yml に manager をサービス登録します。

services:
  plugin.manager.mqtt_import_value_mapper:
    class: Drupal\swpf_mqtt_import\MqttImportValueMapperManager
    arguments: ['@container.namespaces', '@cache.discovery', '@module_handler']

必要なら補助サービスも追加できます。

  swpf_mqtt_import.value_mapper_resolver:
    class: Drupal\swpf_mqtt_import\Service\MqttImportValueMapperResolver
    arguments: ['@plugin.manager.mqtt_import_value_mapper']

16. サンプルプラグイン: ThermometerMapper

ファイル例: swpfmqttimport/src/Plugin/MqttImportValueMapper/ThermometerMapper.php

16.1 目的

温度系ユニットの payload を IOTDATAHEADER 用に変換する。

16.2 thermometer で吸収するキー揺れの例

  • 温度: temp, temperature, t, temp_c
  • 湿度: humi, humidity, hum, h

16.3 サンプルコード

<?php

namespace Drupal\swpf_mqtt_import\Plugin\MqttImportValueMapper;

use Drupal\swpf_mqtt_import\Annotation\MqttImportValueMapper;
use Drupal\swpf_mqtt_import\MqttImportValueMapperBase;

/**
 * @MqttImportValueMapper(
 *   id = "thermometer",
 *   label = @Translation("Thermometer Mapper"),
 *   unit_type = "thermometer",
 *   description = "Maps thermometer payload into IOTDATAHEADER values."
 * )
 */
class ThermometerMapper extends MqttImportValueMapperBase {

  public function mapPayload(array $payload, array $context = []): array {
    $headerValues = [];
    $derivedValues = [];

    $temp = $this->getPayloadValue($payload, ['temp', 'temperature', 't', 'temp_c']);
    $humi = $this->getPayloadValue($payload, ['humi', 'humidity', 'hum', 'h']);

    if ($temp !== null) {
      $headerValues['value1'] = (float) $temp;
    }

    if ($humi !== null) {
      $headerValues['value2'] = (float) $humi;
    }

    return [
      'header_values' => $headerValues,
      'derived_values' => $derivedValues,
      'meta' => [
        'unit_type' => $this->getUnitType(),
      ],
    ];
  }

}

17. サンプルプラグイン: Co2SensorMapper

17.1 目的

CO2 センサー系の payload を変換する。

17.2 重要な考え方

CO2 センサーにも temp を持たせてよい。 ただしその temp は thermometer の主温度と完全に同一とは限らない。

そのため、同じ temp というキー名でも、unit_type ごとに保存先を変えてよい

  • co2 -> value1
  • temp -> value2
  • humi -> value3

18. 摂氏・華氏変換の考え方

18.1 なぜ function が必要か

payload のキーゆらぎは MAP で吸収できますが、 単位差 は MAP だけでは吸収できません。

たとえば次の2つは意味が違います。

  • temp = 26.5 (摂氏)
  • temp_f = 79.7 (華氏)

どちらも最終的には value1 = 摂氏温度 に揃えたい場合、 function を使って値そのものを変換する必要があります。


18.2 仕様例

入力 payload 例1: 摂氏

{{
  "temp": 26.5,
  "humi": 47.0
}}

入力 payload 例2: 華氏

{{
  "temp_f": 79.7,
  "humidity": 47.0
}}

目標

どちらも最終的に

  • value1 = 摂氏温度
  • value2 = 湿度

へ揃える。


18.3 function を含む MAP の考え方

単純な "temp" => "value1" ではなく、 次のような定義を持つと柔軟になります。

[
  'temp' => [
    'column' => 'value1',
    'function' => null,
  ],
  'temp_f' => [
    'column' => 'value1',
    'function' => 'convertFahrenheitToCelsius',
  ],
  'humidity' => [
    'column' => 'value2',
    'function' => null,
  ],
]

18.4 function のサンプル実装

protected function convertFahrenheitToCelsius(float $value): float {
  return round(($value - 32) * 5 / 9, 2);
}

18.5 function 適用込みの例

public function mapPayload(array $payload, array $context = []): array {
  $headerValues = [];

  if (isset($payload['temp'])) {
    $headerValues['value1'] = (float) $payload['temp'];
  }
  elseif (isset($payload['temp_f'])) {
    $headerValues['value1'] = $this->convertFahrenheitToCelsius((float) $payload['temp_f']);
  }

  if (isset($payload['humi'])) {
    $headerValues['value2'] = (float) $payload['humi'];
  }
  elseif (isset($payload['humidity'])) {
    $headerValues['value2'] = (float) $payload['humidity'];
  }

  return [
    'header_values' => $headerValues,
    'derived_values' => [],
    'meta' => [],
  ];
}

19. extraFunction 的な派生計算の扱い

既存の growplants では extraFunction を使い、

  • calcTempVarRate
  • calcEffectiveTemp

のような派生処理を行っています。

新設するマッパープラグインでも、同じ概念を持たせられます。

  • value1 に温度を入れる
  • その温度から tempvarrate を計算する
  • さらに effective_temp を計算する

この場合、返り値の derived_values に含める設計がおすすめです。

[
  'header_values' => [
    'value1' => 26.5,
  ],
  'derived_values' => [
    'temp_var_rate' => 1.4,
    'effective_temp' => 28.1,
  ],
]

20. IMPORT モジュール側の組み込み位置

20.1 最も重要な考え方

変換は 保存直前 に行うのが分かりやすいです。

つまり IMPORT の流れの中で

  1. topic 解析
  2. V1 / V2 判定
  3. parent uid 解決
  4. queue item 準備
  5. 保存直前に mapper plugin を選ぶ
  6. mapper が payload を正規化
  7. IOTDATAHEADER 保存

の順にします。


20.2 推奨メソッド分割

SwpfMqttImportService または QueueWorker 側に、以下の役割を追加します。

推奨メソッド

  • resolveUnitType(array $topicContext, array $payload): string
  • resolveMapperPluginId(string $unitType): string
  • mapPayloadForIotHeader(string $unitType, array $payload, array $context): array

20.3 resolveUnitType の考え方

V2 の場合、unitidthermometer1 なら、 unit_type = thermometer と切り出せます。

サンプル

protected function resolveUnitType(array $topicContext, array $payload): string {
  $unitId = (string) ($topicContext['unit_id'] ?? '');
  if ($unitId !== '' && str_contains($unitId, '_')) {
    return explode('_', $unitId, 2)[0];
  }
  return 'default';
}

21. V1 と V2 の扱い

21.1 V2 の場合

  • unit_id がある
  • そこから unit_type を取り出す
  • 対応する mapper plugin を呼ぶ

21.2 V1 の場合

V1 は unit_id がありません。

したがって V1 では次のいずれかが必要です。

選択肢

  1. device user の設定からデフォルト unit_type を決める
  2. plugin 設定から unit_type を決める
  3. 暫定的に default mapper を使う

今回のおすすめ

最初は default または既存互換 mapper を用意し、 V1 はそこへ流す。


22. 保存サービス側の設計変更方針

現在の MimamoriUpdateIotdataService は、 ACTION プラグインから getIotDataProcessParams() を取得して 自分で変換しています。

新設計では、保存サービスは可能な限り次の責務だけに絞ります。

  • 受け取った header 値を保存する
  • 必要な派生値を保存する
  • 保存後イベントを投げる

つまり、「何をどの value 列に入れるか」は IMPORT 側で終わらせる 方針です。


23. 既存 growplants からの移行方針

23.1 現在

  • growplants が getIotDataProcessParams() を持つ
  • 保存サービスがその定義を読む

23.2 移行後

  • thermometer / co2_sensor / weather などの mapper plugin を IMPORT に作る
  • 保存サービスは mapper 済み値を保存する
  • growplants は保存後の業務処理に集中する

23.3 段階移行

完全に一度で切り替えなくてもよいです。

段階案

  1. MQTT_IMPORT に mapper plugin を新設
  2. 新しい MQTT 経路だけ mapper を使う
  3. 既存 REST / growplants ルートは当面維持
  4. 安定後に保存サービスから旧変換ロジックを削減

24. 実装手順(推奨順)

Phase 1: 基盤作成

  1. Annotation クラス作成
  2. Interface 作成
  3. Base クラス作成
  4. Plugin Manager 作成
  5. services.yml 登録

Phase 2: 最初の mapper 作成

  1. DefaultMapper 作成
  2. ThermometerMapper 作成
  3. Co2SensorMapper 作成

Phase 3: IMPORT への組み込み

  1. unit_type 解決処理追加
  2. mapper 選択処理追加
  3. payload 正規化処理追加
  4. 保存サービスへ渡す配列を変更

Phase 4: 移行

  1. 既存 growplants 変換との整合確認
  2. V2 経路で優先使用
  3. 必要に応じて V1 fallback 実装

25. 開発者向けの重要注意点

25.1 このプラグインは「保存前変換」である

保存後の通知や判定ではありません。 あくまで payload を IOTDATAHEADER 用に正規化するためのプラグイン です。

25.2 unit_type と model は分ける

  • unittype: thermometer, co2sensor など
  • model: SHT30, SCD41 など

まずは unit_type で大枠を決め、 必要な場合だけ model 差分を追加します。

25.3 payload キーゆらぎは mapper が吸収する

temp, temperature, t などの違いは mapper で吸収するのが基本です。

25.4 単位差は function で吸収する

キーが違うだけでなく値の意味が違う場合は、MAP ではなく function を使います。


26. テスト観点

26.1 discovery テスト

  • プラグインが正しく見つかるか
  • plugin.manager.mqttimportvalue_mapper から取得できるか

26.2 thermometer テスト

  • temp -> value1
  • temperature -> value1
  • humi -> value2
  • humidity -> value2

26.3 Fahrenheit テスト

  • temp_f = 79.7value1 = 26.5 になるか

26.4 V2 組み込みテスト

  • unitid = thermometer1 から unit_type = thermometer を引けるか
  • mapper を選択できるか
  • 保存結果が正しいか

26.5 V1 fallback テスト

  • V1 topic でも default mapper で最低限処理できるか

27. 最小構成のサンプル一覧

最初に作るなら最低でもこの3つを推奨します。

  • DefaultMapper
  • ThermometerMapper
  • Co2SensorMapper

理由:

  • V1 fallback が必要
  • 温度系は最初に使う可能性が高い
  • CO2 系は温度補助値も扱えるため設計確認に向いている

28. この設計の最終的なメリット

この新設計により、次の効果が得られます。

  • IMPORT モジュールの責務が明確になる
  • unit_type ごとの変換を追加しやすい
  • growplants 依存を弱められる
  • V2 topic 構造を活かせる
  • Fahrenheit / Celsius のような現実的な差分に対応できる
  • はじめて実装する開発者でも、どこに何を書くかが分かりやすい

29. 実装時の補足メモ

  • FetchRawDataFromMqttQueue は今回の新プラグイン追加では原則変更しない
  • SwpfMqttImportService または QueueWorker が mapper を呼ぶ責務を持つ
  • 既存の MimamoriUpdateIotdataService は、段階移行の間だけ旧変換ロジックを残してもよい
  • 最終的には「IMPORT で正規化」「保存サービスは保存のみ」に寄せる

30. 最終まとめ

今回新規作成するのは、MQTT_IMPORT モジュール専用の変換マッパープラグインタイプ です。

このプラグインは、

  • MQTT payload を
  • unit_type ごとに
  • IOTDATAHEADER 用へ変換する

ためのものです。

設計の要点は次のとおりです。

  • 変換責務は ACTION ではなく IMPORT に置く
  • unit_type 主体でマッピングする
  • payload キーのゆらぎは MAP で吸収する
  • 単位差は function で吸収する
  • 保存サービスは保存に専念させる

この設計により、今後の V2 / unitid / unittype 中心の世界へ、 無理なく移行できます。

 

当サイトまたはIoTカスタムモジュール、開発支援に関するお問い合わせはこちらのメールフォームからお気軽にお問い合わせください。