Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for functions to become ready #6125

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 24 additions & 21 deletions internal/xfn/function_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,28 @@ const (
errFmtDialFunction = "cannot gRPC dial target %q from status.endpoint of active FunctionRevision %q"
)

// TODO(negz): Should any of these be configurable?
const (
// This configures a gRPC client to use round robin load balancing. This
// means that if the Function Deployment has more than one Pod, and the
// Function Service is headless, requests will be spread across each Pod.
// See https://github.com/grpc/grpc/blob/v1.58.0/doc/load-balancing.md#load-balancing-policies
lbRoundRobin = `{"loadBalancingConfig":[{"round_robin":{}}]}`

dialFunctionTimeout = 10 * time.Second
runFunctionTimeout = 10 * time.Second
)
// This configures a gRPC client to use round robin load balancing. This means
// that if the Function Deployment has more than one Pod, and the Function
// Service is headless, requests will be spread across each Pod.
// See https://github.com/grpc/grpc/blob/v1.58.0/doc/load-balancing.md#load-balancing-policies
//
// It also configures the gRPC client to wait for the server to be ready before
// sending RPCs. Notably this gives Functions time to start before we make a
// request. See https://grpc.io/docs/guides/wait-for-ready/
const svcConfig = `
{
"loadBalancingConfig": [
{
"round_robin":{}
}
],
"methodConfig": [
{
"name": [{}],
"waitForReady": true
}
]
}`

// A PackagedFunctionRunner runs a Function by making a gRPC call to a Function
// package's runtime. It creates a gRPC client connection for each Function. The
Expand Down Expand Up @@ -136,10 +147,6 @@ func (r *PackagedFunctionRunner) RunFunction(ctx context.Context, name string, r
return nil, errors.Wrapf(err, errFmtGetClientConn, name)
}

// This context is used for actually making the request.
ctx, cancel := context.WithTimeout(ctx, runFunctionTimeout)
defer cancel()

rsp, err := NewBetaFallBackFunctionRunnerServiceClient(conn).RunFunction(ctx, req)
return rsp, errors.Wrapf(err, errFmtRunFunction, name)
}
Expand Down Expand Up @@ -220,18 +227,14 @@ func (r *PackagedFunctionRunner) getClientConn(ctx context.Context, name string)
delete(r.conns, name)
}

// This context is only used for setting up the connection.
ctx, cancel := context.WithTimeout(ctx, dialFunctionTimeout)
defer cancel()

is := make([]grpc.UnaryClientInterceptor, len(r.interceptors))
for i := range r.interceptors {
is[i] = r.interceptors[i].CreateInterceptor(name, active.Spec.Package)
}

conn, err := grpc.DialContext(ctx, active.Status.Endpoint, //nolint:staticcheck // Figure out how to replace deprecated grpc.DialContext with grpc.NewClient and still pass the dialFunctionTimeout.
conn, err := grpc.NewClient(active.Status.Endpoint,
grpc.WithTransportCredentials(r.creds),
grpc.WithDefaultServiceConfig(lbRoundRobin),
grpc.WithDefaultServiceConfig(svcConfig),
grpc.WithChainUnaryInterceptor(is...))
if err != nil {
return nil, errors.Wrapf(err, errFmtDialFunction, active.Status.Endpoint, active.GetName())
Expand Down
Loading