Bi-Directional Streaming
// - proto/bidirectional-streaming.proto -
syntax = "proto3";
import "google/protobuf/wrappers.proto";
option go_package = "proto;bidirectionalstreaming_proto";
package ua.made.bidirectionalstreaming;
service Calculator {
rpc AvgLast10(stream google.protobuf.UInt64Value)
returns (stream google.protobuf.DoubleValue);
}
// - server.go -
package main
import (
"container/ring"
"errors"
"io"
"log"
"net"
"sync"
pb "github.com/butuzov/sandbox/grpc/bidirectional-streaming/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"google.golang.org/protobuf/types/known/wrapperspb"
)
const (
address = "localhost:4772"
)
type server struct {
pb.UnsafeCalculatorServer
}
func (s *server) AvgLast10(stream pb.Calculator_AvgLast10Server) error {
// receive
var (
wg sync.WaitGroup
chRes = make(chan float64)
cdSig = make(chan struct{})
data = ring.New(1)
)
wg.Add(3)
stream.Send(wrapperspb.Double(1.0))
// receiver
go func() {
defer wg.Done()
defer func() { close(cdSig) }()
for {
resp, err := stream.Recv()
if errors.Is(err, io.EOF) {
return
}
if data.Len() < 10 {
data.Link(&ring.Ring{Value: resp.GetValue()})
} else {
data.Value = resp.GetValue()
data = data.Next()
}
// Adding value to ring
cdSig <- struct{}{}
}
}()
// worker
go func() {
defer wg.Done()
for _ = range cdSig {
var sum float64
data.Do(func(i interface{}) {
if value, ok := i.(uint64); ok {
sum += float64(value)
}
})
chRes <- (float64(sum) / float64(data.Len()))
}
}()
// sender
go func() {
defer wg.Done()
for value := range chRes {
stream.Send(wrapperspb.Double(value))
}
}()
wg.Wait()
return nil
}
func main() {
conn, err := net.Listen("tcp", address)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
log.Printf("SERVER Runs @ %s \n", conn.Addr().String())
s := grpc.NewServer()
pb.RegisterCalculatorServer(s, &server{})
reflection.Register(s)
log.Fatal("failed to server: %v", s.Serve(conn))
}
// - client.go -
package main
import (
"context"
"errors"
"io"
"log"
"math/rand"
"sync"
pb "github.com/butuzov/sandbox/grpc/bidirectional-streaming/proto"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/wrapperspb"
)
const address = "localhost:4772"
func main() {
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Fatal("can not connect with server %v", err)
}
defer conn.Close()
var (
client = pb.NewCalculatorClient(conn)
ctx = context.Background()
)
if stream, err := client.AvgLast10(ctx); err != nil {
log.Fatal(err)
} else {
var wg sync.WaitGroup
wg.Add(2)
// Sender
go func(stream pb.Calculator_AvgLast10Client) {
defer func() { wg.Done() }()
/*
Sends some (19) random uint64 to the server
*/
for i := uint64(0); i < 19; i++ {
value := uint64(rand.Int63n(599))
if err := stream.Send(wrapperspb.UInt64(value)); err != nil {
log.Fatalf("client_err: %v", err)
} else {
log.Printf("sent: %v", value)
}
}
if err := stream.CloseSend(); err != nil {
log.Fatal("CloseSend: %v", err)
}
}(stream)
// Receiver
go func(stream pb.Calculator_AvgLast10Client) {
defer func() { wg.Done() }()
for {
resp, err := stream.Recv()
if errors.Is(err, io.EOF) {
return
}
if err != nil {
}
log.Printf("client: avg %+v", resp.GetValue())
}
}(stream)
wg.Wait()
}
}