精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Spring Boot + Redis Streams :構(gòu)建高效消息系統(tǒng)

數(shù)據(jù)庫 Redis
Redis Streams是一種日志數(shù)據(jù)結(jié)構(gòu),類似于Apache Kafka中的分區(qū)日志,提供了持久化、可回溯、消息分組等特性。它支持生產(chǎn)者消費(fèi)者模型,允許生產(chǎn)者將消息追加到流的末尾,消費(fèi)者從流中讀取消息進(jìn)行處理。

前言

在現(xiàn)代微服務(wù)架構(gòu)中,可靠的消息處理系統(tǒng)是保證系統(tǒng)高可用性和擴(kuò)展性的關(guān)鍵。Redis Streams作為Redis 5.0引入的強(qiáng)大功能,提供了一種日志數(shù)據(jù)結(jié)構(gòu),能夠高效地處理消息隊(duì)列和流數(shù)據(jù)。

簡介

圖片圖片

Redis Streams是一種日志數(shù)據(jù)結(jié)構(gòu),類似于Apache Kafka中的分區(qū)日志,提供了持久化、可回溯、消息分組等特性。它支持生產(chǎn)者消費(fèi)者模型,允許生產(chǎn)者將消息追加到流的末尾,消費(fèi)者從流中讀取消息進(jìn)行處理。

Redis Streams的主要特性包括:

  • 消息持久化:消息存儲(chǔ)在Redis內(nèi)存中,并可通過持久化策略(如 RDB、AOF)保證數(shù)據(jù)不丟失。
  • 消息分組:支持將消費(fèi)者劃分為不同的分組,每個(gè)分組可以獨(dú)立消費(fèi)消息,實(shí)現(xiàn)消息的并行處理。
  • 消息確認(rèn)機(jī)制:消費(fèi)者處理完消息后,可以向流發(fā)送確認(rèn)消息,確保消息不會(huì)被重復(fù)處理。
  • 消息回溯:可以從任意位置讀取消息,支持歷史消息的查詢和重放。

效果圖

圖片圖片

消息生產(chǎn)與消費(fèi)實(shí)踐

創(chuàng)建消息實(shí)體類

@Data
public class Message implements Serializable {
    private String id;
    private String content;
}

生產(chǎn)者服務(wù)

@Service
public class MessageProducer {
    private static final String STREAM_KEY = "message-stream";
    private final RedisTemplate<String, Object> redisTemplate;

    public MessageProducer(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    public RecordId sendMessage(Message message) {
        StreamOperations<String, Object, Object> streamOps = redisTemplate.opsForStream();
        Map<String, Object> messageMap = new HashMap<>();
        messageMap.put("id", message.getId());
        messageMap.put("content", message.getContent());

        return streamOps.add(MapRecord.create(STREAM_KEY, messageMap));
    }
}

配置消費(fèi)者(組)

@Slf4j
@Service
public class MessageConsumer implements StreamListener<String, MapRecord<String, String, String>> {
    private static final String STREAM_KEY = "message-stream";
    private static final String GROUP_NAME = "message-group";
    private static final String CONSUMER_NAME = "consumer-1";
    private final RedisTemplate<String, Object> redisTemplate;
    private StreamMessageListenerContainer<String, MapRecord<String, String, String>> container;

