Background

When passing data through gRPC, switching topics often results in the error: rpc error: code = Internal desc = grpc: error while marshaling: marshaling SubscribeIoStateResponse: size mismatch (see https://github.com/golang/protobuf/issues/1609): calculated=6, measured=8. Following the issue, the most likely cause is that Protobuf messages or sub-messages are shared and concurrently modified. Protobuf does not allow modifications to messages during the encoding process.

Root Cause Analysis

The minimal reproducible code is as follows:

 1var IoStates []*pb.IoState
 2
 3func (s *GRPCServer) SubscribeIo(req *SubscribeRequest, stream SubscriptionService_SubscribeIoServer) (err error) {
 4    for {
 5        res := new(SubscribeIoStateResponse)
 6		res.Data = IoStates
 7        if err := stream.Send(res); err != nil {
 8			return err
 9		}
10    }
11}
12
13func init() {
14	for {
15		for _, item := range IoStates {
16            // ... modify item
17		}
18	}
19}

As shown, gRPC reads the global variable IoStates while pushing the stream. Simultaneously, an infinite loop keeps modifying IoStates, leading to concurrent read-write issues and causing the error, which terminates the push.

Solution

To avoid direct manipulation of shared variables and eliminate data races, we can use channels to pass data.

init() acts as the producer, updating the data and sending the modified IoStates to the consumer via a channel. SubscribeIo() acts as the consumer, receiving IoStates from the channel and sending them to the client.

 1var IoStates []*pb.IoState
 2var IoStateChan = make(chan []*pb.IoState) // Channel for passing IoStates
 3
 4func (s *GRPCServer) SubscribeIo(req *SubscribeRequest, stream SubscriptionService_SubscribeIoServer) (err error) {
 5    for {
 6        res := new(SubscribeIoStateResponse)
 7		res.Data = <-IoStateChan  // Receive the latest IoStates from the channel (blocking)
 8        if err := stream.Send(res); err != nil {
 9			return err
10		}
11    }
12}
13
14func init() {
15	for {
16		for _, item := range IoStates {
17			// ... modify item
18		}
19        IoStateChan <- IoStates // Send the latest IoStates to the channel
20	}
21}