diff --git a/controllers/agentaction_controller_test.go b/controllers/agentaction_controller_test.go index cddb80ba..4f7ab4e5 100644 --- a/controllers/agentaction_controller_test.go +++ b/controllers/agentaction_controller_test.go @@ -26,7 +26,7 @@ func TestPorterResourceStatus_ApplyAgentAction(t *testing.T) { tests := []struct { name string action *porterv1.AgentAction - resource porterResource + resource PorterResource wantStatus porterv1.PorterResourceStatus }{ { diff --git a/controllers/porter_resource.go b/controllers/porter_resource.go index acc69c18..c331e3f4 100644 --- a/controllers/porter_resource.go +++ b/controllers/porter_resource.go @@ -19,7 +19,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" ) -type porterResource interface { +type PorterResource interface { client.Object GetStatus() porterv1.PorterResourceStatus SetStatus(value porterv1.PorterResourceStatus) @@ -57,7 +57,7 @@ func PatchObjectWithRetry(ctx context.Context, log logr.Logger, clnt client.Clie } } -func applyAgentAction(log logr.Logger, resource porterResource, action *porterv1.AgentAction) { +func applyAgentAction(log logr.Logger, resource PorterResource, action *porterv1.AgentAction) { log.V(Log5Trace).Info(fmt.Sprintf("Syncing AgentAction status with %s", resource.GetObjectKind().GroupVersionKind().Kind)) status := resource.GetStatus() status.ObservedGeneration = resource.GetGeneration() @@ -88,18 +88,18 @@ func applyAgentAction(log logr.Logger, resource porterResource, action *porterv1 } // this is our kubectl delete check -func isDeleted(resource porterResource) bool { +func isDeleted(resource PorterResource) bool { timestamp := resource.GetDeletionTimestamp() return timestamp != nil && !timestamp.IsZero() } // ensure delete action is completed before delete -func isDeleteProcessed(resource porterResource) bool { +func isDeleteProcessed(resource PorterResource) bool { status := resource.GetStatus() return isDeleted(resource) && apimeta.IsStatusConditionTrue(status.Conditions, string(porterv1.ConditionComplete)) } -func isFinalizerSet(resource porterResource) bool { +func isFinalizerSet(resource PorterResource) bool { for _, finalizer := range resource.GetFinalizers() { if finalizer == porterv1.FinalizerName { return true @@ -109,7 +109,7 @@ func isFinalizerSet(resource porterResource) bool { } // ensureFinalizerSet sets a finalizer on the resource and saves it, if necessary. -func ensureFinalizerSet(ctx context.Context, log logr.Logger, client client.Client, resource porterResource) (updated bool, err error) { +func ensureFinalizerSet(ctx context.Context, log logr.Logger, client client.Client, resource PorterResource) (updated bool, err error) { // Ensure all resources have a finalizer to we can react when they are deleted if !isDeleted(resource) { // The object is not being deleted, so if it does not have our finalizer, diff --git a/magefiles/magefile.go b/magefiles/magefile.go index 4c611621..536213a6 100644 --- a/magefiles/magefile.go +++ b/magefiles/magefile.go @@ -54,16 +54,18 @@ const ( registryContainer = "registry" // Porter home for running commands - porterVersion = "v1.0.0-alpha.19" + porterVersion = "v1.0.0-alpha.20" ) var srcDirs = []string{"api", "config", "controllers", "installer", "installer-olm"} var binDir = "bin" -//TODO: sort out getting k8s plugin into porter-agent -var localAgentImgRepository = "localhost:5000/porter-agent-kubernetes" -var localAgentImgVersion = "canary-dev" -var localAgentImgName = fmt.Sprintf("%s:%s", localAgentImgRepository, localAgentImgVersion) +// Porter agent that has k8s plugin included +var porterAgentImgRepository = "ghcr.io/getporter/dev/porter-agent-kubernetes" +var porterAgentImgVersion = "v1.0.0-alpha.20" + +// Local porter agent image name to use for local testing +var localAgentImgName = "localhost:5000/porter-agent:canary-dev" // Build a command that stops the build on if the command fails var must = shx.CommandBuilder{StopOnError: true} @@ -236,9 +238,9 @@ func Publish() { // Push the porter-operator bundle to a registry. Defaults to the local test registry. func PublishBundle() { mg.SerialDeps(PublishImages, BuildBundle) - buildPorterCmd("publish", "--registry", Env.Registry, "-f=porter.yaml").In("installer").Must().RunV() - meta := releases.LoadMetadata() + buildPorterCmd("publish", "--registry", Env.Registry, "-f=porter.yaml", "--tag", meta.Version).In("installer").Must().RunV() + buildPorterCmd("publish", "--registry", Env.Registry, "-f=porter.yaml", "--tag", meta.Permalink).In("installer").Must().RunV() } @@ -318,13 +320,13 @@ func TestIntegration() { kubectl("delete", "deployment", "porter-operator-controller-manager", "-n=porter-operator-system").RunV() if os.Getenv("PORTER_AGENT_REPOSITORY") != "" && os.Getenv("PORTER_AGENT_VERSION") != "" { - localAgentImgRepository = os.Getenv("PORTER_AGENT_REPOSITORY") - localAgentImgVersion = os.Getenv("PORTER_AGENT_VERSION") + porterAgentImgRepository = os.Getenv("PORTER_AGENT_REPOSITORY") + porterAgentImgVersion = os.Getenv("PORTER_AGENT_VERSION") } //"-p", "-nodes", "4", must.Command("ginkgo").Args("-v", "-tags=integration", "./tests/integration/...", "-coverprofile=coverage-integration.out"). - Env(fmt.Sprintf("PORTER_AGENT_REPOSITORY=%s", localAgentImgRepository), - fmt.Sprintf("PORTER_AGENT_VERSION=%s", localAgentImgVersion), + Env(fmt.Sprintf("PORTER_AGENT_REPOSITORY=%s", porterAgentImgRepository), + fmt.Sprintf("PORTER_AGENT_VERSION=%s", porterAgentImgVersion), "ACK_GINKGO_DEPRECATIONS=1.16.5", "ACK_GINKGO_RC=true", fmt.Sprintf("KUBECONFIG=%s/kind.config", pwd())).RunV() @@ -361,7 +363,6 @@ func Deploy() { } meta := releases.LoadMetadata() if rebuild { - PublishLocalPorterAgent() PublishBundle() buildPorterCmd("credentials", "apply", "hack/creds.yaml", "-n=operator").Must().RunV() } @@ -470,7 +471,6 @@ func SetupNamespace(name string) { // Only specify the parameter set we have the env vars set // It would be neat if Porter could handle this for us - PublishLocalPorterAgent() buildPorterCmd("parameters", "apply", "./hack/params.yaml", "-n=operator").RunV() ps := "" if os.Getenv("PORTER_AGENT_REPOSITORY") != "" && os.Getenv("PORTER_AGENT_VERSION") != "" { @@ -634,20 +634,22 @@ func pwd() string { return wd } +func ensurePorterAt() { + porter.EnsurePorterAt(porterVersion) +} + // Run porter using the local storage, not the in-cluster storage func buildPorterCmd(args ...string) shx.PreparedCommand { - mg.SerialDeps(porter.UseBinForPorterHome, porter.EnsurePorter) + mg.SerialDeps(porter.UseBinForPorterHome, ensurePorterAt) return must.Command(filepath.Join(pwd(), "bin/porter")).Args(args...). Env("PORTER_DEFAULT_STORAGE=", "PORTER_DEFAULT_STORAGE_PLUGIN=mongodb-docker", fmt.Sprintf("PORTER_HOME=%s", filepath.Join(pwd(), "bin"))) - //mg.Deps(EnsureLocalPorter) - // return shx.Command(filepath.Join(GetPorterHome(), "porter")).Args(args...). - // Env("PORTER_DEFAULT_STORAGE=", "PORTER_DEFAULT_STORAGE_PLUGIN=mongodb-docker") } func BuildLocalPorterAgent() { - mg.SerialDeps(porter.UseBinForPorterHome, porter.EnsurePorter, getPlugins, getMixins) + mg.SerialDeps(porter.UseBinForPorterHome, ensurePorterAt) + mg.SerialDeps(getPlugins, getMixins) porterRegistry := "ghcr.io/getporter" buildImage := func(img string) error { _, err := shx.Output("docker", "build", "-t", img, diff --git a/tests/integration/credset_test.go b/tests/integration/credset_test.go index defd17ba..44ed2a86 100644 --- a/tests/integration/credset_test.go +++ b/tests/integration/credset_test.go @@ -3,26 +3,16 @@ package integration_test import ( - "bytes" "context" "fmt" - "io" - "strings" - "time" "get.porter.sh/operator/controllers" - "github.com/carolynvs/magex/shx" "github.com/go-logr/logr" . "github.com/onsi/ginkgo" - "github.com/pkg/errors" "github.com/tidwall/gjson" - batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" apitypes "k8s.io/apimachinery/pkg/types" - cl "k8s.io/client-go/kubernetes" "sigs.k8s.io/controller-runtime/pkg/client" porterv1 "get.porter.sh/operator/api/v1" @@ -51,18 +41,18 @@ var _ = Describe("CredentialSet create", func() { cs.Spec.Namespace = ns Expect(k8sClient.Create(ctx, cs)).Should(Succeed()) - Expect(waitForPorterCS(ctx, cs, "waiting for credential set to apply")).Should(Succeed()) - validateCredSetConditions(cs) + Expect(waitForPorter(ctx, cs, 1, "waiting for credential set to apply")).Should(Succeed()) + validateResourceConditions(cs) Log("install porter-test-me bundle with new credset") inst := NewTestInstallation("cs-with-secret") inst.ObjectMeta.Namespace = ns inst.Spec.Namespace = ns inst.Spec.CredentialSets = append(inst.Spec.CredentialSets, name) - inst.Spec.SchemaVersion = "1.0.0" + inst.Spec.SchemaVersion = "1.0.1" Expect(k8sClient.Create(ctx, inst)).Should(Succeed()) - Expect(waitForPorter(ctx, inst, "waiting for porter-test-me to install")).Should(Succeed()) - validateInstallationConditions(inst) + Expect(waitForPorter(ctx, inst, 1, "waiting for porter-test-me to install")).Should(Succeed()) + validateResourceConditions(inst) // Validate that the correct credential set was used by the installation jsonOut := runAgentAction(ctx, "show-outputs", ns, []string{"installation", "outputs", "list", "-n", ns, "-i", inst.Spec.Name, "-o", "json"}) @@ -92,8 +82,8 @@ var _ = Describe("CredentialSet secret does not exist", func() { cs.Spec.Namespace = ns Expect(k8sClient.Create(ctx, cs)).Should(Succeed()) - Expect(waitForPorterCS(ctx, cs, "waiting for credential set to apply")).Should(Succeed()) - validateCredSetConditions(cs) + Expect(waitForPorter(ctx, cs, 1, "waiting for credential set to apply")).Should(Succeed()) + validateResourceConditions(cs) }) By("failing the installation install", func() { @@ -102,11 +92,11 @@ var _ = Describe("CredentialSet secret does not exist", func() { inst.ObjectMeta.Namespace = ns inst.Spec.Namespace = ns inst.Spec.CredentialSets = append(inst.Spec.CredentialSets, name) - inst.Spec.SchemaVersion = "1.0.0" + inst.Spec.SchemaVersion = "1.0.1" Expect(k8sClient.Create(ctx, inst)).Should(Succeed()) - err := waitForPorter(ctx, inst, "waiting for porter-test-me to install") + err := waitForPorter(ctx, inst, 1, "waiting for porter-test-me to install") Expect(err).Should(HaveOccurred()) - validateInstallationConditions(inst) + validateResourceConditions(inst) Expect(inst.Status.Phase).To(Equal(porterv1.PhaseFailed)) }) }) @@ -136,8 +126,8 @@ var _ = Describe("CredentialSet update", func() { cs.Spec.Namespace = ns Expect(k8sClient.Create(ctx, cs)).Should(Succeed()) - Expect(waitForPorterCS(ctx, cs, "waiting for credential set to apply")).Should(Succeed()) - validateCredSetConditions(cs) + Expect(waitForPorter(ctx, cs, 1, "waiting for credential set to apply")).Should(Succeed()) + validateResourceConditions(cs) Log("verify it's created") jsonOut := runAgentAction(ctx, "create-check-credentials-list", ns, []string{"credentials", "list", "-n", ns, "-o", "json"}) @@ -165,7 +155,7 @@ var _ = Describe("CredentialSet update", func() { }) } patchCS(cs) - Expect(waitForPorterCS(ctx, cs, "waiting for credential update to apply")).Should(Succeed()) + Expect(waitForPorter(ctx, cs, 2, "waiting for credential update to apply")).Should(Succeed()) Log("verify it's updated") jsonOut = runAgentAction(ctx, "update-check-credentials-list", ns, []string{"credentials", "list", "-n", ns, "-o", "json"}) updatedFirstName := gjson.Get(jsonOut, "0.name").String() @@ -211,8 +201,8 @@ var _ = Describe("CredentialSet delete", func() { cs.Spec.Namespace = ns Expect(k8sClient.Create(ctx, cs)).Should(Succeed()) - Expect(waitForPorterCS(ctx, cs, "waiting for credential set to apply")).Should(Succeed()) - validateCredSetConditions(cs) + Expect(waitForPorter(ctx, cs, 1, "waiting for credential set to apply")).Should(Succeed()) + validateResourceConditions(cs) Log("verify it's created") jsonOut := runAgentAction(ctx, "create-check-credentials-list", ns, []string{"credentials", "list", "-n", ns, "-o", "json"}) @@ -225,7 +215,7 @@ var _ = Describe("CredentialSet delete", func() { Log("delete a credential set") Expect(k8sClient.Delete(ctx, cs)).Should(Succeed()) - Expect(waitForCredSetDeleted(ctx, cs)).Should(Succeed()) + Expect(waitForResourceDeleted(ctx, cs)).Should(Succeed()) Log("verify credential set is gone from porter data store") delJsonOut := runAgentAction(ctx, "delete-check-credentials-list", ns, []string{"credentials", "list", "-n", ns, "-o", "json"}) @@ -311,207 +301,3 @@ func newAgentAction(namespace string, name string, cmd []string) *porterv1.Agent }, } } - -func waitForAgentAction(ctx context.Context, aa *porterv1.AgentAction, msg string) error { - Log("%s: %s/%s", msg, aa.Namespace, aa.Name) - key := client.ObjectKey{Namespace: aa.Namespace, Name: aa.Name} - ctx, cancel := context.WithTimeout(ctx, getWaitTimeout()) - defer cancel() - for { - select { - case <-ctx.Done(): - Fail(errors.Wrapf(ctx.Err(), "timeout %s", msg).Error()) - default: - err := k8sClient.Get(ctx, key, aa) - if err != nil { - // There is lag between creating and being able to retrieve, I don't understand why - if apierrors.IsNotFound(err) { - time.Sleep(time.Second) - continue - } - return err - } - - // Check if the latest change has been processed - if aa.Generation == aa.Status.ObservedGeneration { - if apimeta.IsStatusConditionTrue(aa.Status.Conditions, string(porterv1.ConditionComplete)) { - return nil - } - - if apimeta.IsStatusConditionTrue(aa.Status.Conditions, string(porterv1.ConditionFailed)) { - // Grab some extra info to help with debugging - //debugFailedCSCreate(ctx, aa) - return errors.New("porter did not run successfully") - } - } - - time.Sleep(time.Second) - continue - } - } - -} - -func waitForPorterCS(ctx context.Context, cs *porterv1.CredentialSet, msg string) error { - Log("%s: %s/%s", msg, cs.Namespace, cs.Name) - key := client.ObjectKey{Namespace: cs.Namespace, Name: cs.Name} - ctx, cancel := context.WithTimeout(ctx, getWaitTimeout()) - defer cancel() - for { - select { - case <-ctx.Done(): - Fail(errors.Wrapf(ctx.Err(), "timeout %s", msg).Error()) - default: - err := k8sClient.Get(ctx, key, cs) - if err != nil { - // There is lag between creating and being able to retrieve, I don't understand why - if apierrors.IsNotFound(err) { - time.Sleep(time.Second) - continue - } - return err - } - - // Check if the latest change has been processed - if cs.Generation == cs.Status.ObservedGeneration { - if apimeta.IsStatusConditionTrue(cs.Status.Conditions, string(porterv1.ConditionComplete)) { - time.Sleep(time.Second) - return nil - } - - if apimeta.IsStatusConditionTrue(cs.Status.Conditions, string(porterv1.ConditionFailed)) { - // Grab some extra info to help with debugging - debugFailedCSCreate(ctx, cs) - return errors.New("porter did not run successfully") - } - } - - time.Sleep(time.Second) - continue - } - } -} - -func waitForCredSetDeleted(ctx context.Context, cs *porterv1.CredentialSet) error { - Log("Waiting for CredentialSet to finish deleting: %s/%s", cs.Namespace, cs.Name) - key := client.ObjectKey{Namespace: cs.Namespace, Name: cs.Name} - waitCtx, cancel := context.WithTimeout(ctx, getWaitTimeout()) - defer cancel() - for { - select { - case <-waitCtx.Done(): - Fail(errors.Wrap(waitCtx.Err(), "timeout waiting for CredentialSet to delete").Error()) - default: - err := k8sClient.Get(ctx, key, cs) - if err != nil { - if apierrors.IsNotFound(err) { - return nil - } - return err - } - - time.Sleep(time.Second) - continue - } - } -} - -func debugFailedCSCreate(ctx context.Context, cs *porterv1.CredentialSet) { - Log("DEBUG: ----------------------------------------------------") - actionKey := client.ObjectKey{Name: cs.Status.Action.Name, Namespace: cs.Namespace} - action := &porterv1.AgentAction{} - if err := k8sClient.Get(ctx, actionKey, action); err != nil { - Log(errors.Wrap(err, "could not retrieve the CredentialSet's AgentAction to troubleshoot").Error()) - return - } - - jobKey := client.ObjectKey{Name: action.Status.Job.Name, Namespace: action.Namespace} - job := &batchv1.Job{} - if err := k8sClient.Get(ctx, jobKey, job); err != nil { - Log(errors.Wrap(err, "could not retrieve the CredentialSet's Job to troubleshoot").Error()) - return - } - - shx.Command("kubectl", "logs", "-n="+job.Namespace, "job/"+job.Name). - Env("KUBECONFIG=" + "../../kind.config").RunV() - Log("DEBUG: ----------------------------------------------------") -} - -func validateCredSetConditions(cs *porterv1.CredentialSet) { - // Checks that all expected conditions are set - Expect(apimeta.IsStatusConditionTrue(cs.Status.Conditions, string(porterv1.ConditionScheduled))) - Expect(apimeta.IsStatusConditionTrue(cs.Status.Conditions, string(porterv1.ConditionStarted))) - Expect(apimeta.IsStatusConditionTrue(cs.Status.Conditions, string(porterv1.ConditionComplete))) -} - -// Get the pod logs associated to the job created by the agent action -func getAgentActionJobOutput(ctx context.Context, agentActionName string, namespace string) (string, error) { - actionKey := client.ObjectKey{Name: agentActionName, Namespace: namespace} - action := &porterv1.AgentAction{} - if err := k8sClient.Get(ctx, actionKey, action); err != nil { - Log(errors.Wrap(err, "could not retrieve the CredentialSet's AgentAction to troubleshoot").Error()) - return "", err - } - // Find the job associated with the agent action - jobKey := client.ObjectKey{Name: action.Status.Job.Name, Namespace: action.Namespace} - job := &batchv1.Job{} - if err := k8sClient.Get(ctx, jobKey, job); err != nil { - Log(errors.Wrap(err, "could not retrieve the Job to troubleshoot").Error()) - return "", err - } - // Create a new k8s client that's use for fetching pod logs. This is not implemented on the controller-runtime client - c, err := cl.NewForConfig(testEnv.Config) - if err != nil { - Log(err.Error()) - return "", err - } - selector, err := metav1.LabelSelectorAsSelector(job.Spec.Selector) - if err != nil { - Log(errors.Wrap(err, "could not retrieve label selector for job").Error()) - return "", err - } - // Get the pod associated with the job. There should only be 1 - pods, err := c.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: selector.String()}) - if err != nil { - Log(errors.Wrap(err, "could not retrive pod list for job").Error()) - return "", err - } - if len(pods.Items) != 1 { - Log(fmt.Sprintf("too many pods associated to agent action job. Expected 1 found %v", len(pods.Items))) - return "", err - } - podLogOpts := corev1.PodLogOptions{} - // Fetch the pod logs - req := c.CoreV1().Pods(namespace).GetLogs(pods.Items[0].Name, &podLogOpts) - podLogs, err := req.Stream(ctx) - if err != nil { - Log(errors.Wrap(err, "could not stream pod logs").Error()) - return "", err - } - defer podLogs.Close() - buf := new(bytes.Buffer) - _, err = io.Copy(buf, podLogs) - if err != nil { - Log(errors.Wrap(err, "could not copy pod logs to byte buffer").Error()) - return "", err - } - outputLog := buf.String() - return outputLog, nil -} - -func getAgentActionCmdOut(action *porterv1.AgentAction, aaOut string) string { - return strings.SplitAfterN(strings.Replace(aaOut, "\n", "", -1), strings.Join(action.Spec.Args, " "), 2)[1] -} - -/* Fully execute an agent action and return the associated result of the command executed. For example an agent action -that does "porter credentials list" will return just the result of the porter command from the job logs. This can be -used to run porter commands inside the cluster to validate porter state -*/ -func runAgentAction(ctx context.Context, actionName string, namespace string, cmd []string) string { - aa := newAgentAction(namespace, actionName, cmd) - Expect(k8sClient.Create(ctx, aa)).Should(Succeed()) - Expect(waitForAgentAction(ctx, aa, fmt.Sprintf("waiting for action %s to run", actionName))).Should(Succeed()) - aaOut, err := getAgentActionJobOutput(ctx, aa.Name, namespace) - Expect(err).Error().ShouldNot(HaveOccurred()) - return getAgentActionCmdOut(aa, aaOut) -} diff --git a/tests/integration/installation_test.go b/tests/integration/installation_test.go index 3cbf97fc..47f4199f 100644 --- a/tests/integration/installation_test.go +++ b/tests/integration/installation_test.go @@ -4,21 +4,13 @@ package integration_test import ( "context" - "fmt" - "os" "time" porterv1 "get.porter.sh/operator/api/v1" "get.porter.sh/operator/controllers" - "github.com/carolynvs/magex/shx" "github.com/go-logr/logr" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/pkg/errors" - "github.com/tidwall/pretty" - batchv1 "k8s.io/api/batch/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -45,7 +37,7 @@ var _ = Describe("Installation Lifecycle", func() { Namespace: ns, }, Spec: porterv1.InstallationSpec{ - SchemaVersion: "1.0.0", + SchemaVersion: "1.0.1", Name: "hello", Namespace: "operator-tests", Bundle: porterv1.OCIReferenceParts{ @@ -55,8 +47,8 @@ var _ = Describe("Installation Lifecycle", func() { }, } Expect(k8sClient.Create(ctx, inst)).Should(Succeed()) - Expect(waitForPorter(ctx, inst, "waiting for the bundle to install")).Should(Succeed()) - validateInstallationConditions(inst) + Expect(waitForPorter(ctx, inst, 1, "waiting for the bundle to install")).Should(Succeed()) + validateResourceConditions(inst) patchInstallation := func(inst *porterv1.Installation) { controllers.PatchObjectWithRetry(ctx, logr.Discard(), k8sClient, k8sClient.Patch, inst, func() client.Object { @@ -67,131 +59,18 @@ var _ = Describe("Installation Lifecycle", func() { Log("upgrade the installation") inst.Spec.Parameters = runtime.RawExtension{Raw: []byte(`{"name": "operator"}`)} patchInstallation(inst) - Expect(waitForPorter(ctx, inst, "waiting for the bundle to upgrade")).Should(Succeed()) - validateInstallationConditions(inst) + Expect(waitForPorter(ctx, inst, 2, "waiting for the bundle to upgrade")).Should(Succeed()) + validateResourceConditions(inst) Log("uninstall the installation") inst.Spec.Uninstalled = true patchInstallation(inst) - Expect(waitForPorter(ctx, inst, "waiting for the bundle to uninstall")).Should(Succeed()) - validateInstallationConditions(inst) + Expect(waitForPorter(ctx, inst, 3, "waiting for the bundle to uninstall")).Should(Succeed()) + validateResourceConditions(inst) Log("delete the installation") Expect(k8sClient.Delete(ctx, inst)).Should(Succeed()) - Expect(waitForInstallationDeleted(ctx, inst)).Should(Succeed()) + Expect(waitForResourceDeleted(ctx, inst)).Should(Succeed()) }) }) }) - -func waitForPorter(ctx context.Context, inst *porterv1.Installation, msg string) error { - Log("%s: %s/%s", msg, inst.Namespace, inst.Name) - key := client.ObjectKey{Namespace: inst.Namespace, Name: inst.Name} - ctx, cancel := context.WithTimeout(ctx, getWaitTimeout()) - defer cancel() - for { - select { - case <-ctx.Done(): - Fail(errors.Wrapf(ctx.Err(), "timeout %s", msg).Error()) - default: - err := k8sClient.Get(ctx, key, inst) - if err != nil { - // There is lag between creating and being able to retrieve, I don't understand why - if apierrors.IsNotFound(err) { - time.Sleep(time.Second) - continue - } - return err - } - - // Check if the latest change has been processed - if inst.Generation == inst.Status.ObservedGeneration { - if apimeta.IsStatusConditionTrue(inst.Status.Conditions, string(porterv1.ConditionComplete)) { - return nil - } - - if apimeta.IsStatusConditionTrue(inst.Status.Conditions, string(porterv1.ConditionFailed)) { - // Grab some extra info to help with debugging - debugFailedInstallation(ctx, inst) - return errors.New("porter did not run successfully") - } - } - - time.Sleep(time.Second) - continue - } - } -} - -func debugFailedInstallation(ctx context.Context, inst *porterv1.Installation) { - Log("DEBUG: ----------------------------------------------------") - actionKey := client.ObjectKey{Name: inst.Status.Action.Name, Namespace: inst.Namespace} - action := &porterv1.AgentAction{} - if err := k8sClient.Get(ctx, actionKey, action); err != nil { - Log(errors.Wrap(err, "could not retrieve the Installation's AgentAction to troubleshoot").Error()) - return - } - - jobKey := client.ObjectKey{Name: action.Status.Job.Name, Namespace: action.Namespace} - job := &batchv1.Job{} - if err := k8sClient.Get(ctx, jobKey, job); err != nil { - Log(errors.Wrap(err, "could not retrieve the Installation's Job to troubleshoot").Error()) - return - } - - shx.Command("kubectl", "logs", "-n="+job.Namespace, "job/"+job.Name). - Env("KUBECONFIG=" + "../../kind.config").RunV() - Log("DEBUG: ----------------------------------------------------") -} - -// Get the amount of time that we should wait for a test action to be processed. -func getWaitTimeout() time.Duration { - if value := os.Getenv("PORTER_TEST_WAIT_TIMEOUT"); value != "" { - timeout, err := time.ParseDuration(value) - if err != nil { - fmt.Printf("WARNING: An invalid value, %q, was set for PORTER_TEST_WAIT_TIMEOUT environment variable. The format should be a Go time duration such as 30s or 1m. Ignoring and using the default instead", value) - return defaultWaitTimeout - } - - return timeout - } - return defaultWaitTimeout -} - -func waitForInstallationDeleted(ctx context.Context, inst *porterv1.Installation) error { - Log("Waiting for installation to finish deleting: %s/%s", inst.Namespace, inst.Name) - key := client.ObjectKey{Namespace: inst.Namespace, Name: inst.Name} - waitCtx, cancel := context.WithTimeout(ctx, getWaitTimeout()) - defer cancel() - for { - select { - case <-waitCtx.Done(): - Fail(errors.Wrap(waitCtx.Err(), "timeout waiting for installation to delete").Error()) - default: - err := k8sClient.Get(ctx, key, inst) - if err != nil { - if apierrors.IsNotFound(err) { - return nil - } - return err - } - - time.Sleep(time.Second) - continue - } - } -} - -func validateInstallationConditions(inst *porterv1.Installation) { - // Checks that all expected conditions are set - Expect(apimeta.IsStatusConditionTrue(inst.Status.Conditions, string(porterv1.ConditionScheduled))) - Expect(apimeta.IsStatusConditionTrue(inst.Status.Conditions, string(porterv1.ConditionStarted))) - Expect(apimeta.IsStatusConditionTrue(inst.Status.Conditions, string(porterv1.ConditionComplete))) -} - -func Log(value string, args ...interface{}) { - GinkgoWriter.Write([]byte(fmt.Sprintf(value+"\n", args...))) -} - -func LogJson(value string) { - GinkgoWriter.Write(pretty.Pretty([]byte(value + "\n"))) -} diff --git a/tests/integration/paramset_test.go b/tests/integration/paramset_test.go index cdd00ccb..595b6b10 100644 --- a/tests/integration/paramset_test.go +++ b/tests/integration/paramset_test.go @@ -5,17 +5,11 @@ package integration_test import ( "context" "fmt" - "time" "get.porter.sh/operator/controllers" - "github.com/carolynvs/magex/shx" "github.com/go-logr/logr" . "github.com/onsi/ginkgo" - "github.com/pkg/errors" "github.com/tidwall/gjson" - batchv1 "k8s.io/api/batch/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" apitypes "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" @@ -49,8 +43,8 @@ var _ = Describe("ParameterSet lifecycle", func() { ps.Spec.Namespace = ns Expect(k8sClient.Create(ctx, ps)).Should(Succeed()) - Expect(waitForPorterPS(ctx, ps, "waiting for parameter set to apply")).Should(Succeed()) - validateParamSetConditions(ps) + Expect(waitForPorter(ctx, ps, 1, "waiting for parameter set to apply")).Should(Succeed()) + validateResourceConditions(ps) Log("verify it's created") jsonOut := runAgentAction(ctx, "create-check-parameters-list", ns, []string{"parameters", "list", "-n", ns, "-o", "json"}) @@ -68,10 +62,10 @@ var _ = Describe("ParameterSet lifecycle", func() { inst.ObjectMeta.Namespace = ns inst.Spec.Namespace = ns inst.Spec.ParameterSets = append(inst.Spec.ParameterSets, pSetName) - inst.Spec.SchemaVersion = "1.0.0" + inst.Spec.SchemaVersion = "1.0.1" Expect(k8sClient.Create(ctx, inst)).Should(Succeed()) - Expect(waitForPorter(ctx, inst, "waiting for porter-test-me to install")).Should(Succeed()) - validateInstallationConditions(inst) + Expect(waitForPorter(ctx, inst, 1, "waiting for porter-test-me to install")).Should(Succeed()) + validateResourceConditions(inst) // Validate that the correct parameter set was used by the installation instJsonOut := runAgentAction(ctx, "show-outputs", ns, []string{"installation", "outputs", "list", "-n", ns, "-i", inst.Spec.Name, "-o", "json"}) @@ -92,9 +86,10 @@ var _ = Describe("ParameterSet lifecycle", func() { controllers.PatchObjectWithRetry(ctx, logr.Discard(), k8sClient, k8sClient.Patch, ps, func() client.Object { return &porterv1.ParameterSet{} }) + // Wait for the patch to apply, this can cause race conditions } patchPS(ps) - Expect(waitForPorterPS(ctx, ps, "waiting for parameters update to apply")).Should(Succeed()) + Expect(waitForPorter(ctx, ps, 2, "waiting for parameters update to apply")).Should(Succeed()) Log("verify it's updated") jsonOut = runAgentAction(ctx, "update-check-parameters-list", ns, []string{"parameters", "list", "-n", ns, "-o", "json"}) updatedFirstName := gjson.Get(jsonOut, "0.name").String() @@ -110,7 +105,7 @@ var _ = Describe("ParameterSet lifecycle", func() { Log("delete a parameter set") Expect(k8sClient.Delete(ctx, ps)).Should(Succeed()) - Expect(waitForParamSetDeleted(ctx, ps)).Should(Succeed()) + Expect(waitForResourceDeleted(ctx, ps)).Should(Succeed()) Log("verify parameter set is gone from porter data store") delJsonOut := runAgentAction(ctx, "delete-check-parameters-list", ns, []string{"parameters", "list", "-n", ns, "-o", "json"}) @@ -145,95 +140,3 @@ func NewTestParamSet(psName string) *porterv1.ParameterSet { } return ps } - -func waitForPorterPS(ctx context.Context, ps *porterv1.ParameterSet, msg string) error { - Log("%s: %s/%s", msg, ps.Namespace, ps.Name) - key := client.ObjectKey{Namespace: ps.Namespace, Name: ps.Name} - ctx, cancel := context.WithTimeout(ctx, getWaitTimeout()) - defer cancel() - for { - select { - case <-ctx.Done(): - Fail(errors.Wrapf(ctx.Err(), "timeout %s", msg).Error()) - default: - err := k8sClient.Get(ctx, key, ps) - if err != nil { - // There is lag between creating and being able to retrieve, I don't understand why - if apierrors.IsNotFound(err) { - time.Sleep(time.Second) - continue - } - return err - } - - // Check if the latest change has been processed - if ps.Generation == ps.Status.ObservedGeneration { - if apimeta.IsStatusConditionTrue(ps.Status.Conditions, string(porterv1.ConditionComplete)) { - time.Sleep(time.Second) - return nil - } - - if apimeta.IsStatusConditionTrue(ps.Status.Conditions, string(porterv1.ConditionFailed)) { - // Grab some extra info to help with debugging - debugFailedPSCreate(ctx, ps) - return errors.New("porter did not run successfully") - } - } - - time.Sleep(time.Second) - continue - } - } -} - -func waitForParamSetDeleted(ctx context.Context, ps *porterv1.ParameterSet) error { - Log("Waiting for ParameterSet to finish deleting: %s/%s", ps.Namespace, ps.Name) - key := client.ObjectKey{Namespace: ps.Namespace, Name: ps.Name} - waitCtx, cancel := context.WithTimeout(ctx, getWaitTimeout()) - defer cancel() - for { - select { - case <-waitCtx.Done(): - Fail(errors.Wrap(waitCtx.Err(), "timeout waiting for ParameterSet to delete").Error()) - default: - err := k8sClient.Get(ctx, key, ps) - if err != nil { - if apierrors.IsNotFound(err) { - return nil - } - return err - } - - time.Sleep(time.Second) - continue - } - } -} - -func debugFailedPSCreate(ctx context.Context, ps *porterv1.ParameterSet) { - Log("DEBUG: ----------------------------------------------------") - actionKey := client.ObjectKey{Name: ps.Status.Action.Name, Namespace: ps.Namespace} - action := &porterv1.AgentAction{} - if err := k8sClient.Get(ctx, actionKey, action); err != nil { - Log(errors.Wrap(err, "could not retrieve the ParameterSet's AgentAction to troubleshoot").Error()) - return - } - - jobKey := client.ObjectKey{Name: action.Status.Job.Name, Namespace: action.Namespace} - job := &batchv1.Job{} - if err := k8sClient.Get(ctx, jobKey, job); err != nil { - Log(errors.Wrap(err, "could not retrieve the ParameterSet's Job to troubleshoot").Error()) - return - } - - shx.Command("kubectl", "logs", "-n="+job.Namespace, "job/"+job.Name). - Env("KUBECONFIG=" + "../../kind.config").RunV() - Log("DEBUG: ----------------------------------------------------") -} - -func validateParamSetConditions(ps *porterv1.ParameterSet) { - // Checks that all expected conditions are set - Expect(apimeta.IsStatusConditionTrue(ps.Status.Conditions, string(porterv1.ConditionScheduled))) - Expect(apimeta.IsStatusConditionTrue(ps.Status.Conditions, string(porterv1.ConditionStarted))) - Expect(apimeta.IsStatusConditionTrue(ps.Status.Conditions, string(porterv1.ConditionComplete))) -} diff --git a/tests/integration/utils_test.go b/tests/integration/utils_test.go new file mode 100644 index 00000000..32f441ce --- /dev/null +++ b/tests/integration/utils_test.go @@ -0,0 +1,300 @@ +//go:build integration + +package integration_test + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "strings" + "time" + + porterv1 "get.porter.sh/operator/api/v1" + "get.porter.sh/operator/controllers" + "github.com/carolynvs/magex/shx" + "github.com/mitchellh/mapstructure" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/pkg/errors" + "github.com/tidwall/pretty" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + apimeta "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + cl "k8s.io/client-go/kubernetes" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// Used to lookup Resource for CRD based off of Kind +var resourceTypeMap = map[string]string{ + "ParameterSet": "parametersets", + "CredentialSet": "credentialsets", + "Installation": "installations", + "AgentAction": "agentactions", +} + +var gvrVersion = "v1" +var gvrGroup = "porter.sh" + +// Get the amount of time that we should wait for a test action to be processed. +func getWaitTimeout() time.Duration { + if value := os.Getenv("PORTER_TEST_WAIT_TIMEOUT"); value != "" { + timeout, err := time.ParseDuration(value) + if err != nil { + fmt.Printf("WARNING: An invalid value, %q, was set for PORTER_TEST_WAIT_TIMEOUT environment variable. The format should be a Go time duration such as 30s or 1m. Ignoring and using the default instead", value) + return defaultWaitTimeout + } + + return timeout + } + return defaultWaitTimeout +} + +func Log(value string, args ...interface{}) { + GinkgoWriter.Write([]byte(fmt.Sprintf(value+"\n", args...))) +} + +func LogJson(value string) { + GinkgoWriter.Write(pretty.Pretty([]byte(value + "\n"))) +} + +// This will wait for the porter action to complete and returns error if the action failed. This will take in any +// porter resource as well as the expected generation of that resource. It uses dynamic client to inspect resource +// status because not all porter resources use the same status type. The expected generation is needed from the +// caller to deal with race conditions when waiting for resources that get updated +func waitForPorter(ctx context.Context, resource client.Object, expGeneration int64, msg string) error { + namespace := resource.GetNamespace() + name := resource.GetName() + // Use dynamic client so that porter resource and agent actions can all be + // handled + dynamicClient, err := dynamic.NewForConfig(testEnv.Config) + if err != nil { + return err + } + Log("%s: %s/%s", msg, namespace, name) + key := client.ObjectKey{Namespace: namespace, Name: name} + ctx, cancel := context.WithTimeout(ctx, getWaitTimeout()) + defer cancel() + for { + select { + case <-ctx.Done(): + Fail(errors.Wrapf(ctx.Err(), "timeout %s", msg).Error()) + default: + // There's multiple retry checks that need to wait so just do initial wait + time.Sleep(time.Second) + // Update the resource to get controller applied values that are needed for + // dynamic client. This also serves to update the resource state for the + // calling method + err := k8sClient.Get(ctx, key, resource) + if err != nil { + // There is lag between creating and being able to retrieve, I don't understand why + if apierrors.IsNotFound(err) { + continue + } + return err + } + // This is populated by the controller and isn't immediately available on + // the resource. If it isn't set then wait and check again + rKind := resource.GetObjectKind().GroupVersionKind().Kind + if rKind == "" { + continue + } + // Create a GVR for the resource type + gvr := schema.GroupVersionResource{ + Group: gvrGroup, + Version: gvrVersion, + Resource: resourceTypeMap[rKind], + } + resourceClient := dynamicClient.Resource(gvr).Namespace(namespace) + // If the resource isn't yet available to fetch then try again + porterResource, err := resourceClient.Get(ctx, name, metav1.GetOptions{}) + if err != nil { + continue + } + // The observed generation is set by the controller so might not be + // immediately available + observedGen, err := getObservedGeneration(porterResource) + if err != nil { + continue + } + // When updating an existing porter resource this check can happen before + // the controller has incremented the generation so make sure that the + // generation is set to the expected generation before continuing + generation := resource.GetGeneration() + if generation != expGeneration { + continue + } + if generation == observedGen { + conditions, err := getConditions(porterResource) + // Conditions may not yet be set, try again + if err != nil { + continue + } + if apimeta.IsStatusConditionTrue(conditions, string(porterv1.ConditionComplete)) { + return nil + } + if apimeta.IsStatusConditionTrue(conditions, string(porterv1.ConditionFailed)) { + debugFailedResource(ctx, name, namespace) + return errors.New("porter did not run successfully") + } + } + } + continue + } +} + +func getObservedGeneration(obj *unstructured.Unstructured) (int64, error) { + observedGeneration, found, err := unstructured.NestedInt64(obj.Object, "status", "observedGeneration") + if err != nil { + return 0, err + } + if found { + return observedGeneration, nil + } + return 0, errors.New("Unable to find observed generation") +} + +func getConditions(obj *unstructured.Unstructured) ([]metav1.Condition, error) { + conditions, found, err := unstructured.NestedSlice(obj.Object, "status", "conditions") + if err != nil { + return []metav1.Condition{}, err + } + if !found { + return []metav1.Condition{}, errors.New("Unable to find resource status") + } + c := []metav1.Condition{} + mapstructure.Decode(conditions, &c) + return c, nil +} + +func waitForResourceDeleted(ctx context.Context, resource client.Object) error { + namespace := resource.GetNamespace() + name := resource.GetName() + Log("Waiting for resource to finish deleting: %s/%s", namespace, name) + key := client.ObjectKey{Namespace: namespace, Name: name} + waitCtx, cancel := context.WithTimeout(ctx, getWaitTimeout()) + defer cancel() + for { + select { + case <-waitCtx.Done(): + Fail(errors.Wrap(waitCtx.Err(), "timeout waiting for CredentialSet to delete").Error()) + default: + err := k8sClient.Get(ctx, key, resource) + if err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return err + } + time.Sleep(time.Second) + continue + } + } +} + +func debugFailedResource(ctx context.Context, namespace, name string) { + Log("DEBUG: ----------------------------------------------------") + actionKey := client.ObjectKey{Name: name, Namespace: namespace} + action := &porterv1.AgentAction{} + if err := k8sClient.Get(ctx, actionKey, action); err != nil { + Log(errors.Wrap(err, "could not retrieve the Resource's AgentAction to troubleshoot").Error()) + return + } + + jobKey := client.ObjectKey{Name: action.Status.Job.Name, Namespace: action.Namespace} + job := &batchv1.Job{} + if err := k8sClient.Get(ctx, jobKey, job); err != nil { + Log(errors.Wrap(err, "could not retrieve the Resources's Job to troubleshoot").Error()) + return + } + + shx.Command("kubectl", "logs", "-n="+job.Namespace, "job/"+job.Name). + Env("KUBECONFIG=" + "../../kind.config").RunV() + Log("DEBUG: ----------------------------------------------------") +} + +// Checks that all expected conditions are set +func validateResourceConditions(resource controllers.PorterResource) { + status := resource.GetStatus() + Expect(apimeta.IsStatusConditionTrue(status.Conditions, string(porterv1.ConditionScheduled))) + Expect(apimeta.IsStatusConditionTrue(status.Conditions, string(porterv1.ConditionStarted))) + Expect(apimeta.IsStatusConditionTrue(status.Conditions, string(porterv1.ConditionComplete))) +} + +// Get the pod logs associated to the job created by the agent action +func getAgentActionJobOutput(ctx context.Context, agentActionName string, namespace string) (string, error) { + actionKey := client.ObjectKey{Name: agentActionName, Namespace: namespace} + action := &porterv1.AgentAction{} + if err := k8sClient.Get(ctx, actionKey, action); err != nil { + Log(errors.Wrap(err, "could not retrieve the Resource's AgentAction to troubleshoot").Error()) + return "", err + } + // Find the job associated with the agent action + jobKey := client.ObjectKey{Name: action.Status.Job.Name, Namespace: action.Namespace} + job := &batchv1.Job{} + if err := k8sClient.Get(ctx, jobKey, job); err != nil { + Log(errors.Wrap(err, "could not retrieve the Job to troubleshoot").Error()) + return "", err + } + // Create a new k8s client that's use for fetching pod logs. This is not implemented on the controller-runtime client + c, err := cl.NewForConfig(testEnv.Config) + if err != nil { + Log(err.Error()) + return "", err + } + selector, err := metav1.LabelSelectorAsSelector(job.Spec.Selector) + if err != nil { + Log(errors.Wrap(err, "could not retrieve label selector for job").Error()) + return "", err + } + // Get the pod associated with the job. There should only be 1 + pods, err := c.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: selector.String()}) + if err != nil { + Log(errors.Wrap(err, "could not retrive pod list for job").Error()) + return "", err + } + if len(pods.Items) != 1 { + Log(fmt.Sprintf("too many pods associated to agent action job. Expected 1 found %v", len(pods.Items))) + return "", err + } + podLogOpts := corev1.PodLogOptions{} + // Fetch the pod logs + req := c.CoreV1().Pods(namespace).GetLogs(pods.Items[0].Name, &podLogOpts) + podLogs, err := req.Stream(ctx) + if err != nil { + Log(errors.Wrap(err, "could not stream pod logs").Error()) + return "", err + } + defer podLogs.Close() + buf := new(bytes.Buffer) + _, err = io.Copy(buf, podLogs) + if err != nil { + Log(errors.Wrap(err, "could not copy pod logs to byte buffer").Error()) + return "", err + } + outputLog := buf.String() + return outputLog, nil +} + +func getAgentActionCmdOut(action *porterv1.AgentAction, aaOut string) string { + return strings.SplitAfterN(strings.Replace(aaOut, "\n", "", -1), strings.Join(action.Spec.Args, " "), 2)[1] +} + +/* Fully execute an agent action and return the associated result of the command executed. For example an agent action +that does "porter credentials list" will return just the result of the porter command from the job logs. This can be +used to run porter commands inside the cluster to validate porter state +*/ +func runAgentAction(ctx context.Context, actionName string, namespace string, cmd []string) string { + aa := newAgentAction(namespace, actionName, cmd) + Expect(k8sClient.Create(ctx, aa)).Should(Succeed()) + Expect(waitForPorter(ctx, aa, 1, fmt.Sprintf("waiting for action %s to run", actionName))).Should(Succeed()) + aaOut, err := getAgentActionJobOutput(ctx, aa.Name, namespace) + Expect(err).Error().ShouldNot(HaveOccurred()) + return getAgentActionCmdOut(aa, aaOut) +}