精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

在 .NET 中實現發件箱模式的實戰

開發 數據庫
本文介紹了發件箱模式在.NET中的實現,討論了該模式的利弊和擴容機制,在本文的基礎上也能比較方便的實現基于其他語言的發件箱模式。

我們在分布式系統中經常面臨保持數據庫和外部系統同步的挑戰。想象一下,將訂單保存在數據庫中,然后將消息發布到消息代理。任何一個操作失敗,系統都將處于不一致的狀態。

發件箱模式通過將消息發布視為數據庫事務的一部分來解決此問題。我們并不直接發布消息,而是將消息保存到數據庫中的發件箱表中,以確保原子操作,然后通過單獨進程可靠的發布消息。

本文將深入探討如何在 .NET 中實現這種模式。

為什么需要發件箱模式?

事務性發件箱模式修復了分布式系統中的一個常見問題。當我們需要同時做兩件事時,就會出現這個問題:保存數據并與外部組件通信。

考慮這樣的場景:發送訂單確認郵件,通知其他系統有關新客戶注冊的信息,或在下訂單后更新庫存水平。每一種操作都涉及本地數據變更以及外部數據通信或更新。

例如,想象某個微服務需要:

  • 在數據庫中保存新訂單
  • 告訴其他系統這個新訂單

如果其中某個步驟失敗,系統可能最終處于不一致狀態。也許訂單被保存了,但是沒有其他人知道。或者每個人都認為有新訂單,但數據庫里卻沒有。

下面是一個沒有發件箱模式的 CreateOrderCommandHandler:

public class CreateOrderCommandHandler(
    IOrderRepository orderRepository,
    IProductInventoryChecker inventoryChecker,
    IUnitOfWork unitOfWork,
    IEventBus eventBus) : IRequestHandler<CreateOrderCommand, OrderDto>
{
    public async Task<OrderDto> Handle(CreateOrderCommand request, CancellationToken cancellationToken)
    {
        var order = new Order(request.CustomerId, request.ProductId, request.Quantity, inventoryChecker);

        await orderRepository.AddAsync(order);

        await unitOfWork.CommitAsync(cancellationToken);

        // 數據庫事務已完成

        await eventBus.Send(new OrderCreatedIntegrationEvent(order.Id));

        returnnew OrderDto { Id = order.Id, Total = order.Total };
    }
}

這段代碼有潛在的一致性問題。在提交數據庫事務之后,有兩件事可能出錯:

  • 應用程序可能在事務提交之后、事件發送之前崩潰。因此數據庫會創建訂單,但其他系統不會知道。
  • 當我們嘗試發送事件時,事件總線可能已關閉或無法訪問。這將導致在沒有通知其他系統的情況下創建訂單。

事務性發件箱模式通過確保將數據庫更新和事件發布視為單個原子操作來幫助解決此問題。

流程圖說明了發件箱模式如何解決一致性挑戰。我們沒有嘗試分別保存數據和發送消息,而是將訂單和發件箱消息保存在單個數據庫事務中。這是一個全部成功或全部失敗的操作,該操作不能以不一致的狀態結束。

單獨的發件箱進程處理實際的消息發送。它持續檢查發件箱表中未發送的消息,并將其發布到消息隊列中。進程在成功發布消息后將消息標記為已發送,從而避免重復發送。

需要注意,發件箱模式提供了至少一次的交付。發件箱消息將至少發送一次,但也可以多次發送,用于重試。這意味著必須實現冪等的消息消費。

發件箱模式實現

首先,創建發件箱表,將在其中存儲消息:

CREATE TABLE outbox_messages (
    idUUID PRIMARY KEY,
    typeVARCHAR(255) NOTNULL,
    content JSONB NOTNULL,
    occurred_on_utc TIMESTAMPWITHTIME ZONE NOTNULL,
    processed_on_utc TIMESTAMPWITHTIME ZONE NULL,
    errorTEXTNULL
);

-- 因為將會經常查詢未處理的消息,因此可以考慮添加索引
-- 它將數據行按照我們查詢需要的正確順序排序。

