不可复用的简单实现

其实就是基于有缓冲channel的特性,每创建一个goroutine就向goroutine里面发送一个数据,goroutine任务完成之后就消费一条channel里面的数据,当channel满了之后就阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func main() {
// 最多50个goroutine
ch := make(chan struct{}, 100)
wg := sync.WaitGroup{}

for i := 0; i <= 1000; i++ {
ch <- struct{}{}
wg.Add(1)
go func(num int) {
defer func() {
wg.Done()
<-ch
}()
DoSomething(num)
}(i)
}
wg.Wait()
fmt.Println("Done")
}

func DoSomething(num int){
fmt.Printf("k....: %d \n", num)
time.Sleep(time.Second)
}

上面方案实现简单,但缺点是会不断创建和销毁goroutine,不可服用,性能较差。

可复用的pool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
func main(){
jobParams := make([]JobParams, 0)
jobParams = append(jobParams,
JobParams{ "张三", 18},
JobParams{ "李斯", 28},
JobParams{ "赵武", 8},
JobParams{ "王二", 1},
JobParams{ "利好", 7},
JobParams{ "六七", 2},
JobParams{ "问候", 23},
)
jobParamsCh := make(chan JobParams, 1000)

wg := sync.WaitGroup{}

wg.Add(1)
go func() {
defer wg.Done()
workPool(2, jobParamsCh)
}()

wg.Add(1)
go func() {
defer wg.Done()
for _, v := range jobParams {
jobParamsCh <- v
}
}()

wg.Wait()
fmt.Println("Done...")
}

func workPool(size int, jobParamCh chan JobParams) {
wg := sync.WaitGroup{}

for i := 1; i<= size; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case params := <-jobParamCh:
Job(params)
time.Sleep(time.Second)
default:
fmt.Println("wait...")
}
}
}()
}
wg.Wait()
}


type JobParams struct {
UserName string
Age int
}

func Job(params JobParams) {
// doSomethings for example:
fmt.Printf("Hello, I'm %s and %d years old. \n", params.UserName, params.Age)
}

其实就在在每个goroutine里面去获取需要处理的参数,然后调用对应的逻辑函数逐个处理。