Concurrency

#Reading

#Books

#Articles

#Packages

#Talks

#Patterns, Examples, Usage, etc..

//  - atomic.go - 
package main

import (
	"fmt"
	"sync/atomic"
	"time"
)

/*
	Difference on blocking and non blocking counters.
	Atomic operations.
*/

func main() {

	var (
		simpleCounter     int32 = 0
		atomicCounter     int32 = 0
		atomicCounterFunc       = func() { atomic.AddInt32(&atomicCounter, 1) }
		simpleCounterFunc       = func() { simpleCounter++ }
	)

	for i := 0; i < 100000; i++ {
		go atomicCounterFunc()
		go simpleCounterFunc()
	}

	time.Sleep(2 * time.Millisecond)
	fmt.Println("Results: Atomic", simpleCounter)
	fmt.Println("Results: Simple", atomicCounter)
}
//  - cond.go - 
package main

import (
	"fmt"
	"sync"
)

// Example from the book - go concurrency patterns

type Button struct {
	Clicked *sync.Cond
}

func main() {

	button := Button{
		Clicked: sync.NewCond(&sync.Mutex{}),
	}

	subscribe := func(c *sync.Cond, fn func()) {
		var goroutineRunning sync.WaitGroup
		goroutineRunning.Add(1)
		go func() {
			goroutineRunning.Done()
			c.L.Lock()
			defer c.L.Unlock()
			c.Wait()

			fn()
		}()
		goroutineRunning.Wait()
	}

	var clickRegistered sync.WaitGroup
	clickRegistered.Add(3)
	subscribe(button.Clicked, func() {
		fmt.Println("Maximizing window.")
		clickRegistered.Done()
	})

	subscribe(button.Clicked, func() {
		fmt.Println("Displaying annoying dialog box!")
		clickRegistered.Done()
	})
	subscribe(button.Clicked, func() {
		fmt.Println("Mouse clicked.")
		clickRegistered.Done()
	})
	button.Clicked.Broadcast()
	clickRegistered.Wait()
}
//  - context_timeout.go - 
package main

import (
	"context"
	"log"
	"math/rand"
	"time"
)

// There are some chances that function will succeed and some changes that not.

func main() {

	start := time.Now()
	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(rand.Int63n(7))*time.Second)
	defer func() {
		log.Printf("ctx cancel:  %-10s", time.Since(start))
		cancel()
	}()

	Runner(ctx, start)
}

func Runner(ctx context.Context, start time.Time) {
	var res = make(chan struct{})
	go func() {
		LongRunningFunction(ctx, start)
		res <- struct{}{}
	}()

	select {
	case <-res:
		log.Printf("func done:   %-10s", time.Since(start))
	case <-ctx.Done():
		log.Printf("ctx timeout: %s", ctx.Err())
		log.Printf("ctx timeout: %-10s", time.Since(start))
	}

}

func LongRunningFunction(ctx context.Context, start time.Time) {
	time.Sleep(3 * time.Second)
}
//  - once.go - 
package main

import (
	"fmt"
	"runtime"
	"strconv"
	"sync"
	"time"
)

func main() {

	runtime.GOMAXPROCS(0)

	var limit = 5
	var t = struct {
		sync.Once
		sync.WaitGroup
		Winner int
	}{}

	t.Add(limit)
	for i := 1; i <= limit; i++ {
		go func(n int) {

			defer t.Done()
			time.Sleep(35 * time.Microsecond)

			t.Do(func() {
				t.Winner = n
				fmt.Println("Only", strconv.Itoa(t.Winner)+"th", "came to the finish")
			})
		}(i)
	}

	t.Wait()
}
//  - promise.go - 
package main

import (
	"fmt"
	"time"
)

// Promise like patterns, we returning a channel to get results of our goroutine.

func executor() chan struct{} {

	var result = make(chan struct{})

	go func(pipe chan<- struct{}) {
		time.Sleep(1 * time.Second)
		fmt.Println("Execution done")
		pipe <- struct{}{}
	}(result)

	return result
}

func main() {
	signal := executor()
	<-signal
}
//  - race_cond_mutex.go - 
package main

import (
	"fmt"
	"sync"
)

func main() {

	var (
		counters = map[int]int{}
		mutex    = &sync.Mutex{}
		limit    = 5
		waiter   = &sync.WaitGroup{}
	)

	waiter.Add(limit)
	for i := 0; i < limit; i++ {
		go func(th int) {
			defer waiter.Done()
			for j := 0; j < limit; j++ {
				mutex.Lock()
				counters[th*10+j]++
				mutex.Unlock()
			}
		}(i)
	}
	waiter.Wait()
	fmt.Println("counters result", counters)

}
//  - semaphores.go - 
package main

import (
	"context"
	"log"
	"math/rand"
	"sync/atomic"
	"time"

	"golang.org/x/sync/semaphore"
)

