Modify applier to use event from constructor
In previos implementation applier returned channel instead of taking the one that is provided by higher level caller. To better facilitate our needs, we need to change it in a way that we can provide the channel instead of using the returned one. Change-Id: I9b5f5872327854759ccb42593427eb94a9c8996b
This commit is contained in:
parent
81fb7de6e1
commit
f75995f361
@ -52,13 +52,14 @@ type Applier struct {
|
||||
Streams genericclioptions.IOStreams
|
||||
Poller poller.Poller
|
||||
ManifestReaderFactory utils.ManifestReaderFactory
|
||||
eventChannel chan events.Event
|
||||
}
|
||||
|
||||
// ReaderFactory function that returns reader factory interface
|
||||
type ReaderFactory func(validate bool, bundle document.Bundle, factory cmdutil.Factory) manifestreader.ManifestReader
|
||||
|
||||
// NewApplier returns instance of Applier
|
||||
func NewApplier(f cmdutil.Factory, streams genericclioptions.IOStreams) *Applier {
|
||||
func NewApplier(eventCh chan events.Event, f cmdutil.Factory, streams genericclioptions.IOStreams) *Applier {
|
||||
return &Applier{
|
||||
Factory: f,
|
||||
Streams: streams,
|
||||
@ -66,76 +67,68 @@ func NewApplier(f cmdutil.Factory, streams genericclioptions.IOStreams) *Applier
|
||||
Driver: &Adaptor{
|
||||
CliUtilsApplier: cliapply.NewApplier(f, streams),
|
||||
},
|
||||
eventChannel: eventCh,
|
||||
}
|
||||
}
|
||||
|
||||
// ApplyBundle apply bundle to kubernetes cluster
|
||||
func (a *Applier) ApplyBundle(bundle document.Bundle, ao ApplyOptions) <-chan events.Event {
|
||||
eventCh := make(chan events.Event)
|
||||
go func() {
|
||||
defer close(eventCh)
|
||||
if bundle == nil {
|
||||
// TODO add this to errors
|
||||
handleError(eventCh, ErrApplyNilBundle{})
|
||||
return
|
||||
func (a *Applier) ApplyBundle(bundle document.Bundle, ao ApplyOptions) {
|
||||
defer close(a.eventChannel)
|
||||
log.Debugf("Getting infos for bundle, inventory id is %s", ao.BundleName)
|
||||
infos, err := a.getInfos(ao.BundleName, bundle)
|
||||
if err != nil {
|
||||
handleError(a.eventChannel, err)
|
||||
return
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
ch := a.Driver.Run(ctx, infos, cliApplyOptions(ao))
|
||||
for e := range ch {
|
||||
a.eventChannel <- events.Event{
|
||||
Type: events.ApplierType,
|
||||
ApplierEvent: e,
|
||||
}
|
||||
log.Printf("Applying bundle, inventory id: %s", ao.BundleName)
|
||||
// TODO Get this selector from document package instead
|
||||
// Selector to filter invenotry document from bundle
|
||||
selector := document.
|
||||
NewSelector().
|
||||
ByLabel(clicommon.InventoryLabel).
|
||||
ByKind(document.ConfigMapKind)
|
||||
// if we could find exactly one inventory document, we don't do anything else with it
|
||||
_, err := bundle.SelectOne(selector)
|
||||
// if we got an error, which means we could not find Config Map with invetory ID at rest
|
||||
// now we need to generate and inject one at runtime
|
||||
if err != nil && errors.As(err, &document.ErrDocNotFound{}) {
|
||||
log.Debug("Inventory Object config Map not found, auto generating Invetory object")
|
||||
invDoc, innerErr := NewInventoryDocument(ao.BundleName)
|
||||
if innerErr != nil {
|
||||
// this should never happen
|
||||
log.Debug("Failed to create new invetory document")
|
||||
handleError(eventCh, innerErr)
|
||||
return
|
||||
}
|
||||
log.Debugf("Injecting Invetory Object: %v into bundle", invDoc)
|
||||
innerErr = bundle.Append(invDoc)
|
||||
if innerErr != nil {
|
||||
log.Debug("Couldn't append bunlde with inventory document")
|
||||
handleError(eventCh, innerErr)
|
||||
return
|
||||
}
|
||||
log.Debugf("Making sure that inventory object namespace %s exists", invDoc.GetNamespace())
|
||||
innerErr = a.ensureNamespaceExists(invDoc.GetNamespace())
|
||||
if innerErr != nil {
|
||||
handleError(eventCh, innerErr)
|
||||
return
|
||||
}
|
||||
} else if err != nil {
|
||||
handleError(eventCh, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Applier) getInfos(bundleName string, bundle document.Bundle) ([]*resource.Info, error) {
|
||||
if bundle == nil {
|
||||
return nil, ErrApplyNilBundle{}
|
||||
}
|
||||
selector := document.
|
||||
NewSelector().
|
||||
ByLabel(clicommon.InventoryLabel).
|
||||
ByKind(document.ConfigMapKind)
|
||||
// if we could find exactly one inventory document, we don't do anything else with it
|
||||
_, err := bundle.SelectOne(selector)
|
||||
// if we got an error, which means we could not find Config Map with invetory ID at rest
|
||||
// now we need to generate and inject one at runtime
|
||||
if err != nil && errors.As(err, &document.ErrDocNotFound{}) {
|
||||
log.Debug("Inventory Object config Map not found, auto generating Invetory object")
|
||||
invDoc, innerErr := NewInventoryDocument(bundleName)
|
||||
if innerErr != nil {
|
||||
// this should never happen
|
||||
log.Debug("Failed to create new invetory document")
|
||||
return nil, innerErr
|
||||
}
|
||||
err = a.Driver.Initialize(a.Poller)
|
||||
if err != nil {
|
||||
handleError(eventCh, err)
|
||||
return
|
||||
log.Debugf("Injecting Invetory Object: %v into bundle", invDoc)
|
||||
innerErr = bundle.Append(invDoc)
|
||||
if innerErr != nil {
|
||||
log.Debug("Couldn't append bunlde with inventory document")
|
||||
return nil, innerErr
|
||||
}
|
||||
ctx := context.Background()
|
||||
infos, err := a.ManifestReaderFactory(false, bundle, a.Factory).Read()
|
||||
if err != nil {
|
||||
handleError(eventCh, err)
|
||||
return
|
||||
log.Debugf("Making sure that inventory object namespace %s exists", invDoc.GetNamespace())
|
||||
innerErr = a.ensureNamespaceExists(invDoc.GetNamespace())
|
||||
if innerErr != nil {
|
||||
return nil, innerErr
|
||||
}
|
||||
ch := a.Driver.Run(ctx, infos, cliApplyOptions(ao))
|
||||
for e := range ch {
|
||||
eventCh <- events.Event{
|
||||
Type: events.ApplierType,
|
||||
ApplierEvent: e,
|
||||
}
|
||||
}
|
||||
}()
|
||||
return eventCh
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = a.Driver.Initialize(a.Poller); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return a.ManifestReaderFactory(false, bundle, a.Factory).Read()
|
||||
}
|
||||
|
||||
func (a *Applier) ensureNamespaceExists(name string) error {
|
||||
|
@ -36,11 +36,13 @@ import (
|
||||
)
|
||||
|
||||
func TestFakeApplier(t *testing.T) {
|
||||
a := applier.NewFakeApplier(genericclioptions.IOStreams{
|
||||
In: os.Stdin,
|
||||
Out: os.Stdout,
|
||||
ErrOut: os.Stderr,
|
||||
}, k8stest.SuccessEvents(), k8stest.FakeFactory(t, []k8stest.ClientHandler{}))
|
||||
ch := make(chan events.Event)
|
||||
a := applier.NewFakeApplier(ch,
|
||||
genericclioptions.IOStreams{
|
||||
In: os.Stdin,
|
||||
Out: os.Stdout,
|
||||
ErrOut: os.Stderr,
|
||||
}, k8stest.SuccessEvents(), k8stest.FakeFactory(t, []k8stest.ClientHandler{}))
|
||||
assert.NotNil(t, a)
|
||||
}
|
||||
|
||||
@ -120,7 +122,8 @@ func TestApplierRun(t *testing.T) {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
// create default applier
|
||||
a := applier.NewApplier(f, s)
|
||||
eventChan := make(chan events.Event)
|
||||
a := applier.NewApplier(eventChan, f, s)
|
||||
opts := applier.ApplyOptions{
|
||||
WaitTimeout: time.Second * 5,
|
||||
BundleName: "test-bundle",
|
||||
@ -132,9 +135,10 @@ func TestApplierRun(t *testing.T) {
|
||||
if tt.poller != nil {
|
||||
a.Poller = tt.poller
|
||||
}
|
||||
ch := a.ApplyBundle(tt.bundle, opts)
|
||||
// start writing to channel
|
||||
go a.ApplyBundle(tt.bundle, opts)
|
||||
var airEvents []events.Event
|
||||
for e := range ch {
|
||||
for e := range eventChan {
|
||||
airEvents = append(airEvents, e)
|
||||
}
|
||||
var errs []error
|
||||
|
@ -28,6 +28,7 @@ import (
|
||||
pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
|
||||
"sigs.k8s.io/cli-utils/pkg/object"
|
||||
|
||||
"opendev.org/airship/airshipctl/pkg/events"
|
||||
"opendev.org/airship/airshipctl/pkg/k8s/utils"
|
||||
)
|
||||
|
||||
@ -75,12 +76,16 @@ func (fa FakeAdaptor) WithInitError(err error) FakeAdaptor {
|
||||
}
|
||||
|
||||
// NewFakeApplier returns applier with events you want
|
||||
func NewFakeApplier(streams genericclioptions.IOStreams, events []applyevent.Event, f cmdutil.Factory) *Applier {
|
||||
func NewFakeApplier(
|
||||
eventCh chan events.Event,
|
||||
streams genericclioptions.IOStreams,
|
||||
events []applyevent.Event, f cmdutil.Factory) *Applier {
|
||||
return &Applier{
|
||||
Driver: NewFakeAdaptor().WithEvents(events),
|
||||
Poller: &FakePoller{},
|
||||
Factory: f,
|
||||
ManifestReaderFactory: utils.DefaultManifestReaderFactory,
|
||||
eventChannel: eventCh,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -30,21 +30,23 @@ import (
|
||||
|
||||
// Options is an abstraction used to apply the phase
|
||||
type Options struct {
|
||||
RootSettings *environment.AirshipCTLSettings
|
||||
Applier *applier.Applier
|
||||
Processor events.EventProcessor
|
||||
|
||||
WaitTimeout time.Duration
|
||||
DryRun bool
|
||||
Prune bool
|
||||
PhaseName string
|
||||
WaitTimeout time.Duration
|
||||
|
||||
RootSettings *environment.AirshipCTLSettings
|
||||
Applier *applier.Applier
|
||||
Processor events.EventProcessor
|
||||
EventChannel chan events.Event
|
||||
}
|
||||
|
||||
// Initialize Options with required field, such as Applier
|
||||
func (o *Options) Initialize() {
|
||||
f := utils.FactoryFromKubeConfigPath(o.RootSettings.KubeConfigPath)
|
||||
streams := utils.Streams()
|
||||
o.Applier = applier.NewApplier(f, streams)
|
||||
o.EventChannel = make(chan events.Event)
|
||||
o.Applier = applier.NewApplier(o.EventChannel, f, streams)
|
||||
o.Processor = events.NewDefaultProcessor(streams)
|
||||
}
|
||||
|
||||
@ -87,6 +89,6 @@ func (o *Options) Run() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ch := o.Applier.ApplyBundle(bundle, ao)
|
||||
return o.Processor.Process(ch)
|
||||
go o.Applier.ApplyBundle(bundle, ao)
|
||||
return o.Processor.Process(o.EventChannel)
|
||||
}
|
||||
|
@ -23,10 +23,12 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/cli-runtime/pkg/genericclioptions"
|
||||
applyevent "sigs.k8s.io/cli-utils/pkg/apply/event"
|
||||
|
||||
"opendev.org/airship/airshipctl/pkg/config"
|
||||
"opendev.org/airship/airshipctl/pkg/document"
|
||||
"opendev.org/airship/airshipctl/pkg/environment"
|
||||
"opendev.org/airship/airshipctl/pkg/events"
|
||||
"opendev.org/airship/airshipctl/pkg/k8s/applier"
|
||||
"opendev.org/airship/airshipctl/pkg/phase/apply"
|
||||
"opendev.org/airship/airshipctl/testutil"
|
||||
@ -59,18 +61,14 @@ func TestDeploy(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
expectedErrorString string
|
||||
cliApplier *applier.Applier
|
||||
clusterPurposes map[string]*config.ClusterPurpose
|
||||
phaseName string
|
||||
events []applyevent.Event
|
||||
}{
|
||||
{
|
||||
name: "success",
|
||||
expectedErrorString: "",
|
||||
cliApplier: applier.NewFakeApplier(genericclioptions.IOStreams{
|
||||
In: os.Stdin,
|
||||
Out: os.Stdout,
|
||||
ErrOut: os.Stderr,
|
||||
}, k8sutils.SuccessEvents(), f),
|
||||
events: k8sutils.SuccessEvents(),
|
||||
},
|
||||
{
|
||||
name: "missing clusters",
|
||||
@ -94,8 +92,17 @@ func TestDeploy(t *testing.T) {
|
||||
ao.Initialize()
|
||||
ao.PhaseName = "initinfra"
|
||||
ao.DryRun = true
|
||||
if tt.cliApplier != nil {
|
||||
ao.Applier = tt.cliApplier
|
||||
if tt.events != nil {
|
||||
ch := make(chan events.Event)
|
||||
cliApplier := applier.NewFakeApplier(
|
||||
ch,
|
||||
genericclioptions.IOStreams{
|
||||
In: os.Stdin,
|
||||
Out: os.Stdout,
|
||||
ErrOut: os.Stderr,
|
||||
}, k8sutils.SuccessEvents(), f)
|
||||
ao.Applier = cliApplier
|
||||
ao.EventChannel = ch
|
||||
}
|
||||
if tt.clusterPurposes != nil {
|
||||
ao.RootSettings.Config.Clusters = tt.clusterPurposes
|
||||
|
Loading…
x
Reference in New Issue
Block a user