消息队列之rocketmq | go 技术论坛-380玩彩网官网入口

[toc]

文章介绍

本文来简单介绍一下消息队列 ,这里将什么是mq, 介绍rocketmq的安装,rocketmq的基本概念,消息类型,并使用go做各类消息的收发

什么是mq

1.什么是mq

消息队列是一种“先进先出”的数据结构

queue1.png

2.应用场景

其应用场景主要包含以下3个方面

  • 应用解耦

系统的耦合性越高,容错性就越低。以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。

解耦1.png
使用消息队列解耦合,系统的耦合性就会提高了。比如物流系统发生故障,需要几分钟才能来修复,在这段时间内,物流系统要处理的数据被缓存到消息队列中,用户的下单操作正常完成。当物流系统回复后,补充处理存在消息队列中的订单消息即可,终端系统感知不到物流系统发生过几分钟故障。

解耦2.png

  • 流量削峰

mq-5.png
应用系统如果遇到系统请求流量的瞬间猛增,有可能会将系统压垮。有了消息队列可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提到系统的稳定性和用户体验。

mq-6.png

一般情况,为了保证系统的稳定性,如果系统负载超过阈值,就会阻止用户请求,这会影响用户体验,而如果使用消息队列将请求缓存起来,等待系统处理完毕后通知用户下单完毕,这样总不能下单体验要好。

处于经济考量目的:

业务系统正常时段的qps如果是1000,流量最高峰是10000,为了应对流量高峰配置高性能的服务器显然不划算,这时可以使用消息队列对峰值流量削峰

  • 数据分发

通过消息队列可以让数据在多个系统更加之间进行流通。数据的产生方不需要关心谁来使用数据,只需要将数据发送到消息队列,数据使用方直接在消息队列中直接获取数据即可。

mq的优点和缺点

优点:解耦、削峰、数据分发mq-2.png

缺点包含以下几点:

  • 系统可用性降低
    系统引入的外部依赖越多,系统稳定性越差。一旦mq宕机,就会对业务造成影响。
    如何保证mq的高可用?
  • 系统复杂度提高
    mq的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过mq进行异步调用。
    如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
  • 一致性问题
    a系统处理完业务,通过mq给b、c、d三个系统发消息数据,如果b系统、c系统处理成功,d系统处理失败。
    如何保证消息数据处理的一致性?

rocketmq的安装

使用docker安装

rocketmq的基本概念

  • producer:消息的发送者;例如:发信人
  • consumer:消息接收者;例如:收信人
  • broker:暂存和传输消息;例如:邮局、中转站
  • nameserver:管理broker;例如:各个邮局的管理机构
  • topic:区分消息的种类;一个发送者可以发送消息给一个或者多个topic;一个消息的接收者可以订阅一个或者多个topic消息
  • message queue:相当于是topic的分区;用于并行发送和接收消息

消息类型

go实战

需要拉取

go get github.com/apache/rocketmq-client-go/v2
go get github.com/apache/rocketmq-client-go/v2/primitive
go get github.com/apache/rocketmq-client-go/v2/producer

这里我以实战的角度来介绍rocketmq的消息类型:

1. 普通消息

只是消息的收发,发送成功后接收者就直接可以收到消息

package main
import (
    "context"
    "fmt"
    "time"
    "github.com/apache/rocketmq-client-go/v2"
    "github.com/apache/rocketmq-client-go/v2/primitive"
    "github.com/apache/rocketmq-client-go/v2/producer"
)
func main() {
    //初始化生产者
    q, err := rocketmq.newproducer(producer.withnameserver([]string{"101.1.12.202:9876"}))
    if err != nil {
        panic("生成q生产者失败")
    }
    if err := q.start(); err != nil {
        panic("启动q生产者失败")
    }
    msg := []byte("您好呀, 我是ice_moss")
    mq := primitive.newmessage("msg_test_hello", msg)  //msg_test_hello是为topic
    res, err := q.sendsync(context.background(), mq)
    if err != nil {
        fmt.printf("发送失败%s", err)
    }
    fmt.println("消息发送成功")
    fmt.println(res.string())
    err = q.shutdown()
    if err != nil {
        panic("shutdown fail err")
    }
}

这里需要注意的是如果我们需要在一个进程中启动多个rocketmq.newproducer()就必须将他的第二个参数配置上:producer.withgroupname("sendmsg")

q, err := rocketmq.newproducer(producer.withnameserver([]string{"101.1.12.202:9876"}), producer.withgroupname("sendmsg"))

不然就会报:生产者组已经被创建

原因:我们没有不设置withgroupname在调用时,会自动为我们创建一个默认名称的withgroupname,当第二次rocketmq.newproducer仍然是默认名,这时整个groupname就冲突了

好了已经将”普通消息”发送到队列中了,现在我们来接收

2. 消费消息

注意:两端的topic必须保持一直

