
作者 | 崔皓
審校 | 重樓
開篇
對多數運維工程師而言,日常工作更像一場 “被動的消防演練”—— 緊盯著監控面板上跳動的 CPU 使用率、內存占用、磁盤容量與網絡流量,等數值觸達告警閾值時,再匆忙排查問題、擴容資源、處理故障。這種 “盯數據 - 等告警 - 忙救火” 的模式,看似能保障系統穩定,卻藏著難以規避的痛點:當業務高峰期突然來臨,CPU 使用率驟升導致服務卡頓;當磁盤空間在深夜悄然耗盡,核心業務中斷才觸發告警;當網絡流量突發峰值沖垮帶寬,用戶投訴已堆積成山……
我們總在事后補救,卻很少能提前回答:“1 小時后 CPU 會不會過載?”“3 天后磁盤空間是否夠用?”“下周這個時段網絡流量會不會突破閾值?” 并非運維工程師不愿主動預判,而是傳統運維中,既缺乏能捕捉系統指標時間規律的工具,也沒有低成本落地的預測方法 —— 直到 NeuralProphet 的出現,讓 “提前預測運維數據” 從復雜的 AI 課題,變成了像搭樂高積木一樣可落地的實踐。
想象一下用樂高積木搭建一個模型:每一塊積木都有特定的形狀和功能,有的負責搭建底座(對應數據的基礎趨勢),有的負責拼接循環結構(對應指標的周期性波動),有的負責填補細節(對應突發異常的修正),將這些積木按邏輯組合,就能從零散部件變成完整的、可復用的模型。NeuralProphet 處理運維時間序列數據(如 CPU 使用率、內存波動)的方式,與此幾乎完全一致。
它本質上是 Facebook 經典預測模型 Prophet 的 “升級版”——Prophet 曾因將復雜時間序列拆解為 “趨勢、周期、節假日” 等可解釋模塊而風靡運維圈,但在面對運維數據的 “短期高頻波動”(如每 10 分鐘一次的 CPU 驟升)時,常因缺乏 “局部上下文” 建模能力導致預測偏差;同時,其基于 Stan 的后端架構,也讓普通運維工程師難以根據實際場景調整參數。
而 NeuralProphet 的誕生,正是為了解決這些痛點。它完整保留了 Prophet “模塊化拆解數據” 的核心優勢 —— 比如將 CPU 使用率數據拆成 “長期增長趨勢(業務擴容導致的使用率緩步上升)”“日周期波動(早 9 晚 6 的辦公高峰)”“周周期波動(工作日與周末的負載差異)” 等獨立模塊,讓非 AI 背景的運維人員也能看懂預測邏輯;同時,它通過引入 “自回歸組件”(能捕捉近 1 小時內的短期波動)和 “PyTorch 后端”(支持靈活調參與輕量化部署),完美彌補了 Prophet 的短板,既能精準預測下 10 分鐘的 CPU 峰值,也能適配從邊緣服務器到云集群的不同運維場景。
上面說了這么多,其實就是一句話:NeuralProphet 非常好用,我們可以用NeuralProphet 模型來預測系統指標(CPU、內存、磁盤、網絡)。
那么如何用 NeuralProphet 這個模型呢, 我的思路也比較簡單粗暴, 如下圖所示。
為了大家理解方便,我們舉個簡單的例子,我們使用“ 8 月份的數據”(CPU 等)來預測“下一個小時的數據”,然后把“8 月份的數據”+“下一個小時的數據”預測“再下一個小時的數據”。依次類推,有點俄羅斯套娃的感覺,實際情況也是如此。CPU 的使用率數據會在系統中不斷產生,有了歷史數據可以幫助我們預測下個時間段(一小時)的數據,同時下個時間段的數據也會成為歷史數據,為后面數據的預測發光發熱。

好了,有了目標接下來就好辦了,整個實戰案例的思路如下圖所示。
首先,生成歷史數據,也就是真實的系統指標數據,模擬真實數據分為 CPU、內存、磁盤、網絡,每 10 分鐘生成一條數據。
接著,利用已經生成的歷史數據訓練模型,模型就用NeuralProphet,生成的模型保存備用。
然后,利用訓練好的模型預測下一個小時的系統指標數據,例如:現在時間是 9 月 1 日的 00:00:00, 我們要預測 從 00:00:00 到 01:00:00 的 CPU 使用率,由于預測數據也是 10 分鐘一條,所以會生成 6 條 CPU 使用率的數據。

最終的效果如下,藍色的線條為實際數據,橙色線條為預測值,紅色的文字為誤差比,也就是實際值與預測值之間存在的差距。

