diff --git a/cmd/phase/apply.go b/cmd/phase/apply.go
index 8b2567217..851c7e06f 100644
--- a/cmd/phase/apply.go
+++ b/cmd/phase/apply.go
@@ -17,10 +17,11 @@ limitations under the License.
 package phase
 
 import (
+	"time"
+
 	"github.com/spf13/cobra"
 
 	"opendev.org/airship/airshipctl/pkg/environment"
-	"opendev.org/airship/airshipctl/pkg/k8s/client"
 	"opendev.org/airship/airshipctl/pkg/phase/apply"
 )
 
@@ -35,9 +36,10 @@ airshipctl phase apply initinfra
 )
 
 // NewApplyCommand creates a command to apply phase to k8s cluster.
-func NewApplyCommand(rootSettings *environment.AirshipCTLSettings, factory client.Factory) *cobra.Command {
-	i := apply.NewOptions(rootSettings)
-
+func NewApplyCommand(rootSettings *environment.AirshipCTLSettings) *cobra.Command {
+	i := &apply.Options{
+		RootSettings: rootSettings,
+	}
 	applyCmd := &cobra.Command{
 		Use:     "apply PHASE_NAME",
 		Short:   "Apply phase to a cluster",
@@ -46,12 +48,7 @@ func NewApplyCommand(rootSettings *environment.AirshipCTLSettings, factory clien
 		Example: applyExample,
 		RunE: func(cmd *cobra.Command, args []string) error {
 			i.PhaseName = args[0]
-			client, err := factory(rootSettings)
-			if err != nil {
-				return err
-			}
-			i.Client = client
-
+			i.Initialize()
 			return i.Run()
 		},
 	}
@@ -73,4 +70,10 @@ func addApplyFlags(i *apply.Options, cmd *cobra.Command) {
 		false,
 		`if set to true, command will delete all kubernetes resources that are not`+
 			` defined in airship documents and have airshipit.org/deployed=apply label`)
+
+	flags.DurationVar(
+		&i.WaitTimeout,
+		"wait-timeout",
+		time.Second*120,
+		`number of seconds to wait for resources to become ready, if set to 0 will not wait`)
 }
diff --git a/cmd/phase/apply_test.go b/cmd/phase/apply_test.go
index 56e28cb04..f141af6a4 100644
--- a/cmd/phase/apply_test.go
+++ b/cmd/phase/apply_test.go
@@ -19,8 +19,6 @@ import (
 
 	"opendev.org/airship/airshipctl/cmd/phase"
 	"opendev.org/airship/airshipctl/pkg/environment"
-	"opendev.org/airship/airshipctl/pkg/k8s/client"
-	"opendev.org/airship/airshipctl/pkg/k8s/client/fake"
 	"opendev.org/airship/airshipctl/testutil"
 )
 
@@ -30,15 +28,12 @@ func TestNewApplyCommand(t *testing.T) {
 		KubeConfigPath:    "../../testdata/k8s/kubeconfig.yaml",
 	}
 	fakeRootSettings.InitConfig()
-	testClientFactory := func(_ *environment.AirshipCTLSettings) (client.Interface, error) {
-		return fake.NewClient(), nil
-	}
 
 	tests := []*testutil.CmdTest{
 		{
 			Name:    "phase-apply-cmd-with-help",
 			CmdLine: "--help",
-			Cmd:     phase.NewApplyCommand(fakeRootSettings, testClientFactory),
+			Cmd:     phase.NewApplyCommand(fakeRootSettings),
 		},
 	}
 	for _, testcase := range tests {
diff --git a/cmd/phase/phase.go b/cmd/phase/phase.go
index d130caed7..18181668e 100644
--- a/cmd/phase/phase.go
+++ b/cmd/phase/phase.go
@@ -18,7 +18,6 @@ import (
 	"github.com/spf13/cobra"
 
 	"opendev.org/airship/airshipctl/pkg/environment"
-	"opendev.org/airship/airshipctl/pkg/k8s/client"
 	"opendev.org/airship/airshipctl/pkg/log"
 )
 
@@ -37,13 +36,12 @@ func NewPhaseCommand(rootSettings *environment.AirshipCTLSettings) *cobra.Comman
 		Long:  clusterLong[1:],
 		PersistentPreRun: func(cmd *cobra.Command, args []string) {
 			log.Init(rootSettings.Debug, cmd.OutOrStderr())
-
 			// Load or Initialize airship Config
 			rootSettings.InitConfig()
 		},
 	}
 
-	phaseRootCmd.AddCommand(NewApplyCommand(rootSettings, client.DefaultClient))
+	phaseRootCmd.AddCommand(NewApplyCommand(rootSettings))
 	phaseRootCmd.AddCommand(NewRenderCommand(rootSettings))
 	phaseRootCmd.AddCommand(NewPlanCommand(rootSettings))
 
diff --git a/cmd/phase/testdata/TestNewApplyCommandGoldenOutput/phase-apply-cmd-with-help.golden b/cmd/phase/testdata/TestNewApplyCommandGoldenOutput/phase-apply-cmd-with-help.golden
index df9a3066a..f94a59179 100644
--- a/cmd/phase/testdata/TestNewApplyCommandGoldenOutput/phase-apply-cmd-with-help.golden
+++ b/cmd/phase/testdata/TestNewApplyCommandGoldenOutput/phase-apply-cmd-with-help.golden
@@ -10,6 +10,7 @@ airshipctl phase apply initinfra
 
 
 Flags:
-      --dry-run   don't deliver documents to the cluster, simulate the changes instead
-  -h, --help      help for apply
-      --prune     if set to true, command will delete all kubernetes resources that are not defined in airship documents and have airshipit.org/deployed=apply label
+      --dry-run                 don't deliver documents to the cluster, simulate the changes instead
+  -h, --help                    help for apply
+      --prune                   if set to true, command will delete all kubernetes resources that are not defined in airship documents and have airshipit.org/deployed=apply label
+      --wait-timeout duration   number of seconds to wait for resources to become ready, if set to 0 will not wait (default 2m0s)
diff --git a/docs/source/cli/airshipctl_phase_apply.md b/docs/source/cli/airshipctl_phase_apply.md
index 7ebb768a5..314795a3a 100644
--- a/docs/source/cli/airshipctl_phase_apply.md
+++ b/docs/source/cli/airshipctl_phase_apply.md
@@ -23,9 +23,10 @@ airshipctl phase apply initinfra
 ### Options
 
 ```
-      --dry-run   don't deliver documents to the cluster, simulate the changes instead
-  -h, --help      help for apply
-      --prune     if set to true, command will delete all kubernetes resources that are not defined in airship documents and have airshipit.org/deployed=apply label
+      --dry-run                 don't deliver documents to the cluster, simulate the changes instead
+  -h, --help                    help for apply
+      --prune                   if set to true, command will delete all kubernetes resources that are not defined in airship documents and have airshipit.org/deployed=apply label
+      --wait-timeout duration   number of seconds to wait for resources to become ready, if set to 0 will not wait (default 2m0s)
 ```
 
 ### Options inherited from parent commands
diff --git a/pkg/phase/apply/apply.go b/pkg/phase/apply/apply.go
index 8cc0418ca..a06578d82 100644
--- a/pkg/phase/apply/apply.go
+++ b/pkg/phase/apply/apply.go
@@ -15,66 +15,72 @@
 package apply
 
 import (
+	"fmt"
+	"time"
+
 	"opendev.org/airship/airshipctl/pkg/document"
 	"opendev.org/airship/airshipctl/pkg/environment"
-	"opendev.org/airship/airshipctl/pkg/k8s/client"
+	"opendev.org/airship/airshipctl/pkg/events"
+	"opendev.org/airship/airshipctl/pkg/k8s/applier"
+	"opendev.org/airship/airshipctl/pkg/k8s/utils"
+	"opendev.org/airship/airshipctl/pkg/log"
 )
 
 // Options is an abstraction used to apply the phase
 type Options struct {
 	RootSettings *environment.AirshipCTLSettings
-	Client       client.Interface
+	Applier      *applier.Applier
+	Processor    events.EventProcessor
 
-	DryRun    bool
-	Prune     bool
-	PhaseName string
+	WaitTimeout time.Duration
+	DryRun      bool
+	Prune       bool
+	PhaseName   string
 }
 
-// NewOptions return instance of Options
-func NewOptions(settings *environment.AirshipCTLSettings) *Options {
-	// At this point AirshipCTLSettings may not be fully initialized
-	applyOptions := &Options{RootSettings: settings}
-	return applyOptions
+// Initialize Options with required field, such as Applier
+func (o *Options) Initialize() {
+	f := utils.FactoryFromKubeConfigPath(o.RootSettings.KubeConfigPath)
+	streams := utils.Streams()
+	o.Applier = applier.NewApplier(f, streams)
+	o.Processor = events.NewDefaultProcessor(streams)
 }
 
 // Run apply subcommand logic
-func (applyOptions *Options) Run() error {
-	kctl := applyOptions.Client.Kubectl()
-	ao, err := kctl.ApplyOptions()
+func (o *Options) Run() error {
+	ao := applier.ApplyOptions{
+		DryRun:      o.DryRun,
+		Prune:       o.Prune,
+		WaitTimeout: o.WaitTimeout,
+	}
+	globalConf := o.RootSettings.Config
+
+	if err := globalConf.EnsureComplete(); err != nil {
+		return err
+	}
+	clusterName, err := globalConf.CurrentContextClusterName()
 	if err != nil {
 		return err
 	}
-
-	ao.SetDryRun(applyOptions.DryRun)
-	// If prune is true, set selector for pruning
-	if applyOptions.Prune {
-		ao.SetPrune(document.ApplyPhaseSelector + applyOptions.PhaseName)
-	}
-
-	globalConf := applyOptions.RootSettings.Config
-
-	if err = globalConf.EnsureComplete(); err != nil {
-		return err
-	}
-
-	kustomizePath, err := globalConf.CurrentContextEntryPoint(applyOptions.PhaseName)
+	clusterType, err := globalConf.CurrentContextClusterType()
 	if err != nil {
 		return err
 	}
-
+	ao.BundleName = fmt.Sprintf("%s-%s-%s", clusterName, clusterType, o.PhaseName)
+	kustomizePath, err := globalConf.CurrentContextEntryPoint(o.PhaseName)
+	if err != nil {
+		return err
+	}
+	log.Debugf("building bundle from kustomize path %s", kustomizePath)
 	b, err := document.NewBundleByPath(kustomizePath)
 	if err != nil {
 		return err
 	}
-
 	// Returns all documents for this phase
-	docs, err := b.Select(document.NewDeployToK8sSelector())
+	bundle, err := b.SelectBundle(document.NewDeployToK8sSelector())
 	if err != nil {
 		return err
 	}
-	if len(docs) == 0 {
-		return document.ErrDocNotFound{}
-	}
-
-	return kctl.Apply(docs, ao)
+	ch := o.Applier.ApplyBundle(bundle, ao)
+	return o.Processor.Process(ch)
 }
diff --git a/pkg/phase/apply/apply_test.go b/pkg/phase/apply/apply_test.go
index 9e3b59ffe..3cd43ca2f 100644
--- a/pkg/phase/apply/apply_test.go
+++ b/pkg/phase/apply/apply_test.go
@@ -15,19 +15,19 @@
 package apply_test
 
 import (
-	"errors"
+	"os"
 	"path/filepath"
 	"testing"
 
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
 	corev1 "k8s.io/api/core/v1"
+	"k8s.io/cli-runtime/pkg/genericclioptions"
 
+	"opendev.org/airship/airshipctl/pkg/config"
 	"opendev.org/airship/airshipctl/pkg/document"
 	"opendev.org/airship/airshipctl/pkg/environment"
-	"opendev.org/airship/airshipctl/pkg/k8s/client"
-	"opendev.org/airship/airshipctl/pkg/k8s/client/fake"
-	"opendev.org/airship/airshipctl/pkg/k8s/kubectl"
+	"opendev.org/airship/airshipctl/pkg/k8s/applier"
 	"opendev.org/airship/airshipctl/pkg/phase/apply"
 	"opendev.org/airship/airshipctl/testutil"
 	"opendev.org/airship/airshipctl/testutil/k8sutils"
@@ -38,12 +38,7 @@ const (
 	airshipConfigFile = "testdata/config.yaml"
 )
 
-var (
-	ErrDynamicClientError = errors.New("ErrDynamicClientError")
-)
-
 func TestDeploy(t *testing.T) {
-	rs := makeNewFakeRootSettings(t, kubeconfigPath, airshipConfigFile)
 	bundle := testutil.NewTestBundle(t, "testdata/primary/site/test-site/ephemeral/initinfra")
 	replicationController, err := bundle.SelectOne(document.NewSelector().ByKind("ReplicationController"))
 	require.NoError(t, err)
@@ -61,44 +56,61 @@ func TestDeploy(t *testing.T) {
 			},
 		})
 	defer f.Cleanup()
-
-	ao := apply.NewOptions(rs)
-	ao.PhaseName = "initinfra"
-	ao.DryRun = true
-
-	kctl := kubectl.NewKubectl(f)
-
 	tests := []struct {
-		theApplyOptions *apply.Options
-		client          client.Interface
-		prune           bool
-		expectedError   error
+		name                string
+		expectedErrorString string
+		cliApplier          *applier.Applier
+		clusterPurposes     map[string]*config.ClusterPurpose
+		phaseName           string
 	}{
 		{
-
-			client: fake.NewClient(fake.WithKubectl(
-				kubectl.NewKubectl(k8sutils.
-					NewMockKubectlFactory().
-					WithDynamicClientByError(nil, ErrDynamicClientError)))),
-			expectedError: ErrDynamicClientError,
+			name:                "success",
+			expectedErrorString: "",
+			cliApplier: applier.NewFakeApplier(genericclioptions.IOStreams{
+				In:     os.Stdin,
+				Out:    os.Stdout,
+				ErrOut: os.Stderr,
+			}, k8sutils.SuccessEvents(), f),
 		},
 		{
-			expectedError: nil,
-			prune:         false,
-			client:        fake.NewClient(fake.WithKubectl(kctl)),
+			name:                "missing clusters",
+			expectedErrorString: "At least one cluster needs to be defined",
+			clusterPurposes:     map[string]*config.ClusterPurpose{},
 		},
 		{
-			expectedError: nil,
-			prune:         true,
-			client:        fake.NewClient(fake.WithKubectl(kctl)),
+			name:                "missing phase",
+			expectedErrorString: "Phase document 'missingPhase' was not found",
+			phaseName:           "missingPhase",
 		},
 	}
 
-	for _, test := range tests {
-		ao.Prune = test.prune
-		ao.Client = test.client
-		actualErr := ao.Run()
-		assert.Equal(t, test.expectedError, actualErr)
+	for _, tt := range tests {
+		tt := tt
+		t.Run(tt.name, func(t *testing.T) {
+			rs := makeNewFakeRootSettings(t, kubeconfigPath, airshipConfigFile)
+			ao := &apply.Options{
+				RootSettings: rs,
+			}
+			ao.Initialize()
+			ao.PhaseName = "initinfra"
+			ao.DryRun = true
+			if tt.cliApplier != nil {
+				ao.Applier = tt.cliApplier
+			}
+			if tt.clusterPurposes != nil {
+				ao.RootSettings.Config.Clusters = tt.clusterPurposes
+			}
+			if tt.phaseName != "" {
+				ao.PhaseName = tt.phaseName
+			}
+			actualErr := ao.Run()
+			if tt.expectedErrorString != "" {
+				require.Error(t, actualErr)
+				assert.Contains(t, actualErr.Error(), tt.expectedErrorString)
+			} else {
+				assert.NoError(t, actualErr)
+			}
+		})
 	}
 }
 
diff --git a/tools/deployment/26_deploy_metal3_capi_ephemeral_node.sh b/tools/deployment/26_deploy_metal3_capi_ephemeral_node.sh
index b402a1ca2..2433bc153 100755
--- a/tools/deployment/26_deploy_metal3_capi_ephemeral_node.sh
+++ b/tools/deployment/26_deploy_metal3_capi_ephemeral_node.sh
@@ -17,10 +17,9 @@ set -xe
 export KUBECONFIG=${KUBECONFIG:-"$HOME/.airship/kubeconfig"}
 
 echo "Deploy metal3.io components to ephemeral node"
-airshipctl phase apply initinfra --debug
+airshipctl phase apply initinfra --wait-timeout 1000s --debug
 
-echo "Waiting for metal3 pods to come up"
-kubectl --kubeconfig $KUBECONFIG wait --for=condition=ready pods --all --timeout=1000s -A
+echo "Getting metal3 pods as debug information"
 kubectl --kubeconfig $KUBECONFIG --namespace metal3 get pods
 
 echo "Deploy cluster components to ephemeral node"