Concurrency
#Reading
#Books
#Articles
#Packages
#Talks
#Patterns, Examples, Usage, etc..
// - atomic.go -
package main
import (
"fmt"
"sync/atomic"
"time"
)
/*
Difference on blocking and non blocking counters.
Atomic operations.
*/
func main() {
var (
simpleCounter int32 = 0
atomicCounter int32 = 0
atomicCounterFunc = func() { atomic.AddInt32(&atomicCounter, 1) }
simpleCounterFunc = func() { simpleCounter++ }
)
for i := 0; i < 100000; i++ {
go atomicCounterFunc()
go simpleCounterFunc()
}
time.Sleep(2 * time.Millisecond)
fmt.Println("Results: Atomic", simpleCounter)
fmt.Println("Results: Simple", atomicCounter)
}
// - cond.go -
package main
import (
"fmt"
"sync"
)
// Example from the book - go concurrency patterns
type Button struct {
Clicked *sync.Cond
}
func main() {
button := Button{
Clicked: sync.NewCond(&sync.Mutex{}),
}
subscribe := func(c *sync.Cond, fn func()) {
var goroutineRunning sync.WaitGroup
goroutineRunning.Add(1)
go func() {
goroutineRunning.Done()
c.L.Lock()
defer c.L.Unlock()
c.Wait()
fn()
}()
goroutineRunning.Wait()
}
var clickRegistered sync.WaitGroup
clickRegistered.Add(3)
subscribe(button.Clicked, func() {
fmt.Println("Maximizing window.")
clickRegistered.Done()
})
subscribe(button.Clicked, func() {
fmt.Println("Displaying annoying dialog box!")
clickRegistered.Done()
})
subscribe(button.Clicked, func() {
fmt.Println("Mouse clicked.")
clickRegistered.Done()
})
button.Clicked.Broadcast()
clickRegistered.Wait()
}
// - context_timeout.go -
package main
import (
"context"
"log"
"math/rand"
"time"
)
// There are some chances that function will succeed and some changes that not.
func main() {
start := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(rand.Int63n(7))*time.Second)
defer func() {
log.Printf("ctx cancel: %-10s", time.Since(start))
cancel()
}()
Runner(ctx, start)
}
func Runner(ctx context.Context, start time.Time) {
var res = make(chan struct{})
go func() {
LongRunningFunction(ctx, start)
res <- struct{}{}
}()
select {
case <-res:
log.Printf("func done: %-10s", time.Since(start))
case <-ctx.Done():
log.Printf("ctx timeout: %s", ctx.Err())
log.Printf("ctx timeout: %-10s", time.Since(start))
}
}
func LongRunningFunction(ctx context.Context, start time.Time) {
time.Sleep(3 * time.Second)
}
// - once.go -
package main
import (
"fmt"
"runtime"
"strconv"
"sync"
"time"
)
func main() {
runtime.GOMAXPROCS(0)
var limit = 5
var t = struct {
sync.Once
sync.WaitGroup
Winner int
}{}
t.Add(limit)
for i := 1; i <= limit; i++ {
go func(n int) {
defer t.Done()
time.Sleep(35 * time.Microsecond)
t.Do(func() {
t.Winner = n
fmt.Println("Only", strconv.Itoa(t.Winner)+"th", "came to the finish")
})
}(i)
}
t.Wait()
}
// - promise.go -
package main
import (
"fmt"
"time"
)
// Promise like patterns, we returning a channel to get results of our goroutine.
func executor() chan struct{} {
var result = make(chan struct{})
go func(pipe chan<- struct{}) {
time.Sleep(1 * time.Second)
fmt.Println("Execution done")
pipe <- struct{}{}
}(result)
return result
}
func main() {
signal := executor()
<-signal
}
// - race_cond_mutex.go -
package main
import (
"fmt"
"sync"
)
func main() {
var (
counters = map[int]int{}
mutex = &sync.Mutex{}
limit = 5
waiter = &sync.WaitGroup{}
)
waiter.Add(limit)
for i := 0; i < limit; i++ {
go func(th int) {
defer waiter.Done()
for j := 0; j < limit; j++ {
mutex.Lock()
counters[th*10+j]++
mutex.Unlock()
}
}(i)
}
waiter.Wait()
fmt.Println("counters result", counters)
}
// - semaphores.go -
package main
import (
"context"
"log"
"math/rand"
"sync/atomic"
"time"
"golang.org/x/sync/semaphore"
)
func main() {
tot := int64(5)
sem := semaphore.NewWeighted(tot) // Total Semaphores available.
ctx := context.Background()
var val int32
for i := 0; i < 25; i++ {
if err := sem.Acquire(ctx, 1); err != nil {
log.Printf("Failed to acquire semaphore: %v", err)
break
}
go func(n int) {
defer func() {
sem.Release(1)
atomic.AddInt32(&val, -1)
}()
time.Sleep(time.Duration(rand.Int63n(1)) * time.Millisecond)
atomic.AddInt32(&val, 1)
log.Printf("%d) Current Counter is %d\n", n, atomic.LoadInt32(&val))
}(i)
}
// Check for released semaphores
if !sem.TryAcquire(tot) {
log.Printf("Still in progress")
}
// Check for released semaphores
if err := sem.Acquire(ctx, tot); err != nil {
log.Printf("Failed to acquire semaphore: %v", err)
} else {
log.Printf("We are done")
}
}
// - tick.go -
package main
import (
"fmt"
"time"
)
func main() {
ticker := time.NewTicker(time.Second)
i := 0
for tickTime := range ticker.C {
i++
fmt.Println("step", i, "time", tickTime)
if i >= 3 {
ticker.Stop()
break
}
}
fmt.Println("total", i)
}
// - timeouts_2.go -
package main
import (
"fmt"
"math/rand"
"time"
)
func times() []time.Duration {
var r = rand.New(rand.NewSource(time.Now().UnixNano()))
var to = []time.Duration{time.Second, 2 * time.Second, 3 * time.Second}
r.Shuffle(len(to), func(i, j int) { to[i], to[j] = to[j], to[i] })
return to
}
func slowpoke(t time.Duration) chan struct{} {
ch := make(chan struct{})
go func() {
time.AfterFunc(t, func() { ch <- struct{}{} })
}()
return ch
}
func main() {
var timeouts = times()
timer := time.NewTimer(timeouts[0])
select {
case <-timer.C:
fmt.Println("Timeout happend")
case <-time.After(timeouts[1]):
fmt.Println("time.After timeout happend")
case <-slowpoke(timeouts[2]):
if !timer.Stop() {
<-timer.C
}
fmt.Println("Slow func is done")
}
}
// - workerpool_2.go -
package main
import (
"fmt"
"log"
"time"
)
func scheduler(term <-chan struct{}, waiter chan<- struct{}, workers chan struct{}, manager chan int) {
for {
select {
case <-term:
log.Println("Terminating manager execution")
waiter <- struct{}{}
return
case i := <-manager:
fmt.Println("Workers working b4 one start", len(workers))
workers <- struct{}{}
go func(n int) {
fmt.Println("Workers working after job#", n, " done", len(workers))
<-workers
fmt.Println("Worker released")
}(i)
default:
time.Sleep(25 * time.Microsecond)
}
}
}
func main() {
var (
limit = 5
stoper = make(chan struct{})
waiter = make(chan struct{})
worker = make(chan struct{}, 3)
manager = make(chan int)
)
go scheduler(stoper, waiter, worker, manager)
for i := 0; i < limit; i++ {
manager <- i
}
<-waiter
}
// - chans.go -
package main
import "fmt"
func main() {
// buffered
cb := make(chan int, 1)
go func() { fmt.Println(<-cb) }()
cb <- 1
// unbuffered
cu := make(chan int)
go func() { fmt.Println(<-cu) }()
cu <- 2
}
// - context_cancel.go -
package main
import (
"context"
"fmt"
"log"
"time"
)
func main() {
// ------------------------------------------------------------------------
// this is why go need generics
nums := func(ctx context.Context) <-chan int {
var (
dst = make(chan int)
num = 1
)
go func() {
for {
select {
case <-ctx.Done():
log.Println("exit generator")
return
case dst <- num:
num++
}
}
}()
return dst
}
tick := func(ctx context.Context) <-chan time.Time {
var dst = make(chan time.Time)
go func() {
for {
select {
case dst <- time.Now():
case <-ctx.Done():
log.Println("exit timer")
return
}
}
}()
return dst
}
// ------------------------------------------------------------------------
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var (
intsChan = nums(ctx)
timeChan = tick(ctx)
start = time.Now()
)
// Onces cancel called for context with Cancel
// its proragated to ctx.Done for every context receiver
for {
select {
case n := <-intsChan:
fmt.Println(n)
if n >= 10 {
cancel()
time.Sleep(time.Second)
return
}
case t := <-timeChan:
fmt.Println(t.Sub(start))
}
}
}
// - generator.go -
package main
import (
"fmt"
)
func main() {
chTerm := make(chan struct{})
chData := make(chan int)
// Generator
go func(chTerm <-chan struct{}, dataCh chan<- int) {
var val = 0
for {
select {
case <-chTerm:
return
case dataCh <- val:
val++
}
}
}(chTerm, chData)
// Generator Consumer
for curVal := range chData {
fmt.Println("got", curVal)
if curVal >= 3 {
fmt.Println("stop it!")
chTerm <- struct{}{}
break
}
}
}
// - pool.go -
package main
import (
"fmt"
"sync"
)
// This example a kinda stupid, but lets assume i am lazy and there suppose to be
// heavy allocation of the instance "type" object, so we can get a ready one from
// the pool instead of creating new onces instead onces killed by gc.
func main() {
myPool := &sync.Pool{
New: func() interface{} {
fmt.Println("Creating new instance.")
return struct{}{}
},
}
myPool.Get()
instance := myPool.Get()
myPool.Put(instance)
myPool.Get()
}
// - race_cond_map.go -
package main
import (
"fmt"
"sync"
)
func main() {
var (
mapa = &sync.Map{}
limit = 5
waiter = &sync.WaitGroup{}
)
waiter.Add(limit)
for i := 0; i < limit; i++ {
go func(th int) {
defer waiter.Done()
for j := 0; j < limit; j++ {
mapa.LoadOrStore(th*10+j, 1)
}
}(i)
}
waiter.Wait()
var counters = map[int]int{}
mapa.Range(func(key, value interface{}) bool {
counters[key.(int)] = value.(int)
return true
})
fmt.Println("counters result", counters)
}
// - range_over_chan.go -
package main
import (
"fmt"
)
func main() {
in := make(chan int, 0)
go func(out chan<- int) {
for i := 0; i <= 10; i++ {
out <- i
}
close(out)
}(in)
for i := range in {
fmt.Println(i)
}
}
// - signal.go -
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
)
func main() {
var (
waiter = make(chan struct{})
notify = make(chan os.Signal)
)
signal.Notify(notify, syscall.SIGINT, syscall.SIGTERM)
go func() {
fmt.Println("To win the game you must kill me, John Romero")
fmt.Println(<-notify)
fmt.Println("Finish him!")
fmt.Println(<-notify)
waiter <- struct{}{}
}()
<-waiter
}
// - timeouts_1.go -
package main
import (
"fmt"
"math/rand"
"time"
)
func times() []time.Duration {
var r = rand.New(rand.NewSource(time.Now().UnixNano()))
var to = []time.Duration{time.Second, 2 * time.Second}
r.Shuffle(len(to), func(i, j int) { to[i], to[j] = to[j], to[i] })
return to
}
func main() {
var timeouts = times()
var (
start = time.Now()
ender = make(chan struct{})
timeout = time.After(timeouts[0])
timer = time.AfterFunc(timeouts[1], func() {
fmt.Printf("Hello World took %v to execute\n", time.Now().Sub(start))
ender <- struct{}{}
})
)
for {
select {
case <-ender:
fmt.Println("Execution terminated by signal routine")
return
case <-timeout:
fmt.Printf("Timeout trigerred? (%t)\n", timer.Stop())
return
default:
time.Sleep(25 * time.Microsecond)
}
}
time.Sleep(time.Second)
}
// - workerpool_1.go -
package main
import (
"fmt"
"time"
)
// Go by example
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "got job", j)
time.Sleep(25 * time.Microsecond)
fmt.Println("worker", id, "did job", j)
results <- j * 2
}
}
func main() {
const numJobs = 5
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
for a := 1; a <= numJobs; a++ {
<-results
}
}