每个grpc请求都是 stream,Keepalive 能够让 grpc 的每个 stream 保持长连接状态,适合一些执行时间长的请求。Keepalive 支持在服务端和客户端配置,且只有服务端配置后,客户端的配置才会真正有效。
server 端实现
https://github.com/grpc/grpc-go/blob/master/examples/features/keepalive/server/main.go
keepalive配置参数是针对整个连接的
grpc-go基于http/2实现,可以多路复用。即:多个请求复用同一个连接,每个请求都是一个单独的stream
package main
import (
"context"
"flag"
"fmt"
"log"
"net"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
pb "google.golang.org/grpc/examples/features/proto/echo"
)
var port = flag.Int("port", 50052, "port number")
var kaep = keepalive.EnforcementPolicy{
MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
PermitWithoutStream: true, // Allow pings even when there are no active streams
}
var kasp = keepalive.ServerParameters{
MaxConnectionIdle: 15 * time.Second, // If a client is idle for 15 seconds, send a GOAWAY
MaxConnectionAge: 30 * time.Second, // If any connection is alive for more than 30 seconds, send a GOAWAY
MaxConnectionAgeGrace: 5 * time.Second, // Allow 5 seconds for pending RPCs to complete before forcibly closing connections
Time: 5 * time.Second, // Ping the client if it is idle for 5 seconds to ensure the connection is still active
Timeout: 1 * time.Second, // Wait 1 second for the ping ack before assuming the connection is dead
}
// server implements EchoServer.
type server struct {
pb.UnimplementedEchoServer
}
func (s *server) UnaryEcho(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) {
return &pb.EchoResponse{Message: req.Message}, nil
}
func main() {
flag.Parse()
address := fmt.Sprintf(":%v", *port)
lis, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp))
pb.RegisterEchoServer(s, &server{})
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
执行
go run server/main.go
server keepalive 的实现核心在于 keepalive.ServerParameters
和 keepalive.EnforcementPolicy
。
首先是 keepalive.ServerParameters
,包含以下几个属性:
- MaxConnectionIdle:最大空闲连接时间,默认为无限制。超出这段时间后,serve 发送
GoWay
,强制 client stream 断开 - MaxConnectionAge:最大连接时间,默认为无限制。stream 连接超出这个值是发送一个
GoWay
- MaxConnectionAgeGrace:超出
MaxConnectionAge
之后的宽限时长,默认无限制 (最小为 1s) - Time:如果一段时间客户端存活但没有 pings 请求,服务端发送一次 ping 请求,默认是 2hour
- Timeout:服务端发送 ping 请求超时的时间,默认20s
即:在发送ping包之后,Timeout 时间内没有收到 ack 则视为超时
keepalive.EnforcementPolicy
为服务端强制执行策略,如果客户端违反则断开连接。它有两个属性:
- MinTime : 如果在指定时间内收到 pings 次数大于一次,强制断开连接,默认 5min
防止客户端在一段时间间隔内发送太频繁 ping 操作
- PermitWithoutStream:没有活动的 stream 也允许pings。默认关闭
是否允许没有流时发送 ping
注意:
若设置 MaxConnectionAge,而没设置 MaxConnectionAgeGrace,在达到 MaxConnectionAge 后看不到效果。
原因见下面代码:
// grpc-go源码文件&位置
// internal/transport/defaults.go
const (
// The default value of flow control window size in HTTP2 spec.
defaultWindowSize = 65535
// The initial window size for flow control.
initialWindowSize = defaultWindowSize // for an RPC
infinity = time.Duration(math.MaxInt64)
defaultClientKeepaliveTime = infinity
defaultClientKeepaliveTimeout = 20 * time.Second
defaultMaxStreamsClient = 100
defaultMaxConnectionIdle = infinity
defaultMaxConnectionAge = infinity
defaultMaxConnectionAgeGrace = infinity
defaultServerKeepaliveTime = 2 * time.Hour
defaultServerKeepaliveTimeout = 20 * time.Second
defaultKeepalivePolicyMinTime = 5 * time.Minute
// max window limit set by HTTP2 Specs.
maxWindowSize = math.MaxInt32
// defaultWriteQuota is the default value for number of data
// bytes that each stream can schedule before some of it being
// flushed out.
defaultWriteQuota = 64 * 1024
defaultClientMaxHeaderListSize = uint32(16 << 20)
defaultServerMaxHeaderListSize = uint32(16 << 20)
)
// grpc-go源码文件&位置
// internal/transport/http2_server.go
func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
......
kp := config.KeepaliveParams
// 最大空闲时间
if kp.MaxConnectionIdle == 0 {
kp.MaxConnectionIdle = defaultMaxConnectionIdle
}
// 最大存活时间
if kp.MaxConnectionAge == 0 {
kp.MaxConnectionAge = defaultMaxConnectionAge
}
// Add a jitter to MaxConnectionAge.
// 添加一个随机值
kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
// 超出MaxConnectionAge之后的宽限时长,默认无限制,最小为 1s
if kp.MaxConnectionAgeGrace == 0 {
kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
}
......
go t.keepalive()
return t, nil
}
// keepalive running in a separate goroutine does the following:
// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection after an additional duration of keepalive.Timeout.
func (t *http2Server) keepalive() {
p := &ping{}
// True iff a ping has been sent, and no data has been received since then.
outstandingPing := false
// Amount of time remaining before which we should receive an ACK for the
// last sent ping.
kpTimeoutLeft := time.Duration(0)
// Records the last value of t.lastRead before we go block on the timer.
// This is required to check for read activity since then.
prevNano := time.Now().UnixNano()
// Initialize the different timers to their default values.
// 初始化不同的定时器
idleTimer := time.NewTimer(t.kp.MaxConnectionIdle)
ageTimer := time.NewTimer(t.kp.MaxConnectionAge)
kpTimer := time.NewTimer(t.kp.Time)
// 在 defer 中关闭定时器
defer func() {
// We need to drain the underlying channel in these timers after a call
// to Stop(), only if we are interested in resetting them. Clearly we
// are not interested in resetting them here.
idleTimer.Stop()
ageTimer.Stop()
kpTimer.Stop()
}()
for {
select {
case <-idleTimer.C: // 空闲定时器
t.mu.Lock()
idle := t.idle
if idle.IsZero() { // The connection is non-idle.
t.mu.Unlock()
idleTimer.Reset(t.kp.MaxConnectionIdle)
continue
}
val := t.kp.MaxConnectionIdle - time.Since(idle)
t.mu.Unlock()
if val <= 0 {
// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
// Gracefully close the connection.
t.Drain()
return
}
idleTimer.Reset(val)
case <-ageTimer.C: // 最大连接时间定时器
t.Drain()
// 把定时器重置为 MaxConnectionAgeGrace
// 假设 设置了 MaxConnectionAge 而没设置 MaxConnectionAgeGrace
// 则 MaxConnectionAgeGrace 是 defaultMaxConnectionAgeGrace = infinity
// 由于 infinity 是一个非常大的数值,所以在达到 MaxConnectionAge 后看不到效果
ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
select {
case <-ageTimer.C:
// Close the connection after grace period.
if logger.V(logLevel) {
logger.Infof("transport: closing server transport due to maximum connection age.")
}
t.Close()
case <-t.done:
}
return
case <-kpTimer.C: // 在指定时间没看到客户端活跃,则发送ping数据帧
lastRead := atomic.LoadInt64(&t.lastRead)
if lastRead > prevNano {
// There has been read activity since the last time we were
// here. Setup the timer to fire at kp.Time seconds from
// lastRead time and continue.
outstandingPing = false
kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
prevNano = lastRead
continue
}
if outstandingPing && kpTimeoutLeft <= 0 {
if logger.V(logLevel) {
logger.Infof("transport: closing server transport due to idleness.")
}
t.Close()
return
}
if !outstandingPing {
if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
}
t.controlBuf.put(p)
kpTimeoutLeft = t.kp.Timeout
outstandingPing = true
}
// The amount of time to sleep here is the minimum of kp.Time and
// timeoutLeft. This will ensure that we wait only for kp.Time
// before sending out the next ping (for cases where the ping is
// acked).
sleepDuration := minTime(t.kp.Time, kpTimeoutLeft)
kpTimeoutLeft -= sleepDuration
kpTimer.Reset(sleepDuration)
case <-t.done: // 已经关闭,退出
return
}
}
}
client 端实现
https://github.com/grpc/grpc-go/blob/master/examples/features/keepalive/client/main.go
package main
import (
"context"
"flag"
"fmt"
"log"
"time"
"google.golang.org/grpc"
pb "google.golang.org/grpc/examples/features/proto/echo"
"google.golang.org/grpc/keepalive"
)
var addr = flag.String("addr", "localhost:50052", "the address to connect to")
var kacp = keepalive.ClientParameters{
Time: 10 * time.Second, // send pings every 10 seconds if there is no activity
Timeout: time.Second, // wait 1 second for ping ack before considering the connection dead
PermitWithoutStream: true, // send pings even without active streams
}
func main() {
flag.Parse()
conn, err := grpc.Dial(*addr, grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewEchoClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()
fmt.Println("Performing unary request")
res, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: "keepalive demo"})
if err != nil {
log.Fatalf("unexpected error from UnaryEcho: %v", err)
}
fmt.Println("RPC response:", res)
select {} // Block forever; run with GODEBUG=http2debug=2 to observe ping frames and GOAWAYs due to idleness.
}
执行
GODEBUG=http2debug=2 go run client/main.go
keepalive.ClientParameters
是在客户端使用的 keepalive 配置:
- Time :ping 请求间隔时间,默认无限制,最小为 10s
- Timeout :ping 超时时间,默认是 20s
即:在发送ping包之后,Timeout 时间内没有收到 ack 则视为超时
- PermitWithoutStream:没有活动的 stream 也允许pings。默认关闭
更多细节&设置,详见 https://github.com/grpc/grpc-go/tree/master/Documentation/keepalive.md
其他参考
- http://xingyys.tech/grpc2/#keepalive GPRC 进阶-keepalive
本文链接:https://ivansli.com/archives/321/
本文系原创作品,版权所有(禁止转载)