package main
import (
    "context"
    "fmt"
    "os"
    "time"
    "github.com/apache/rocketmq-client-go/v2"
    "github.com/apache/rocketmq-client-go/v2/consumer"
    "github.com/apache/rocketmq-client-go/v2/primitive"
)
func main() {
    c, _ := rocketmq.newpushconsumer(
        //接收者组
        consumer.withgroupname("msg_test"),
        consumer.withnsresolver(primitive.newpassthroughresolver([]string{"101.1.12.202:9876"})),
    )
    //订阅消息
    err := c.subscribe("msg_test_hello", consumer.messageselector{}, func(ctx context.context,
        msgs ...*primitive.messageext) (consumer.consumeresult, error) {
        for i := range msgs {
            fmt.printf("subscribe callback: %v \n", msgs[i])
        }
        return consumer.consumesuccess, nil
    })
    if err != nil {
        fmt.println(err.error())
    }
    // note: start after subscribe
    err = c.start()
    if err != nil {
        fmt.println(err.error())
        os.exit(-1)
    }
  //程序运行2分钟
    time.sleep(time.second * 120)
    err = c.shutdown()
    if err != nil {
        fmt.printf("shutdown consumer error: %s", err.error())
    }
}

输出:

subscribe callback: [message=[topic=msg_test_hello, body=您好呀, 我是ice_moss, flag=0, properties=map[consume_start_time:1668255347270 max_offset:2 min_offset:0 uniq_k251664a6e000000003cf040100001], transactionid=], msgid=0a0251664a6e000000003cf040100001, offsetmsgid=010eb4ca00002a9f000000000004bc14,queueid=1, storesize=174, queueoffset=0, sysflag=0, borntimestamp=1668254378888, bornhost=112.21.20.248:43010, storetimestamp=1668254379066, storehost=101.1.12.202:10911, commitlogoffset=310292, bodycrc=1573027761, reconsumetimes=0, preparedtransactionoffset=0] 

3. 延时消息

延时消息,指我们将我们需要发送的发送消息延迟多少时间后接收方才能收到,其中一个应用场景就是分布式电商系统的下单——>支付, 例如:12306380玩彩网官网入口官网买车票,当我们购买一张车票后,后台会做车票库存扣减,但是如果我们只下单,不支付这就很要命,该买票的人买不到票,该卖出去的票没有卖出去;其实仔细一点就会发现,12306购买下单后,在规定时间没有完成支付,就会取消相应的订单, 然后做库存归还。

现在来看一下延迟消息怎么发送:

package main
import (
    "context"
    "fmt"
    "github.com/apache/rocketmq-client-go/v2"
    "github.com/apache/rocketmq-client-go/v2/primitive"
    "github.com/apache/rocketmq-client-go/v2/producer"
)
//sendmessage 生成消息,延迟消息
func sendmessage(q rocketmq.producer) {
    if err := q.start(); err != nil {
        panic("启动q生产者失败")
    }
    msg := primitive.newmessage("msg_test_hello", []byte("这是一个延迟消息"))
    //延迟时间级别
    //messagedelaylevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    msg.withdelaytimelevel(3)  //10s
    res, err := q.sendsync(context.background(), msg)
    if err != nil {
        fmt.printf("发送失败%s", err)
    }
    err = q.shutdown()
    if err != nil {
        fmt.printf("shutdown consumer error: %s", err.error())
    }
    fmt.println(res.string())
}
func main() {
    //初始化生产者
    q, err := rocketmq.newproducer(producer.withnameserver([]string{"101.1.12.202:9876"}))
    if err != nil {
        panic("生成q生产者失败")
    }
    sendmessage(q)
}

10秒后:

