这里写目录标题
软件架构
- 常用的架构模式,针对不同应用场景
pipe-filter
- 模式特点
- 举个简单的例子
// filter.go package pipefilter // 过滤器的输入 type Request interface{} // 过滤器的输出 type Response interface{} type Filter interface{ Process(data Request) (Response, error) // 方法:方法名,参数,返回值 }
// split_filter.go package pipefilter import "strings" import "errors" import "fmt" type SplitFilter struct { delimiter string // 分隔符 } var SplitFilterWrongFormatError = errors.New("input data is not string") func (sf *SplitFilter) Process(data Request) (Response, error) { str, ok := data.(string) // 判断类型 if !ok { return nil, SplitFilterWrongFormatError } parts := strings.Split(str, sf.delimiter) fmt.Println("after split: ", parts) return parts, nil } func NewSplitFilter(delimiter string) *SplitFilter { return &SplitFilter{delimiter} // 初始化对象 }
// to_int_filter.go package pipefilter import "errors" import "strconv" import "fmt" var ToIntFilterWrongFormatError = errors.New("input data should be []string") type ToIntFilter struct { } func (toint *ToIntFilter) Process(data Request) (Response, error) { strs, ok := data.([]string) // 判断类型 if !ok { return nil, ToIntFilterWrongFormatError } ret := []int{} for _, str := range strs { // 第二个是元素值 s, err := strconv.Atoi(str) if err != nil { return nil, err } ret = append(ret, s) } fmt.Println("after to int ",ret) return ret, nil // 返回int化切片 } func NewToIntFilter() *ToIntFilter { return &ToIntFilter{} }
// sum_filter.go package pipefilter import "errors" import "fmt" type SumFilter struct { } var SumFilterWrongFormatError = errors.New("sum is not right") func (toint *SumFilter) Process(data Request) (Response, error) { i, ok := data.([]int) // 判断类型 if !ok { return nil, SumFilterWrongFormatError } ret := 0 for _, elem := range i { // _ 是索引 ret += elem } fmt.Println("after sum: ",ret) return ret, nil // 返回求和 } func NewSumFilter() *SumFilter { return &SumFilter{} }
// stright_pipeline.go package pipefilter import "fmt" type StrightPipeline struct { Name string Filters *[]Filter } func NewStrightPipeline(name string, filters ...Filter) *StrightPipeline { return &StrightPipeline { Name: name, // pipeline 名称 Filters: &filters, } } func (sp *StrightPipeline) Process(data Request) (Response, error) { // var ret interface{} var err error // 逐个执行filter for _, filter := range *sp.Filters { // 不定长参数,传指针 ret, err := filter.Process(data) fmt.Println(ret) if err != nil { return nil, err } data = ret // data,作为下个filter输入 } return data, err // 返回求和 }
// filter_test.go package pipefilter import "testing" func TestPipelineFilter(t *testing.T) { spliter := NewSplitFilter(",") // delimiter convert := NewToIntFilter() sum := NewSumFilter() sp := NewStrightPipeline("p1", spliter, convert, sum) ret, err := sp.Process("1,2,3,4,5") t.Logf("%T", ret) if err != nil { t.Fatal(err) // 结束 } if ret != 15 { t.Fatalf("Sum is not right! expected is 15, actual is %d", ret) } }
- 从上面的例子就可以理解:只跟数据格式耦合
micro kernel
- 特点:易于扩展,错误隔离,保持架构一致性
- 代码给哥整懵了
// agent.go
package micro
type Event struct {
name string
content string
}
type EventReceiver interface { // 事件收集器
OnEvent(e Event)
}
type Collector interface { // 要集成的插件 plugin
Init(evt EventReceiver) error
Start(agtCtx context.Context) error // context 方便停止协程
Stop() error
Destory() error
}
type Agent struct { // micro kernel 会集成一些Collector
collectors map[string]Collector
evtBuff chan Event
cancel context.CancelFunc
ctx context.Context
state int
}
func (agt *Agent) EventProcessGroutine() {
var evtSeg [10]Event
for {
for i:=0; i<10; i++ {
select {
case evtSeg[i] = <-agt.evtBuff: // chan
case <-agt.ctx.Done():
return
}
}
}
}
func NewAgent(sizeEvtBuf int) *Agent {
agt := Agent {
collectors: map[string]Collector{},
evtBuff: make(chan Event, sizeEvtBuf),
state: Waiting,
}
return &agt // 取址,返回的是个指针
}
var WrongStateError = errors.New("wrong state, the collector was already reigstered")
func (agt *Agent) RegisterCollector(name string, collector Collector) error {
if agt.state != Waiting {
return WrongStateError
}
agt.collectors[name] = collector
return collector.Init(agt) // 初始化插件
}
func (agt *Agent) startCollectors() error {
var err error
var errs CollectorsError
var mutex sync.Mutex
for name, collector := range agt.collectors {
go func(name string, collector Collector, ctx context.Context) {
defer func() {
mutex.Unlock()
}() // 别忘了调用
err = collector.Start(ctx)
mutex.Lock()
if err != nil {
errs.CollectorErros = append(errs.CollectorErrors, errors.New(name + ":" + err.Error()))
}
}(name, collector, agt.ctx)
}
return errs
}
func (agt *Agent) stopCollectors() error {
var err error
var errs CollectorsError
for name, collector := range agt.collectors {
if err = collector.Stop(); err != nil {
errs.CollectorsErrors = append(errs.CollectorErrors, errors.New(name + ":" + err.Error()))
}
}
return errs
}
func (agt *Agent) destoryCollectors() error {
var err error
var errs CollectorsError
for name, collector := range agt.collectors {
if err = collector.Destory(); err != nil {
errs.CollectorsErrors = append(errs.CollectorErrors, errors.New(name + ":" + err.Error()))
}
}
return errs
}
func (agt *Agent) Start() error { // 只是从agent层面管理的,所以调用的还是collector自身的方法
if agt.state != Waiting {
return WrongStateError
}
agt.state = Running
agt.ctx, agt.cancel = context.WithCancel(context.Background())
go agt.EventProcessGroutine()
return agt.startCollectors()
}
func (agt *Agent) Stop() error {
if agt.state != Running {
return WrongStateError
}
agt.state = Waiting
agt.cancel()
return agt.stopCollectors()
}
func (agt *Agent) Destory() error {
if agt.state != Running {
return WrongStateError
}
return agt.destoryCollectors()
}
func (agt *Agent) OnEvent(evt Event) error { // 事件交给调度中心,准备处理?
agt.evtBuff <- evt
}
// agent_test.go
package micro
import "errors"
import "context"
import "fmt"
import "testing"
import "time"
type DemoCollector struct {
evtReceiver EventReceiver
agtCtx context.Context
stopChan chan struct{}
name string
content string
}
func Newcollector(name string, content string) *DemoCollector { // &
return &DemoCollector{ // 直接初始化
stopChan: make(chan struct{}), // 其他属性不用赋值
name: name,
content: content,
}
}
func (c *DemoCollector) Start(agtCtx context.Context) error {
fmt.Println("start collect", c.name)
for {
select {
case <- agtCtx.Done():
c.stopChan <- struct{}{} // 初始化一个
break
default:
time.Sleep(time.Millisecond * 50)
c.evtReceiver.OnEvent(Event{c.name, c.content})
}
}
}
func (c *DemoCollector) Stop(agtCtx context.Context) error {
fmt.Println("stop collect", c.name)
select {
case <- c.stopChan:
return nil
case <-time.Sleep(time.Millisecond * 1000): // 超时控制
return errors.New("timeout when stop")
}
}
func (c *DemoCollector) Destory() error {
fmt.Println("release resource", c.name)
return nil
}
func TestAgent(t *testing.T) {
agt := NewAgent(100)
c1 := NewCollect("c1", "1")
c2 := NewCollect("c2", "2")
agt.RegisterCollector("c1",c1)
agt.RegisterCollector("c2",c2)
agt.Start()
fmt.Println(agt.Start())
time.Sleep(time.Millisecond * 1000)
agt.Stop()
agt.Destory()
}