1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| package main
import ( "errors" "fmt" "sync" "sync/atomic" "time" )
type throttle struct { D time.Duration C int64 Mu sync.Mutex Token chan bool num int64 maxNum int64 }
var ErrApplyTimeout = errors.New("apply token time out")
func NewThrottle(D time.Duration, C, maxNum int64) *throttle { instance := &throttle{ D: D, C: C, Token: make(chan bool, C), maxNum: maxNum, } go instance.reset() return instance }
func (t *throttle) reset() { ticker := time.NewTicker(t.D) for _ = range ticker.C { if t.num >= t.maxNum { continue } t.Mu.Lock() supply := t.C - int64(len(t.Token)) fmt.Printf("reset token:%d\n", supply) for supply > 0 { t.Token <- true supply-- } t.Mu.Unlock() } }
func (t *throttle) ApplyToken() (bool, error) { select { case <-t.Token: return true, nil case <-time.After(t.D * 2): return false, ErrApplyTimeout } }
func (t *throttle) Work(job func()) { if ok, err := t.ApplyToken(); !ok { fmt.Println(err) return } go func() { atomic.AddInt64(&t.num, 1) defer atomic.AddInt64(&t.num, -1) job() }() } func main() { t := NewThrottle(time.Second, 10, 20) for { t.Work(doWork) } }
func doWork() { fmt.Println(time.Now()) <-time.After(5 * time.Second) }
|