CREATEINDEXIFNOTEXISTS idx_outbox_messages_unprocessed
ON outbox_messages (occurred_on_utc, processed_on_utc)
INCLUDE (id, type, content)
WHERE processed_on_utc ISNULL;

我用 PostgreSQL 作為示例數據庫。注意 content 列類型為 jsonb。如果將來需要,可以對 JSON 數據進行索引和查詢。

現在,我們創建一個表示發件箱條目的類:

public sealed class OutboxMessage
{
    public Guid Id { get; init; }
    public string Type { get; init; }
    public string Content { get; init; }
    public DateTime OccurredOnUtc { get; init; }
    public DateTime? ProcessedOnUtc { get; init; }
    public string? Error { get; init; }
}

下面將消息添加到發件箱:

public async Task AddToOutbox<T>(T message, NpgsqlDataSource dataSource)
{
    var outboxMessage = new OutboxMessage
    {
        Id = Guid.NewGuid(),
        OccurredOnUtc = DateTime.UtcNow,
        Type = typeof(T).FullName, // 通過這種方式實現反序列化
        Content = JsonSerializer.Serialize(message)
    };

    awaitusingvar connection = await dataSource.OpenConnectionAsync();
    await connection.ExecuteAsync(
        @"""
        INSERT INTO outbox_messages (id, occurred_on_utc, type, content)
        VALUES (@Id, @OccurredOnUtc, @Type, @Content::jsonb)
        """,
        outboxMessage);
}

一種優雅的實現方法是使用域事件來表示通知。當域中發生重大事件時,將觸發域事件。在完成事務之前,可以獲取所有事件并存儲為發件箱消息。我們可以通過工作單元或EF Core攔截器執行此操作。

發件箱進程

另一個組件是發件箱進程,可以是物理上獨立的進程,也可以是同一進程中的后臺工作線程。

我用 Quartz 來調度處理發件箱的后臺作業,這是一個健壯的庫,對調度循環作業提供了出色的支持。

我們來實現 OutboxProcessorJob:

[DisallowConcurrentExecution]
public class OutboxProcessorJob(
    NpgsqlDataSource dataSource,
    IPublishEndpoint publishEndpoint,
    Assembly integrationEventsAssembly) : IJob
{
    public async Task Execute(IJobExecutionContext context)
    {
        awaitusingvar connection = await dataSource.OpenConnectionAsync();
        awaitusingvar transaction = await connection.BeginTransactionAsync();

        // You can make the limit a parameter, to control the batch size.
        // We can also select just the id, type, and content columns.
        var messages = await connection.QueryAsync<OutboxMessage>(
            @"""
            SELECT id AS Id, type AS Type, content AS Content
            FROM outbox_messages
            WHERE processed_on_utc IS NULL
            ORDER BY occurred_on_utc LIMIT 100
            """,
            transaction: transaction);

        foreach (var message in messages)
        {
            try
            {
                var messageType = integrationEventsAssembly.GetType(message.Type);
                var deserializedMessage = JsonSerializer.Deserialize(message.Content, messageType);

                // We should introduce retries here to improve reliability.
                await publishEndpoint.Publish(deserializedMessage);

                await connection.ExecuteAsync(
                    @"""
                    UPDATE outbox_messages
                    SET processed_on_utc = @ProcessedOnUtc
                    WHERE id = @Id
                    """,
                    new { ProcessedOnUtc = DateTime.UtcNow, message.Id },
                    transaction: transaction);
            }
            catch (Exception ex)
            {
                // We can also introduce error logging here.

                await connection.ExecuteAsync(
                    @"""
                    UPDATE outbox_messages
                    SET processed_on_utc = @ProcessedOnUtc, error = @Error
                    WHERE id = @Id
                    """,
                    new { ProcessedOnUtc = DateTime.UtcNow, Error = ex.ToString(), message.Id },
                    transaction: transaction);
            }
        }

        await transaction.CommitAsync();
    }
}

