0
点赞
收藏
分享

微信扫一扫

go 流处理器思路

流水线编程,场景适合的话能够让我们的代码结构更加清晰


package main

import (
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/tal-tech/go-zero/core/fx"
)

func main() {
    ch := make(chan int)

    go inputStream(ch)
    go outputStream(ch)

    c := make(chan os.Signal, 1)
    signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
    <-c
}

func inputStream(ch chan int) {
    count := 0
    for {
        ch <- count
        time.Sleep(time.Millisecond * 500)
        count++
    }
}

func outputStream(ch chan int) {
    fx.From(func(source chan<- interface{}) {  //生产资源
        for c := range ch {
            source <- c
        }
    }).Walk(func(item interface{}, pipe chan<- interface{}) {  //并发处理上一步生产的资源并写入pipe给下游继续加工(并发处理资源的work个数可以通过配置控制)
        count := item.(int)
        pipe <- count
    }).Filter(func(item interface{}) bool {  //串行的过滤资源
        itemInt := item.(int)
        if itemInt%2 == 0 {
            return true
        }
        return false
    }).ForEach(func(item interface{}) {  //串行的收集结果资源
        fmt.Println(item)
    })
}

举报

相关推荐

0 条评论