redis 作队列使用时,假设需要完成以下逻辑:
1.一端入,另一端出。
2.每一次读取队列内容长度10。
3. 读取完之后从redis队列里移除读取过的内容。
先说结论:
官方客户端命令的顺序应该是:
1.rpush test 1 2 3 4 5 6 7 8 9 10 11 12
// 压入test队列1-12 个数字
2.lrange test 0 9
// 获取队列的系数0-9,即1到10 十个数字,但是并没有移除!
3.ltrim test 10 -1
// 截取队列,从第10个开始到最后一个保留,其它的移除
以上,完成了redis消息队列的基本逻辑
疑问1: 为什么不直接lrange test 0 -1 获取全部队列,读完后 del test
假设先获取了全部队列,在删除该队列以前,如果同时有新的消息入列了,此消息将丢失,因为发生在获取全部队列以后,又随着删除而删除。
疑问2: 为什么不用lpush,而用rpush
一个队列,只有做到一端写入,一端读出,才能安全,如果在一端同时写读,那么在读后如果有新的消息入列了,执行截取时会把入列的消息移除.
疑问3: 执行lrange和itrim的系数如果远远大于length,会抛错吗?
不会,可以写lrange test 1000,-1和itrim 1000,-1,前者超容时,拿到长度为0的空[],后者保留空[]
疑问4: 如何保证处理队列后删除不会影响期间入列的数据?
在处理队列时,先获取当前队列长度,再根据长度拿出队列,根据长度移除,因为保证了一端插入一段拿出,并且lrange拿出时不会影响原队列长度,所以删除的时候,很干净地删除了指定长度的数据。
golang 执行同步队列操作
代码不补全了,缺少的model和入库信息自己脑补,实现的功能是把每日的redis内的访问日志迁移进数据库
go func() {
var total int
c := redistool.RedisPool.Get()
defer c.Close()
total, er := redis.Int(c.Do("LLEN", "test"))
if er != nil {
errorGarbage.Garbage.Add(errorx.Wrap(er))
return
}
if total == 0 {
return
}
var count int
var wg sync.WaitGroup
for {
if total > MILLION {
count = MILLION
} else {
count = total
}
wg.Add(1)
var l = sync.Mutex{}
// 迁移 count 条数据
go func(count int, wg *sync.WaitGroup, lk *sync.Mutex) {
defer wg.Done()
l.Lock()
defer l.UnLock()
// 获取 redis 内 request 队列数据
rs, er := c.Do("LRANGE", "test", 0, count-1)
if er != nil {
errorGarbage.Garbage.Add(errorx.Wrap(er))
return
}
res := rs.([]interface{})
var request middlewares.RequestInfo
txn, err := models.PQDB.Begin()
if err != nil {
errorGarbage.Garbage.Add(errorx.Wrap(err))
return
}
// 批量插入
stmt, err := txn.Prepare(pq.CopyIn("request_log", "header", "body", "method", "request_at", "url"))
if err != nil {
errorGarbage.Garbage.Add(errorx.Wrap(err))
return
}
for i := 0; i < len(res)-1; i++ {
err = json.Unmarshal(res[i].(json.RawMessage), &request)
if err != nil {
txn.Rollback()
errorGarbage.Garbage.Add(errorx.Wrap(err))
return
}
_, err = stmt.Exec(request.Header, request.Body, request.Method, request.RequestAt, request.Url)
if err != nil {
txn.Rollback()
errorGarbage.Garbage.Add(errorx.Wrap(err))
return
}
}
_, err = stmt.Exec()
if err != nil {
txn.Rollback()
errorGarbage.Garbage.Add(errorx.Wrap(err))
return
}
err = stmt.Close()
if err != nil {
txn.Rollback()
errorGarbage.Garbage.Add(errorx.Wrap(err))
return
}
err = txn.Commit()
if err != nil {
txn.Rollback()
errorGarbage.Garbage.Add(errorx.Wrap(err))
return
}
// 清除已存储的队列数据
_, err = c.Do("LTRIM", "test", count, -1)
if err != nil {
errorGarbage.Garbage.Add(errorx.Wrap(err))
return
}
}(count, &wg, l)
total = total - count
if total <= 0 {
break
}
}
wg.Wait()
}()