NeuralProphet 的核心理念
好了,說明了目的(系統數據預測)以及方法(利用 NeuralProphet 預測)之后,需要介紹主角NeuralProphet。
NeuralProphet 的核心理念非常直觀:最終的預測值為“獨立組件”預測值的總和。簡單來說,多個“獨立組件”等同于從多維度思考。把不同維度預測數據的模塊得到的結果加起來就是全面的預測。你可以想象有一個復雜問題,讓多個不同的專家一起思考提出方案,然后整合他們的方案得到最終方案。
但是這些預測模型或者說思考方案總得通過一個公式表示一下, 要不我們也不好描述,于是就有了下面的公式。:
?t = T(t) + S(t) + E(t) + F(t) + A(t) + L(t)
看到公式這么復雜,我是有點懵的,用一個列表表示,通過“說人話”對其進行解釋。
組件符號 | 簡單描述 |
T(t) | 趨勢 (Trend):數據隨時間變化的長期基礎走向(例如,增長或下降)。 |
S(t) | 季節性 (Seasonality):在固定周期內重復出現的模式(例如,每周、每年的周期)。 |
E(t) | 事件 (Events):特殊日期(如節假日)對數據產生的短期影響。 |
F(t) | 未來回歸量 (Future Regressors):其未來值已知的外部變量(例如,已計劃的營銷活動)。 |
A(t) | 自回歸 (Auto-Regression):近期歷史觀測值對未來值的直接影響。 |
L(t) | 滯后回歸量 (Lagged Regressors):其未來值未知的外部變量(例如,昨天的天氣)。 |
下面再花一點點篇幅對各個組件進行介紹,特別是自回歸。如果對這部分不感興趣或者已經有所了解的同學,可以自行跳到實戰環節,從“安裝依賴”開始看。
趨勢 (T(t)):整體走向
趨勢組件捕捉的是時間序列總體、長期的發展方向。為了使趨勢線能夠適應現實世界中的變化,NeuralProphet 引入了變化點 (changepoints) 的概念。假設你在開車,正在直線行駛,路上的一個轉彎,你的方向(或速度)就要根據這個轉彎發生變化,這個轉彎就是變化點 (changepoints) 。
NeuralProphet 將趨勢建模為一個“分段線性”序列。數據變化的趨勢基本就是一條直線,直線可以在變化點 (changepoints)改變方向。讓模型識別在數據中反復出現的變化模式,從而知道在變化點轉彎--改變方向。

線性

