基于 MongoDB 的智能家居數據中臺:從存儲到分析的全鏈路實現
作者:一安
在物聯網技術滲透家居場景的今天,溫度、濕度、空氣質量等高頻異構數據的處理成為智能家居系統的核心挑戰。MongoDB作為文檔型NoSQL數據庫,憑借動態schema、高吞吐、分布式擴展等特性,成為解決這類問題的優選方案。
引言
在物聯網技術滲透家居場景的今天,溫度、濕度、空氣質量等高頻異構數據的處理成為智能家居系統的核心挑戰。
MongoDB作為文檔型NoSQL數據庫,憑借動態schema、高吞吐、分布式擴展等特性,成為解決這類問題的優選方案。
架構設計:智能家居數據中臺的核心鏈路
圖片
智能家居數據中臺的核心目標是實現數據采集 - 存儲 - 分析 - 應用的閉環,整體架構分為四層,各層職責清晰且協同聯動:
數據接入層:多源設備的統一入口
- 支持溫濕度傳感器、光照檢測儀、空氣質量監測設備等多類型終端接入,通過 HTTP 接口接收異構數據。
- 提供數據格式校驗與補全機制,自動填充采集時間戳,確保數據完整性。
- 適配高頻數據上報場景,支持每秒萬級數據的并發接收與轉發。
數據存儲層:MongoDB 的優化配置
- 采用集合拆分策略,將環境數據與設備配置分離存儲,分別對應
environment_data和device_config集合。 - 針對查詢場景創建專項索引:為
collectTime字段配置TTL索引實現數據自動過期,為deviceId、room字段創建普通索引提升查詢效率。 - 支持單機部署與分片集群部署無縫切換,滿足從家庭到社區級的規模擴展需求。
數據處理層:智能分析與業務聯動
- 實現閾值校驗引擎,基于設備配置自動判斷環境指標是否超標,觸發多級報警機制。
- 提供數據聚合分析能力,支持按時間維度(小時 / 日 / 周)、空間維度(房間 / 設備)統計均值、極值等指標。
- 內置數據清洗邏輯,過濾異常值與重復數據,保障分析結果準確性。
應用服務層:面向多場景的接口封裝
- 提供數據上報、查詢、配置管理三類核心接口,支持前端可視化、設備聯動等上層應用調用。
- 接口設計遵循
RESTful規范,支持時間范圍、設備ID、房間名稱等多條件組合查詢。 - 統一返回格式與錯誤處理機制,降低前后端協作成本。
技術實現
連接池配置
spring:
data:
mongodb:
uri: mongodb://localhost:27017/smart_home_db
database: smart_home_db
connection-timeout: 30000ms
read-timeout: 10000ms
write-timeout: 10000ms
max-connection-pool-size: 50
min-connection-pool-size: 10數據模型
@Data
@Document(collection = "environment_data")
public class EnvironmentData {
@Id
private String id;
@NotBlank(message = "設備ID不能為空")
private String deviceId;
@NotBlank(message = "房間名稱不能為空")
private String room;
@NotNull(message = "環境指標不能為空")
private Map<String, Double> metrics; // 支持temperature、humidity、pm25、co2、illumination等
@Indexed(expireAfterSeconds = 2592000) // 30天自動過期
private Instant collectTime;
private String dataStatus; // 數據狀態:NORMAL(正常)、ABNORMAL(異常)、PENDING(待校驗)
}業務邏輯
@Service
@Slf4j
public class EnvironmentDataServiceImpl implements EnvironmentDataService {
@Autowired
private EnvironmentDataRepository dataRepository;
@Autowired
private DeviceConfigService configService;
@Autowired
private AlertService alertService;
// 時間格式化器(用于圖表數據)
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter
.ofPattern("yyyy-MM-dd HH:mm")
.withZone(ZoneId.systemDefault());
@Override
@Transactional
public void processAndSaveData(EnvironmentData data) {
// 1. 補全采集時間(未傳則用當前時間)
if (data.getCollectTime() == null) {
data.setCollectTime(Instant.now());
}
// 2. 校驗指標并標記數據狀態
String dataStatus = getMetricsStatus(data);
data.setDataStatus(dataStatus);
// 3. 保存數據
dataRepository.save(data);
log.info("環境數據保存成功:設備ID={}, 房間={}, 狀態={}",
data.getDeviceId(), data.getRoom(), dataStatus);
// 4. 異常數據觸發報警
if ("ABNORMAL".equals(dataStatus)) {
Optional<DeviceConfig> configOpt = configService.getConfigByDeviceId(data.getDeviceId());
configOpt.ifPresent(config -> alertService.triggerAlert(data, config));
}
}
@Override
public List<EnvironmentData> queryAbnormalData(String deviceId, Instant startTime, Instant endTime) {
// 設備ID為空則查詢所有設備的異常數據
if (StringUtils.isEmpty(deviceId)) {
// 此處簡化實現:查詢所有房間的異常數據(實際可擴展MongoRepository方法)
return dataRepository.findByRoomAndDataStatusAndCollectTimeBetween(
"", "ABNORMAL", startTime, endTime);
}
// 按設備ID+時間范圍+異常狀態查詢
return dataRepository.findByDeviceIdAndCollectTimeBetween(deviceId, startTime, endTime).stream()
.filter(data -> "ABNORMAL".equals(data.getDataStatus()))
.collect(Collectors.toList());
}
@Override
public List<ChartDataDTO> getChartData(String room, String metricType, Instant startTime, Instant endTime) {
// 1. 查詢指定房間、時間范圍的數據
List<EnvironmentData> dataList = dataRepository.findByRoomAndDataStatusAndCollectTimeBetween(
room, "NORMAL", startTime, endTime);
// 2. 轉換為圖表所需格式(時間字符串+指標值)
return dataList.stream()
.map(data -> {
ChartDataDTO dto = new ChartDataDTO();
// 格式化時間
dto.setTime(DATE_TIME_FORMATTER.format(data.getCollectTime()));
// 獲取指定指標值(無則設為0.0)
Double metricValue = data.getMetrics().getOrDefault(metricType, 0.0);
dto.setValue(metricValue);
return dto;
})
.collect(Collectors.toList());
}
/**
* 校驗指標是否超標,返回數據狀態
*/
private String getMetricsStatus(EnvironmentData data) {
Optional<DeviceConfig> configOpt = configService.getConfigByDeviceId(data.getDeviceId());
// 無配置則設為“待校驗”
if (!configOpt.isPresent()) {
return"PENDING";
}
DeviceConfig config = configOpt.get();
Map<String, Double> thresholds = config.getThresholdConfig();
Map<String, Double> metrics = data.getMetrics();
// 遍歷指標,判斷是否有超標項
for (Map.Entry<String, Double> metricEntry : metrics.entrySet()) {
String metricKey = metricEntry.getKey();
Double metricValue = metricEntry.getValue();
Double threshold = thresholds.get(metricKey);
if (threshold != null && metricValue > threshold) {
log.warn("指標超標:設備ID={}, 指標={}, 當前值={}, 閾值={}",
data.getDeviceId(), metricKey, metricValue, threshold);
return"ABNORMAL";
}
}
return"NORMAL";
}
}聚合分析工具類
@Component
public class DataAggregationUtil {
@Autowired
private MongoTemplate mongoTemplate;
// 按時間區間統計房間指標均值
public Map<String, Object> calculateRoomMetricAvg(String room, String metricType, Instant startTime, Instant endTime) {
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(Criteria.where("room").is(room)
.and("collectTime").gte(startTime).lte(endTime)),
Aggregation.group("room")
.avg("metrics." + metricType).as("averageValue"),
Aggregation.project("averageValue").and("room").previousOperation()
);
AggregationResults<MetricAvgDTO> results = mongoTemplate.aggregate(
aggregation, "environment_data", MetricAvgDTO.class);
MetricAvgDTO result = results.getUniqueMappedResult();
return result != null ? Map.of("room", result.getRoom(), "averageValue", result.getAverageValue()) : Map.of();
}
}
@Data
class MetricAvgDTO {
private String room;
private Double averageValue;
}數據查詢接口
@RestController
@RequestMapping("/api/smarthome/data")
@Validated
@Slf4j
public class DataController {
@Autowired
private EnvironmentDataService dataService;
@Autowired
private DataAggregationUtil aggregationUtil;
// 上報環境數據
@PostMapping("/report")
public ResponseEntity<String> reportEnvironmentData(@Valid @RequestBody EnvironmentData data) {
dataService.processAndSaveData(data);
return ResponseEntity.ok("數據上報成功");
}
// 查詢異常數據
@GetMapping("/abnormal")
public ResponseEntity<List<EnvironmentData>> getAbnormalData(
@RequestParam(required = false) String deviceId,
@RequestParam @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") Instant startTime,
@RequestParam @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") Instant endTime) {
List<EnvironmentData> abnormalData = dataService.queryAbnormalData(deviceId, startTime, endTime);
return ResponseEntity.ok(abnormalData);
}
// 統計房間指標平均值
@GetMapping("/room/metric/avg")
public ResponseEntity<Map<String, Object>> getRoomMetricAvg(
@RequestParam @NotBlank String room,
@RequestParam @NotBlank String metricType,
@RequestParam @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") Instant startTime,
@RequestParam @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") Instant endTime) {
Map<String, Object> avgData = aggregationUtil.calculateRoomMetricAvg(room, metricType, startTime, endTime);
return ResponseEntity.ok(avgData);
}
// 獲取圖表可視化數據
@GetMapping("/chart")
public ResponseEntity<List<ChartDataDTO>> getChartData(
@RequestParam @NotBlank String room,
@RequestParam @NotBlank String metricType,
@RequestParam @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") Instant startTime,
@RequestParam @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") Instant endTime) {
List<ChartDataDTO> chartData = dataService.getChartData(room, metricType, startTime, endTime);
return ResponseEntity.ok(chartData);
}
}異常處理機制
@RestControllerAdvice
@Slf4j
public class GlobalExceptionHandler {
@ExceptionHandler(MongoException.class)
public ResponseEntity<String> handleMongoException(MongoException e) {
log.error("MongoDB操作異常:", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("數據處理失敗:" + e.getMessage());
}
@ExceptionHandler(ConstraintViolationException.class)
public ResponseEntity<String> handleValidationException(ConstraintViolationException e) {
String errorMsg = e.getConstraintViolations().stream()
.map(ConstraintViolation::getMessage)
.collect(Collectors.joining(";"));
return ResponseEntity.badRequest().body("參數校驗失敗:" + errorMsg);
}
}多渠道報警實現
@Service
public class AlertService {
@Autowired
private JavaMailSender mailSender;
// 郵件報警
public void sendEmailAlert(EnvironmentData data, DeviceConfig config) {
SimpleMailMessage message = new SimpleMailMessage();
message.setTo("user@example.com");
message.setSubject("智能家居環境異常報警");
message.setText(String.format("設備%s(%s)%s指標超標:當前值=%.2f,閾值=%.2f",
data.getDeviceId(), data.getRoom(),
getOverThresholdMetric(data.getMetrics(), config.getThresholdConfig()),
getOverThresholdValue(data.getMetrics(), config.getThresholdConfig()),
getThreshold(data.getMetrics(), config.getThresholdConfig())));
mailSender.send(message);
}
// APP推送報警(對接極光推送等第三方服務)
public void sendAppAlert(EnvironmentData data, DeviceConfig config) {
// 第三方推送API調用邏輯
}
}驗證
圖片

責任編輯:武曉燕
來源:
一安未來





























