聊聊gost的generictaskpool

codecraft      2022-02-13     146

关键词:

本文主要研究一下gost的GenericTaskPool

GenericTaskPool

gost/sync/task_pool.go

// GenericTaskPool represents an generic task pool.
type GenericTaskPool interface {
    // AddTask wait idle worker add task
    AddTask(t task) bool
    // AddTaskAlways add task to queues or do it immediately
    AddTaskAlways(t task)
    // AddTaskBalance add task to idle queue
    AddTaskBalance(t task)
    // Close use to close the task pool
    Close()
    // IsClosed use to check pool status.
    IsClosed() bool
}
GenericTaskPool接口定义了AddTask、AddTaskAlways、AddTaskBalance、Close、IsClosed接口

TaskPool

gost/sync/task_pool.go

type TaskPool struct {
    TaskPoolOptions

    idx    uint32 // round robin index
    qArray []chan task
    wg     sync.WaitGroup

    once sync.Once
    done chan struct{}
}

// return false when the pool is stop
func (p *TaskPool) AddTask(t task) (ok bool) {
    idx := atomic.AddUint32(&p.idx, 1)
    id := idx % uint32(p.tQNumber)

    select {
    case <-p.done:
        return false
    default:
        p.qArray[id] <- t
        return true
    }
}

func (p *TaskPool) AddTaskAlways(t task) {
    id := atomic.AddUint32(&p.idx, 1) % uint32(p.tQNumber)

    select {
    case p.qArray[id] <- t:
        return
    default:
        goSafely(t)
    }
}

// do it immediately when no idle queue
func (p *TaskPool) AddTaskBalance(t task) {
    length := len(p.qArray)

    // try len/2 times to lookup idle queue
    for i := 0; i < length/2; i++ {
        select {
        case p.qArray[rand.Intn(length)] <- t:
            return
        default:
            continue
        }
    }

    goSafely(t)
}

// check whether the session has been closed.
func (p *TaskPool) IsClosed() bool {
    select {
    case <-p.done:
        return true

    default:
        return false
    }
}

func (p *TaskPool) Close() {
    p.stop()
    p.wg.Wait()
    for i := range p.qArray {
        close(p.qArray[i])
    }
}
TaskPool定义了TaskPoolOptions、idx、qArray、wg、once、done属性;它实现了GenericTaskPool接口;AddTask方法在pool是done的时候会返回false,其余情况会递增idx,然后根据tQNumber计算id,往qArray[id]写入task;AddTaskAlways方法会忽略pool的关闭信息;AddTaskBalance方法会尝试len/2次随机往qArray写入task,都写入不成功则goSafely执行;IsClosed主要是读取done的channel信息;Close方法执行stop及wg.Wait(),最后遍历qArray挨个执行close

NewTaskPool

gost/sync/task_pool.go

func NewTaskPool(opts ...TaskPoolOption) GenericTaskPool {
    var tOpts TaskPoolOptions
    for _, opt := range opts {
        opt(&tOpts)
    }

    tOpts.validate()

    p := &TaskPool{
        TaskPoolOptions: tOpts,
        qArray:          make([]chan task, tOpts.tQNumber),
        done:            make(chan struct{}),
    }

    for i := 0; i < p.tQNumber; i++ {
        p.qArray[i] = make(chan task, p.tQLen)
    }
    p.start()

    return p
}
NewTaskPool通过TaskPoolOptions来创建TaskPool

TaskPoolOption

gost/sync/options.go

const (
    defaultTaskQNumber = 10
    defaultTaskQLen    = 128
)

/////////////////////////////////////////
// Task Pool Options
/////////////////////////////////////////

// TaskPoolOptions is optional settings for task pool
type TaskPoolOptions struct {
    tQLen      int // task queue length. buffer size per queue
    tQNumber   int // task queue number. number of queue
    tQPoolSize int // task pool size. number of workers
}