分段線性
季節性 (S(t)):周期性節律
季節性是指在固定時期內發生的可預測、重復的模式,重點是可預測和重復。比如:勞動節、兒童節、春節人們會更多出游購物,零售店的銷售額通常在周末達到高峰。冰淇淋的銷量在夏季會顯著增加,等等。
NeuralProphet 利用傅里葉項(本質上是正弦和余弦函數的組合)使模型能夠同時捕捉多種季節性,例如,一個模型可以同時識別出數據中的每日、每周和年度模式。
此外,季節性還具備如下特性:
? 加法性 (Additive) 季節性:一家冰淇淋店每到夏天,銷量總是在平日基礎上固定增加50份,這個增量不受公司規模變化的影響。
? 乘法性 (Multiplicative) 季節性:一家電商公司每逢節假日的銷售額,總是能達到當年平均水平的兩倍,因此公司規模越大,節假日帶來的銷量增長就越多。
自回歸 (A(t)):近期歷史的影響
自回歸 (Auto-Regression, AR) 在短期預測方面表現突出。在介紹自回歸之前先說說什么是回歸,回歸是通過外部變量(如促銷活動)與目標指標(CPU 使用率)的關系,量化外部因素對目標的影響,比如 “電商平臺的促銷能讓服務器的 CPU 負載提升 15%”;而自回歸是回歸的特殊形式,不依賴外部變量,僅通過目標指標自身的歷史數據(如過去 1 小時的 CPU 使用率)預測未來值,核心是捕捉 “歷史慣性”—— 比如 “上 10 分鐘 CPU 使用率超 80%,下 10 分鐘大概率維持高負載”,也就是 “同一變量過去影響未來” 。
所以,自回歸的核心思想:“最近發生的事情是對接下來會發生的事情產生影響?!?模型會回顧過去特定數量的數據點,這些數據點被稱為“滯后項 (lags)”。例如,如果我們使用 5 個滯后項,模型就會查看最近的 5 個觀測值來幫助預測下一個值。
好了到這里,可能有人感覺趨勢和自回歸有點像,都是通過歷史預測未來,所以我們停下來,給他們做一個小區分。趨勢是運維指標長期的宏觀走向(比如業務增長導致 CPU 使用率每月穩步上升 2%),自回歸則是指標短期的實時動態(比如上 10 分鐘 CPU 突升 15%,下 10 分鐘大概率維持高負載)。
NeuralProphet 的 AR 模塊基于一個名為 AR-Net 的架構,它可以配置為兩種模式,兼具簡單性和強大的功能:
? 線性 AR:這是一種簡單直接的方法。它假設每個過去的值都對預測有一個直接的、加權的線性影響。這種方式非常容易解釋,你可以清楚地看到每個滯后項對預測的貢獻大小。
? 深度 AR:這是一種更高級的方法,它使用一個小型神經網絡(即 AR-Net)來發現過去值與未來預測之間復雜的、非線性的關系。這通??梢蕴岣哳A測的準確性,但其內部工作機制不如線性 AR 那樣易于直接解讀。
回歸量與事件:外部影響因素
回歸量是幫助預測目標的外部變量,可以理解為“外援”。例如:“要預測冰淇淋銷量(目標),了解每日溫度(“外援”=回歸量)會非常有幫助。”
NeuralProphet 可以處理多種類型的外部影響因素,下表對它們進行了比較:
類型 | 關鍵特征 | 簡單示例 |
滯后回歸量 | 歷史值已知,未來值未知。功能上與自回歸模塊相同,但使用外部變量作為輸入。 | 利用昨天的天氣數據來預測今天的能源消耗。 |
未來回歸量 | 歷史和未來值都已知。 | 在銷售預測中包含已提前計劃好的市場營銷活動的日期。 |
事件與節假日 | 特殊的一次性或重復性日期。它們被建模為二元變量,并且可以自動包含特定國家的節假日。 | 對每年“黑色星期五”購物節期間出現的銷售高峰進行建模。 |
上面已經了解了所有的構建模塊,NeuralProphet模型就是將每個獨立組件生成的預測值(趨勢 + 季節性 + 自回歸 + 回歸量等)全部加在一起,得到最終的綜合預測值,可以理解為“匯總專家意見”。這種方法的最大好處是可解釋性 (explainability)。由于每個組件可以獨立建模,用戶可以單獨繪制每個組件的圖表,從而確切地了解是哪個因素在影響最終預測結果。
安裝依賴
好了理論的部分已經講完,如果沒有聽懂也不要緊,直接實操幫助理解,如果還是沒有聽懂,那就回到開頭再看一遍。接下來,我們需要安裝必要的環境,先確認已安裝conda,conda 是老演員了,安裝的鏈接放到這里,https://anaconda.org/anaconda/conda。
為了本次實踐創建虛擬環境,如下:
conda create -n forecast python=3.12創建完畢之后,啟用虛擬環境,如下:
conda activate forecast然后再虛擬環境中,安裝相關的依賴包,如下:
# 基于 PyTorch 的時間序列預測工具,用來訓練模型并進行預測
pip install neuralprophet
# 用于處理Excel的庫,演示用數據保存在Excel
pip install openpyxl
# 用于構建網頁應用界面
pip install streamlit生成“歷史數據”
安裝完了環境,我們開始造一些數據,方便后面測試,在造數據之前先回顧一下我們要做的事情,如下圖所示。

