Bi-Directional Streaming

//  - proto/bidirectional-streaming.proto - 
syntax = "proto3";

import "google/protobuf/wrappers.proto";

option go_package = "proto;bidirectionalstreaming_proto";

package ua.made.bidirectionalstreaming;

service Calculator {
  rpc AvgLast10(stream google.protobuf.UInt64Value)
      returns (stream google.protobuf.DoubleValue);
}
//  - server.go - 
package main

import (
	"container/ring"
	"errors"
	"io"
	"log"
	"net"
	"sync"

	pb "github.com/butuzov/sandbox/grpc/bidirectional-streaming/proto"
	"google.golang.org/grpc"
	"google.golang.org/grpc/reflection"
	"google.golang.org/protobuf/types/known/wrapperspb"
)

const (
	address = "localhost:4772"
)

type server struct {
	pb.UnsafeCalculatorServer
}

func (s *server) AvgLast10(stream pb.Calculator_AvgLast10Server) error {
	// receive

	var (
		wg    sync.WaitGroup
		chRes = make(chan float64)
		cdSig = make(chan struct{})
		data  = ring.New(1)
	)
	wg.Add(3)

	stream.Send(wrapperspb.Double(1.0))

	// receiver
	go func() {
		defer wg.Done()
		defer func() { close(cdSig) }()

		for {
			resp, err := stream.Recv()
			if errors.Is(err, io.EOF) {
				return
			}

			if data.Len() < 10 {
				data.Link(&ring.Ring{Value: resp.GetValue()})
			} else {
				data.Value = resp.GetValue()
				data = data.Next()

			}
			// Adding value to ring
			cdSig <- struct{}{}

		}
	}()

	// worker
	go func() {
		defer wg.Done()

		for _ = range cdSig {
			var sum float64

			data.Do(func(i interface{}) {
				if value, ok := i.(uint64); ok {
					sum += float64(value)
				}
			})

			chRes <- (float64(sum) / float64(data.Len()))

		}
	}()

	// sender
	go func() {
		defer wg.Done()

		for value := range chRes {
			stream.Send(wrapperspb.Double(value))
		}
	}()

	wg.Wait()

	return nil
}

func main() {
	conn, err := net.Listen("tcp", address)
	if err != nil {
		log.Fatal(err)
	}

	defer conn.Close()

	log.Printf("SERVER Runs @ %s \n", conn.Addr().String())

	s := grpc.NewServer()
	pb.RegisterCalculatorServer(s, &server{})
	reflection.Register(s)

	log.Fatal("failed to server: %v", s.Serve(conn))
}
//  - client.go - 
package main

import (
	"context"
	"errors"
	"io"
	"log"
	"math/rand"
	"sync"

	pb "github.com/butuzov/sandbox/grpc/bidirectional-streaming/proto"
	"google.golang.org/grpc"
	"google.golang.org/protobuf/types/known/wrapperspb"
)

const address = "localhost:4772"

func main() {
	conn, err := grpc.Dial(address, grpc.WithInsecure())
	if err != nil {
		log.Fatal("can not connect with server %v", err)
	}
	defer conn.Close()

	var (
		client = pb.NewCalculatorClient(conn)
		ctx    = context.Background()
	)

	if stream, err := client.AvgLast10(ctx); err != nil {
		log.Fatal(err)
	} else {
		var wg sync.WaitGroup
		wg.Add(2)
		// Sender
		go func(stream pb.Calculator_AvgLast10Client) {
			defer func() { wg.Done() }()

			/*
				Sends some (19) random uint64 to the server
			*/

			for i := uint64(0); i < 19; i++ {
				value := uint64(rand.Int63n(599))
				if err := stream.Send(wrapperspb.UInt64(value)); err != nil {
					log.Fatalf("client_err: %v", err)
				} else {
					log.Printf("sent: %v", value)
				}
			}

			if err := stream.CloseSend(); err != nil {
				log.Fatal("CloseSend: %v", err)
			}
		}(stream)

		// Receiver
		go func(stream pb.Calculator_AvgLast10Client) {
			defer func() { wg.Done() }()

			for {
				resp, err := stream.Recv()
				if errors.Is(err, io.EOF) {
					return
				}

				if err != nil {
				}

				log.Printf("client: avg %+v", resp.GetValue())

			}
		}(stream)

		wg.Wait()
	}
}