0
点赞
收藏
分享

微信扫一扫

redis消息队列的处理逻辑

书写经典 2022-11-22 阅读 98


redis 作队列使用时,假设需要完成以下逻辑:

1.一端入,另一端出。
2.每一次读取队列内容长度10。
3. 读取完之后从redis队列里移除读取过的内容。

先说结论:

官方客户端命令的顺序应该是:
1.​​​rpush test 1 2 3 4 5 6 7 8 9 10 11 12​​​ // 压入test队列1-12 个数字
2.​​​lrange test 0 9​​​ // 获取队列的系数0-9,即1到10 十个数字,但是并没有移除!
3.​​​ltrim test 10 -1​​​ // 截取队列,从第10个开始到最后一个保留,其它的移除
以上,完成了redis消息队列的基本逻辑

疑问1: 为什么不直接lrange test 0 -1 获取全部队列,读完后 del test

假设先获取了全部队列,在删除该队列以前,如果同时有新的消息入列了,此消息将丢失,因为发生在获取全部队列以后,又随着删除而删除。

疑问2: 为什么不用lpush,而用rpush

一个队列,只有做到一端写入,一端读出,才能安全,如果在一端同时写读,那么在读后如果有新的消息入列了,执行截取时会把入列的消息移除.

疑问3: 执行lrange和itrim的系数如果远远大于length,会抛错吗?

不会,可以写lrange test 1000,-1和itrim 1000,-1,前者超容时,拿到长度为0的空[],后者保留空[]

疑问4: 如何保证处理队列后删除不会影响期间入列的数据?

在处理队列时,先获取当前队列长度,再根据长度拿出队列,根据长度移除,因为保证了一端插入一段拿出,并且lrange拿出时不会影响原队列长度,所以删除的时候,很干净地删除了指定长度的数据。

golang 执行同步队列操作

代码不补全了,缺少的model和入库信息自己脑补,实现的功能是把每日的redis内的访问日志迁移进数据库

go func() {
var total int
c := redistool.RedisPool.Get()
defer c.Close()
total, er := redis.Int(c.Do("LLEN", "test"))
if er != nil {
errorGarbage.Garbage.Add(errorx.Wrap(er))
return
}
if total == 0 {
return
}
var count int
var wg sync.WaitGroup
for {
if total > MILLION {
count = MILLION
} else {
count = total
}
wg.Add(1)
var l = sync.Mutex{}
// 迁移 count 条数据
go func(count int, wg *sync.WaitGroup, lk *sync.Mutex) {
defer wg.Done()
l.Lock()
defer l.UnLock()
// 获取 redis 内 request 队列数据
rs, er := c.Do("LRANGE", "test", 0, count-1)
if er != nil {
errorGarbage.Garbage.Add(errorx.Wrap(er))
return
}
res := rs.([]interface{})
var request middlewares.RequestInfo

txn, err := models.PQDB.Begin()
if err != nil {
errorGarbage.Garbage.Add(errorx.Wrap(err))
return
}

// 批量插入
stmt, err := txn.Prepare(pq.CopyIn("request_log", "header", "body", "method", "request_at", "url"))
if err != nil {
errorGarbage.Garbage.Add(errorx.Wrap(err))
return
}
for i := 0; i < len(res)-1; i++ {
err = json.Unmarshal(res[i].(json.RawMessage), &request)
if err != nil {
txn.Rollback()
errorGarbage.Garbage.Add(errorx.Wrap(err))
return
}
_, err = stmt.Exec(request.Header, request.Body, request.Method, request.RequestAt, request.Url)
if err != nil {
txn.Rollback()
errorGarbage.Garbage.Add(errorx.Wrap(err))
return
}
}

_, err = stmt.Exec()
if err != nil {
txn.Rollback()
errorGarbage.Garbage.Add(errorx.Wrap(err))
return
}

err = stmt.Close()
if err != nil {
txn.Rollback()
errorGarbage.Garbage.Add(errorx.Wrap(err))
return
}
err = txn.Commit()
if err != nil {
txn.Rollback()
errorGarbage.Garbage.Add(errorx.Wrap(err))
return
}

// 清除已存储的队列数据
_, err = c.Do("LTRIM", "test", count, -1)
if err != nil {
errorGarbage.Garbage.Add(errorx.Wrap(err))
return
}
}(count, &wg, l)

total = total - count
if total <= 0 {
break
}
}
wg.Wait()
}()


举报

相关推荐

0 条评论