這種方法使用輪詢定期從數據庫獲取未處理的消息。因為需要頻繁查詢未處理消息,因此輪詢會增加數據庫負載。

處理發件箱消息的另一種方法是使用事務日志跟蹤,可以通過 Postgres邏輯復制來實現。數據庫把更改從預寫日志(Write-Ahead Log, WAL)流式傳輸到應用程序,然后處理這些消息并發布到消息代理。通過這種方式可以實現基于推送的發件箱處理進程。

權衡利弊

發件箱模式雖然有效,但引入了額外復雜性和數據庫寫入。在高吞吐量系統中,很重要的一點是需要監控性能以確保其不會成為瓶頸。

建議在發件箱處理進程中實現重試機制,以提高可靠性。考慮對瞬態故障使用指數回退,對持久性問題使用斷路器,以防止系統在中斷期間過載。

非常重要的一點是需要實現消息的冪等消費。網絡問題或處理器重啟可能導致多次傳遞同一消息,因此使用者必須安全的處理重復消息。

隨著時間推移,發件箱表可能會顯著增長,從而影響數據庫性能。盡早實現存檔策略是很重要的一點,可以考慮將處理過的消息移動到冷存儲或在一段時間后刪除。

擴展發件箱處理進程

隨著系統增長,可能單個發件箱處理進程無法跟上消息數量的增長,從而導致發生錯誤以及增加處理延遲。

一種直接的方法是增加發件箱處理作業的頻率,考慮每隔幾秒鐘運行一次,可以顯著減少消息處理中的延遲。

另一種有效的策略是在獲取未處理消息時增加批處理大小。通過在每次運行中處理更多消息,可以提高吞吐量。但是,要小心不要使批處理太大,以免導致長時間運行的事務。

對于大容量系統,發件箱的并行處理可能非常有效。實現鎖定機制以聲明消息批次,從而允許多個處理進程同時工作而不發生沖突。可以 SELECT…FOR UPDATE SKIP LOCKED 聲明一批消息。這種方法可以顯著提高處理能力。

總結

發件箱模式是維護分布式系統數據一致性的強大工具。通過將數據庫操作與消息發布分離,發件箱模式可確保系統即使在出現故障時也保持可靠。

記住保持消費者冪等,實現適當的擴容策略,并管理好發件箱表的增長。

雖然增加了一些復雜性,但保證消息傳遞的好處使其成為許多場景中有價值的模式。

責任編輯:趙寧寧 來源: DeepNoMind
相關推薦

2023-01-07 10:17:06

微服務架構模式

2025-05-26 09:10:00

微服務系統發件箱模式

2012-05-24 13:39:11

Python

2024-09-29 09:58:57

2024-07-22 14:34:20

簡單工廠模式C#

2009-03-13 09:48:33

ASP.NETAjaxJQuery

2009-08-10 09:19:47

.NET反應性框架

2010-01-21 09:08:53

.NET設計模式

2011-08-30 14:18:00

2019-07-02 15:21:39

緩存NET單線程

2009-02-27 16:22:34

AjaxProAjax.NET

2009-07-27 18:13:19

ASP.NET工廠模式

2009-07-30 18:45:05

ASP.NET中防止頁

2024-09-30 09:48:41

RabbitMQ消息中間件

2009-06-26 10:48:45

職責鏈模式.NET

2018-02-10 09:44:19

2009-07-30 13:45:40

ASP.NET開發模式MVC模式

2009-06-25 15:54:18

設計模式EJB

2009-07-30 14:03:04

ASP.NET中的se

2021-09-13 07:00:01

C# .NET 緩存
點贊
收藏

51CTO技術棧公眾號