func (o *TaskPoolOptions) validate() {
    if o.tQPoolSize < 1 {
        panic(fmt.Sprintf("illegal pool size %d", o.tQPoolSize))
    }

    if o.tQLen < 1 {
        o.tQLen = defaultTaskQLen
    }

    if o.tQNumber < 1 {
        o.tQNumber = defaultTaskQNumber
    }

    if o.tQNumber > o.tQPoolSize {
        o.tQNumber = o.tQPoolSize
    }
}

type TaskPoolOption func(*TaskPoolOptions)

// WithTaskPoolTaskPoolSize set @size of the task queue pool size
func WithTaskPoolTaskPoolSize(size int) TaskPoolOption {
    return func(o *TaskPoolOptions) {
        o.tQPoolSize = size
    }
}

// WithTaskPoolTaskQueueLength set @length of the task queue length
func WithTaskPoolTaskQueueLength(length int) TaskPoolOption {
    return func(o *TaskPoolOptions) {
        o.tQLen = length
    }
}

// WithTaskPoolTaskQueueNumber set @number of the task queue number
func WithTaskPoolTaskQueueNumber(number int) TaskPoolOption {
    return func(o *TaskPoolOptions) {
        o.tQNumber = number
    }
}
TaskPoolOptions定义了tQLen、tQNumber、tQPoolSize属性,提供了WithTaskPoolTaskPoolSize、WithTaskPoolTaskQueueLength、WithTaskPoolTaskQueueNumber、validate方法

start

gost/sync/task_pool.go

func (p *TaskPool) start() {
    for i := 0; i < p.tQPoolSize; i++ {
        p.wg.Add(1)
        workerID := i
        q := p.qArray[workerID%p.tQNumber]
        p.safeRun(workerID, q)
    }
}

func (p *TaskPool) safeRun(workerID int, q chan task) {
    gxruntime.GoSafely(nil, false,
        func() {
            err := p.run(int(workerID), q)
            if err != nil {
                // log error to stderr
                log.Printf("gost/TaskPool.run error: %s", err.Error())
            }
        },
        nil,
    )
}
start方法根据tQPoolSize挨个执行safeRun;safeRun方法通过GoSafely执行p.run(int(workerID), q)

run

gost/sync/task_pool.go

// worker
func (p *TaskPool) run(id int, q chan task) error {
    defer p.wg.Done()

    var (
        ok bool
        t  task
    )

    for {
        select {
        case <-p.done:
            if 0 < len(q) {
                return fmt.Errorf("task worker %d exit now while its task buffer length %d is greater than 0",
                    id, len(q))
            }

            return nil

        case t, ok = <-q:
            if ok {
                func() {
                    defer func() {
                        if r := recover(); r != nil {
                            fmt.Fprintf(os.Stderr, "%s goroutine panic: %v\n%s\n",
                                time.Now(), r, string(debug.Stack()))
                        }
                    }()
                    t()
                }()
            }
        }
    }
}
run方法通过for循环进行select,若读取到p.done则退出循环;若是读取到task则执行task

taskPoolSimple

gost/sync/task_pool.go

type taskPoolSimple struct {
    work chan task     // task channel
    sem  chan struct{} // gr pool size

    wg sync.WaitGroup

    once sync.Once
    done chan struct{}
}
taskPoolSimple定义了work、sem、wg、once、done属性;它实现了GenericTaskPool接口;AddTask方法先判断done,之后写入work及sem;AddTaskAlways方法在select不到channel的时候会执行goSafely;AddTaskBalance方法实际执行的是AddTaskAlways;IsClosed方法读取done信息;Close方法执行stop及wg.Wait()

实例

gost/sync/task_pool_test.go