一般而言,我們在預測未來的系統指標的時候,需要利用歷史數據,未來演示的需要我們用代碼生成所謂的“歷史數據”。在真實的場景中,大家可以按照我后面說的數據格式,將系統指標(CPU、內存、磁盤、網絡)的信息填入到 XSL 中,為后面訓練預測模型做好準備。在forecast目錄中新建python腳本。
cd /forecast
# 直接編輯,在保存時會生成文件
vim history_data.py
import numpy as np
import pandas as pd
# ============================
# 參數配置
# ============================
start_time = pd.Timestamp("2025-08-01 00:00:00")
end_time = pd.Timestamp("2025-08-31 23:59:59")
rng = pd.date_range(start=start_time, end=end_time, freq="10min")
periods = len(rng)
np.random.seed(42) # 保證可復現
# ============================
# 輔助函數:日內波動模式
# ============================
def daily_pattern(minutes, amplitude=1.0, phase_shift=0):
radians = (minutes / (24 * 60)) * 2 * np.pi
return 0.5 + 0.5 * np.sin(radians - 0.3 + phase_shift)
minutes_from_start = (np.arange(periods) * 10) % (24 * 60)
weekday = rng.weekday # 0=周一
hours = rng.hour
# ============================
# CPU
# ============================
cpu_base = 10 + 5 * (np.random.rand(periods) - 0.5)
cpu_daily = 25 * daily_pattern(minutes_from_start)
cpu = cpu_base + cpu_daily * np.where(weekday < 5, 1.0, 0.7)
spike_mask = (np.random.rand(periods) < 0.005) # 偶發高峰
cpu[spike_mask] += np.random.uniform(20, 60, size=spike_mask.sum())
# ============================
# 內存
# ============================
mem_base = 50 + 10 * np.sin((np.arange(periods) / periods) * 2 * np.pi)
mem_noise = np.random.normal(0, 2.0, size=periods)
mem_weekday_effect = np.where(weekday < 5, 1.02, 0.99)
memory_used = mem_base * mem_weekday_effect + mem_noise
mem_spike_mask = (np.random.rand(periods) < 0.002)
memory_used[mem_spike_mask] += np.random.uniform(5, 20, size=mem_spike_mask.sum())
memory_used = np.clip(memory_used, 10, 99.5)
# ============================
# 磁盤使用率
# ============================
disk_start, disk_end = 60.0, 62.0
disk_trend = np.linspace(disk_start, disk_end, periods)
disk_noise = np.random.normal(0, 0.05, size=periods)
disk_used = np.clip(disk_trend + disk_noise, 20, 99.9)
# ============================
# 網絡流量
# ============================
net_in_base = 5 * daily_pattern(minutes_from_start) + 1.0
net_out_base = 3 * daily_pattern(minutes_from_start) + 0.5
lunch_mask = ((hours >= 11) & (hours <= 13))
net_in = net_in_base * (1.2 * np.where(weekday < 5, 1, 0.9))
net_out = net_out_base * (1.15 * np.where(weekday < 5, 1, 0.9))
net_in[lunch_mask] *= 1.3
net_out[lunch_mask] *= 1.2
burst_mask = (np.random.rand(periods) < 0.003)
net_in[burst_mask] += np.random.uniform(20, 200, size=burst_mask.sum())
net_out[burst_mask] += np.random.uniform(5, 80, size=burst_mask.sum())
# ============================
# 組合 DataFrame
# ============================
df_aug = pd.DataFrame({
"ds": rng,
"cpu": np.round(np.clip(cpu, 0, 100), 2),
"memory": np.round(memory_used, 2),
"disk": np.round(disk_used, 2),
"net_in": np.round(net_in, 3),
"net_out": np.round(net_out, 3)
})
# ============================
# 每小時統計(平均、最大、最小)
# ============================
df_hourly = df_aug.set_index("ds").resample("1h").agg(
{
"cpu": ["mean", "max", "min"],
"memory": ["mean", "max", "min"],
"disk": ["mean", "max", "min"],
"net_in": ["mean", "max", "min"],
"net_out": ["mean", "max", "min"],
}
)
df_hourly.columns = ["_".join(col).strip() for col in df_hourly.columns.values]
df_hourly.reset_index(inplace=True)
# ============================
# 為每個指標創建單獨的sheet(ds:日期,y:值)
# ============================
# CPU sheet
df_cpu = pd.DataFrame({
"ds": rng,
"y": cpu
})
# Memory sheet
df_memory = pd.DataFrame({
"ds": rng,
"y": memory_used
})
# Disk sheet
df_disk = pd.DataFrame({
"ds": rng,
"y": disk_used
})
# Network In sheet
df_net_in = pd.DataFrame({
"ds": rng,
"y": net_in
})
# Network Out sheet
df_net_out = pd.DataFrame({
"ds": rng,
"y": net_out
})
# ============================
# 導出到 Excel
# ============================
output_file = "server_metrics_2025_08.xlsx"
with pd.ExcelWriter(output_file, engine="openpyxl") as writer:
df_aug.to_excel(writer, index=False, sheet_name="10min_data")
df_hourly.to_excel(writer, index=False, sheet_name="hourly_stats")
# 添加各個指標的單獨sheet
df_cpu.to_excel(writer, index=False, sheet_name="cpu")
df_memory.to_excel(writer, index=False, sheet_name="memory")
df_disk.to_excel(writer, index=False, sheet_name="disk")
df_net_in.to_excel(writer, index=False, sheet_name="net_in")
df_net_out.to_excel(writer, index=False, sheet_name="net_out")
print(f"? 數據已生成并導出到 {output_file}")代碼詳細內容如下:
- 參數配置:設定 2025 年 8 月 1 日至 31 日的時間范圍,按 10 分鐘間隔生成時間序列,固定隨機種子確保數據可復現。
- 函數定義:編寫日內波動模式函數,通過三角函數計算,模擬指標一天內隨分鐘變化的周期性波動規律。
- 時間計算:算出每個時間點對應的起始分鐘數、星期幾和小時數,為后續指標模擬提供時間維度數據。
- CPU 生成:先確定 CPU 基礎值范圍,疊加日內波動,按工作日和周末調整幅度,再用隨機掩碼添加偶發高峰,限制值在 0-100%。
- 內存生成:以正弦曲線為內存基礎趨勢,疊加隨機噪聲,結合工作日差異調整,加偶發峰值后限制在 10%-99.5%。
- 磁盤生成:設定磁盤使用率從 60.0 到 62.0 的線性增長趨勢,疊加小噪聲,將值限制在 20%-99.9%。
- 網絡生成:為網絡流入、流出設定日內波動基礎值,按工作日和午餐時段調整,用隨機掩碼加突發流量,確保值非負。
- 組合數據:將所有指標數據與時間序列組合,生成含完整指標的 DataFrame,保留指定小數位數。
- 小時統計:對原始 10 分鐘數據按小時重采樣,計算各指標每小時的均值、最大值、最小值,整理成統計 DataFrame。
- 單表創建:為 CPU、內存等 5 個指標各建單獨 DataFrame,僅含 “時間(ds)” 和 “指標值(y)” 兩列。
- 導出 Excel:用 ExcelWriter 將所有數據導出到 “server_metrics_2025_08.xlsx”,包含多類數據表。
- 完成提示:打印數據已生成并導出到指定文件的提示信息。
執行如下命令,命令完成之后在python腳本所在目錄生成歷史數據的excel文件。
python history_data.py如下圖所示,會在 history_data.py 相同的目錄下生成“server_metrics_2025_08.xlsx”的文件。