日韩欧美二区| 亚洲欧美小说色综合小说一区| 国内精品视频一区二区三区八戒| 欧美成aaa人片免费看| 国产一级免费片| 亚洲精品一区| 亚洲欧美日韩精品久久久久| 99在线看视频| 成人免费视频国产免费| 婷婷综合在线| 亚洲精品色婷婷福利天堂| 污色网站在线观看| 久草在线视频网站| 久久精品亚洲精品国产欧美 | 久久99精品久久久久久久久久久久 | 日韩精品人妻中文字幕有码| 高清成人在线| 亚洲成人免费在线观看| 亚洲人成77777| 天天插天天干天天操| 久久精品国产精品亚洲综合| 性色av一区二区三区| av资源在线免费观看| 欧美电影在线观看免费| 91精品国产综合久久国产大片 | 国产直播在线| 亚洲男人天堂av网| 日韩av图片| 五月婷婷六月激情| 国产精品一区二区男女羞羞无遮挡| 97高清免费视频| 三级影片在线看| 日韩精品看片| 国产亚洲xxx| 一二三不卡视频| av成人app永久免费| 欧美久久婷婷综合色| 一本久道中文无码字幕av| 亚洲无线看天堂av| 中文字幕在线观看不卡视频| 日本一区二区久久精品| 天天操天天射天天舔| 丁香婷婷综合五月| 亚洲综合国产精品| 国产精品久久久久久久久久久久久久久久久久| 噜噜噜躁狠狠躁狠狠精品视频 | 国产精品一站二站| 欧美三级资源在线| 污污的网站18| 91成人在线| 欧美亚洲一区三区| 日日摸日日碰夜夜爽av| caoporn视频在线| 亚洲精品成人少妇| 欧美日韩午夜爽爽| 午夜小视频福利在线观看| 亚洲美女在线一区| 亚洲精品二区| 日本在线天堂| 亚洲视频你懂的| 成人av在线播放观看| 中国av在线播放| 亚洲一区在线播放| 999一区二区三区| 黄色在线观看视频网站| 亚洲第一福利视频在线| 丰满少妇久久久| 亚洲欧洲自拍| 欧美午夜电影网| 日韩肉感妇bbwbbwbbw| 久久亚洲人体| 欧美一区二区网站| 91精品又粗又猛又爽| 九色丨蝌蚪丨成人| 亚洲老司机av| 无码人中文字幕| 中文字幕一区二区三三| 久久久久久久久中文字幕| 日本三级片在线观看| 亚洲欧美高清| 国产精品中文在线| 国产夫绿帽单男3p精品视频| 丰满白嫩尤物一区二区| 久久精品人成| 69视频在线| 亚洲一区二区三区视频在线| 99999精品视频| 欧美日韩视频免费看| 日韩丝袜情趣美女图片| 美女又爽又黄免费| 久久在线电影| 国语自产偷拍精品视频偷| 潘金莲一级淫片aaaaaa播放| 精品在线免费观看| 国产精品露出视频| 中文日本在线观看| 亚洲国产精品久久久久秋霞影院| 女人另类性混交zo| 精品视频在线播放一区二区三区| 亚洲电影免费观看高清完整版在线| 一级片手机在线观看| 欧美在线二区| 国产suv精品一区二区三区88区| 国产又大又黄又爽| 99riav久久精品riav| 中国成人在线视频| 欧美巨大丰满猛性社交| 欧美猛男超大videosgay| 中文在线一区二区三区| 成人区精品一区二区婷婷| 久久久久久国产精品久久| 青青国产在线视频| 波多野结衣中文一区| 午夜久久久久久久| 亚洲s码欧洲m码国产av| 日本不卡一区二区三区高清视频| 99国产盗摄| 永久av在线| 欧美性xxxx18| 亚洲女则毛耸耸bbw| 日韩久久综合| 欧洲中文字幕国产精品| 亚洲精品字幕在线| 国产精品五月天| 777精品久无码人妻蜜桃| 国产精品久久久久久久久久辛辛 | www欧美xxxx| 911精品产国品一二三产区| 人妻大战黑人白浆狂泄| 亚洲高清在线| 国产精品18毛片一区二区| 国产盗摄在线观看| 精品视频1区2区3区| 尤物视频最新网址| 亚洲一区成人| 国内精品久久久久久久果冻传媒| 丝袜综合欧美| 日韩欧美综合在线| 日本aⅴ在线观看| 毛片av一区二区| 日韩欧美亚洲日产国| 亚洲永久av| 日韩电影免费观看在线观看| 国产网站在线看| 国产91综合一区在线观看| 国产欧美综合一区| 成人在线视频www| 久久精品国产视频| 国产又粗又猛又爽又黄的| 欧美激情在线一区二区| 日本男人操女人| 精品久久视频| 国产精品丝袜高跟| 五月婷婷在线视频| 欧美浪妇xxxx高跟鞋交| 美女视频久久久| 韩国女主播成人在线观看| 亚洲精品中文综合第一页| 日韩精品一级毛片在线播放| 日韩在线观看视频免费| 国产又粗又猛又爽| 洋洋av久久久久久久一区| 欧美熟妇精品一区二区| 亚洲福利久久| 免费成人av网站| 美女网站视频一区| 最新国产精品拍自在线播放| 一区二区三区亚洲视频| 亚洲男女一区二区三区| 又大又长粗又爽又黄少妇视频| 亚洲午夜极品| 免费精品视频一区| 全球最大av网站久久| 日日狠狠久久偷偷四色综合免费 | 久久久久久久久久一级| 国产精品欧美经典| 中国特级黄色片| 国产精品一国产精品k频道56| 欧美成人免费在线| 欧美性aaa| 久久久久久久久久久免费 | 日韩手机在线观看视频| 日韩理论电影院| www久久99| 欧美第一视频| 久久久精品在线| 亚洲 美腿 欧美 偷拍| 欧美午夜寂寞影院| 精品在线视频观看| 国产日韩欧美高清在线| 亚洲综合123| 香蕉成人久久| 欧美一级黄色录像片| 欧美1区二区| 成人免费观看网址| 小早川怜子影音先锋在线观看| 中文国产成人精品久久一| 亚洲精品一区二区三区区别| 91黄视频在线观看| 久久综合加勒比| 国产日本欧洲亚洲| av电影在线播放| 久久精品国产一区二区三区免费看| avav在线播放| 久久中文视频| 另类欧美小说| 亚洲超碰在线观看| 国产精品自拍小视频| a'aaa级片在线观看| 久久九九精品99国产精品| 天堂在线一二区| 欧美一区二区黄色| 国模私拍一区二区| 欧美视频在线观看 亚洲欧| 亚洲一级生活片| 亚洲国产精品高清| 日韩精品一区二区三区高清免费| 国产中文字幕精品| 天天操天天爱天天爽| 亚洲经典自拍| 亚洲爆乳无码精品aaa片蜜桃| 久久综合88| 日本婷婷久久久久久久久一区二区 | 色婷婷一区二区| 日韩精品国产一区二区| 亚洲免费av观看| 韩国一级黄色录像| 国产清纯白嫩初高生在线观看91| 免费的av网站| gogo大胆日本视频一区| 免费黄色a级片| 国产精品自拍av| 999在线精品视频| 麻豆精品一二三| 九九热精品在线播放| 久久一本综合频道| 人妻熟女一二三区夜夜爱| 在线日韩视频| 久艹视频在线免费观看| 欧美午夜影院| 久久久天堂国产精品| 亚洲美女视频| 色爽爽爽爽爽爽爽爽| 亚洲最大av| 午夜久久久久久久久久久| 亚洲精品网址| 女人床在线观看| 欧美日韩蜜桃| 国产91沈先生在线播放| 国内久久精品| 青青青青草视频| 亚洲伦伦在线| 欧美精品色婷婷五月综合| 亚洲一区区二区| 热久久精品国产| 男女男精品视频| 视频在线观看免费高清| 激情综合色综合久久综合| www.日本久久| 国产99精品国产| 一级国产黄色片| 久久久精品一品道一区| 一级片久久久久| 国产精品福利电影一区二区三区四区 | 99精品中文字幕在线不卡 | 亚洲成人1区2区| 精品欧美一区二区三区免费观看| 欧美日韩精品在线播放| 无码人妻久久一区二区三区| 欧美日韩在线观看一区二区| 国产熟女一区二区三区四区| 日韩亚洲电影在线| 色欲久久久天天天综合网| 日韩精品在线播放| 97视频在线观看网站| 欧美久久精品午夜青青大伊人 | 国产精品久久久久久久久久小说 | 亚洲第一区在线| 欧美高清电影在线| 色哟哟网站入口亚洲精品| av中文字幕在线观看| 欧美激情精品久久久久久| 中文在线8资源库| 91久久久久久久久| 欧美大胆a级| 一区二区三区三区在线| 伊人激情综合| 潘金莲激情呻吟欲求不满视频| 国产激情91久久精品导航| 法国伦理少妇愉情| 亚洲色图制服丝袜| av资源免费观看| 欧美精品自拍偷拍| 天天操天天干天天爱| 日韩在线视频免费观看| 韩国成人免费视频| 国产精品久久久久久久av电影| 久久爱www.| 日韩欧美视频第二区| 黄色欧美日韩| 九九热99视频| 91偷拍与自偷拍精品| 黄色录像免费观看| 色综合久久久久综合99| 性生交生活影碟片| 综合国产在线视频| 女厕盗摄一区二区三区| 91欧美精品成人综合在线观看| 日韩av网站在线免费观看| 桥本有菜av在线| 日韩国产欧美一区二区三区| 中文在线字幕观看| 亚洲天堂2014| 中文字幕av久久爽| 亚洲精品天天看| a级片免费在线观看| 91在线免费视频| 欧美日韩精品在线一区| 内射国产内射夫妻免费频道| 国产毛片精品一区| 欧日韩不卡视频| 色综合久久久久综合体| 天天摸天天干天天操| 欧美尺度大的性做爰视频| 欧美黄色成人| 手机成人在线| 久久久久久婷| 一区二区视频观看| 亚洲第一福利视频在线| 狠狠躁日日躁夜夜躁av| 久久视频在线观看免费| 日韩五码电影| 在线观看福利一区| 美女精品一区二区| 性欧美一区二区| 在线精品视频小说1| 青青操视频在线| 欧美综合一区第一页| 欧洲亚洲一区二区三区| 国产a级片网站| 91小视频免费看| 在线能看的av| 日韩精品黄色网| 亚洲天堂电影| 欧美一区免费视频| 日韩精品一二三区| 韩国三级hd中文字幕| 在线观看日韩毛片| 69xxxx欧美| 成人性生交大片免费看小说| 香蕉视频国产精品| 爱情岛论坛亚洲自拍| 亚洲欧美成aⅴ人在线观看| 精品国产av 无码一区二区三区| 久久精品久久久久久| 欧美二区观看| 精品少妇在线视频| 91亚洲精品乱码久久久久久蜜桃| 欧美特黄aaaaaa| 亚洲欧美视频在线| 日韩精品免费观看视频| 亚洲欧美国产不卡| 精品伊人久久久久7777人| 成人免费黄色小视频| 欧美成人精品1314www| 阿v视频在线| 欧美一区三区二区在线观看| 美腿丝袜亚洲综合| 精品国产乱码久久久久久鸭王1| 精品国产露脸精彩对白| 成人影院在线播放| 欧美精品一区二区视频 | 日本xxxx免费| 偷偷要91色婷婷| 国产免费视频在线| 95av在线视频| 国产精品入口66mio| 久久久久久久久久久久久久久| 欧美三级日韩三级国产三级| 高清全集视频免费在线| 国产尤物91| 免费看欧美美女黄的网站| 国产中文av在线| 亚洲成人久久一区| 日韩国产网站| 黄色一级片黄色| 久久精品综合网| 国产黄a三级三级看三级| 97av在线视频免费播放| 爽成人777777婷婷| 91精品又粗又猛又爽| 在线免费一区三区| 美洲精品一卡2卡三卡4卡四卡| 欧美福利一区二区三区| 国产综合色在线| www毛片com| 欧美福利视频在线观看| 国产不卡av一区二区| 特种兵之深入敌后| 欧美性猛交99久久久久99按摩|