Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: vcluster connect prefer background proxy #2192

Merged
merged 1 commit into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions pkg/cli/connect_helm.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ func (cmd *connectHelm) getVClusterKubeConfig(ctx context.Context, vcluster *fin

// check if the vcluster is exposed and set server
if vcluster.Name != "" && cmd.Server == "" && len(command) == 0 {
// check if local kubernetes / can be exposed
err = cmd.setServerIfExposed(ctx, vcluster, kubeConfig)
if err != nil {
return nil, err
Expand All @@ -338,14 +339,14 @@ func (cmd *connectHelm) getVClusterKubeConfig(ctx context.Context, vcluster *fin
if cmd.Server == "" && cmd.BackgroundProxy {
if localkubernetes.IsDockerInstalledAndUpAndRunning() {
// start background container
server, err := localkubernetes.CreateBackgroundProxyContainer(ctx, vcluster.Name, cmd.Namespace, cmd.kubeClientConfig, kubeConfig, cmd.LocalPort, cmd.Log)
cmd.Server, err = localkubernetes.CreateBackgroundProxyContainer(ctx, vcluster.Name, cmd.Namespace, cmd.kubeClientConfig, kubeConfig, cmd.LocalPort, cmd.Log)
if err != nil {
cmd.Log.Warnf("Error exposing local vcluster, will fallback to port-forwarding: %v", err)
cmd.BackgroundProxy = false
}
cmd.Server = server
} else {
cmd.Log.Debugf("Docker is not installed, so skip background proxy")
cmd.BackgroundProxy = false
}
}
}
Expand Down Expand Up @@ -445,16 +446,14 @@ func (cmd *connectHelm) setServerIfExposed(ctx context.Context, vcluster *find.V
}

// not a load balancer? Then don't wait
if service.Spec.Type == corev1.ServiceTypeNodePort {
server, err := localkubernetes.ExposeLocal(ctx, vcluster.Name, cmd.Namespace, &cmd.rawConfig, vClusterConfig, service, cmd.LocalPort, cmd.Log)
if service.Spec.Type != corev1.ServiceTypeLoadBalancer {
server, err := localkubernetes.ExposeLocal(ctx, &cmd.rawConfig, vClusterConfig, service)
if err != nil {
cmd.Log.Warnf("Error exposing local vcluster, will fallback to port-forwarding: %v", err)
}

cmd.Server = server
return true, nil
} else if service.Spec.Type != corev1.ServiceTypeLoadBalancer {
return true, nil
}

if len(service.Status.LoadBalancer.Ingress) == 0 {
Expand Down
5 changes: 0 additions & 5 deletions pkg/cli/delete_helm.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,6 @@ func (cmd *deleteHelm) prepare(vCluster *find.VCluster) error {
return err
}

err = localkubernetes.CleanupLocal(vCluster.Name, vCluster.Namespace, &rawConfig, cmd.log)
if err != nil {
cmd.log.Warnf("error cleaning up: %v", err)
}

// construct proxy name
proxyName := find.VClusterConnectBackgroundProxyName(vCluster.Name, vCluster.Namespace, rawConfig.CurrentContext)
_ = localkubernetes.CleanupBackgroundProxy(proxyName, cmd.log)
Expand Down
261 changes: 4 additions & 257 deletions pkg/cli/localkubernetes/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ package localkubernetes
import (
"context"
"fmt"
"net/url"
"os"
"os/exec"
"strconv"
"strings"
"time"

"github.com/loft-sh/log"
Expand All @@ -25,13 +23,10 @@ import (
func (c ClusterType) LocalKubernetes() bool {
return c == ClusterTypeDockerDesktop ||
c == ClusterTypeRancherDesktop ||
c == ClusterTypeKIND ||
c == ClusterTypeMinikube ||
c == ClusterTypeK3D ||
c == ClusterTypeOrbstack
}

func ExposeLocal(ctx context.Context, vClusterName, vClusterNamespace string, rawConfig *clientcmdapi.Config, vRawConfig *clientcmdapi.Config, service *corev1.Service, localPort int, log log.Logger) (string, error) {
func ExposeLocal(ctx context.Context, rawConfig *clientcmdapi.Config, vRawConfig *clientcmdapi.Config, service *corev1.Service) (string, error) {
// Timeout to wait for connection before falling back to port-forwarding
timeout := time.Second * 30
clusterType := DetectClusterType(rawConfig)
Expand All @@ -42,138 +37,12 @@ func ExposeLocal(ctx context.Context, vClusterName, vClusterNamespace string, ra
return directConnection(ctx, vRawConfig, service, timeout)
case ClusterTypeRancherDesktop:
return directConnection(ctx, vRawConfig, service, timeout)
case ClusterTypeKIND:
return kindProxy(ctx, vClusterName, vClusterNamespace, rawConfig, vRawConfig, service, localPort, timeout, log)
case ClusterTypeMinikube:
return minikubeProxy(ctx, vClusterName, vClusterNamespace, rawConfig, vRawConfig, service, localPort, timeout, log)
case ClusterTypeK3D:
return k3dProxy(ctx, vClusterName, vClusterNamespace, rawConfig, vRawConfig, service, localPort, timeout, log)
default:
}

return "", nil
}

func CleanupLocal(vClusterName, vClusterNamespace string, rawConfig *clientcmdapi.Config, log log.Logger) error {
if rawConfig == nil {
return errors.New("nil rawConfig")
}

clusterType := DetectClusterType(rawConfig)
switch clusterType {
case ClusterTypeMinikube:
if containerExists(rawConfig.CurrentContext) {
return cleanupProxy(vClusterName, vClusterNamespace, rawConfig, log)
}

return nil
case ClusterTypeKIND:
return cleanupProxy(vClusterName, vClusterNamespace, rawConfig, log)
case ClusterTypeK3D:
return cleanupProxy(vClusterName, vClusterNamespace, rawConfig, log)
default:
}

return nil
}

func k3dProxy(ctx context.Context, vClusterName, vClusterNamespace string, rawConfig *clientcmdapi.Config, vRawConfig *clientcmdapi.Config, service *corev1.Service, localPort int, timeout time.Duration, log log.Logger) (string, error) {
if service == nil {
return "", errors.New("service is nil")
}
if len(service.Spec.Ports) == 0 {
return "", fmt.Errorf("service has %d ports (expected 1 port)", len(service.Spec.Ports))
}

// see if we already have a proxy container running
server, err := getServerFromExistingProxyContainer(ctx, vClusterName, vClusterNamespace, rawConfig, vRawConfig, service, log)
if err != nil {
return "", err
} else if server != "" {
return server, nil
}

k3dName := strings.TrimPrefix(rawConfig.CurrentContext, "k3d-")
return createProxyContainer(ctx, vClusterName, vClusterNamespace, rawConfig, vRawConfig, service, localPort, timeout, "k3d-"+k3dName+"-server-0", "k3d-"+k3dName, log)
}

func minikubeProxy(ctx context.Context, vClusterName, vClusterNamespace string, rawConfig *clientcmdapi.Config, vRawConfig *clientcmdapi.Config, service *corev1.Service, localPort int, timeout time.Duration, log log.Logger) (string, error) {
if service == nil {
return "", errors.New("nil service")
}
if len(service.Spec.Ports) == 0 {
return "", fmt.Errorf("service has %d ports (expected 1 port)", len(service.Spec.Ports))
}

// check if docker driver or vm
minikubeName := rawConfig.CurrentContext
if containerExists(minikubeName) {
// see if we already have a proxy container running
server, err := getServerFromExistingProxyContainer(ctx, vClusterName, vClusterNamespace, rawConfig, vRawConfig, service, log)
if err != nil {
return "", err
} else if server != "" {
return server, nil
}

// create proxy container if missing
return createProxyContainer(ctx, vClusterName, vClusterNamespace, rawConfig, vRawConfig, service, localPort, timeout, minikubeName, minikubeName, log)
}

// in case other type of driver (e.g. VM on linux) is used
// check if the service is reacheable directly via the minikube IP
c := rawConfig.Contexts[rawConfig.CurrentContext]
if c != nil {
s := rawConfig.Clusters[c.Cluster]
if s != nil {
u, err := url.Parse(s.Server)
if err == nil {
splitted := strings.Split(u.Host, ":")
server := fmt.Sprintf("https://%s:%v", splitted[0], service.Spec.Ports[0].NodePort)

// workaround for the fact that vcluster certificate is not made valid for the node IPs
// but avoid modifying the passed config before the connection is tested
testvConfig := vRawConfig.DeepCopy()
for _, cluster := range testvConfig.Clusters {
if cluster == nil {
continue
}

cluster.CertificateAuthorityData = nil
cluster.InsecureSkipTLSVerify = true
}

// test local connection
waitErr := wait.PollUntilContextTimeout(ctx, time.Second, timeout, true, func(ctx context.Context) (bool, error) {
err = testConnectionWithServer(ctx, testvConfig, server)
if err != nil {
return false, nil
}

return true, nil
})
if waitErr != nil {
return "", fmt.Errorf("test connection: %w %w", waitErr, err)
}

// now it's safe to modify the vRawConfig struct that was passed in as a pointer
for _, cluster := range vRawConfig.Clusters {
if cluster == nil {
continue
}

cluster.CertificateAuthorityData = nil
cluster.InsecureSkipTLSVerify = true
}

return server, nil
}
}
}

return "", nil
}

func CleanupBackgroundProxy(proxyName string, log log.Logger) error {
// check if background proxy container already exists
if containerExists(proxyName) {
Expand All @@ -191,42 +60,6 @@ func CleanupBackgroundProxy(proxyName string, log log.Logger) error {
return nil
}

func cleanupProxy(vClusterName, vClusterNamespace string, rawConfig *clientcmdapi.Config, log log.Logger) error {
// construct proxy name
proxyName := find.VClusterContextName(vClusterName, vClusterNamespace, rawConfig.CurrentContext)

// check if proxy container already exists
cmd := exec.Command(
"docker",
"stop",
proxyName,
)
log.Infof("Stopping docker proxy...")
_, _ = cmd.Output()
return nil
}

func kindProxy(ctx context.Context, vClusterName, vClusterNamespace string, rawConfig *clientcmdapi.Config, vRawConfig *clientcmdapi.Config, service *corev1.Service, localPort int, timeout time.Duration, log log.Logger) (string, error) {
if service == nil {
return "", errors.New("nil service")
}
if len(service.Spec.Ports) == 0 {
return "", fmt.Errorf("service has %d ports (expected 1 port)", len(service.Spec.Ports))
}

// see if we already have a proxy container running
server, err := getServerFromExistingProxyContainer(ctx, vClusterName, vClusterNamespace, rawConfig, vRawConfig, service, log)
if err != nil {
return "", err
} else if server != "" {
return server, nil
}

// name is prefixed with kind- and suffixed with -control-plane
controlPlane := strings.TrimPrefix(rawConfig.CurrentContext, "kind-") + "-control-plane"
return createProxyContainer(ctx, vClusterName, vClusterNamespace, rawConfig, vRawConfig, service, localPort, timeout, controlPlane, "kind", log)
}

func directServiceConnection(ctx context.Context, vRawConfig *clientcmdapi.Config, service *corev1.Service, timeout time.Duration) (string, error) {
if len(service.Spec.Ports) == 0 {
return "", fmt.Errorf("service has %d ports (expected 1 port)", len(service.Spec.Ports))
Expand All @@ -250,6 +83,9 @@ func directServiceConnection(ctx context.Context, vRawConfig *clientcmdapi.Confi
}

func directConnection(ctx context.Context, vRawConfig *clientcmdapi.Config, service *corev1.Service, timeout time.Duration) (string, error) {
if service.Spec.Type != corev1.ServiceTypeNodePort {
return "", nil
}
if len(service.Spec.Ports) == 0 {
return "", fmt.Errorf("service has %d ports (expected 1 port)", len(service.Spec.Ports))
}
Expand All @@ -271,49 +107,6 @@ func directConnection(ctx context.Context, vRawConfig *clientcmdapi.Config, serv
return server, nil
}

func createProxyContainer(ctx context.Context, vClusterName, vClusterNamespace string, rawConfig *clientcmdapi.Config, vRawConfig *clientcmdapi.Config, service *corev1.Service, localPort int, timeout time.Duration, backendHost, network string, log log.Logger) (string, error) {
// construct proxy name
proxyName := find.VClusterContextName(vClusterName, vClusterNamespace, rawConfig.CurrentContext)

// in general, we need to run this statement to expose the correct port for this
// docker run -d -p LOCAL_PORT:NODE_PORT --rm -e "BACKEND_HOST=NAME-control-plane" -e "BACKEND_PORT=NODE_PORT" --network=NETWORK ghcr.io/loft-sh/docker-tcp-proxy
cmd := exec.Command(
"docker",
"run",
"-d",
"-p",
fmt.Sprintf("%v:%v", localPort, service.Spec.Ports[0].NodePort),
"--rm",
fmt.Sprintf("--name=%s", proxyName),
"-e",
fmt.Sprintf("BACKEND_HOST=%s", backendHost),
"-e",
fmt.Sprintf("BACKEND_PORT=%v", service.Spec.Ports[0].NodePort),
fmt.Sprintf("--network=%s", network),
"ghcr.io/loft-sh/docker-tcp-proxy",
)
log.Infof("Starting proxy container...")
out, err := cmd.Output()
if err != nil {
return "", errors.Errorf("error starting kind proxy: %s %v", string(out), err)
}

server := fmt.Sprintf("https://127.0.0.1:%v", localPort)
waitErr := wait.PollUntilContextTimeout(ctx, time.Second, timeout, true, func(ctx context.Context) (bool, error) {
err = testConnectionWithServer(ctx, vRawConfig, server)
if err != nil {
return false, nil
}

return true, nil
})
if waitErr != nil {
return "", fmt.Errorf("test connection: %w %w", waitErr, err)
}

return server, nil
}

func CreateBackgroundProxyContainer(ctx context.Context, vClusterName, vClusterNamespace string, rawConfig clientcmd.ClientConfig, vRawConfig *clientcmdapi.Config, localPort int, log log.Logger) (string, error) {
rawConfigObj, err := rawConfig.RawConfig()
if err != nil {
Expand Down Expand Up @@ -428,52 +221,6 @@ func testConnectionWithServer(ctx context.Context, vRawConfig *clientcmdapi.Conf
return nil
}

func getServerFromExistingProxyContainer(ctx context.Context, vClusterName, vClusterNamespace string, rawConfig *clientcmdapi.Config, vRawConfig *clientcmdapi.Config, service *corev1.Service, log log.Logger) (string, error) {
// construct proxy name
proxyName := find.VClusterContextName(vClusterName, vClusterNamespace, rawConfig.CurrentContext)

// check if proxy container already exists
cmd := exec.Command(
"docker",
"inspect",
proxyName,
"-f",
fmt.Sprintf("{{ index (index (index .HostConfig.PortBindings \"%v/tcp\") 0) \"HostPort\" }}", service.Spec.Ports[0].NodePort),
)
out, err := cmd.Output()
if err == nil {
localPort, err := strconv.Atoi(strings.TrimSpace(string(out)))
if err == nil && localPort != 0 {
server := fmt.Sprintf("https://127.0.0.1:%v", localPort)
waitErr := wait.PollUntilContextTimeout(ctx, time.Second, time.Second*5, true, func(ctx context.Context) (bool, error) {
err = testConnectionWithServer(ctx, vRawConfig, server)
if err != nil {
return false, nil
}

return true, nil
})
if waitErr != nil {
// return err here as waitErr is only timed out
return "", errors.Wrap(err, "test connection")
}

return server, nil
}
} else {
log.Debugf("Error running docker inspect with go template: %v", err)
}

if containerExists(proxyName) {
err := cleanupProxy(vClusterName, vClusterNamespace, rawConfig, log)
if err != nil {
return "", err
}
}

return "", nil
}

func containerExists(containerName string) bool {
cmd := exec.Command(
"docker",
Expand Down
5 changes: 3 additions & 2 deletions test/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,9 @@ func CreateFramework(ctx context.Context, scheme *runtime.Scheme) error {
Debug: true,
},
ConnectOptions: cli.ConnectOptions{
KubeConfig: vKubeconfigFile.Name(),
LocalPort: 14550, // choosing a port that usually should be unused
KubeConfig: vKubeconfigFile.Name(),
LocalPort: 14550, // choosing a port that usually should be unused
BackgroundProxy: true,
},
}
err = connectCmd.Run(ctx, []string{name})
Expand Down
Loading