0
点赞
收藏
分享

微信扫一扫

Asynq实现Go异步crontab定时任务

最近在用Go写运维平台, 需要在Go应用程序中非同步处理任务, go cron并不能满足我的需求,于是在github发现了Asynq库。 让我们来动手实验asynq的用法吧!

Asynq是一个Go库,用于将任务排队并与工作者异步处理它们。它由Redis支持,设计成可扩展且易于上手。

任务队列被用作跨多台机器分配工作的机制。

一个系统可以由多个工作服务器和代理组成,从而实现高可用性和水平扩展。


Highlevel overview of how Asynq works:

  • Client puts task on a queue
  • Server pulls task off queues and starts a worker goroutine for each task
  • Tasks are processed concurrently by multiple workers

Asynq实现Go异步crontab定时任务_golang crontab


快速开始

首先,确保你在本地运行Redis服务器。

$ redis-server


安装Asynq库

go get -u github.com/hibiken/asynq
go get -u github.com/hibiken/asynq/tools/asynqmon


创建项目

mkdir ziji && cd ziji
go mod init ziji
mkdir tasks
touch tasks/beta.go tasks/worker.go tasks/task.go

Asynq实现Go异步crontab定时任务_golang crontab_02


Redis 连接选项

Asynq 使用 Redis 作为消息代理, beta.go worker.go 都需要连接到 Redis 进行写入和读取。

我们将使用 RedisClientOpt 指定如何连接到本地 Redis 实例。

beta.go

package tasks

import (
"github.com/hibiken/asynq"
"log"
"pigs/common"
"pigs/models/cmdb"
)

func TaskBeta() {
c := common.CONFIG.Redis
// 周期性任务
scheduler := asynq.NewScheduler(
asynq.RedisClientOpt{
Addr: c.Host,
Username: c.UserName,
Password: c.PassWord,
DB: c.DB,
}, nil)

var account cmdb.CloudPlatform
common.DB.Table("cloud_platform").Where("enable != ? and type = ?", 0, "aliyun").Find(&account)
syncResource := NewAliCloudTask(&account)
// 每隔5分钟同步一次
entryID, err := scheduler.Register("*/5 * * * *", syncResource)

if err != nil {
log.Fatal(err)
}
log.Printf("registered an entry: %q\n", entryID)

if err := scheduler.Run(); err != nil {
log.Fatal(err)
}
}


  • NewScheduler

将运行调度程序,用于定期处理任务。调度器定期对任务排队,然后由集群中可用的工作服务器执行。

  • 时区

默认情况下,定期任务计划使用UTC时区,更改默认时区可以使用SchedulerOpts参数

scheduler.Register 接受三个参数,cron时间任务队列名asynq.Queue("cloud")

// Example of using America/Los_Angeles timezone instead of the default UTC timezone.
loc, err := time.LoadLocation("America/Los_Angeles")
if err != nil {
panic(err)
}
scheduler := asynq.NewScheduler(
redisConnOpt,
&asynq.SchedulerOpts{
Location: loc,
},
)


task.go

package tasks

import (
"context"
"encoding/json"
"github.com/hibiken/asynq"
"log"
"pigs/inner/cloud/cloudsync"
"pigs/inner/cloud/cloudvendor"
"pigs/models/cmdb"
)

const (
SyncAliYunCloud = "cmdb:aliyun"
SyncTencentCloud = "cmdb:tencent"
)

// NewAliCloudTask 同步阿里云资产同步任务
func NewAliCloudTask(conf *cmdb.CloudPlatform) *asynq.Task {
payload, err := json.Marshal(conf)
if err != nil {
panic(err)
}
return asynq.NewTask(SyncAliYunCloud, payload)
}

func HandleAliCloudTask(ctx context.Context, t *asynq.Task) error {

var a cmdb.CloudPlatform
if err := json.Unmarshal(t.Payload(), &a); err != nil {
return err
}

_, err := cloudvendor.GetVendorClient(&a)
if err != nil {
log.Fatalf("AccountVerify GetVendorClient failed,%v", err)
return err
}

cloudsync.SyncAliYunHost(&a)

log.Printf("Aliyun Cloud assets are successfully synchronized...")
return nil
}


Tasks 任务

在 asynq 中,工作单元被封装为 Task 类型。

其中有两个字段:“类型” 和 “有效载荷”。

// Task represents a task to be performed.
type Task struct {
// Type indicates the type of a task to be performed.
Type string

// Payload holds data needed to perform the task.
Payload Payload
}

Type 是一个简单的字符串值,指示给定任务的类型。Payload 保存执行任务所需的数据,您可以将其视为 map[string]interface{}。需要注意的重要一件事是有效负载值必须是可序列化的。


worker.go

package tasks

import (
"context"
"pigs/common"
"time"

"log"
"os"
"os/signal"

"github.com/hibiken/asynq"
"golang.org/x/sys/unix"
)

