0%

关于rockteMq消息并发消费写db的小技巧

关于rocketMq消息消费,一般使用的是并发消费模式。

消息消费时要考虑的问题主要有以下几个

  1. 消息处理耗时 如果耗时过长,会导致消息积压。
  2. 幂等性,一般rocketMq消息不会重复,但是还是存在这种可能性。 所以需要对重复消息进行幂等处理
    1. 幂等的处理逻辑: 需要保证重复消息只处理一次,或者处理效果等效。 一般的处理方案是丢弃重复消息。
    2. 消息消费是记录消息的唯一id,每次消费消息时检查以下是否已经消费过,是的话直接标记消息已消费。
  3. 并发问题的控制。
    1. 一般方案:通过加锁,消息处理中唯一键要保证唯一性。
    2. 进行线程安全控制。
    3. 通过 insert on duplicate key update 方式: 此方式有个缺陷: db表的自增主键会在每次唯一键冲突时 增加。
    4. 例子: 通过消费支付消息,统计用户月度累计消费金额。
      1. 更新数据,基于用户id和月份,如果更新结果是0,表示记录不存在
      2. 进行数据插入操作。 如果插入失败,则说明由于并发其他线程已经插入数据。直接返回失败,待后续消息重新消费。
      3. 如果插入成功则直接继续后续流程。

最佳实践:

​ 新增&更新记录防并发策略:

  1. 先更新
    1. 成功直接返回
    2. 失败,则走 insert on duplicate key update 方式。(改方式存在db表自增主键不连续问题)

参考资料:

https://www.sevenyuan.cn/2020/11/18/jms/2020-11-18-RocketMQ-Learning-Client/

Welcome to my other publishing channels