func main() {

	tot := int64(5)
	sem := semaphore.NewWeighted(tot) // Total Semaphores available.
	ctx := context.Background()

	var val int32

	for i := 0; i < 25; i++ {

		if err := sem.Acquire(ctx, 1); err != nil {
			log.Printf("Failed to acquire semaphore: %v", err)
			break
		}

		go func(n int) {
			defer func() {
				sem.Release(1)
				atomic.AddInt32(&val, -1)
			}()

			time.Sleep(time.Duration(rand.Int63n(1)) * time.Millisecond)
			atomic.AddInt32(&val, 1)
			log.Printf("%d) Current Counter is %d\n", n, atomic.LoadInt32(&val))
		}(i)
	}
	// Check for released semaphores
	if !sem.TryAcquire(tot) {
		log.Printf("Still in progress")
	}

	// Check for released semaphores
	if err := sem.Acquire(ctx, tot); err != nil {
		log.Printf("Failed to acquire semaphore: %v", err)
	} else {
		log.Printf("We are done")
	}
}
//  - tick.go - 
package main

import (
	"fmt"
	"time"
)

func main() {
	ticker := time.NewTicker(time.Second)
	i := 0
	for tickTime := range ticker.C {
		i++
		fmt.Println("step", i, "time", tickTime)
		if i >= 3 {
			ticker.Stop()
			break
		}
	}
	fmt.Println("total", i)
}
//  - timeouts_2.go - 
package main

import (
	"fmt"
	"math/rand"
	"time"
)

func times() []time.Duration {
	var r = rand.New(rand.NewSource(time.Now().UnixNano()))
	var to = []time.Duration{time.Second, 2 * time.Second, 3 * time.Second}

	r.Shuffle(len(to), func(i, j int) { to[i], to[j] = to[j], to[i] })
	return to
}

func slowpoke(t time.Duration) chan struct{} {
	ch := make(chan struct{})
	go func() {
		time.AfterFunc(t, func() { ch <- struct{}{} })
	}()
	return ch
}

func main() {
	var timeouts = times()
	timer := time.NewTimer(timeouts[0])
	select {
	case <-timer.C:
		fmt.Println("Timeout happend")
	case <-time.After(timeouts[1]):
		fmt.Println("time.After timeout happend")
	case <-slowpoke(timeouts[2]):
		if !timer.Stop() {
			<-timer.C
		}
		fmt.Println("Slow func is done")
	}
}
//  - workerpool_2.go - 
package main

import (
	"fmt"
	"log"
	"time"
)

func scheduler(term <-chan struct{}, waiter chan<- struct{}, workers chan struct{}, manager chan int) {
	for {
		select {
		case <-term:
			log.Println("Terminating manager execution")
			waiter <- struct{}{}
			return
		case i := <-manager:
			fmt.Println("Workers working b4 one start", len(workers))
			workers <- struct{}{}
			go func(n int) {

				fmt.Println("Workers working after job#", n, " done", len(workers))
				<-workers
				fmt.Println("Worker released")
			}(i)

		default:
			time.Sleep(25 * time.Microsecond)
		}
	}
}

func main() {
	var (
		limit   = 5
		stoper  = make(chan struct{})
		waiter  = make(chan struct{})
		worker  = make(chan struct{}, 3)
		manager = make(chan int)
	)

	go scheduler(stoper, waiter, worker, manager)

	for i := 0; i < limit; i++ {
		manager <- i
	}

	<-waiter
}
//  - chans.go - 
package main

import "fmt"

func main() {
	// buffered
	cb := make(chan int, 1)
	go func() { fmt.Println(<-cb) }()
	cb <- 1

	// unbuffered
	cu := make(chan int)
	go func() { fmt.Println(<-cu) }()
	cu <- 2
}
//  - context_cancel.go - 
package main

import (
	"context"
	"fmt"
	"log"
	"time"
)

func main() {

	// ------------------------------------------------------------------------
	// this is why go need generics

	nums := func(ctx context.Context) <-chan int {
		var (
			dst = make(chan int)
			num = 1
		)

		go func() {
			for {
				select {
				case <-ctx.Done():
					log.Println("exit generator")
					return
				case dst <- num:
					num++
				}
			}
		}()

		return dst
	}

	tick := func(ctx context.Context) <-chan time.Time {
		var dst = make(chan time.Time)
		go func() {
			for {
				select {
				case dst <- time.Now():
				case <-ctx.Done():
					log.Println("exit timer")
					return
				}
			}
		}()
		return dst
	}

	// ------------------------------------------------------------------------
	ctx, cancel := context.WithCancel(context.Background())

	defer cancel()

	var (
		intsChan = nums(ctx)
		timeChan = tick(ctx)
		start    = time.Now()
	)

	// Onces cancel called for context with Cancel
	// its proragated to ctx.Done for every context receiver

	for {
		select {
		case n := <-intsChan:
			fmt.Println(n)
			if n >= 10 {
				cancel()
				time.Sleep(time.Second)
				return
			}
		case t := <-timeChan:
			fmt.Println(t.Sub(start))
		}
	}

}
//  - generator.go - 
package main