// loggingMiddleware 记录任务日志中间件
func loggingMiddleware(h asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
start := time.Now()
log.Printf("Start processing %q", t.Type())
err := h.ProcessTask(ctx, t)
if err != nil {
return err
}
log.Printf("Finished processing %q: Elapsed Time = %v", t.Type(), time.Since(start))
return nil
})
}

func TaskWorker() {
c := common.CONFIG.Redis
srv := asynq.NewServer(
asynq.RedisClientOpt{
Addr: c.Host,
Username: c.UserName,
Password: c.PassWord,
DB: c.DB,
},
asynq.Config{Concurrency: 20},
)

mux := asynq.NewServeMux()
mux.Use(loggingMiddleware)
// 任务执行时的handle
mux.HandleFunc(SyncAliYunCloud, HandleAliCloudTask)

// start server
if err := srv.Start(mux); err != nil {
log.Fatalf("could not start server: %v", err)
}

// Wait for termination signal.
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP)
for {
s := <-sigs
if s == unix.SIGTSTP {
srv.Shutdown()
continue
}
break
}

// Stop worker server.
srv.Stop()
}


示例

建立任务使用NewTask方法,并为任务传递类型和有效负载。 可以通过Client.Schedule传入任务和需要处理的时间来计划任务

func main() {
client := asynq.NewClient(redis)

// 创建任务,声明任务类型,有效负载
t1 := asynq.NewTask("send_register_email", map[string]interface{}{"userName": "zhangsan"})
t2 := asynq.NewTask("send_forget_email", map[string]interface{}{"userName": "zhangsan"})

// 立即处理任务
err := client.Enqueue(t1, time.Now())
if err != nil {
log.Fatal(err)
}

// 2小时后处理任务, 延迟任务
err := client.Enqueue(t2, asynq.ProcessIn(time.Now().Add(2 * time.Hour)))
if err != nil {
log.Fatal(err)
}
}


asynq.Client 支持三种调度任务的方法:EnqueueEnqueueInEnqueueAt

使用 client.Enqueue 将任务立即加入队列。

使用 client.EnqueueInclient.EnqueueAt 来安排将来要处理的任务。 EnqueueAt支持 2021-11-11 15:10:00 时间格式定时执行任务

// 定时任务
package tasks

import (
"fmt"
"github.com/hibiken/asynq"
"log"
"pigs/common"
"pigs/models/cmdb"
"time"
)

type EmailTaskPayloadTest struct {
UserID int64
Msg string
}

func TaskBeta() {
client := asynq.NewClient(
asynq.RedisClientOpt{
Addr: ":6379",
Password: "",
})

payload, err := json.Marshal(EmailTaskPayloadTest{
UserID: 100,
Msg: "test",
})
if err != nil {
log.Fatal(err)
}
t1 := asynq.NewTask("task:oneTask", payload)

// 定时执行任务时间
setDate := "2021-11-11 15:10:00"
dateFormats := "2006-01-02 15:04:05"

// 获取时区
loc, _ := time.LoadLocation("Local")

// 指定日期 转 当地 日期对象 类型为 time.Time
timeObj, err := time.ParseInLocation(dateFormats, setDate, loc)
if err != nil {
fmt.Println("parse time failed err :", err)
return
}

info, err := client.Enqueue(t1, asynq.ProcessAt(timeObj), asynq.Queue("test"))
if err != nil {
log.Fatal(err)
}

log.Printf(" [*] Successfully enqueued task: %+v", info)


启动任务

go run beta.go
go run worker.go

asynq: pid=1274467 2021/11/17 04:27:34.960443 INFO: Starting processing

2021/11/17 12:27:34 /home/risk/code/pigs/tasks/beat.go:25

2021/11/17 12:27:34 registered an entry: “fe209263-1561-4b94-8b72-98b29bab6efe”

asynq: pid=1274467 2021/11/17 04:27:34.962365 INFO: Scheduler starting

asynq: pid=1274467 2021/11/17 04:27:34.962371 INFO: Scheduler timezone is set to UTC

asynq: pid=1274467 2021/11/17 04:27:34.962379 INFO: Send signal TERM or INT to stop the scheduler


2021/11/17 12:45:13 Aliyun Cloud assets are successfully synchronized…

2021/11/17 12:45:13 Finished processing “cmdb:aliyun”: Elapsed Time = 12.471785932s


Web UI

​​Asynqmon​​​是一个基于web的工具,用于监视和管理Asynq队列和任务。有关详细信息,请参阅工具的​​README​​。

Asynq实现Go异步crontab定时任务_go定时任务_03



Asynq实现Go异步crontab定时任务_go定时任务_04



参考文档:

​​https://github.com/hibiken/asynq/wiki/Getting-Started​​

标签: ​​go asynq异步定时任务​​​, ​​go 异步任务​​​, ​​golang crontab​​​, ​​go一次性任务​​​, ​​go定时任务​​


扫码查看更多运维开发教程

Asynq实现Go异步crontab定时任务_golang crontab_05

举报

相关推荐

0 条评论