MQTT_IMPORT 変換マッパープラグイン設計書
作成日: 2026-04-21
1. この設計書の目的
この設計書は、MQTT受信から IOTDATAHEADER 保存までの全体像を説明したうえで、 今回新規作成する MQTT_IMPORT モジュール内の変換マッパープラグインタイプ について、 はじめて実装する開発者向けに一から説明するものです。
特に重要なポイントは次の3つです。
- MQTT_IMPORT モジュールの役割を明確にすること
- IOTDATAHEADER 保存用サービスから変換ロジックを分離すること
- UNITタイプごとに payload キー → IOTDATAHEADER カラムへの変換を切り替えられるようにすること
2. 背景と現在の問題
現在の構成では、mimamorirestapi モジュールの MimamoriUpdateIotdataService が IOTDATAHEADER 保存を行っています。
その中で、対象デバイスユーザーに設定された ACTION プラグイン (例: growplants)を読み込み、getIotDataProcessParams() で 変換ルールを取得して保存しています。
つまり今は、
- 受信
- 変換ルール取得
- 値変換
- IOTDATAHEADER 保存
が、MimamoriUpdateIotdataService に強く結びついています。
この方式は、初期段階では動きますが、今後以下の理由で苦しくなります。
- MQTT topic が V1 / V2 に分かれる
- V2 では
unit_idによりユニット単位で処理したい - デバイスごとではなく UNITタイプごと に変換ルールを変えたい
growplantsのような業務用 ACTION プラグインに、変換責務を持たせ続けたくない- thermometer / co2 / weather などのユニットが増える
そのため、変換責務を MQTT_IMPORT 側に戻す 設計へ切り替えます。
3. 目指す最終イメージ
今回の設計では、責務を次のように整理します。
- MQTT_IMPORT
- MQTT RAW を解釈する
- V1 / V2 を判定する
- UNITタイプを特定する
- UNITタイプ用の変換マッパープラグインを呼ぶ
- 正規化済みデータを IOTDATAHEADER 保存サービスへ渡す
- IOTDATAHEADER 保存サービス
- 受け取った正規化済みデータを保存する
- 保存そのものに集中する
- ACTION プラグイン
- 保存後の業務処理や通知、分析などに集中する
つまり、どのキーをどの value 列に入れるか という責務は MQTT_IMPORT の新規マッパープラグインへ移します。
4. 全体像: MQTT受信から IOTDATAHEADER 保存まで
4.1 全体フロー
4.2 現在の IMPORT モジュールの重要な動き
swpfmqttimport の中心は SwpfMqttImportService です。 このサービスは主に次の責務を持っています。
現在の責務
- 最終取り込み位置(
sourcerowid)の取得 - 未処理 RAW (
processed = 0) の取得 - topic の解析
- 疑似MAC / topic から対象デバイス解決
- QueueWorker に積むための queue item 作成
- 処理済み / エラー更新
特に重要なメソッド
getLastImportedSourceRowId()fetchLatestMqttRawData()groupRawRowsByParentUid()syncMqttDataByQueueWorker()markRowsProcessed()markRowsError()
4.3 MimamoriUpdateIotdataService の現在の動き
現在の MimamoriUpdateIotdataService は、受信データを保存する際に次のような流れで動いています。
- 対象 device user の
fieldpluginidを取得 plugin.manager.mimamori_actionから ACTION プラグインを生成getIotDataProcessParams()でキー変換定義を取得- payload の各キーを
value1などへ変換 IotDataHeaderを保存- 保存後に
executeJustAfterReceived()を呼ぶ
この構造だと、保存前変換が ACTION プラグインに依存しています。
5. なぜ ACTION プラグインではなく MQTT_IMPORT に置くのか
5.1 ACTION プラグインに置き続ける問題
現在の growplants 方式では、getIotDataProcessParams() が payload の変換ルールを持っています。
例:
temp -> value1humi -> value2light -> value3
これは初期構成としては分かりやすいですが、今後は問題になります。
問題点
growplantsが業務処理と I/F 定義を両方持ってしまう- unit_type ごとに変換したいのに、device user 単位の plugin に縛られる
- thermometer / co2 / weather などを growplants に寄せるのは不自然
- MQTTIMPORT 側で unittype が分かっているのに、その情報を活かしきれない
5.2 MQTT_IMPORT に置く利点
変換マッパーを MQTT_IMPORT に置くと、責務がきれいに分かれます。
利点
- topic / unitid / unittype に最も近い場所で変換できる
- unit_type ごとのマッパーに分離できる
- ACTION プラグインを業務処理に集中させられる
- IOTDATAHEADER 保存サービスを汎用化できる
- V2 と非常に相性がよい
6. 今回新規作成するものの概要
今回新規作成するのは、MQTT_IMPORT モジュール専用の変換マッパープラグインタイプ です。
仮称を以下とします。
- プラグインタイプ名:
MqttImportValueMapper - プラグイン管理サービス名:
plugin.manager.mqttimportvalue_mapper - プラグイン格納場所:
swpfmqttimport/src/Plugin/MqttImportValueMapper/
役割
- UNITタイプごとに payload を解釈する
- payload のキーを IOTDATAHEADER カラムへ変換する
- 必要に応じて function を使い値を変換する
- extraFunction 的な派生計算も返せるようにする
7. どの情報をプラグインが受け取るか
新しいマッパープラグインは、最低限次の情報を受け取る想定です。
入力
unit_typeunit_idtopic_contextpayloaddevice/user/context(必要なら)
出力
- IOTDATAHEADER に保存すべきフィールド配列
- 追加の計算結果
- ログ用メタ情報
出力例
[
'header_values' => [
'value1' => 26.5,
'value2' => 47.0,
'message' => '...',
],
'derived_values' => [
'temp_var_rate' => 1.2,
'effective_temp' => 28.1,
],
'meta' => [
'unit_type' => 'thermometer',
'matched_keys' => ['temp', 'humi'],
],
]
8. 設計方針
8.1 基本方針
- 変換ルールは unit_type ごとに持つ
- payload キーのゆらぎは unit_type の MAP で吸収する
- 型番は補助情報に留める
- 摂氏 / 華氏のような単位差は function で吸収する
- 保存サービスはマッピング済み値を受け取って保存するだけにする
8.2 unit_type 主体で設計する理由
たとえば温度系センサーは、メーカーや型番によって payload キーが次のように揺れる可能性があります。
temptemperaturettemp_c
これをセンサー型番ごとに完全分離するより、 まず thermometer という論理タイプでまとめる方が管理しやすいです。
つまり、メーカーごとのゆらぎは thermometer の MAP で吸収する という考え方です。
9. 新プラグインタイプの責務
新しい MqttImportValueMapper プラグインは次の責務を持ちます。
必須責務
- 自分がどの
unit_typeに対応するか宣言する - payload キーを IOTDATAHEADER カラムへ変換する
- function による値変換を実行する
- 変換結果を統一形式で返す
任意責務
- 派生値(例: 温度変化率)を計算する
- メッセージ生成の補助を行う
- validation を行う
10. プラグインタイプの実装構成
以下の構成を推奨します。
swpf_mqtt_import/
src/
Annotation/
MqttImportValueMapper.php
MqttImportValueMapperInterface.php
MqttImportValueMapperBase.php
MqttImportValueMapperManager.php
Plugin/
MqttImportValueMapper/
DefaultMapper.php
ThermometerMapper.php
Co2SensorMapper.php
11. アノテーション設計
11.1 アノテーションクラス
ファイル例: swpfmqttimport/src/Annotation/MqttImportValueMapper.php
役割
Drupal の annotated plugin として、各 mapper の定義情報を記述する。
推奨プロパティ
idlabelunit_typedescription
サンプル
<?php
namespace Drupal\swpf_mqtt_import\Annotation;
use Drupal\Component\Annotation\Plugin;
/**
* MQTT IMPORT value mapper plugin definition.
*
* @Annotation
*/
class MqttImportValueMapper extends Plugin {
/**
* The plugin ID.
*
* @var string
*/
public $id;
/**
* Human readable label.
*
* @var \Drupal\Core\Annotation\Translation
*/
public $label;
/**
* Target unit type.
*
* @var string
*/
public $unit_type;
/**
* Description.
*
* @var string
*/
public $description;
}
11.2 プラグイン定義の使用例
/**
* @MqttImportValueMapper(
* id = "thermometer",
* label = @Translation("Thermometer Mapper"),
* unit_type = "thermometer",
* description = "Maps thermometer payload into IOTDATAHEADER values."
* )
*/
12. インターフェース設計
ファイル例: swpfmqttimport/src/MqttImportValueMapperInterface.php
役割
すべての mapper が実装すべき共通契約を定義する。
推奨メソッド
<?php
namespace Drupal\swpf_mqtt_import;
interface MqttImportValueMapperInterface {
/**
* 対応 unit_type を返す。
*/
public function getUnitType(): string;
/**
* payload を IOTDATAHEADER 用の配列に変換する。
*
* @param array $payload
* @param array $context
*
* @return array
* [
* 'header_values' => [],
* 'derived_values' => [],
* 'meta' => [],
* ]
*/
public function mapPayload(array $payload, array $context = []): array;
}
13. ベースクラス設計
ファイル例: swpfmqttimport/src/MqttImportValueMapperBase.php
役割
共通機能を持つ抽象クラス。
持たせたい共通機能
- アノテーションから
unit_typeを読む - 共通の helper を持つ
- function 呼び出しの共通処理
- payload から存在するキーだけ拾う補助処理
サンプル
<?php
namespace Drupal\swpf_mqtt_import;
use Drupal\Component\Plugin\PluginBase;
abstract class MqttImportValueMapperBase extends PluginBase implements MqttImportValueMapperInterface {
public function getUnitType(): string {
return (string) ($this->pluginDefinition['unit_type'] ?? '');
}
protected function getPayloadValue(array $payload, array $candidateKeys, mixed $default = null): mixed {
foreach ($candidateKeys as $key) {
if (array_key_exists($key, $payload)) {
return $payload[$key];
}
}
return $default;
}
}
14. Plugin Manager 設計
ファイル例: swpfmqttimport/src/MqttImportValueMapperManager.php
役割
Plugin/MqttImportValueMapper 配下の annotated plugin を発見し、 plugin instance を生成する。
役割のポイント
unit_typeから mapper を引けるようにする- 定義一覧の取得
- キャッシュ
概略
MimamoriActionManager と同じ考え方で作成する。 違いは subdir と annotation class の名前だけです。
15. services.yml 設計
swpfmqttimport.services.yml に manager をサービス登録します。
例
services:
plugin.manager.mqtt_import_value_mapper:
class: Drupal\swpf_mqtt_import\MqttImportValueMapperManager
arguments: ['@container.namespaces', '@cache.discovery', '@module_handler']
必要なら補助サービスも追加できます。
swpf_mqtt_import.value_mapper_resolver:
class: Drupal\swpf_mqtt_import\Service\MqttImportValueMapperResolver
arguments: ['@plugin.manager.mqtt_import_value_mapper']
16. サンプルプラグイン: ThermometerMapper
ファイル例: swpfmqttimport/src/Plugin/MqttImportValueMapper/ThermometerMapper.php
16.1 目的
温度系ユニットの payload を IOTDATAHEADER 用に変換する。
16.2 thermometer で吸収するキー揺れの例
- 温度:
temp,temperature,t,temp_c - 湿度:
humi,humidity,hum,h
16.3 サンプルコード
<?php
namespace Drupal\swpf_mqtt_import\Plugin\MqttImportValueMapper;
use Drupal\swpf_mqtt_import\Annotation\MqttImportValueMapper;
use Drupal\swpf_mqtt_import\MqttImportValueMapperBase;
/**
* @MqttImportValueMapper(
* id = "thermometer",
* label = @Translation("Thermometer Mapper"),
* unit_type = "thermometer",
* description = "Maps thermometer payload into IOTDATAHEADER values."
* )
*/
class ThermometerMapper extends MqttImportValueMapperBase {
public function mapPayload(array $payload, array $context = []): array {
$headerValues = [];
$derivedValues = [];
$temp = $this->getPayloadValue($payload, ['temp', 'temperature', 't', 'temp_c']);
$humi = $this->getPayloadValue($payload, ['humi', 'humidity', 'hum', 'h']);
if ($temp !== null) {
$headerValues['value1'] = (float) $temp;
}
if ($humi !== null) {
$headerValues['value2'] = (float) $humi;
}
return [
'header_values' => $headerValues,
'derived_values' => $derivedValues,
'meta' => [
'unit_type' => $this->getUnitType(),
],
];
}
}
17. サンプルプラグイン: Co2SensorMapper
17.1 目的
CO2 センサー系の payload を変換する。
17.2 重要な考え方
CO2 センサーにも temp を持たせてよい。 ただしその temp は thermometer の主温度と完全に同一とは限らない。
そのため、同じ temp というキー名でも、unit_type ごとに保存先を変えてよい。
例
co2->value1temp->value2humi->value3
18. 摂氏・華氏変換の考え方
18.1 なぜ function が必要か
payload のキーゆらぎは MAP で吸収できますが、 単位差 は MAP だけでは吸収できません。
たとえば次の2つは意味が違います。
temp = 26.5(摂氏)temp_f = 79.7(華氏)
どちらも最終的には value1 = 摂氏温度 に揃えたい場合、 function を使って値そのものを変換する必要があります。
18.2 仕様例
入力 payload 例1: 摂氏
{{
"temp": 26.5,
"humi": 47.0
}}
入力 payload 例2: 華氏
{{
"temp_f": 79.7,
"humidity": 47.0
}}
目標
どちらも最終的に
value1= 摂氏温度value2= 湿度
へ揃える。
18.3 function を含む MAP の考え方
単純な "temp" => "value1" ではなく、 次のような定義を持つと柔軟になります。
[
'temp' => [
'column' => 'value1',
'function' => null,
],
'temp_f' => [
'column' => 'value1',
'function' => 'convertFahrenheitToCelsius',
],
'humidity' => [
'column' => 'value2',
'function' => null,
],
]
18.4 function のサンプル実装
protected function convertFahrenheitToCelsius(float $value): float {
return round(($value - 32) * 5 / 9, 2);
}
18.5 function 適用込みの例
public function mapPayload(array $payload, array $context = []): array {
$headerValues = [];
if (isset($payload['temp'])) {
$headerValues['value1'] = (float) $payload['temp'];
}
elseif (isset($payload['temp_f'])) {
$headerValues['value1'] = $this->convertFahrenheitToCelsius((float) $payload['temp_f']);
}
if (isset($payload['humi'])) {
$headerValues['value2'] = (float) $payload['humi'];
}
elseif (isset($payload['humidity'])) {
$headerValues['value2'] = (float) $payload['humidity'];
}
return [
'header_values' => $headerValues,
'derived_values' => [],
'meta' => [],
];
}
19. extraFunction 的な派生計算の扱い
既存の growplants では extraFunction を使い、
calcTempVarRatecalcEffectiveTemp
のような派生処理を行っています。
新設するマッパープラグインでも、同じ概念を持たせられます。
例
value1に温度を入れる- その温度から
tempvarrateを計算する - さらに
effective_tempを計算する
この場合、返り値の derived_values に含める設計がおすすめです。
[
'header_values' => [
'value1' => 26.5,
],
'derived_values' => [
'temp_var_rate' => 1.4,
'effective_temp' => 28.1,
],
]
20. IMPORT モジュール側の組み込み位置
20.1 最も重要な考え方
変換は 保存直前 に行うのが分かりやすいです。
つまり IMPORT の流れの中で
- topic 解析
- V1 / V2 判定
- parent uid 解決
- queue item 準備
- 保存直前に mapper plugin を選ぶ
- mapper が payload を正規化
- IOTDATAHEADER 保存
の順にします。
20.2 推奨メソッド分割
SwpfMqttImportService または QueueWorker 側に、以下の役割を追加します。
推奨メソッド
resolveUnitType(array $topicContext, array $payload): stringresolveMapperPluginId(string $unitType): stringmapPayloadForIotHeader(string $unitType, array $payload, array $context): array
20.3 resolveUnitType の考え方
V2 の場合、unitid が thermometer1 なら、 unit_type = thermometer と切り出せます。
サンプル
protected function resolveUnitType(array $topicContext, array $payload): string {
$unitId = (string) ($topicContext['unit_id'] ?? '');
if ($unitId !== '' && str_contains($unitId, '_')) {
return explode('_', $unitId, 2)[0];
}
return 'default';
}
21. V1 と V2 の扱い
21.1 V2 の場合
unit_idがある- そこから
unit_typeを取り出す - 対応する mapper plugin を呼ぶ
21.2 V1 の場合
V1 は unit_id がありません。
したがって V1 では次のいずれかが必要です。
選択肢
- device user の設定からデフォルト
unit_typeを決める - plugin 設定から
unit_typeを決める - 暫定的に
defaultmapper を使う
今回のおすすめ
最初は default または既存互換 mapper を用意し、 V1 はそこへ流す。
22. 保存サービス側の設計変更方針
現在の MimamoriUpdateIotdataService は、 ACTION プラグインから getIotDataProcessParams() を取得して 自分で変換しています。
新設計では、保存サービスは可能な限り次の責務だけに絞ります。
- 受け取った header 値を保存する
- 必要な派生値を保存する
- 保存後イベントを投げる
つまり、「何をどの value 列に入れるか」は IMPORT 側で終わらせる 方針です。
23. 既存 growplants からの移行方針
23.1 現在
- growplants が
getIotDataProcessParams()を持つ - 保存サービスがその定義を読む
23.2 移行後
- thermometer / co2_sensor / weather などの mapper plugin を IMPORT に作る
- 保存サービスは mapper 済み値を保存する
- growplants は保存後の業務処理に集中する
23.3 段階移行
完全に一度で切り替えなくてもよいです。
段階案
- MQTT_IMPORT に mapper plugin を新設
- 新しい MQTT 経路だけ mapper を使う
- 既存 REST / growplants ルートは当面維持
- 安定後に保存サービスから旧変換ロジックを削減
24. 実装手順(推奨順)
Phase 1: 基盤作成
- Annotation クラス作成
- Interface 作成
- Base クラス作成
- Plugin Manager 作成
- services.yml 登録
Phase 2: 最初の mapper 作成
DefaultMapper作成ThermometerMapper作成Co2SensorMapper作成
Phase 3: IMPORT への組み込み
- unit_type 解決処理追加
- mapper 選択処理追加
- payload 正規化処理追加
- 保存サービスへ渡す配列を変更
Phase 4: 移行
- 既存 growplants 変換との整合確認
- V2 経路で優先使用
- 必要に応じて V1 fallback 実装
25. 開発者向けの重要注意点
25.1 このプラグインは「保存前変換」である
保存後の通知や判定ではありません。 あくまで payload を IOTDATAHEADER 用に正規化するためのプラグイン です。
25.2 unit_type と model は分ける
unittype: thermometer, co2sensor などmodel: SHT30, SCD41 など
まずは unit_type で大枠を決め、 必要な場合だけ model 差分を追加します。
25.3 payload キーゆらぎは mapper が吸収する
temp, temperature, t などの違いは mapper で吸収するのが基本です。
25.4 単位差は function で吸収する
キーが違うだけでなく値の意味が違う場合は、MAP ではなく function を使います。
26. テスト観点
26.1 discovery テスト
- プラグインが正しく見つかるか
plugin.manager.mqttimportvalue_mapperから取得できるか
26.2 thermometer テスト
temp->value1temperature->value1humi->value2humidity->value2
26.3 Fahrenheit テスト
temp_f = 79.7がvalue1 = 26.5になるか
26.4 V2 組み込みテスト
unitid = thermometer1からunit_type = thermometerを引けるか- mapper を選択できるか
- 保存結果が正しいか
26.5 V1 fallback テスト
- V1 topic でも default mapper で最低限処理できるか
27. 最小構成のサンプル一覧
最初に作るなら最低でもこの3つを推奨します。
DefaultMapperThermometerMapperCo2SensorMapper
理由:
- V1 fallback が必要
- 温度系は最初に使う可能性が高い
- CO2 系は温度補助値も扱えるため設計確認に向いている
28. この設計の最終的なメリット
この新設計により、次の効果が得られます。
- IMPORT モジュールの責務が明確になる
- unit_type ごとの変換を追加しやすい
- growplants 依存を弱められる
- V2 topic 構造を活かせる
- Fahrenheit / Celsius のような現実的な差分に対応できる
- はじめて実装する開発者でも、どこに何を書くかが分かりやすい
29. 実装時の補足メモ
FetchRawDataFromMqttQueueは今回の新プラグイン追加では原則変更しないSwpfMqttImportServiceまたは QueueWorker が mapper を呼ぶ責務を持つ- 既存の
MimamoriUpdateIotdataServiceは、段階移行の間だけ旧変換ロジックを残してもよい - 最終的には「IMPORT で正規化」「保存サービスは保存のみ」に寄せる
30. 最終まとめ
今回新規作成するのは、MQTT_IMPORT モジュール専用の変換マッパープラグインタイプ です。
このプラグインは、
- MQTT payload を
- unit_type ごとに
- IOTDATAHEADER 用へ変換する
ためのものです。
設計の要点は次のとおりです。
- 変換責務は ACTION ではなく IMPORT に置く
- unit_type 主体でマッピングする
- payload キーのゆらぎは MAP で吸収する
- 単位差は function で吸収する
- 保存サービスは保存に専念させる
この設計により、今後の V2 / unitid / unittype 中心の世界へ、 無理なく移行できます。