    public MessageConsumer(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @PostConstruct
    public void init() {
        String script = "if redis.call('EXISTS', KEYS[1]) == 0 then " +
                "  return 1 " +
                "else " +
                "  return 0 " +
                "end";
        RedisScript<Long> redisScript = RedisScript.of(script, Long.class);
        Long result = redisTemplate.execute(redisScript, Collections.singletonList(streamKey));

        if (result != null && result == 1) {
            redisTemplate.opsForStream().createGroup(streamKey, ReadOffset.latest(), groupName);
            log.info("消費(fèi)者組 {} 創(chuàng)建成功", groupName);
        } else {
            log.info("消費(fèi)者組 {} 已存在", groupName);
        }

        // 配置消息監(jiān)聽容器
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String,
                MapRecord<String, String, String>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                        .batchSize(10)
                        .pollTimeout(Duration.ofMillis(100))
                        .build();

        container = StreamMessageListenerContainer.create(redisTemplate.getConnectionFactory(), options);

        /**
         *  ReadOffset.latest():指定組的起始位置為 “當(dāng)前最新消息”
         *  ReadOffset.lastConsumed():從消費(fèi)者組的最后確認(rèn)位置開始讀取。如果是新組,默認(rèn)從$(最新位置)開始,確保消息至少被消費(fèi)一次(At Least Once)
         *  ReadOffset.from(String id):從指定的消息 ID 開始讀取
         *   id="0-0":從流的起始位置(第一條消息)開始讀取所有歷史消息
         *   id="$":等價(jià)于ReadOffset.latest(),從尾部開始讀取新消息
         *   id="具體消息ID":從指定 ID 的下一條消息開始讀取
        **/
        container.receive(
                Consumer.from(GROUP_NAME, CONSUMER_NAME),
                StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()),this);

        container.start();
        // 驗(yàn)證容器是否啟動(dòng)成功
        if (container.isRunning()) {
            log.info("消息監(jiān)聽容器已啟動(dòng)");
        } else {
            log.warn("消息監(jiān)聽容器啟動(dòng)失敗");
        }
    }

    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        try {
            Map<String, String> value = message.getValue();
            String id = value.get("id");
            String content = value.get("content");

            // 處理消息
            System.out.println("收到消息: ID=" + id + ", 內(nèi)容=" + content);

            // 業(yè)務(wù)處理邏輯...

            // 確認(rèn)消息處理完成
            redisTemplate.opsForStream().acknowledge(STREAM_KEY, GROUP_NAME, message.getId());
        } catch (Exception e) {
            // 處理異常,可以記錄日志或?qū)崿F(xiàn)重試邏輯
            System.err.println("消息處理失敗: " + e.getMessage());
        }
    }

    @PreDestroy
    public void destroy() {
        if (container != null) {
            container.stop();
        }
    }

    public void consumeMessages() {
        StreamOperations<String, String, String> streamOps = redisTemplate.opsForStream();
        List<MapRecord<String, String, String>> messages = streamOps.read(
                Consumer.from(GROUP_NAME, CONSUMER_NAME),
                StreamReadOptions.empty().count(10).block(Duration.ofSeconds(10)),
                StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed())
        );

        messages.forEach(message -> {
            Map<String, String> value = message.getValue();
            String id = value.get("id");
            String content = value.get("content");
            System.out.println("Received message: id=" + id + ", cnotallow=" + content);
            // 確認(rèn)消息已處理
            streamOps.acknowledge(STREAM_KEY, GROUP_NAME, message.getId());
        });
    }
}

檢查流和消費(fèi)者組狀態(tài)

# 查看流信息
XLEN message-stream

# 查看消費(fèi)者組信息
XINFO GROUPS message-stream

# 查看組消費(fèi)情況
XINFO CONSUMERS message-stream message-group

注意

  • 在創(chuàng)建Redis Streams消費(fèi)者組時(shí),不能使用ReadOffset.lastConsumed(),當(dāng)你創(chuàng)建一個(gè)新的消費(fèi)者組時(shí),Redis要求你明確指定組的初始讀取位置(即從哪個(gè)消息ID開始消費(fèi))

組的狀態(tài)尚未初始化:新組沒有任何消費(fèi)記錄,lastConsumed()無法確定起始位置

