Client Streaming
// - proto/clientstreaming.proto -
syntax = "proto3";
import "google/protobuf/wrappers.proto";
option go_package = "proto;clientstreaming_proto";
package ua.made.clientstreaming;
service Calculator {
rpc Avg(stream google.protobuf.UInt64Value)
returns (google.protobuf.DoubleValue);
}
// - server.go -
package main
import (
"errors"
"io"
"log"
"net"
pb "github.com/butuzov/sandbox/grpc/client-streaming/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"google.golang.org/protobuf/types/known/wrapperspb"
)
const (
address = "localhost:4772"
)
// gRPC Server
var _ pb.CalculatorServer = (*server)(nil)
type server struct {
pb.UnsafeCalculatorServer
}
func (s *server) Avg(stream pb.Calculator_AvgServer) error {
var (
sum uint64
cnt int
)
for {
res, err := stream.Recv()
if errors.Is(err, io.EOF) {
return stream.SendAndClose(&wrapperspb.DoubleValue{
Value: float64(sum) / float64(cnt),
})
}
log.Printf("server got: %v", res.GetValue())
sum = sum + res.GetValue()
cnt++
}
return nil
}
func main() {
conn, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
defer conn.Close()
log.Printf("SERVER Runs @ %s \n", conn.Addr().String())
s := grpc.NewServer()
pb.RegisterCalculatorServer(s, &server{})
reflection.Register(s)
log.Fatalf("failed to serve: %v", s.Serve(conn))
}
// - client.go -
package main
import (
"context"
"log"
"math/rand"
pb "github.com/butuzov/sandbox/grpc/client-streaming/proto"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/wrapperspb"
)
const (
address = "localhost:4772"
)
func main() {
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Fatalf("can not connect with server %v", err)
}
defer conn.Close()
var (
client = pb.NewCalculatorClient(conn)
ctx = context.Background()
)
if stream, err := client.Avg(ctx); err != nil {
log.Fatal(err)
} else {
for i := uint64(0); i < 10; i++ {
n := &wrapperspb.UInt64Value{Value: uint64(rand.Int63n(599))}
if err := stream.Send(n); err != nil {
log.Fatalf("Send: %v", err)
}
}
if resp, err := stream.CloseAndRecv(); err != nil {
log.Fatalf("client/Error/CloseAndRecv: %v\n", err)
} else {
log.Printf("client/Result/CloseAndRecv: %v\n", resp.GetValue())
}
}
}