0
点赞
收藏
分享

微信扫一扫

Go——常见模式

杏花疏影1 2022-02-03 阅读 29

这里写目录标题

软件架构

  • 常用的架构模式,针对不同应用场景

pipe-filter

  • 模式特点
    1
  • 举个简单的例子
    2
    // filter.go
    package pipefilter
    
    // 过滤器的输入
    type Request interface{}
    
    // 过滤器的输出
    type Response interface{}
    
    type Filter interface{
      Process(data Request) (Response, error) // 方法:方法名,参数,返回值
    }
    
    // split_filter.go
    package pipefilter
    
    import "strings"
    import "errors"
    import "fmt"
    
    
    type SplitFilter struct {
      delimiter string  // 分隔符
    }
    
    var SplitFilterWrongFormatError = errors.New("input data is not string")
    
    func (sf *SplitFilter) Process(data Request) (Response, error) {
      str, ok := data.(string)    // 判断类型
      if !ok {
        return nil, SplitFilterWrongFormatError
      }
      parts := strings.Split(str, sf.delimiter)
      fmt.Println("after split: ", parts)
      return parts, nil
    }
    
    func NewSplitFilter(delimiter string) *SplitFilter {
      return &SplitFilter{delimiter}  // 初始化对象
    }
    
    // to_int_filter.go
    package pipefilter
    
    import "errors"
    import "strconv"
    import "fmt"
    
    var ToIntFilterWrongFormatError = errors.New("input data should be []string")
    
    type ToIntFilter struct {
    }
    
    func (toint *ToIntFilter) Process(data Request)  (Response, error) {
      strs, ok := data.([]string)    // 判断类型
      if !ok {
        return nil, ToIntFilterWrongFormatError
      }
      ret := []int{}
      for _, str := range strs {  // 第二个是元素值
        s, err := strconv.Atoi(str)
        if err != nil {
          return nil, err
        }
        ret = append(ret, s)
      }
      fmt.Println("after to int ",ret)
      return ret, nil // 返回int化切片
    }
    
    func NewToIntFilter() *ToIntFilter {
      return &ToIntFilter{}
    }
    
    // sum_filter.go
    package pipefilter
    
    import "errors"
    import "fmt"
    
    type SumFilter struct {
    }
    
    var SumFilterWrongFormatError = errors.New("sum is not right")
    
    func (toint *SumFilter) Process(data Request)  (Response, error) {
      i, ok := data.([]int)    // 判断类型
      if !ok {
        return nil, SumFilterWrongFormatError
      }
      ret := 0
      for _, elem := range i {  // _ 是索引
        ret += elem
      }
      fmt.Println("after sum: ",ret)
      return ret, nil // 返回求和
    }
    
    func NewSumFilter() *SumFilter {
      return &SumFilter{}
    }
    
    // stright_pipeline.go
    package pipefilter
    
    import "fmt"
    
    type StrightPipeline struct {
      Name string
      Filters *[]Filter
    }
    
    func NewStrightPipeline(name string, filters ...Filter) *StrightPipeline {
      return &StrightPipeline {
        Name: name, // pipeline 名称
        Filters: &filters,
      }
    }
    
    func (sp *StrightPipeline) Process(data Request)  (Response, error) {
      // var ret interface{}
      var err error
      // 逐个执行filter
      for _, filter := range *sp.Filters {	// 不定长参数,传指针
        ret, err := filter.Process(data)
        fmt.Println(ret)
        if err != nil {
          return nil, err
        }
        data = ret  // data,作为下个filter输入
      }
      return data, err // 返回求和
    }
    
    // filter_test.go
    package pipefilter
    
    import "testing"
    
    func TestPipelineFilter(t *testing.T) {
      spliter := NewSplitFilter(",")  // delimiter
      convert := NewToIntFilter()
      sum := NewSumFilter()
      sp := NewStrightPipeline("p1", spliter, convert, sum)
      ret, err := sp.Process("1,2,3,4,5")
      t.Logf("%T", ret)
      if err != nil {
        t.Fatal(err)  // 结束
      }
      if ret != 15 {
        t.Fatalf("Sum is not right! expected is 15, actual is %d", ret)
      }
    }
    
    • 从上面的例子就可以理解:只跟数据格式耦合

micro kernel

  • 特点:易于扩展,错误隔离,保持架构一致性
  • 代码给哥整懵了
// agent.go
package micro

type Event struct {
  name        string
  content     string
}

type EventReceiver interface {  // 事件收集器
  OnEvent(e Event)
}

type Collector interface {  // 要集成的插件 plugin
  Init(evt EventReceiver) error
  Start(agtCtx context.Context) error  // context 方便停止协程
  Stop()  error
  Destory() error
}