如下圖所示,在生成的excel中的 “ds”會顯示采集時間,我們按照 10 分鐘一次采集數據,同時在“y”列顯示的是 CPU 的使用率。

在上圖的下部的“sheet”中,列出了不同維度的數據,按從左到右的順序分別是:
- 10min_data:包含所有指標的數據
- hourly_stats:所有指標按小時統計數據
- cpu:cpu使用率
- memory:內存使用率
- disk:磁盤使用率
- net_in:網絡入流量
- net_out:網絡出流量
訓練模型
我們通過代碼的方式生成了系統指標的歷史數據,接著會根據這些數據訓練模型。
在forecast目錄中新建python腳本,如下:
cd /forecast
# 直接編輯,在保存時會生成文件
vim train_model.py下面代碼用來訓練歷史數據,這里只選擇 cpu 進行訓練, 其他的 sheet 中存放了內存、磁盤和網絡信息,在代碼中已經 remark 上了,有興趣的同學可以自行訓練。
from neuralprophet import NeuralProphet
import pandas as pd
import warnings
from neuralprophet import save
# 忽略警告
warnings.filterwarnings('ignore')
# 1. 讀取Excel數據
excel_path = 'server_metrics_2025_08.xlsx'
sheet_name = 'cpu'
# sheet_name = 'memory'
# sheet_name = 'disk'
# sheet_name = 'net_in'
# sheet_name = 'net_out'
df = pd.read_excel(excel_path, sheet_name=sheet_name)
# 只保留 ds 和 y 列
df = df[['ds', 'y']]
# 2. 初始化并訓練模型
m = NeuralProphet(
changepoints_range=0.8, # 只在前80%歷史數據中尋找變點,避免對最新數據過擬合
trend_reg=1, # 適度正則化,防止趨勢過度擬合
seasonality_reg=0.5, # 適度正則化,防止季節性過擬合
n_lags=144, # n_lags越大,可用訓練樣本越少
ar_reg=0.7, # 輕微正則化自回歸系數,防止過擬合
n_forecasts=6, # 預測的步數,10分數間隔的數據生成6條
collect_metrics={ "MAE": "MeanAbsoluteError", "MAPE": "MeanAbsolutePercentageError" },
)
# 3. 分割訓練集和驗證集
df_train, df_val = m.split_df(df, valid_p=0.2)
# 4. 訓練模型
m.fit(df_train, validation_df=df_val, freq="10min")
# 5. 保存模型到本地目錄
save(m, sheet_name + ".np")這里稍微對代碼部分做解釋;
- 導入工具:引入 NeuralProphet 模型、pandas 數據處理庫、警告忽略工具及模型保存函數。
- 忽略警告:關閉程序運行中的警告提示,使輸出更簡潔。
- 讀取數據:從 “server_metrics_2025_08.xlsx”Excel 文件中讀取指定工作表(默認 cpu,可切換為 memory、disk 等)的數據。
- 數據篩選:僅保留數據中的 “ds”(時間)和 “y”(指標值)兩列,符合 NeuralProphet 模型要求的輸入格式。
- 模型初始化:創建 NeuralProphet 模型實例,配置核心參數 —— 在前 80% 數據中尋找趨勢變點,對趨勢和季節性特征進行適度正則化,設置 144 個歷史滯后項捕捉自回歸效應,輕微正則化自回歸系數,指定預測 6 個時間步(10 分鐘間隔),并收集 MAE 和 MAPE 評估指標。
- 數據分割:將數據集按 8:2 比例拆分為訓練集(df_train)和驗證集(df_val),用于模型訓練和性能驗證。
- 模型訓練:使用訓練集訓練模型,同時用驗證集評估效果,指定數據時間間隔為 10 分鐘。
- 保存模型:將訓練好的模型以 “指標名稱.np”(如 cpu.np)的格式保存到本地,便于后續加載使用。
預測系統指標
上面的代碼會生成一個 np 為后綴的文件,.np 文件是一個模型檢查點文件,它是一個包含了模型完整狀態的快照,其核心內容是模型在訓練后學到的所有參數(權重),以及恢復訓練所必需的模型配置和優化器狀態。前面通過命令pip install neuralprophet 安裝neuralprophet 的基礎模型,此時只需要通過歷史數據訓練成.np 的模型檢查點文件,再將這個文件與之前安裝的neuralprophet 基礎模型進行合并,就是完整的模型了,再用這個完整的模型預測外來數據。只不過,開發者看到的是.np 的文件,而不用關心模型是如何合并以及預測的,這一系列操作都是neuralprophet 框架幫我完成了。接下來,我們來生成一個 UI 界面,對外來數據進行預測,并且展示對應內容。
在forecast目錄中新建python 代碼,代碼中的 UI 界面 streamlit 生成。
cd /forecast
# 直接編輯,在保存時會生成文件
vim app.py由于篇幅關系,我們只展示部分核心代碼,如下:
import streamlit as st
import pandas as pd
import plotly.graph_objects as go
from neuralprophet import load
import warnings
import numpy as np
from datetime import datetime, timedelta
import sys
def generate_forecast(df, sheet_name):
"""
生成預測的獨立函數
按步數遍歷,步數1取yhat1,步數2取yhat2...
所有預測值都放到yhat1列中
"""
try:
# 在預測函數中僅保留'ds'和'y'列用于模型訓練
model_df = df[['ds', 'y']].copy()
# 加載預訓練模型
model_path = f"{sheet_name}.np"
model = load(model_path)
# 生成預測
future_df = model.make_future_dataframe(model_df, n_historic_predictions=False)
forecast_result = model.predict(future_df, decompose=False)
print(forecast_result)
# 獲取最后一個數據點
last_data_point = model_df['ds'].max()
# 處理預測結果
forecast_long = []
last_actual_idx = forecast_result['y'].last_valid_index() if 'y' in forecast_result.columns else -1
start_idx = last_actual_idx + 1 if last_actual_idx + 1 < len(forecast_result) else 0
# 生成預測步驟,按步數取對應yhat,統一放到yhat1列
for step in range(1, FORECAST_STEPS + 1):
yhat_col = f'yhat{step}' # 步數1取yhat1,步數2取yhat2...
current_idx = start_idx + (step - 1)
if current_idx >= len(forecast_result):
current_idx = len(forecast_result) - 1
st.warning(f"預測數據不足,使用最后一條數據預測第{step}步")
if current_idx < len(forecast_result):
date = forecast_result.loc[current_idx, 'ds']
if pd.notna(date) and yhat_col in forecast_result.columns and pd.notna(forecast_result.loc[current_idx, yhat_col]):
forecast_time = last_data_point + step * FORCE_TIME_INTERVAL
# 所有預測值都放到yhat1列
forecast_entry = {
'ds': forecast_time,
'yhat1': forecast_result.loc[current_idx, yhat_col],
'step': step
}
forecast_long.append(forecast_entry)
else:
st.warning(f"第{step}步預測值缺失,使用歷史平均值")
forecast_time = last_data_point + step * FORCE_TIME_INTERVAL
avg_value = model_df['y'].mean()
forecast_entry = {
'ds': forecast_time,
'yhat1': avg_value,
'step': step
}
forecast_long.append(forecast_entry)
else:
forecast_time = last_data_point + step * FORCE_TIME_INTERVAL
avg_value = model_df['y'].mean()
forecast_entry = {
'ds': forecast_time,
'yhat1': avg_value,
'step': step
}
forecast_long.append(forecast_entry)
# 轉換為DataFrame
forecast_long = pd.DataFrame(forecast_long)
forecast_merged = forecast_long.merge(df[['ds', 'y']], on='ds', how='left')
return forecast_merged
except Exception as e:
st.error(f"預測出錯: {str(e)}")
st.info("可能的原因:模型文件不存在、數據格式不正確或模型訓練不完整")
return None代碼的主要內容如下:
- 數據篩選:從輸入的歷史數據中,僅保留 “ds(時間列)” 和 “y(指標值列)” 并復制,形成模型可識別的輸入數據(model_df)。
- 加載模型:cpu.np,通過 NeuralProphet 的 load 函數加載本地預訓練好的模型。
- 生成預測數據:調用模型的 make_future_dataframe 方法,基于歷史數據創建未來預測所需的時間框架(關閉 “包含歷史預測” 功能,僅生成未來數據);再用 predict 方法生成預測結果,關閉 “成分分解” 功能(不拆分趨勢、季節性等組件),并打印預測結果供調試查看。
- 確定時間基準:提取歷史數據中最新的時間點(last_data_point),作為后續計算 “未來預測時間” 的基準。
- 初始化存儲列表:創建空列表 forecast_long,用于暫存每一步的預測結果;同時找到歷史數據在預測結果中的最后有效索引(last_actual_idx),確定未來預測的起始位置(start_idx)。
- 循環生成預測:按設定的預測步數(FORECAST_STEPS)循環,每一步對應一個預測列(如第 1 步取 yhat1、第 2 步取 yhat2):
a. 計算當前步在預測結果中的索引(current_idx),若索引超出預測結果范圍,自動調整為最后一條數據的索引,并通過 Streamlit 給出 “預測數據不足” 的警告;
b. 若索引有效,先獲取對應時間,若 “時間非空、預測列存在、預測值非空”,則以 “歷史最新時間 + 當前步數 × 時間間隔(FORCE_TIME_INTERVAL)” 為預測時間,將 “時間、該步預測值(存入 yhat1)、步數” 組成字典存入列表;
c. 若預測值缺失或時間無效,通過 Streamlit 給出 “預測值缺失” 的警告,用歷史指標的平均值作為預測值,同樣按上述格式存入列表;
d. 若索引無效(極端情況),直接用歷史平均值作為預測值,生成對應時間和數據存入列表。
然后,通過如下命令執行應用查看結果。
streamlit run app.py如下圖所示,在彈出的頁面中選擇“cpu”,以及展示的開始和結束時間,然后點擊“生成預測”。在右邊的界面中會展示預測的曲線(橙色),這就是我們利用 8 月份 CPU 使用率的模型預測出來,9 月 1 日 00:00 到 02:50 的數據,每 10 分鐘產生一個數據點。