Redis API設(shè)計(jì):創(chuàng)建組的命令(XGROUP CREATE)必須包含一個(gè)固定的偏移量參數(shù)(如0-0$

Redis Streams 和 Redis Pub-Sub 之間的主要區(qū)別

特性

Redis Streams

Redis Pub-Sub

消息持久性

支持

不支持

投遞保證

即使消費(fèi)者離線也能投遞

無(未被消費(fèi)的消息會(huì)丟失)

重放能力

支持(可通過 ID 讀取歷史消息)

不支持(僅支持實(shí)時(shí)消息)

消息有序性

有保證(基于消息 ID)

無保證

消費(fèi)者協(xié)調(diào)

支持(通過消費(fèi)者組實(shí)現(xiàn))

不支持

多消費(fèi)者支持

支持(通過消費(fèi)者組實(shí)現(xiàn)并發(fā)消費(fèi))

支持(消息廣播至所有訂閱者)


責(zé)任編輯:武曉燕 來源: 一安未來
相關(guān)推薦

2025-06-05 08:00:00

Go事件驅(qū)動(dòng)系統(tǒng)編程

2025-05-16 07:24:41

Springkafka腳手架

2023-11-07 10:01:34

2020-01-14 15:08:44

Redis5Streams數(shù)據(jù)庫

2024-10-25 08:41:18

消息隊(duì)列RedisList

2023-10-11 14:37:21

工具開發(fā)

2018-12-05 09:00:00

RedisRedis Strea數(shù)據(jù)庫

2021-09-03 06:46:34

Spring 6pring Boot 項(xiàng)目

2021-09-15 09:02:20

Spring 6Spring BootJava

2023-09-01 08:46:44

2025-03-31 08:39:55

2021-01-12 08:43:29

Redis ListStreams

2023-07-10 08:26:19

2025-05-29 01:33:00

微服務(wù)架構(gòu)系統(tǒng)

2025-05-13 07:13:25

2019-01-15 11:40:14

開發(fā)技能代碼

2022-10-10 08:00:00

微服務(wù)Spring Boo容器

2020-11-12 07:51:05

DockerSpring Boot應(yīng)用

2018-11-02 15:45:41

Spring BootRedis數(shù)據(jù)庫

2020-07-14 11:00:12

Spring BootRedisJava
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

精品久久久久久久久久久| 国产一区二区三区黄视频| 亚洲欧洲激情在线| 四季av一区二区三区| 欧美家庭影院| 国产亚洲人成网站| 91成人免费在线观看| 西西44rtwww国产精品| 色一区二区三区四区| 亚洲成人网在线| 亚洲一区在线不卡| 啊啊啊久久久| 中文字幕日韩一区| 精品视频一区二区| 国产美女免费视频| 久久电影一区| 欧美日韩成人网| 午夜在线观看一区| 91精品国产自产精品男人的天堂| 91久久线看在观草草青青| 久久躁狠狠躁夜夜爽| 一区二区免费在线观看视频| 欧美天堂一区二区| 香蕉乱码成人久久天堂爱免费| 日韩av在线电影观看| 午夜精品久久久久久久99老熟妇| 视频一区欧美日韩| 久久久久久久久久国产精品| 久久久国产一级片| 欧美日韩123| 精品福利av导航| 国内自拍第二页| 99久久精品一区二区成人| 性感美女极品91精品| 一二三四中文字幕| 日本在线免费网| 国产日韩v精品一区二区| 国产精品自拍首页| 性生活黄色大片| 久久99国产精品久久99果冻传媒| 国产精品成人v| 欧美三级一区二区三区| 一区精品久久| 久精品免费视频| 免费在线观看一级片| 久久亚洲国产| 爽爽爽爽爽爽爽成人免费观看| 一卡二卡三卡四卡| 一区三区在线欧| 亚洲欧美精品一区| 中文字幕高清视频| 欧美日韩爱爱| 亚洲欧美在线播放| 中文字幕av网址| 国产91精品对白在线播放| 精品视频在线播放| 成年人在线观看av| 欧美男gay| 一区二区国产精品视频| 天天摸日日摸狠狠添| 日韩成人精品一区| 中文字幕一区二区精品| 美国黄色特级片| 久久国产成人精品| 啊v视频在线一区二区三区| 亚洲女人久久久| 欧美成人嫩草网站| 久久久久久国产精品久久| 久久这里只有精品国产| 亚洲日产国产精品| 青草青草久热精品视频在线观看| 久久久久在线视频| 青青草国产成人av片免费| 国产精品一二三在线| 国产又粗又猛视频免费| 国内不卡的二区三区中文字幕| 91在线视频九色| 国模私拍视频在线| 久久综合九色综合97婷婷| 色噜噜一区二区| 成人短视频在线观看| 亚洲综合激情网| 激情综合在线观看| 国产精品第一| 欧美一区二区三区免费视频 | 夜夜春很很躁夜夜躁| 青青草国产免费一区二区下载| 日韩亚洲综合在线| 久久视频免费在线观看| 日韩黄色免费网站| 97久草视频| 日本韩国一区| 亚洲视频狠狠干| 日本三级免费网站| 亚洲网站三级| 日韩精品久久久久| 欧洲美女女同性互添| 亚洲人妖在线| 成人激情春色网| 在线播放第一页| 亚洲精品网站在线| 99久久婷婷国产综合| 伊人成人网在线看| 国产精品女人久久久久久| 亚洲国产成人在线观看| 国产亚洲精品aa午夜观看| 国内自拍中文字幕| 88xx成人永久免费观看| 欧美电影免费观看完整版| 免费看污片网站| 影音先锋一区| 91精品免费看| 欧美孕妇孕交xxⅹ孕妇交| 亚洲特黄一级片| 无码少妇一区二区三区芒果| aaa国产精品视频| 在线播放日韩欧美| 一级成人黄色片| 成人午夜私人影院| 一本二本三本亚洲码| 免费成人直播| 亚洲黄页视频免费观看| 男人操女人的视频网站| 日韩电影在线免费看| 国产伦精品一区二区三区视频孕妇| 91最新在线| 日韩欧美在线中文字幕| 任你躁av一区二区三区| 欧美影院一区| 91久久在线视频| 2017亚洲天堂1024| 一本大道av伊人久久综合| 日本道中文字幕| 黄色成人在线网址| av成人综合网| bt在线麻豆视频| 欧美日韩国产综合一区二区三区| 国产在线观看h| 午夜在线视频观看日韩17c| 国产精品久久久久久久久久直播| 91麻豆一二三四在线| 在线成人小视频| 手机在线中文字幕| 美女视频黄免费的久久 | 人妻91麻豆一区二区三区| 中文字幕综合网| 日本77777| 亚洲精品2区| 超碰国产精品久久国产精品99| 超鹏97在线| 日韩视频一区二区| 国产精品老熟女一区二区| 国精产品一区一区三区mba桃花 | 中文一区二区| 久久综合精品一区| 黄色亚洲网站| 中文字幕欧美精品日韩中文字幕| 国产精华7777777| 中文字幕av不卡| 亚洲精品永久视频| 你懂的国产精品| 国产精品免费看一区二区三区| 欧美大胆的人体xxxx| 亚洲第一区第二区| 国产精品男女视频| 国产午夜精品在线观看| 日韩在线不卡一区| 一区二区三区四区日韩| 肥熟一91porny丨九色丨| 蜜臀久久精品| 亚洲人精选亚洲人成在线| 夜夜爽8888| 亚洲一区二区在线观看视频| 亚洲精品乱码久久久久久久| 肉色丝袜一区二区| 丰满女人性猛交| 理论片一区二区在线| 国产999精品视频| 国产午夜精品久久久久免费视| 精品日韩欧美在线| 成人在线免费看视频| 国产精品免费网站在线观看| 欧美又黄又嫩大片a级| 91久久久久| 日韩福利视频| 99a精品视频在线观看| 欧美最猛性xxxx| 米奇777四色精品人人爽| 精品91自产拍在线观看一区| 日日噜噜噜噜人人爽亚洲精品| 中文字幕一区二区三| 艳妇乳肉豪妇荡乳xxx| 免费在线观看视频一区| 国产xxxx振车| 日本激情一区| 国产精品一区二区三区免费观看| 成人毛片免费| 91国产精品视频在线| 日本激情视频在线观看| 精品国产乱码久久久久久久 | 欧美精品一区二区三区在线四季| 人人精品久久| 欧美孕妇性xx| 国产91足控脚交在线观看| 在线播放日韩欧美| 天堂网在线中文| 欧美一区二区视频在线观看2020 | 丝袜亚洲另类欧美综合| 欧美做暖暖视频| 欧美hentaied在线观看| 久久久久欧美| 超碰精品在线观看| 91色在线观看| av成人在线观看| 91高清在线免费观看| 色呦呦在线播放| 日韩在线小视频| 色哟哟国产精品色哟哟| 日韩一区二区三区视频| 亚洲中文字幕在线观看| 欧美视频免费在线观看| 久久这里只有精品国产| 亚洲美女一区二区三区| 久久久精品成人| 久久久www成人免费无遮挡大片| 精品一区二区三区四区五区六区| 国产一区二区三区美女| 久热在线视频观看| 免费国产亚洲视频| 妓院一钑片免看黄大片| 久久最新视频| 女性隐私黄www网站视频| 99在线热播精品免费99热| 免费高清一区二区三区| 欧美国产高潮xxxx1819| 精品一区二区成人免费视频| 日韩精品1区| 亚洲成色最大综合在线| 国产亚洲一区二区三区啪| 欧美激情导航| 久久最新网址| 日韩经典在线视频| 日本一区二区免费高清| 日韩在线电影一区| 日韩中文在线电影| 亚洲综合网中心| 欧美肥老太太性生活| 经典三级在线视频| 欧美91精品| 韩国无码av片在线观看网站| 欧美黄色精品| 国内精品在线观看视频| 亚洲免费观看| 精品久久久久久久免费人妻| 老司机久久99久久精品播放免费| 热久久精品免费视频| 另类综合日韩欧美亚洲| 精品国产午夜福利在线观看| 国产原创一区二区三区| 国产成人精品一区二区在线小狼| 盗摄精品av一区二区三区| 亚洲熟女乱综合一区二区三区| 久久品道一品道久久精品| 波多野结衣片子| 国产精品国产三级国产aⅴ原创| 欧美性生交大片| 一区二区免费视频| 午夜影院在线看| 欧美性大战久久久久久久| 国产精品欧美亚洲| 精品国产乱码久久久久久浪潮| 视频午夜在线| 中文字幕精品av| 丝袜在线观看| 欧美中文字幕在线播放| 99riav视频一区二区| 99视频日韩| 国产成人短视频在线观看| 中文字幕欧美人与畜| 精品99视频| 亚洲天堂av线| 高清av一区二区| 无码人妻aⅴ一区二区三区69岛| 蜜桃av在线| 欧美一区在线直播| 香蕉久久一区| 精品亚洲欧美日韩| 日韩一区二区三区免费播放| 伊人网在线免费| 久久国产精品99国产| 一个色综合久久| 91在线观看一区二区| 免费黄色国产视频| 天天影视网天天综合色在线播放| 真实新婚偷拍xxxxx| 日韩精品一区二区三区视频在线观看 | 亚洲色图偷窥自拍| 老司机午夜在线视频| 91精品国产91久久久久久| 欧美日韩免费电影| 久久精品ww人人做人人爽| 亚洲欧洲美洲一区二区三区| 丰满爆乳一区二区三区| 紧缚奴在线一区二区三区| av网站有哪些| 亚洲一区在线视频| 中文字幕一区二区人妻电影| 日韩亚洲国产中文字幕欧美| yiren22综合网成人| 国内精品国产三级国产在线专| www.国产精品| 欧美激情www| 亚洲日本黄色| 性感美女一区二区三区| 中文字幕一区二区日韩精品绯色| 天堂网一区二区| 亚洲成色www8888| 人人澡人人添人人爽一区二区| 国产精品偷伦一区二区| 欧美男gay| 干日本少妇首页| 99久久精品国产毛片| 欧美丰满艳妇bbwbbw| 欧美日产国产精品| 3p视频在线观看| 国产精品久久久久久av福利| 啪啪国产精品| 怡红院av亚洲一区二区三区h| 国产大片一区二区| 午夜国产福利一区二区| 这里只有精品电影| 一本一道波多野毛片中文在线| 国产精品成av人在线视午夜片| 奇米亚洲欧美| 国产三区在线视频| 久久久久久一级片| 中文字幕亚洲乱码熟女1区2区| 精品国产百合女同互慰| 青草青在线视频| 亚洲一区二区中文| 欧美成人久久| 俄罗斯黄色录像| 亚洲国产精品天堂| 欧美一级淫片免费视频魅影视频| 欧美不卡视频一区发布| 亚洲一区二区三区四区电影| 国产一级大片免费看| 国产不卡一区视频| 久久精品无码人妻| 日韩激情av在线免费观看| 涩涩视频在线播放| 日本一区二区精品视频| 日韩福利视频网| www.99re6| 日韩美女一区二区三区四区| 欧美人与牲禽动交com| 国产免费一区二区| 亚洲欧美高清| 中文字幕网站在线观看| 欧美三级日韩三级| 国产在线观看免费麻豆| 97久久精品午夜一区二区| 在线国产欧美| 亚洲黄色小说视频| 欧美日韩免费观看一区二区三区 | 久久久久久久久电影| 婷婷精品在线观看| 日本特黄a级片| 一区二区三区在线不卡| 午夜视频免费看| 日韩免费观看av| 婷婷色综合网| 中文字幕在线播放视频| 91国模大尺度私拍在线视频| 色欧美激情视频在线| 91久久偷偷做嫩草影院| 亚洲少妇一区| 911国产在线| 亚洲大胆人体视频| 456成人影院在线观看| 四虎精品欧美一区二区免费| 91在线视频在线| 一级做a爰片久久毛片16| 欧美高清视频免费观看| 国产精品一国产精品| 丰满少妇中文字幕| 色综合久久88色综合天天免费| 免费在线看黄网站| 精品日产一区2区三区黄免费| 日韩av中文字幕一区二区三区| 亚洲av无码一区二区三区在线| 亚洲精品久久久一区二区三区| 日韩欧美精品电影| 欧美一区二区激情| 国产女同互慰高潮91漫画| 亚洲黄色一级大片| 国产精品日韩电影| 在线综合欧美| 青娱乐免费在线视频| 中文字幕国产精品|