type Agent struct { // micro kernel 会集成一些Collector
  collectors map[string]Collector
  evtBuff    chan Event
  cancel     context.CancelFunc
  ctx        context.Context
  state      int
}

func (agt *Agent) EventProcessGroutine() {
  var evtSeg [10]Event
  for {
    for i:=0; i<10; i++ {
      select {
      case evtSeg[i] = <-agt.evtBuff:  // chan
      case <-agt.ctx.Done():
        return
      }
    }
  }
}

func NewAgent(sizeEvtBuf int) *Agent {
  agt := Agent {
    collectors: map[string]Collector{},
    evtBuff:    make(chan Event, sizeEvtBuf),
    state:      Waiting,
  }
  return &agt // 取址,返回的是个指针
}

var WrongStateError = errors.New("wrong state, the collector was already reigstered")

func (agt *Agent) RegisterCollector(name string, collector Collector) error {
  if agt.state != Waiting {
    return WrongStateError
  }
  agt.collectors[name] = collector
  return collector.Init(agt)  // 初始化插件
}

func (agt *Agent) startCollectors() error {
  var err error
  var errs CollectorsError
  var mutex sync.Mutex
  for name, collector := range agt.collectors {
    go func(name string, collector Collector, ctx context.Context) {
      defer func() {
        mutex.Unlock()
      }() // 别忘了调用
      err = collector.Start(ctx)
      mutex.Lock()
      if err != nil {
        errs.CollectorErros = append(errs.CollectorErrors, errors.New(name + ":" + err.Error()))
      }
    }(name, collector, agt.ctx)
  }
  return errs
}

func (agt *Agent) stopCollectors() error {
  var err error
  var errs CollectorsError
  for name, collector := range agt.collectors {
    if err = collector.Stop(); err != nil {
      errs.CollectorsErrors = append(errs.CollectorErrors, errors.New(name + ":" + err.Error()))
    }
  }
  return errs
}

func (agt *Agent) destoryCollectors() error {
  var err error
  var errs CollectorsError
  for name, collector := range agt.collectors {
    if err = collector.Destory(); err != nil {
      errs.CollectorsErrors = append(errs.CollectorErrors, errors.New(name + ":" + err.Error()))
    }
  }
  return errs
}

func (agt *Agent) Start() error { // 只是从agent层面管理的,所以调用的还是collector自身的方法
  if agt.state != Waiting {
    return WrongStateError
  }
  agt.state = Running
  agt.ctx, agt.cancel = context.WithCancel(context.Background())
  go agt.EventProcessGroutine()
  return agt.startCollectors()
}

func (agt *Agent) Stop() error {
  if agt.state != Running {
    return WrongStateError
  }
  agt.state = Waiting
  agt.cancel()
  return agt.stopCollectors()
}

func (agt *Agent) Destory() error {
  if agt.state != Running {
    return WrongStateError
  }
  return agt.destoryCollectors()
}

func (agt *Agent) OnEvent(evt Event) error {  // 事件交给调度中心,准备处理?
  agt.evtBuff <- evt
}
// agent_test.go
package micro

import "errors"
import "context"
import "fmt"
import "testing"
import "time"

type DemoCollector struct {
  evtReceiver EventReceiver
  agtCtx      context.Context
  stopChan    chan struct{}
  name        string
  content     string
}

func Newcollector(name string, content string) *DemoCollector { // &
  return &DemoCollector{  // 直接初始化
    stopChan: make(chan struct{}), // 其他属性不用赋值
    name: name,
    content: content,
  }
}

func (c *DemoCollector) Start(agtCtx context.Context) error {
  fmt.Println("start collect", c.name)
  for {
    select {
    case <- agtCtx.Done():
      c.stopChan <- struct{}{}  // 初始化一个
      break
    default:
      time.Sleep(time.Millisecond * 50)
      c.evtReceiver.OnEvent(Event{c.name, c.content})
    }
  }
}

func (c *DemoCollector) Stop(agtCtx context.Context) error {
  fmt.Println("stop collect", c.name)
  select {
  case <- c.stopChan:
    return nil
  case <-time.Sleep(time.Millisecond * 1000): // 超时控制
    return errors.New("timeout when stop")
  }
}

func (c *DemoCollector) Destory() error {
  fmt.Println("release resource", c.name)
  return nil
}

func TestAgent(t *testing.T) {
  agt := NewAgent(100)
  c1 := NewCollect("c1", "1")
  c2 := NewCollect("c2", "2")
  agt.RegisterCollector("c1",c1)
  agt.RegisterCollector("c2",c2)
  agt.Start()
  fmt.Println(agt.Start())
  time.Sleep(time.Millisecond * 1000)
  agt.Stop()
  agt.Destory()
}
举报

相关推荐

0 条评论