如果把鼠標放到任何一個數據點上,可以看到如下圖所示的紅色字體“誤差比”,描述了預測值與實際值之間的誤差情況。

接入LLM進行對話
好,到這里,數據預測的功能完成了,接著來到最后一步,我們可以與歷史數據進行“對話”,讓大模型基于數據給出運維存在的風險,并提出建議。
接下來,還是開始實戰操作,先安裝一些依賴。
# python-dotenv:可以使用load_dotenv從同目錄的.env文件中讀取配置
pip install python-dotenv openai由于要使用 DeepSeek 模型,所以要創建.env文件,用來保存 DeepSeek API Key。
DEEPSEEK_API_KEY=sk-*********新建config.py文件,用于管理配置參數,包括:項目根目錄(使用 Path 獲取當前文件所在目錄)、Excel數據文件的路徑(指向服務器指標數據文件)、需要監控的服務器資源類型列表(CPU、內存、磁盤、網絡入流量和出流量),以及這些資源類型的中文顯示映射(便于用戶界面展示),同時還定義了日期時間的標準格式化模式。
import os
from pathlib import Path
# 項目根目錄
ROOT_DIR = Path(__file__).parent
# Excel文件路徑
EXCEL_PATH = os.path.join(ROOT_DIR, 'server_metrics_2025_08.xlsx')
# 資源類型列表
RESOURCE_TYPES = ['cpu', 'memory', 'disk', 'net_in', 'net_out']
# 資源類型中文映射
RESOURCE_TYPE_MAP = {
'cpu': 'CPU使用率',
'memory': '內存使用率',
'disk': '磁盤使用率',
'net_in': '網絡入流量',
'net_out': '網絡出流量'
}
# 時間格式
DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'
DATE_FORMAT = '%Y-%m-%d'新建logger.py文件,這是一個日志配置模塊,它使用 Python 的 logging 庫來設置應用程序的日志記錄功能。該模塊首先在項目目錄下創建 logs 文件夾,然后配置日志記錄的格式(包含時間戳、日志級別和消息內容)和日期格式。它會根據當前日期創建日志文件(格式如 chat_app_20250919.log),并設置日志記錄同時輸出到文件和控制臺。日志級別被設置為 INFO,這意味著它會記錄信息性消息、警告和錯誤。最后創建了一個名為 logger 的日志記錄器實例,供其他模塊使用來記錄應用程序的運行狀態和調試信息。
import logging
import os
from datetime import datetime
from pathlib import Path
# 創建logs目錄
log_dir = Path(__file__).parent / 'logs'
log_dir.mkdir(exist_ok=True)
# 配置日志格式
log_format = '%(asctime)s - %(levelname)s - %(message)s'
date_format = '%Y-%m-%d %H:%M:%S'
# 創建日志文件名(包含日期)
log_file = log_dir / f'chat_app_{datetime.now().strftime("%Y%m%d")}.log'
# 配置日志
logging.basicConfig(
level=logging.INFO,
format=log_format,
datefmt=date_format,
handlers=[
logging.FileHandler(log_file, encoding='utf-8'),
logging.StreamHandler()
]
)
# 創建logger實例
logger = logging.getLogger(__name__)新建chat_app.py文件如下,代碼中會引用config.py和logger.py。 由于篇幅原因我們展示核心函數如下:
def initialize_openai_client():
"""初始化OpenAI客戶端
功能:
- 從環境變量獲取API密鑰
- 初始化DeepSeek API客戶端
- 配置API基礎URL
返回:
- OpenAI客戶端實例 或 None(如果初始化失?。? """
# 從環境變量獲取API密鑰
api_key = os.getenv('DEEPSEEK_API_KEY')
if not api_key:
st.error("未找到DEEPSEEK_API_KEY環境變量")
return None
# 創建并返回配置好的客戶端實例
return OpenAI(
api_key=api_key,
base_url="https://api.deepseek.com/v1" # DeepSeek API的基礎URL
)
def generate_prompt(start_dt, end_dt, resource_type=None):
"""生成提示詞:支持時間點或時間范圍。
- 當 start_dt == end_dt 時,生成“在 X 時 ...”的時間點提示
- 當不相等時,生成“在 X 至 Y 這段時間 ...”的時間范圍提示
"""
if not start_dt or not end_dt:
return ""
start_str = start_dt.strftime("%Y-%m-%d %H:%M:%S")
end_str = end_dt.strftime("%Y-%m-%d %H:%M:%S")
resource_name = RESOURCE_TYPE_MAP.get(resource_type, resource_type) if resource_type else None
if start_dt == end_dt:
if resource_name:
return f"在{start_str} 時服務器的{resource_name}情況如何?請進行分析,指出異常與風險。"
return f"在{start_str} 時服務器的整體運行情況如何?請進行分析,指出異常與風險。"
else:
if resource_name:
return f"在{start_str} 至 {end_str} 這段時間內,服務器的{resource_name}情況如何?請進行分析,指出異常與風險。"
return f"在{start_str} 至 {end_str} 這段時間內,服務器的整體運行情況如何?請進行分析,指出異常與風險。"
def filter_data(df, date, time=None, resource_type=None, end_date=None, end_time=None):
"""根據選擇的日期時間范圍和資源類型篩選數據"""
if df is None:
logger.warning("輸入的DataFrame為空")
return None
logger.info(f"開始篩選數據,起始日期:{date}, 起始時間:{time or '00:00:00'}")
logger.info(f"結束日期:{end_date or date}, 結束時間:{end_time or '23:59:59'}")
logger.info(f"資源類型:{resource_type or '所有資源'}")
# 構建開始日期時間
start_datetime = pd.Timestamp.combine(date, time) if time else pd.Timestamp.combine(date, pd.Timestamp.min.time())
# 構建結束日期時間
if end_date and end_time:
end_datetime = pd.Timestamp.combine(end_date, end_time)
elif end_date:
end_datetime = pd.Timestamp.combine(end_date, pd.Timestamp.max.time())
else:
end_datetime = pd.Timestamp.combine(date, pd.Timestamp.max.time())
logger.info(f"篩選時間范圍:{start_datetime} 至 {end_datetime}")
# 篩選時間范圍內的數據
filtered_df = df[(df['ds'] >= start_datetime) & (df['ds'] <= end_datetime)]
# 記錄篩選結果
logger.info(f"時間范圍內的數據條數:{len(filtered_df)}")
# 如果選擇了資源類型,只返回相關列
if resource_type and resource_type in df.columns:
filtered_df = filtered_df[['ds', resource_type]]
logger.info(f"已篩選資源類型:{resource_type}")
# 記錄最終結果
logger.info(f"最終篩選結果數據條數:{len(filtered_df)}")
if not filtered_df.empty:
logger.info(f"數據時間范圍:{filtered_df['ds'].min()} 至 {filtered_df['ds'].max()}")
if resource_type:
stats = filtered_df[resource_type].describe()
logger.info(f"資源統計信息:\n{stats}")
else:
logger.warning("篩選結果為空")
return filtered_df其中initialize_openai_client函數連接系統與 DeepSeek 大模型的 “橋梁”,創建大模型調用客戶端。從環境變量中讀取DEEPSEEK_API_KEY,配置 DeepSeek API 的 URL(https://api.deepseek.com/v1),生成并返回可直接用于調用的客戶端實例。
執行如下命令,運行對話應用。
streamlit run chat_app.py如下圖所示,可以選擇一段時間以及對應的系統指標(CPU 使用率),此時會自動填寫提示詞,讓 DeepSeek 協助分析異常和風險,然后點擊“開始分析”按鈕。在右側的對話框中就可以看到“CPU 使用率分析報告”的詳細信息了。

作者介紹
崔皓,51CTO社區編輯,資深架構師,擁有18年的軟件開發和架構經驗,10年分布式架構經驗。


