func TestTaskPool(t *testing.T) {
    numCPU := runtime.NumCPU()
    //taskCnt := int64(numCPU * numCPU * 100)

    tp := NewTaskPool(
        WithTaskPoolTaskPoolSize(1),
        WithTaskPoolTaskQueueNumber(1),
        WithTaskPoolTaskQueueLength(1),
    )

    //task, cnt := newCountTask()
    task, _ := newCountTask()

    var wg sync.WaitGroup
    for i := 0; i < numCPU*numCPU; i++ {
        wg.Add(1)
        go func() {
            for j := 0; j < 100; j++ {
                ok := tp.AddTask(task)
                if !ok {
                    t.Log(j)
                }
            }
            wg.Done()
        }()
    }
    wg.Wait()
    tp.Close()

    //if taskCnt != atomic.LoadInt64(cnt) {
    //    //t.Error("want ", taskCnt, " got ", *cnt)
    //}
}

func TestTaskPoolSimple(t *testing.T) {
    numCPU := runtime.NumCPU()
    taskCnt := int64(numCPU * numCPU * 100)

    tp := NewTaskPoolSimple(1)

    task, cnt := newCountTask()

    var wg sync.WaitGroup
    for i := 0; i < numCPU*numCPU; i++ {
        wg.Add(1)
        go func() {
            for j := 0; j < 100; j++ {
                ok := tp.AddTask(task)
                if !ok {
                    t.Log(j)
                }
            }
            wg.Done()
        }()
    }
    wg.Wait()

    cntValue := atomic.LoadInt64(cnt)
    if taskCnt != cntValue {
        t.Error("want ", taskCnt, " got ", cntValue)
    }
}
TaskPoolSimple的创建比较简单,只需要提供size参数即可;TaskPool的创建需要提供TaskPoolOption,有WithTaskPoolTaskPoolSize、WithTaskPoolTaskQueueNumber、WithTaskPoolTaskQueueLength这些option

小结

gost的GenericTaskPool接口定义了AddTask、AddTaskAlways、AddTaskBalance、Close、IsClosed接口;这里有TaskPool、taskPoolSimple两个实现。

doc

  • gost

聊聊dbsync的schedulable

序本文主要研究一下dbsync的SchedulableSchedulable//SchedulablerepresentanabstractionthatcanbescheduletypeSchedulablestruct{URLstringIDstring*contract.SyncSchedule*contract.ScheduleStatusstringstatusuint32}//NewS 查看详情

聊聊storagetapper的cache

序本文主要研究一下storagetapper的cachecachestoragetapper/pipe/cache.gotypecacheEntrystruct{pipePipecfgconfig.PipeConfig}varcachemap[string]cacheEntryvarlocksync.Mutexcache是一个cacheEntry的map,cacheEntry定义了Pipe和con 查看详情

聊聊storagetapper的server