import (
	"fmt"
)

func main() {
	chTerm := make(chan struct{})
	chData := make(chan int)

	// Generator
	go func(chTerm <-chan struct{}, dataCh chan<- int) {
		var val = 0
		for {
			select {
			case <-chTerm:
				return
			case dataCh <- val:
				val++
			}
		}
	}(chTerm, chData)

	// Generator Consumer
	for curVal := range chData {
		fmt.Println("got", curVal)
		if curVal >= 3 {
			fmt.Println("stop it!")
			chTerm <- struct{}{}
			break
		}
	}

}
//  - pool.go - 
package main

import (
	"fmt"
	"sync"
)

// This example a kinda stupid, but lets assume i am lazy and there suppose to be
// heavy allocation of the instance "type" object, so we can get a ready one from
// the pool instead of creating new onces instead onces killed by gc.

func main() {
	myPool := &sync.Pool{
		New: func() interface{} {
			fmt.Println("Creating new instance.")
			return struct{}{}
		},
	}
	myPool.Get()
	instance := myPool.Get()
	myPool.Put(instance)
	myPool.Get()

}
//  - race_cond_map.go - 
package main

import (
	"fmt"
	"sync"
)

func main() {
	var (
		mapa   = &sync.Map{}
		limit  = 5
		waiter = &sync.WaitGroup{}
	)

	waiter.Add(limit)
	for i := 0; i < limit; i++ {
		go func(th int) {
			defer waiter.Done()
			for j := 0; j < limit; j++ {
				mapa.LoadOrStore(th*10+j, 1)
			}
		}(i)
	}
	waiter.Wait()

	var counters = map[int]int{}
	mapa.Range(func(key, value interface{}) bool {
		counters[key.(int)] = value.(int)
		return true
	})
	fmt.Println("counters result", counters)

}
//  - range_over_chan.go - 
package main

import (
	"fmt"
)

func main() {
	in := make(chan int, 0)
	go func(out chan<- int) {
		for i := 0; i <= 10; i++ {
			out <- i
		}
		close(out)
	}(in)

	for i := range in {
		fmt.Println(i)
	}
}
//  - signal.go - 
package main

import (
	"fmt"
	"os"
	"os/signal"
	"syscall"
)

func main() {

	var (
		waiter = make(chan struct{})
		notify = make(chan os.Signal)
	)
	signal.Notify(notify, syscall.SIGINT, syscall.SIGTERM)

	go func() {
		fmt.Println("To win the game you must kill me, John Romero")
		fmt.Println(<-notify)
		fmt.Println("Finish him!")
		fmt.Println(<-notify)
		waiter <- struct{}{}
	}()

	<-waiter
}
//  - timeouts_1.go - 
package main

import (
	"fmt"
	"math/rand"
	"time"
)

func times() []time.Duration {
	var r = rand.New(rand.NewSource(time.Now().UnixNano()))
	var to = []time.Duration{time.Second, 2 * time.Second}

	r.Shuffle(len(to), func(i, j int) { to[i], to[j] = to[j], to[i] })
	return to
}

func main() {

	var timeouts = times()

	var (
		start   = time.Now()
		ender   = make(chan struct{})
		timeout = time.After(timeouts[0])
		timer   = time.AfterFunc(timeouts[1], func() {
			fmt.Printf("Hello World took %v to execute\n", time.Now().Sub(start))
			ender <- struct{}{}
		})
	)

	for {
		select {
		case <-ender:
			fmt.Println("Execution terminated by signal routine")
			return
		case <-timeout:
			fmt.Printf("Timeout trigerred? (%t)\n", timer.Stop())
			return
		default:
			time.Sleep(25 * time.Microsecond)
		}
	}

	time.Sleep(time.Second)
}
//  - workerpool_1.go - 
package main

import (
	"fmt"
	"time"
)

// Go by example

func worker(id int, jobs <-chan int, results chan<- int) {
	for j := range jobs {
		fmt.Println("worker", id, "got job", j)
		time.Sleep(25 * time.Microsecond)
		fmt.Println("worker", id, "did job", j)
		results <- j * 2
	}
}

func main() {

	const numJobs = 5
	jobs := make(chan int, numJobs)
	results := make(chan int, numJobs)

	for w := 1; w <= 3; w++ {
		go worker(w, jobs, results)
	}

	for j := 1; j <= numJobs; j++ {
		jobs <- j
	}
	close(jobs)

	for a := 1; a <= numJobs; a++ {
		<-results
	}
}