Server Streaming
// - proto/serverstream.proto -
syntax = "proto3";
import "google/protobuf/wrappers.proto";
option go_package = "proto;serverstreaming_proto";
package ua.made.serverstreaming;
service Calculator {
rpc Fibonacci(google.protobuf.UInt64Value)
returns (stream google.protobuf.UInt64Value);
}
// - server.go -
// Package main implements a server for Greeter service.
package main
import (
"log"
"net"
pb "github.com/butuzov/sandbox/grpc/server-streaming/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"google.golang.org/protobuf/types/known/wrapperspb"
)
const (
address = "localhost:4772"
)
// server is used to implement helloworld.GreeterServer.
type server struct {
pb.UnsafeCalculatorServer
}
var _ pb.CalculatorServer = (*server)(nil)
// SayHello implements helloworld.GreeterServer
func (s *server) Fibonacci(in *wrapperspb.UInt64Value, service pb.Calculator_FibonacciServer) error {
log.Printf("We asked to generate first %d members of fibonacci sequence:\n", in.GetValue())
ch := fibonacciSwaps(in.GetValue())
for n := range ch {
service.Send(&wrapperspb.UInt64Value{Value: n})
}
return nil
}
func fibonacciSwaps(n uint64) <-chan uint64 {
out := make(chan uint64)
go func() {
var (
a uint64 = 0
b uint64 = 1
)
for i := uint64(0); i < n; i++ {
out <- a
a, b = b, a+b
}
}()
return out
}
func main() {
conn, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
defer conn.Close()
log.Printf("SERVER Runs @ %s \n", conn.Addr().String())
s := grpc.NewServer()
pb.RegisterCalculatorServer(s, &server{})
reflection.Register(s)
log.Fatalf("failed to serve: %v", s.Serve(conn))
}
// - client.go -
package main
import (
"context"
"errors"
"fmt"
"io"
"log"
pb "github.com/butuzov/sandbox/grpc/server-streaming/proto"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/wrapperspb"
)
const (
address = "localhost:4772"
)
func main() {
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Fatalf("can not connect with server %v", err)
}
defer conn.Close()
var (
c = pb.NewCalculatorClient(conn)
in = &wrapperspb.UInt64Value{Value: uint64(40)}
ctx = context.Background()
)
if stream, err := c.Fibonacci(ctx, in); err != nil {
log.Fatal(err)
} else {
done := make(chan bool)
go func() {
var i int
for {
resp, err := stream.Recv()
if errors.Is(err, io.EOF) {
close(done)
return
}
if err != nil {
close(done)
fmt.Errorf("cannot receive %v", err)
return
}
log.Printf("Got@%d < %d \n", i, resp.GetValue())
i++
}
}()
<-done
}
}