序本文主要研究一下storagetapper的serverserverstoragetapper/server/server.govarserver*http.Servervarmutex=sync.Mutex{}funcinit(){http.HandleFunc("/health",healthCheck)http.HandleFunc("/schema",schemaCmd)http.Han 查看详情

聊聊storagetapper的pool

序本文主要研究一下storagetapper的poolThreadstoragetapper/pool/pool.gotypeThreadinterface{Start(muint,ffunc())Adjust(muint)Terminate()boolNumProcs()uint}Thread接口定义了Start、Adjust、Terminate、NumProcs方法poolstoragetap 查看详情

聊聊vue中的数据代理

今天和大家聊聊Vue中的数据代理,什么是数据代理数据代理:通过一个对象代理对另外一个对象中属性的操作Object.defineProperty这个方法也是Vue数据双向绑定原理的常见面试题。今天和大家聊聊Vue中的数据代理,什么是数据代理数... 查看详情

聊聊hystrix的源码(代码片段)

聊聊Hystrix的源码今天我们说一下Hystrix的源码的内容@EnableCircuitBreaker注解需要使用Hystrix的时候,需要我们通过@EnableCircuitBreaker来开启断路器,那么我们看一下这个注解:@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Inheri... 查看详情

聊聊mybatis的事务模块(代码片段)

@[TOC]聊聊Mybatis的事务模块mybatis定义了自己的事务接口来实现事务,这里同样也使用了工厂模式工厂模式中的产品Transaction接口:publicinterfaceTransactionConnectiongetConnection()throwsSQLException;voidcommit()throwsSQLException;voidrollback()throwsSQ 查看详情

聊聊go.cqrs的dispatcher

序本文主要研究一下go.cqrs的DispatcherDispatchertypeDispatcherinterface{Dispatch(CommandMessage)errorRegisterHandler(CommandHandler,...interface{})error}Dispatcher接口定义了Dispatch、RegisterHandler方法InMemoryDispatche 查看详情

聊聊基于lucene的搜索引擎核心技术实践

最近公司用到了ES搜索引擎,由于ES是基于Lucene的企业搜索引擎,无意间在“聊聊架构”微信公众号里发现了这篇文章,分享给大家。请点击链接:聊聊基于Lucene的搜索引擎核心技术实践 查看详情

聊聊mybatis的basestatementhandler的三个子类(代码片段)

@[TOC]聊聊Mybatis的BaseStatementHandler的三个子类今天我们继续聊StatementHandler的实现类PreparedStatementHandlerPreparedStatementHandler处理的是包含?占位符的sql语句,所以它需要进行参数绑定:@Overridepublicvoidparameterize(Statementstatement)thro 查看详情

聊聊hystrix

聊聊HystrixHystrix也是SpringCloud框架中的重要组件,它的功能有跳闸机制,也就是当服务的错误率超过一定的阈值的时候,Hystrix在一段时间内停止请求这个服务,它还有资源隔离的功能,也就是每个方法都可以使用一个小型的线程... 查看详情

聊聊typescript中的类型保护(代码片段)

聊聊TypeScript中的类型保护在TypeScript中使用联合类型时,往往会碰到这种尴尬的情况:interfaceBird //独有方法fly(); //共有方法layEggs();interfaceFish //独有方法swim(); //共有方法layEggs();functiongetSmallPet():Fish|Bird//...letpet=getSmallPet() 查看详情

聊聊mybatis的总体流程(代码片段)

@[TOC]聊聊Mybatis的总体流程我们前几篇文章分析了各个模块,今天我们吧这几个模块串起来,看看这些模块是怎么被Mybatis使用的我们先看一下Mybatis是怎么使用的StringconfigName="mybatis_config.xml";Readerreader=Resources.getResourceAsReader(configNam... 查看详情

聊聊mybatis的数据源之工厂模式

@[TOC]聊聊Mybatis的数据源之工厂模式工厂模式是比较简单的设计模式,Mybatis的数据源的部分使用了工厂模式工厂模式的工厂DataSourceFactory是工厂角色的接口层publicinterfaceDataSourceFactoryvoidsetProperties(Propertiesprops);DataSourcegetDataSource();... 查看详情

“匿名聊聊”作者谈如何打造现象级爆款小程序

  前段时间小程序“匿名聊聊”刷爆了朋友圈,可惜后面被屏蔽了。作为第一款现象级呈现爆炸级传播的小程序它是如何做到的呢?我们就跟随“匿名聊聊”作者来聊聊如何打造现象级爆款小程序。  作为第一... 查看详情

个人经历|聊聊我的安全成长之路

...天,看到TSRC的小密圈里发起了一个“大神”活动,聊聊初入行时,你心目中/身边的大神,刚看到活动的时候我也去参与了。在这里,重新梳理一下,也借此机会聊聊零基础的我是如何踏入安全行业,总结一下自身的成长... 查看详情

聊聊ribbon源码解读(代码片段)

聊聊Ribbon源码解读要说当今最流行的组件当然是SpringCloud,要说框架中最流行的负载均衡组件,非Ribbon莫属了@LoadBalanced注解当我们使用Ribbon的时候,spring中注入RestTemplate,并在上边添加@LoadBalanced,这样使用RestTemplate发送请求的... 查看详情

聊聊mybatis缓存(代码片段)

聊聊MyBatis缓存你好,我是悟空。本文主要内容如下:一、MyBatis缓存中的常用概念MyBatis缓存:它用来优化SQL数据库查询的,但是可能会产生脏数据。SqlSession:代表和数据库的一次会话,向用户提供了操作数据库的方法。MappedState... 查看详情