從 0 到 1 解決超賣問題(從原理到分布式高并發方案)
引言
在電商秒殺、促銷活動等高頻并發場景中,“超賣” 是最棘手的問題之一。比如某商品庫存僅100件,最終卻賣出120件,不僅會引發用戶投訴,還會造成商家信譽損失。
為什么會出現超賣?
復現超賣
數據庫表:product_stock(存儲商品庫存)
CREATE TABLE `product_stock` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主鍵',
`product_id` bigint NOT NULL COMMENT '商品ID',
`stock` int NOT NULL DEFAULT '0' COMMENT '庫存數量',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_product_id` (`product_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品庫存表';
-- 初始化數據:商品ID=1001,庫存=100
INSERT INTO `product_stock` (`product_id`, `stock`) VALUES (1001, 100);錯誤的庫存扣減邏輯
// Entity實體
@Data
@TableName("product_stock")
public class ProductStock {
@TableId(type = IdType.AUTO)
private Long id;
private Long productId;
private Integer stock;
private LocalDateTime createTime;
private LocalDateTime updateTime;
}
// Mapper接口
public interface ProductStockMapper extends BaseMapper<ProductStock> {
// 按商品ID查詢庫存
@Select("SELECT * FROM product_stock WHERE product_id = #{productId}")
ProductStock getByProductId(@Param("productId") Long productId);
// 扣減庫存(僅更新,無判斷)
@Update("UPDATE product_stock SET stock = stock - 1 WHERE product_id = #{productId}")
int decreaseStock(@Param("productId") Long productId);
}
// Service實現
@Service
public class StockService {
@Autowired
private ProductStockMapper stockMapper;
// 錯誤的扣減邏輯:先查庫存,再扣減(非原子)
@Transactional
public boolean decreaseStockWrong(Long productId) {
// 1. 查詢當前庫存
ProductStock stock = stockMapper.getByProductId(productId);
if (stock == null || stock.getStock() <= 0) {
// 庫存不足,返回失敗
returnfalse;
}
// 2. 扣減庫存(此時可能已有其他線程修改了庫存)
int rows = stockMapper.decreaseStock(productId);
return rows > 0;
}
}
// Controller接口(供壓測調用)
@RestController
@RequestMapping("/stock")
public class StockController {
@Autowired
private StockService stockService;
@PostMapping("/decrease/{productId}")
public Result<?> decrease(@PathVariable Long productId) {
boolean success = stockService.decreaseStockWrong(productId);
return success ? Result.ok("扣減成功") : Result.error("庫存不足");
}
}解決方案
悲觀鎖
悲觀鎖的核心思路:讀取庫存時直接鎖定行數據,禁止其他線程修改,直到當前事務結束才釋放鎖,確保查 - 扣過程獨占。
// 1. Mapper新增“鎖定查詢”方法(Select For Update)
@Select("SELECT * FROM product_stock WHERE product_id = #{productId} FOR UPDATE")
ProductStock getByProductIdForUpdate(@Param("productId") Long productId);
// 2. Service修改為“鎖定查詢+扣減”(事務必須生效)
@Service
public class StockService {
@Autowired
private ProductStockMapper stockMapper;
// 悲觀鎖方案:事務+行鎖
@Transactional(isolation = Isolation.REPEATABLE_READ) // 事務隔離級別設為可重復讀
public boolean decreaseStockPessimistic(Long productId) {
// 1. 鎖定查詢:此時其他線程無法修改該商品的庫存行
ProductStock stock = stockMapper.getByProductIdForUpdate(productId);
if (stock == null || stock.getStock() <= 0) {
returnfalse;
}
// 2. 扣減庫存(同一事務內,鎖未釋放,其他線程無法介入)
int rows = stockMapper.decreaseStock(productId);
return rows > 0;
}
}關鍵注意點:
- 鎖范圍控制:
SELECT ... FOR UPDATE默認是行鎖,但需確保product_id是索引(本文中是唯一索引),否則會升級為表鎖,導致性能驟降。 - 事務隔離級別:需設置為
REPEATABLE_READ(MySQL默認級別),避免不可重復讀導致的庫存判斷偏差。 - 性能瓶頸:悲觀鎖是串行化處理請求,并發量過高時會出現線程阻塞,適合秒殺初期庫存充足、后期并發降低的場景。
樂觀鎖
樂觀鎖的核心思路:不主動鎖定數據,而是在扣減庫存時通過版本號或庫存當前值判斷數據是否被修改,若被修改則重試,適合中低并發場景(QPS≤3000)。
// 1. Entity新增version字段(@Version注解是MyBatis-Plus的樂觀鎖標識)
@Data
@TableName("product_stock")
public class ProductStock {
// 原有字段...
@Version // 樂觀鎖版本號字段
private Integer version; // 初始值為0
}
// 2. 配置MyBatis-Plus樂觀鎖插件
@Configuration
public class MyBatisPlusConfig {
@Bean
public MybatisPlusInterceptor mybatisPlusInterceptor() {
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
// 添加樂觀鎖插件
interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());
return interceptor;
}
}
// 3. Mapper修改扣減SQL(加入version判斷)
@Update("UPDATE product_stock SET stock = stock - 1, version = version + 1 " +
"WHERE product_id = #{productId} AND stock > 0 AND version = #{version}")
int decreaseStockWithVersion(@Param("productId") Long productId, @Param("version") Integer version);
// 4. Service實現(加入重試機制,應對版本沖突)
@Service
public class StockService {
@Autowired
private ProductStockMapper stockMapper;
// 樂觀鎖方案:版本號+重試
public boolean decreaseStockOptimistic(Long productId) {
int retryCount = 3; // 最大重試次數(避免無限循環)
while (retryCount > 0) {
// 1. 查詢當前庫存與版本號
ProductStock stock = stockMapper.getByProductId(productId);
if (stock == null || stock.getStock() <= 0) {
returnfalse;
}
// 2. 扣減庫存(僅當version匹配時才生效)
int rows = stockMapper.decreaseStockWithVersion(productId, stock.getVersion());
if (rows > 0) {
// 扣減成功,返回
returntrue;
}
// 版本沖突,重試(重試前可加短暫延遲,減少CPU占用)
retryCount--;
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 重試3次仍失敗,返回false
returnfalse;
}
}Redis 分布式鎖
- 鎖的獲取:通過
Redis的SET NX EX命令(不存在則設置值,同時設過期時間),確保同一時間只有一個線程獲取鎖。 - 鎖的釋放:執行完庫存扣減后,刪除
Redis中的鎖鍵(需校驗鎖的持有者,避免釋放別人的鎖)。 - 庫存預熱:提前將數據庫中的庫存同步到
Redis(如秒殺開始前,通過定時任務或接口加載),減少數據庫訪問。
@Service
public class StockService {
@Autowired
private RedissonClient redissonClient;
@Autowired
private ProductStockMapper stockMapper;
// Redis分布式鎖方案
public boolean decreaseStockRedis(Long productId) {
// 1. 定義Redis鎖鍵(按商品ID區分,避免鎖沖突)
String lockKey = "lock:stock:" + productId;
RLock lock = redissonClient.getLock(lockKey);
try {
// 2. 獲取鎖(等待時間0秒,鎖過期時間30秒,避免死鎖)
// 等待時間0:獲取不到鎖直接返回,不阻塞;過期時間30秒:防止線程異常導致鎖無法釋放
boolean locked = lock.tryLock(0, 30, TimeUnit.SECONDS);
if (!locked) {
// 未獲取到鎖,返回“活動太火爆”
returnfalse;
}
// 3. 從Redis查詢庫存(原子操作)
RAtomicLong redisStock = redissonClient.getAtomicLong("stock:" + productId);
long currentStock = redisStock.get();
if (currentStock <= 0) {
// 庫存不足,返回失敗
returnfalse;
}
// 4. 扣減Redis庫存(原子操作:decrementAndGet() = 先減1再返回)
long newStock = redisStock.decrementAndGet();
System.out.println("Redis庫存扣減后:" + newStock);
// 5. 同步扣減數據庫庫存(最終一致性,可異步處理,此處為簡化用同步)
// 注意:若Redis扣減成功但數據庫扣減失敗,需有補償機制(如定時任務對賬)
stockMapper.decreaseStock(productId);
returntrue;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
returnfalse;
} finally {
// 6. 釋放鎖(僅當當前線程持有鎖時才釋放,避免釋放別人的鎖)
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}關鍵優化點:
- 鎖粒度控制:鎖鍵按
lock:stock:{productId}定義,僅鎖定單個商品的庫存操作,避免一把鎖鎖所有商品導致的性能瓶頸。 - 最終一致性:優先扣減
Redis庫存(高性能),再同步數據庫庫存。若同步失敗,可通過定時任務對賬(對比Redis與數據庫庫存差異,補全扣減),確保數據最終一致。 - 鎖續期:
Redisson的tryLock會自動為鎖續期(默認每10秒續期一次,續期到30秒),避免業務執行時間超過鎖過期時間導致的鎖提前釋放。
消息隊列異步削峰
- 請求入隊:用戶請求先發送到消息隊列,而非直接扣減庫存,隊列暫存請求,避免瞬時高并發打垮業務系統。
- 異步消費:消費者線程從隊列中拉取請求,按順序執行庫存扣減(
Redis+ 數據庫),實現削峰填谷。 - 可靠性保障:通過消息持久化、確認機制、死信隊列,確保請求不丟失、不重復處理。
聲明隊列、交換機與綁定
@Configuration
public class RabbitMQConfig {
// 隊列名稱(庫存扣減隊列)
public static final String STOCK_DECREASE_QUEUE = "stock.decrease.queue";
// 交換機名稱
public static final String STOCK_EXCHANGE = "stock.exchange";
// 路由鍵
public static final String STOCK_DECREASE_ROUTING_KEY = "stock.decrease.key";
// 聲明隊列(持久化,避免消息丟失)
@Bean
public Queue stockDecreaseQueue() {
return QueueBuilder.durable(STOCK_DECREASE_QUEUE)
.deadLetterExchange("stock.dlx.exchange") // 死信交換機(處理失敗消息)
.deadLetterRoutingKey("stock.dlx.key")
.build();
}
// 聲明交換機(topic類型,支持通配符路由)
@Bean
public TopicExchange stockExchange() {
return ExchangeBuilder.topicExchange(STOCK_EXCHANGE).durable(true).build();
}
// 綁定隊列與交換機
@Bean
public Binding stockDecreaseBinding() {
return BindingBuilder.bind(stockDecreaseQueue())
.to(stockExchange())
.with(STOCK_DECREASE_ROUTING_KEY);
}
// 聲明死信隊列與交換機(處理消費失敗的消息,如庫存不足、數據庫異常)
@Bean
public Queue stockDlxQueue() {
return QueueBuilder.durable("stock.dlx.queue").build();
}
@Bean
public TopicExchange stockDlxExchange() {
return ExchangeBuilder.topicExchange("stock.dlx.exchange").durable(true).build();
}
@Bean
public Binding stockDlxBinding() {
return BindingBuilder.bind(stockDlxQueue())
.to(stockDlxExchange())
.with("stock.dlx.key");
}
}生產者
@RestController
@RequestMapping("/stock")
public class StockController {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RedissonClient redissonClient;
// 消息隊列方案:請求先入隊
@PostMapping("/decrease/mq/{productId}")
public Result<?> decreaseWithMQ(@PathVariable Long productId) {
try {
// 1. 先判斷Redis庫存(快速失敗,避免無效消息入隊)
RAtomicLong redisStock = redissonClient.getAtomicLong("stock:" + productId);
if (redisStock.get() <= 0) {
return Result.error("庫存不足");
}
// 2. 生成唯一消息ID(避免重復消費,如用戶重復提交)
String messageId = UUID.randomUUID().toString();
// 3. 構建消息體(包含商品ID、消息ID)
Map<String, Object> message = new HashMap<>();
message.put("productId", productId);
message.put("messageId", messageId);
// 4. 發送消息到隊列
rabbitTemplate.convertAndSend(
RabbitMQConfig.STOCK_EXCHANGE,
RabbitMQConfig.STOCK_DECREASE_ROUTING_KEY,
message,
msg -> {
// 設置消息持久化+消息ID
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
msg.getMessageProperties().setMessageId(messageId);
return msg;
}
);
return Result.ok("請求已接收,正在處理");
} catch (Exception e) {
return Result.error("請求失敗,請稍后再試");
}
}
}消費者
@Component
public class StockDecreaseConsumer {
@Autowired
private RedissonClient redissonClient;
@Autowired
private ProductStockMapper stockMapper;
// 用于記錄已處理的消息ID(避免重復消費,可存Redis或本地緩存)
private final Set<String> processedMessageIds = Collections.synchronizedSet(new HashSet<>());
@RabbitListener(queues = RabbitMQConfig.STOCK_DECREASE_QUEUE)
public void consume(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String messageId = message.getMessageProperties().getMessageId();
Map<String, Object> body = (Map<String, Object>) message.getPayload();
Long productId = (Long) body.get("productId");
try {
// 1. 防重復消費:判斷消息是否已處理
if (processedMessageIds.contains(messageId)) {
// 已處理,手動確認消息
channel.basicAck(deliveryTag, false);
return;
}
// 2. Redis分布式鎖(確保單商品庫存扣減串行化)
String lockKey = "lock:stock:" + productId;
RLock lock = redissonClient.getLock(lockKey);
boolean locked = lock.tryLock(0, 30, TimeUnit.SECONDS);
if (!locked) {
// 未獲取到鎖,拒絕確認(消息會重新入隊,等待下次消費)
channel.basicNack(deliveryTag, false, true);
return;
}
try {
// 3. 扣減Redis庫存
RAtomicLong redisStock = redissonClient.getAtomicLong("stock:" + productId);
long currentStock = redisStock.get();
if (currentStock <= 0) {
// 庫存不足,確認消息(避免重復處理)
channel.basicAck(deliveryTag, false);
return;
}
redisStock.decrementAndGet();
// 4. 扣減數據庫庫存
stockMapper.decreaseStock(productId);
// 5. 標記消息已處理
processedMessageIds.add(messageId);
// 6. 確認消息(消息從隊列刪除)
channel.basicAck(deliveryTag, false);
System.out.println("異步扣減成功:商品" + productId + ",消息ID" + messageId);
} finally {
// 釋放鎖
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
} catch (Exception e) {
// 消費失敗,拒絕確認并將消息送入死信隊列(避免無限重試)
channel.basicNack(deliveryTag, false, false);
System.err.println("異步扣減失敗:商品" + productId + ",錯誤:" + e.getMessage());
}
}
}關鍵保障:
- 防重復消費:通過
messageId+本地緩存(或Redis)記錄已處理消息,避免用戶重復提交導致的重復扣減。 - 消息不丟失:
隊列與消息均設置為持久化。
生產者開啟publisher-confirm(確認消息已到達交換機)。
消費者手動確認消息(basicAck),消費成功才刪除消息。
- 失敗處理:消費失敗的消息送入死信隊列,后續可人工排查或通過定時任務重試。
方案選型
超賣問題的本質是并發下操作非原子化,解決思路從保證原子性逐步升級到異步削峰:
- 數據庫方案是基礎,通過悲觀鎖 / 樂觀鎖確保單庫操作原子性,適合低并發。
Redis分布式鎖解決了分布式場景的原子性問題,性能提升顯著,是中高并發的首選。- 消息隊列 +
Redis鎖則通過異步化應對超高并發,適合大型秒殺等極端場景。
場景 | 推薦方案 | 優點 | 缺點 |
單機 / 低并發 | 數據庫樂觀鎖 | 實現簡單,無需額外組件 | 分布式場景不生效 |
分布式 / 中高并發 | Redis 分布式鎖 | 性能高,支持分布式 | 需維護 Redis,需處理一致性 |
分布式 / 超高并發 | 消息隊列 + Redis 分布式鎖 | 抗瞬時高并發,削峰填谷 | 實現復雜,需維護消息隊列 |
建議
- 庫存預熱:秒殺活動前
10分鐘,將庫存同步到Redis,避免活動開始時大量請求查詢數據庫。 - 接口限流:通過
Spring Cloud Gateway或Sentinel對秒殺接口限流(如單IP每分鐘最多請求5次),過濾無效請求。 - 降級熔斷:當
Redis或數據庫異常時,快速返回活動暫時不可用,避免系統雪崩。 - 對賬機制:定時(如每
5分鐘)對比Redis與數據庫庫存,若存在差異,以數據庫為準同步Redis(確保最終一致性)。

