subscribe callback: [message=[topic=msg_test_hello, body=这是一个延迟消息, flag=0, properties=map[consume_start_time:1668256662984 delay:3 max_offset:5 min_offset:0 reareal_topic:msg_test_hello uniq_key:0a0251664be9000000003d12f2e00001]………

4.事务消息

什么是事务

事务是指是程序中一系列严密的逻辑操作,而且所有操作必须全部成功完成,否则在每个操作中所作的所有更改都会被撤消。可以通俗理解为:就是把多件事情当做一件事情来处理,好比大家同在一条船上,要活一起活,要完一起完

事物的四个特性(acid)

● 原子性(atomicity)操作这些指令时,要么全部执行成功,要么全部不执行。只要其中一个指令执行失败,所有的指令都执行失败,数据进行回滚,回到执行指令前的数据状态。

eg:拿转账来说,假设用户a和用户b两者的钱加起来一共是20000,那么不管a和b之间如何转账,转几次账,事务结束后两个用户的钱相加起来应该还得是20000,这就是事务的一致性。

● 一致性(consistency)事务的执行使数据从一个状态转换为另一个状态,但是对于整个数据的完整性保持稳定。

● 隔离性(isolation)隔离性是当多个用户并发访问数据库时,比如操作同一张表时,数据库为每一个用户开启的事务,不能被其他事务的操作所干扰,多个并发事务之间要相互隔离,可以使用锁机制来实现隔离,其实就是将并发场景下对数据操作的部分对并发请求进行串行化。

● 持久性(durability)当事务正确完成后,它对于数据的改变是永久性的。

eg: 例如我们在使用jdbc操作数据库时,在提交事务方法后,提示用户事务操作完成,当我们程序执行完成直到看到提示后,就可以认定事务以及正确提交,即使这时候数据库出现了问题,也必须要将我们的事务完全执行完成,否则就会造成我们看到提示事务处理完毕,但是数据库因为故障而没有执行事务的重大错误。

mq的事务消息

这里的事务消息实现接口:

type transactionlistener interface {
    //  when send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
    executelocaltransaction(*message) localtransactionstate
    // when no response to prepare(half) message. broker will send check message to check the transaction status, and this
    // method will be invoked to get local transaction status.
    checklocaltransaction(*messageext) localtransactionstate
}

我们的业务代码需要放在executelocaltransaction(*message) localtransactionstate方法中执行,对应返回相应的状态

const (
    commitmessagestate localtransactionstate = iota  1   //返回状态:事务执行成功发现消息
    rollbackmessagestate                                                          //返回状态:进行事务回查
    unknowstate                                                                                        //仍然会回查
)

我们回查机制业务需要在checklocaltransaction(*messageext) localtransactionstate方法中完成

下面我们来实现该接口(创建订单场景下):

package main
import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"
    "github.com/apache/rocketmq-client-go/v2"
    "github.com/apache/rocketmq-client-go/v2/primitive"
    "github.com/apache/rocketmq-client-go/v2/producer"
    "google.golang.org/grpc/codes"
)
//order 模拟订单
type order struct {
    ordersrvid string
    userid     int32
    goodsid    int32
    totalprice float32
    post       string
    address    string
    mobile     string
}
//orderlister 接口实现者,事务可以将一下配置\信息写入该结构体中
type orderlister struct {
    code codes.code      //返回状态码
    ctx  context.context //上下文数据
    id   int32           //订单id
}
//executelocaltransaction  when send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
func (o *orderlister) executelocaltransaction(msg *primitive.message) primitive.localtransactionstate {
    //执行本地业务逻辑
    fmt.println("开始执行本地逻辑")
    time.sleep(time.second * 3)
    orderinfo := order{}
    err := json.unmarshal(msg.body, &orderinfo)
    if err != nil {
        o.code = codes.unavailable
        log.fatal("解析失败:", err)
        //调用回查逻辑
        return primitive.rollbackmessagestate
    }
    fmt.println("订单信息:", orderinfo)
    fmt.println("本地逻辑执行成功")
    //commitmessagestate 提交信息至mq
    //commitmessagestate/rollbackmessagestate都不会回查
    return primitive.commitmessagestate
}
//checklocaltransaction when no response to prepare(half) message. broker will send check message to check the transaction status, and this method will be invoked to get local transaction status.
func (o *orderlister) checklocaltransaction(*primitive.messageext) primitive.localtransactionstate {
    //回查
    fmt.println("事务未通过,开始回查")
    return primitive.rollbackmessagestate
}
func (o *order) createorder(q rocketmq.transactionproducer) {
    order, err := json.marshal(o)
    if err != nil {
        panic("marshal fail")
    }
    msg := primitive.newmessage("msg_test_order", order)
    res, err := q.sendmessageintransaction(context.background(), msg)
    if err != nil {
        fmt.printf("发送失败%s", err)
    } else {
        fmt.println("发送成功", res.string())
    }
    time.sleep(time.hour)
    err = q.shutdown()
    if err != nil {
        panic("shutdown fail err")
    }
}
func main() {
    //初始化事务对象
    orderlister := &orderlister{}
    q, err := rocketmq.newtransactionproducer(orderlister,
        producer.withnameserver([]string{"101.1.12.202:9876"}), producer.withgroupname("msg_test"))
    if err != nil {
        panic("生成q生产者失败")
    }
    if err = q.start(); err != nil {
        panic("启动q生产者失败")
    }
    orderinfo := &order{
        ordersrvid: "343435",
        userid:     21,
        goodsid:    214,
        totalprice: 150.5,
        post:       "请尽快发货",
        address:    "无锡市",
        mobile:     "18389202834",
    }
    orderinfo.createorder(q)
}

执行输出:

开始执行本地逻辑
订单信息: {343435 21 214 150.5 请尽快发货 无锡市 18389202834}
本地逻辑执行成功
发送成功 sendresult [sendstatus=0, msgids=0a0266db4e24000000003da94f100001, offsetmsgid=010eb4ca00002a9f000000000004c28f, queueoffset=364, messagequeue=messagequeue [tomsg_test_order, brokername=broker-a, queueid=1]]

接收者接收到:

subscribe callback: [message=[topic=msg_test_order, body={"ordersrvid":"343435","userid":21,"goodsid":214,"totalprice":150.5,"post":"请尽快发货","address":"无锡市","mobile":"18389202834"}, flag=0, properties=map[consume_start_time:1668266524665 max_offset:1 min_offset:0 pgroup:msg_test real_qid:1 real_topic:msg_test_order tran_msg:true uniq_key:0a0266db4e24000000003da94f100001], transactionid=0a0266db4e24000000003da94f100001], msgid=0a0266db4e24000000003da94f100001…………
本作品采用《cc 协议》,转载必须注明作者和本文链接
刻意学习
本帖由系统于 2年前 自动加精
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!
网站地图