Refactor tenant client

Deal with tenantID provided case

Change-Id: Ia01028d21fe258a97175fcc8ce103cc82df37291
This commit is contained in:
Harry Zhang 2017-07-18 22:52:03 +08:00
parent e2f1a14c1d
commit a0de58d447
16 changed files with 232 additions and 192 deletions

View File

@ -13,6 +13,7 @@ import (
kubestacktypes "git.openstack.org/openstack/stackube/pkg/kubestack/types" kubestacktypes "git.openstack.org/openstack/stackube/pkg/kubestack/types"
"git.openstack.org/openstack/stackube/pkg/openstack" "git.openstack.org/openstack/stackube/pkg/openstack"
"git.openstack.org/openstack/stackube/pkg/util" "git.openstack.org/openstack/stackube/pkg/util"
"github.com/containernetworking/cni/pkg/skel" "github.com/containernetworking/cni/pkg/skel"
"github.com/containernetworking/cni/pkg/types" "github.com/containernetworking/cni/pkg/types"
"github.com/containernetworking/cni/pkg/types/current" "github.com/containernetworking/cni/pkg/types/current"
@ -92,10 +93,16 @@ func initOpenstack(stdinData []byte) (OpenStack, string, error) {
if n.KubestackConfig == "" { if n.KubestackConfig == "" {
return OpenStack{}, "", fmt.Errorf("kubestack-config not specified") return OpenStack{}, "", fmt.Errorf("kubestack-config not specified")
} }
openStackClient, err := openstack.NewClient(n.KubestackConfig)
if n.KubernetesConfig == "" {
return OpenStack{}, "", fmt.Errorf("kubernetes-config not specified")
}
openStackClient, err := openstack.NewClient(n.KubestackConfig, n.KubernetesConfig)
if err != nil { if err != nil {
return OpenStack{}, "", err return OpenStack{}, "", err
} }
os := OpenStack{ os := OpenStack{
Client: *openStackClient, Client: *openStackClient,
} }

View File

@ -44,8 +44,7 @@ func startControllers(kubeconfig, cloudconfig string) error {
// Creates a new RBAC controller // Creates a new RBAC controller
rm, err := rbacmanager.New(kubeconfig, rm, err := rbacmanager.New(kubeconfig,
tc.GetTenantClient(), tc.GetKubeCRDClient(),
nc.GetNetworkClient(),
*systemCIDR, *systemCIDR,
*systemGateway, *systemGateway,
) )
@ -92,7 +91,7 @@ func verifyClientSetting() error {
return fmt.Errorf("Init kubernetes clientset failed: %v", err) return fmt.Errorf("Init kubernetes clientset failed: %v", err)
} }
_, err = openstack.NewClient(*cloudconfig) _, err = openstack.NewClient(*cloudconfig, *kubeconfig)
if err != nil { if err != nil {
return fmt.Errorf("Init openstack client failed: %v", err) return fmt.Errorf("Init openstack client failed: %v", err)
} }

View File

@ -126,7 +126,7 @@ EOF'
function install_master { function install_master {
sed -i "s/KEYSTONE_HOST/${SERVICE_HOST}/g" ${STACKUBE_ROOT}/kubeadm.yaml sed -i "s/KEYSTONE_HOST/${SERVICE_HOST}/g" ${STACKUBE_ROOT}/kubeadm.yaml
sudo kubeadm init kubeadm init --pod-network-cidr ${CLUSTER_CIDR} --config ${STACKUBE_ROOT}/kubeadm.yaml sudo kubeadm init --pod-network-cidr ${CLUSTER_CIDR} --config ${STACKUBE_ROOT}/kubeadm.yaml
# Enable schedule pods on the master for testing. # Enable schedule pods on the master for testing.
sudo cp /etc/kubernetes/admin.conf $HOME/ sudo cp /etc/kubernetes/admin.conf $HOME/
sudo chown $(id -u):$(id -g) $HOME/admin.conf sudo chown $(id -u):$(id -g) $HOME/admin.conf

View File

@ -1,29 +0,0 @@
package auth
import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/rest"
crv1 "git.openstack.org/openstack/stackube/pkg/apis/v1"
)
func NewClient(cfg *rest.Config) (*rest.RESTClient, *runtime.Scheme, error) {
scheme := runtime.NewScheme()
if err := crv1.AddToScheme(scheme); err != nil {
return nil, nil, err
}
config := *cfg
config.GroupVersion = &crv1.SchemeGroupVersion
config.APIPath = "/apis"
config.ContentType = runtime.ContentTypeJSON
config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: serializer.NewCodecFactory(scheme)}
client, err := rest.RESTClientFor(&config)
if err != nil {
return nil, nil, err
}
return client, scheme, nil
}

View File

@ -6,6 +6,7 @@ import (
crv1 "git.openstack.org/openstack/stackube/pkg/apis/v1" crv1 "git.openstack.org/openstack/stackube/pkg/apis/v1"
"git.openstack.org/openstack/stackube/pkg/auth-controller/rbacmanager/rbac" "git.openstack.org/openstack/stackube/pkg/auth-controller/rbacmanager/rbac"
crdClient "git.openstack.org/openstack/stackube/pkg/kubecrd"
"git.openstack.org/openstack/stackube/pkg/util" "git.openstack.org/openstack/stackube/pkg/util"
"github.com/golang/glog" "github.com/golang/glog"
@ -15,7 +16,6 @@ import (
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api" "k8s.io/client-go/pkg/api"
"k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
) )
@ -28,16 +28,14 @@ type Controller struct {
kclient *kubernetes.Clientset kclient *kubernetes.Clientset
nsInf cache.SharedIndexInformer nsInf cache.SharedIndexInformer
queue workqueue.RateLimitingInterface queue workqueue.RateLimitingInterface
tenantClient *rest.RESTClient kubeCRDClient *crdClient.CRDClient
networkClient *rest.RESTClient
systemCIDR string systemCIDR string
systemGateway string systemGateway string
} }
// New creates a new RBAC controller. // New creates a new RBAC controller.
func New(kubeconfig string, func New(kubeconfig string,
tenantClient *rest.RESTClient, kubeCRDClient *crdClient.CRDClient,
networkClient *rest.RESTClient,
systemCIDR string, systemCIDR string,
systemGateway string, systemGateway string,
) (*Controller, error) { ) (*Controller, error) {
@ -53,8 +51,7 @@ func New(kubeconfig string,
o := &Controller{ o := &Controller{
kclient: client, kclient: client,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "rbacmanager"), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "rbacmanager"),
tenantClient: tenantClient, kubeCRDClient: kubeCRDClient,
networkClient: networkClient,
systemCIDR: systemCIDR, systemCIDR: systemCIDR,
systemGateway: systemGateway, systemGateway: systemGateway,
} }
@ -157,7 +154,8 @@ func (c *Controller) handleNamespaceAdd(obj interface{}) {
func (c *Controller) initSystemReservedTenantNetwork() error { func (c *Controller) initSystemReservedTenantNetwork() error {
tenant := &crv1.Tenant{ tenant := &crv1.Tenant{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: util.SystemTenant, Name: util.SystemTenant,
Namespace: util.SystemTenant,
}, },
Spec: crv1.TenantSpec{ Spec: crv1.TenantSpec{
UserName: util.SystemTenant, UserName: util.SystemTenant,
@ -165,13 +163,8 @@ func (c *Controller) initSystemReservedTenantNetwork() error {
}, },
} }
err := c.tenantClient.Post(). if err := c.kubeCRDClient.AddTenant(tenant); err != nil {
Namespace(util.SystemTenant). return err
Resource(crv1.TenantResourcePlural).
Body(tenant).
Do().Error()
if err != nil && !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("failed to create system Tenant: %v", err)
} }
// NOTE(harry): we do not support update Network, so although configurable, // NOTE(harry): we do not support update Network, so although configurable,
@ -179,7 +172,8 @@ func (c *Controller) initSystemReservedTenantNetwork() error {
// that system network. We may need to document this. // that system network. We may need to document this.
network := &crv1.Network{ network := &crv1.Network{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: util.SystemNetwork, Name: util.SystemNetwork,
Namespace: util.SystemTenant,
}, },
Spec: crv1.NetworkSpec{ Spec: crv1.NetworkSpec{
CIDR: c.systemCIDR, CIDR: c.systemCIDR,
@ -188,13 +182,8 @@ func (c *Controller) initSystemReservedTenantNetwork() error {
} }
// network controller will always check if Tenant is ready so we will not wait here // network controller will always check if Tenant is ready so we will not wait here
err = c.networkClient.Post(). if err := c.kubeCRDClient.AddNetwork(network); err != nil {
Resource(crv1.NetworkResourcePlural). return err
Namespace(util.SystemTenant).
Body(network).
Do().Error()
if err != nil && !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("failed to create system Network: %v", err)
} }
return nil return nil

View File

@ -4,41 +4,32 @@ import (
"fmt" "fmt"
crv1 "git.openstack.org/openstack/stackube/pkg/apis/v1" crv1 "git.openstack.org/openstack/stackube/pkg/apis/v1"
crdClient "git.openstack.org/openstack/stackube/pkg/auth-controller/client/auth" crdClient "git.openstack.org/openstack/stackube/pkg/kubecrd"
"git.openstack.org/openstack/stackube/pkg/openstack" "git.openstack.org/openstack/stackube/pkg/openstack"
"git.openstack.org/openstack/stackube/pkg/util"
"github.com/golang/glog" "github.com/golang/glog"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
apismetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" apismetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
apiv1 "k8s.io/client-go/pkg/api/v1" apiv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
) )
// TenantController manages lify cycle of Tenant. // TenantController manages life cycle of Tenant.
type TenantController struct { type TenantController struct {
k8sClient *kubernetes.Clientset k8sClient *kubernetes.Clientset
tenantClient *rest.RESTClient kubeCRDClient *crdClient.CRDClient
tenantScheme *runtime.Scheme
openstackClient *openstack.Client openstackClient *openstack.Client
} }
// NewTenantController creates a new tenant controller. // NewTenantController creates a new tenant controller.
func NewTenantController(kubeconfig, cloudconfig string) (*TenantController, error) { func NewTenantController(kubeconfig, cloudconfig string) (*TenantController, error) {
// Create OpenStack client from config
openStackClient, err := openstack.NewClient(cloudconfig)
if err != nil {
return nil, fmt.Errorf("init openstack client failed: %v", err)
}
// Create the client config. Use kubeconfig if given, otherwise assume in-cluster. // Create the client config. Use kubeconfig if given, otherwise assume in-cluster.
config, err := buildConfig(kubeconfig) config, err := util.NewClusterConfig(kubeconfig)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to build kubeconfig: %v", err) return nil, fmt.Errorf("failed to build kubeconfig: %v", err)
} }
@ -58,15 +49,14 @@ func NewTenantController(kubeconfig, cloudconfig string) (*TenantController, err
return nil, fmt.Errorf("failed to create kubernetes client: %v", err) return nil, fmt.Errorf("failed to create kubernetes client: %v", err)
} }
// make a new config for our extension's API group, using the first config as a baseline // Create OpenStack client from config
tenantClient, tenantScheme, err := crdClient.NewClient(config) openStackClient, err := openstack.NewClient(cloudconfig, kubeconfig)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create client for CRD: %v", err) return nil, fmt.Errorf("init openstack client failed: %v", err)
} }
c := &TenantController{ c := &TenantController{
tenantClient: tenantClient, kubeCRDClient: openStackClient.CRDClient,
tenantScheme: tenantScheme,
k8sClient: k8sClient, k8sClient: k8sClient,
openstackClient: openStackClient, openstackClient: openStackClient,
} }
@ -78,15 +68,8 @@ func NewTenantController(kubeconfig, cloudconfig string) (*TenantController, err
return c, nil return c, nil
} }
func buildConfig(kubeconfig string) (*rest.Config, error) { func (c *TenantController) GetKubeCRDClient() *crdClient.CRDClient {
if kubeconfig != "" { return c.kubeCRDClient
return clientcmd.BuildConfigFromFlags("", kubeconfig)
}
return rest.InClusterConfig()
}
func (c *TenantController) GetTenantClient() *rest.RESTClient {
return c.tenantClient
} }
// Run the controller. // Run the controller.
@ -94,7 +77,7 @@ func (c *TenantController) Run(stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
source := cache.NewListWatchFromClient( source := cache.NewListWatchFromClient(
c.tenantClient, c.kubeCRDClient.Client,
crv1.TenantResourcePlural, crv1.TenantResourcePlural,
apiv1.NamespaceAll, apiv1.NamespaceAll,
fields.Everything()) fields.Everything())
@ -116,11 +99,11 @@ func (c *TenantController) Run(stopCh <-chan struct{}) error {
func (c *TenantController) onAdd(obj interface{}) { func (c *TenantController) onAdd(obj interface{}) {
tenant := obj.(*crv1.Tenant) tenant := obj.(*crv1.Tenant)
glog.V(3).Infof("Tenant controller received new object %v\n", tenant) glog.V(3).Infof("Tenant controller received new object %#v\n", tenant)
copyObj, err := c.tenantScheme.Copy(tenant) copyObj, err := c.kubeCRDClient.Scheme.Copy(tenant)
if err != nil { if err != nil {
glog.Errorf("ERROR creating a deep copy of tenant object: %v\n", err) glog.Errorf("ERROR creating a deep copy of tenant object: %#v\n", err)
return return
} }
@ -138,7 +121,7 @@ func (c *TenantController) onDelete(obj interface{}) {
return return
} }
glog.V(3).Infof("Tenant controller received deleted tenant %v\n", tenant) glog.V(3).Infof("Tenant controller received deleted tenant %#v\n", tenant)
deleteOptions := &apismetav1.DeleteOptions{ deleteOptions := &apismetav1.DeleteOptions{
TypeMeta: apismetav1.TypeMeta{ TypeMeta: apismetav1.TypeMeta{

View File

@ -11,12 +11,12 @@ import (
apiv1 "k8s.io/client-go/pkg/api/v1" apiv1 "k8s.io/client-go/pkg/api/v1"
) )
func (c *TenantController) syncTenant(tenant *crv1.Tenant) error { func (c *TenantController) syncTenant(tenant *crv1.Tenant) {
roleBinding := rbac.GenerateClusterRoleBindingByTenant(tenant.Name) roleBinding := rbac.GenerateClusterRoleBindingByTenant(tenant.Name)
_, err := c.k8sClient.Rbac().ClusterRoleBindings().Create(roleBinding) _, err := c.k8sClient.Rbac().ClusterRoleBindings().Create(roleBinding)
if err != nil && !apierrors.IsAlreadyExists(err) { if err != nil && !apierrors.IsAlreadyExists(err) {
glog.Errorf("Failed create ClusterRoleBinding for tenant %s: %v", tenant.Name, err) glog.Errorf("Failed create ClusterRoleBinding for tenant %s: %v", tenant.Name, err)
return err return
} }
glog.V(4).Infof("Created ClusterRoleBindings %s-namespace-creater for tenant %s", tenant.Name, tenant.Name) glog.V(4).Infof("Created ClusterRoleBindings %s-namespace-creater for tenant %s", tenant.Name, tenant.Name)
if tenant.Spec.TenantID != "" { if tenant.Spec.TenantID != "" {
@ -24,29 +24,30 @@ func (c *TenantController) syncTenant(tenant *crv1.Tenant) error {
err = c.openstackClient.CreateUser(tenant.Spec.UserName, tenant.Spec.Password, tenant.Spec.TenantID) err = c.openstackClient.CreateUser(tenant.Spec.UserName, tenant.Spec.Password, tenant.Spec.TenantID)
if err != nil && !openstack.IsAlreadyExists(err) { if err != nil && !openstack.IsAlreadyExists(err) {
glog.Errorf("Failed create user %s: %v", tenant.Spec.UserName, err) glog.Errorf("Failed create user %s: %v", tenant.Spec.UserName, err)
return err return
} }
} else { } else {
// Create tenant if the tenant not exist in keystone // Create tenant if the tenant not exist in keystone
tenantID, err := c.openstackClient.CreateTenant(tenant.Name) tenantID, err := c.openstackClient.CreateTenant(tenant.Name)
if err != nil { if err != nil {
return err glog.Errorf("Failed create tenant %s: %v", tenant, err)
return
} }
// Create user with the spec username and password in the created tenant // Create user with the spec username and password in the created tenant
err = c.openstackClient.CreateUser(tenant.Spec.UserName, tenant.Spec.Password, tenantID) err = c.openstackClient.CreateUser(tenant.Spec.UserName, tenant.Spec.Password, tenantID)
if err != nil { if err != nil {
return err glog.Errorf("Failed create user %s: %v", tenant.Spec.UserName, err)
return
} }
} }
// Create namespace which name is the same as the tenant's name // Create namespace which name is the same as the tenant's name
err = c.createNamespace(tenant.Name) err = c.createNamespace(tenant.Name)
if err != nil { if err != nil {
return err glog.Errorf("Failed create namespace %s: %v", tenant.Name, err)
return
} }
glog.V(4).Infof("Created namespace %s for tenant %s", tenant.Name, tenant.Name) glog.V(4).Infof("Created namespace %s for tenant %s", tenant.Name, tenant.Name)
return nil
} }
func (c *TenantController) createClusterRoles() error { func (c *TenantController) createClusterRoles() error {
@ -76,7 +77,6 @@ func (c *TenantController) createNamespace(namespace string) error {
func (c *TenantController) deleteNamespace(namespace string) error { func (c *TenantController) deleteNamespace(namespace string) error {
err := c.k8sClient.CoreV1().Namespaces().Delete(namespace, apismetav1.NewDeleteOptions(0)) err := c.k8sClient.CoreV1().Namespaces().Delete(namespace, apismetav1.NewDeleteOptions(0))
if err != nil { if err != nil {
glog.Errorf("Failed delete namespace %s: %v", namespace, err)
return err return err
} }
return nil return nil

113
pkg/kubecrd/crdclient.go Normal file
View File

@ -0,0 +1,113 @@
package kubecrd
import (
"fmt"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/rest"
crv1 "git.openstack.org/openstack/stackube/pkg/apis/v1"
"github.com/golang/glog"
)
type CRDClient struct {
Client *rest.RESTClient
Scheme *runtime.Scheme
}
func NewCRDClient(cfg *rest.Config) (*CRDClient, error) {
scheme := runtime.NewScheme()
if err := crv1.AddToScheme(scheme); err != nil {
return nil, err
}
config := *cfg
config.GroupVersion = &crv1.SchemeGroupVersion
config.APIPath = "/apis"
config.ContentType = runtime.ContentTypeJSON
config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: serializer.NewCodecFactory(scheme)}
client, err := rest.RESTClientFor(&config)
if err != nil {
return nil, err
}
return &CRDClient{
Client: client,
Scheme: scheme,
}, nil
}
// UpdateNetwork updates Network CRD object by given object
func (c *CRDClient) UpdateNetwork(network *crv1.Network) {
err := c.Client.Put().
Name(network.Name).
Namespace(network.Namespace).
Resource(crv1.NetworkResourcePlural).
Body(network).
Do().
Error()
if err != nil {
glog.Errorf("ERROR updating network: %v\n", err)
} else {
glog.V(3).Infof("UPDATED network: %#v\n", network)
}
}
// UpdateTenant updates Network CRD object by given object
func (c *CRDClient) UpdateTenant(tenant *crv1.Tenant) {
err := c.Client.Put().
Name(tenant.Name).
Namespace(tenant.Namespace).
Resource(crv1.TenantResourcePlural).
Body(tenant).
Do().
Error()
if err != nil {
glog.Errorf("ERROR updating tenant: %v\n", err)
} else {
glog.V(3).Infof("UPDATED tenant: %#v\n", tenant)
}
}
func (c *CRDClient) GetTenant(tenantName string) (*crv1.Tenant, error) {
tenant := crv1.Tenant{}
// tenant always has same name and namespace
err := c.Client.Get().
Resource(crv1.TenantResourcePlural).
Namespace(tenantName).
Name(tenantName).
Do().Into(&tenant)
if err != nil {
return nil, err
}
return &tenant, nil
}
func (c *CRDClient) AddTenant(tenant *crv1.Tenant) error {
err := c.Client.Post().
Namespace(tenant.GetNamespace()).
Resource(crv1.TenantResourcePlural).
Body(tenant).
Do().Error()
if err != nil && !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("failed to create Tenant: %v", err)
}
return nil
}
func (c *CRDClient) AddNetwork(network *crv1.Network) error {
err := c.Client.Post().
Resource(crv1.NetworkResourcePlural).
Namespace(network.GetNamespace()).
Body(network).
Do().Error()
if err != nil && !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("failed to create Network: %v", err)
}
return nil
}

View File

@ -1,4 +1,4 @@
package client package kubecrd
import ( import (
"reflect" "reflect"

View File

@ -1,4 +1,4 @@
package auth package kubecrd
import ( import (
"reflect" "reflect"

View File

@ -8,7 +8,8 @@ import (
type NetConf struct { type NetConf struct {
types.NetConf types.NetConf
KubestackConfig string `json:"kubestack-config"` KubestackConfig string `json:"kubestack-config"`
KubernetesConfig string `json:"kubernetes-config"`
} }
// K8sArgs is the valid CNI_ARGS used for Kubernetes // K8sArgs is the valid CNI_ARGS used for Kubernetes

View File

@ -1,29 +0,0 @@
package client
import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/rest"
crv1 "git.openstack.org/openstack/stackube/pkg/apis/v1"
)
func NewClient(cfg *rest.Config) (*rest.RESTClient, *runtime.Scheme, error) {
scheme := runtime.NewScheme()
if err := crv1.AddToScheme(scheme); err != nil {
return nil, nil, err
}
config := *cfg
config.GroupVersion = &crv1.SchemeGroupVersion
config.APIPath = "/apis"
config.ContentType = runtime.ContentTypeJSON
config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: serializer.NewCodecFactory(scheme)}
client, err := rest.RESTClientFor(&config)
if err != nil {
return nil, nil, err
}
return client, scheme, nil
}

View File

@ -7,14 +7,11 @@ import (
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
crv1 "git.openstack.org/openstack/stackube/pkg/apis/v1" crv1 "git.openstack.org/openstack/stackube/pkg/apis/v1"
crdClient "git.openstack.org/openstack/stackube/pkg/network-controller/client" crdClient "git.openstack.org/openstack/stackube/pkg/kubecrd"
osDriver "git.openstack.org/openstack/stackube/pkg/openstack" osDriver "git.openstack.org/openstack/stackube/pkg/openstack"
"git.openstack.org/openstack/stackube/pkg/util" "git.openstack.org/openstack/stackube/pkg/util"
@ -23,20 +20,19 @@ import (
// Watcher is an network of watching on resource create/update/delete events // Watcher is an network of watching on resource create/update/delete events
type NetworkController struct { type NetworkController struct {
networkClient *rest.RESTClient kubeCRDClient *crdClient.CRDClient
networkScheme *runtime.Scheme
driver *osDriver.Client driver *osDriver.Client
} }
func (c *NetworkController) GetNetworkClient() *rest.RESTClient { func (c *NetworkController) GetKubeCRDClient() *crdClient.CRDClient {
return c.networkClient return c.kubeCRDClient
} }
func (c *NetworkController) Run(stopCh <-chan struct{}) error { func (c *NetworkController) Run(stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
source := cache.NewListWatchFromClient( source := cache.NewListWatchFromClient(
c.networkClient, c.kubeCRDClient.Client,
crv1.NetworkResourcePlural, crv1.NetworkResourcePlural,
apiv1.NamespaceAll, apiv1.NamespaceAll,
fields.Everything()) fields.Everything())
@ -57,22 +53,9 @@ func (c *NetworkController) Run(stopCh <-chan struct{}) error {
return nil return nil
} }
func buildConfig(kubeconfig string) (*rest.Config, error) {
if kubeconfig != "" {
return clientcmd.BuildConfigFromFlags("", kubeconfig)
}
return rest.InClusterConfig()
}
func NewNetworkController(kubeconfig, openstackConfigFile string) (*NetworkController, error) { func NewNetworkController(kubeconfig, openstackConfigFile string) (*NetworkController, error) {
// Create OpenStack client from config
openstack, err := osDriver.NewClient(openstackConfigFile)
if err != nil {
return nil, fmt.Errorf("Couldn't initialize openstack: %#v", err)
}
// Create the client config. Use kubeconfig if given, otherwise assume in-cluster. // Create the client config. Use kubeconfig if given, otherwise assume in-cluster.
config, err := buildConfig(kubeconfig) config, err := util.NewClusterConfig(kubeconfig)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to build kubeconfig: %v", err) return nil, fmt.Errorf("failed to build kubeconfig: %v", err)
} }
@ -87,15 +70,14 @@ func NewNetworkController(kubeconfig, openstackConfigFile string) (*NetworkContr
return nil, fmt.Errorf("failed to create CRD to kube-apiserver: %v", err) return nil, fmt.Errorf("failed to create CRD to kube-apiserver: %v", err)
} }
// make a new config for our extension's API group, using the first config as a baseline // Create OpenStack client from config
networkClient, networkScheme, err := crdClient.NewClient(config) openstack, err := osDriver.NewClient(openstackConfigFile, kubeconfig)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create client for CRD: %v", err) return nil, fmt.Errorf("Couldn't initialize openstack: %#v", err)
} }
networkController := &NetworkController{ networkController := &NetworkController{
networkClient: networkClient, kubeCRDClient: openstack.CRDClient,
networkScheme: networkScheme,
driver: openstack, driver: openstack,
} }
return networkController, nil return networkController, nil
@ -109,7 +91,7 @@ func (c *NetworkController) onAdd(obj interface{}) {
// NEVER modify objects from the store. It's a read-only, local cache. // NEVER modify objects from the store. It's a read-only, local cache.
// You can use networkScheme.Copy() to make a deep copy of original object and modify this copy // You can use networkScheme.Copy() to make a deep copy of original object and modify this copy
// Or create a copy manually for better performance // Or create a copy manually for better performance
copyObj, err := c.networkScheme.Copy(network) copyObj, err := c.GetKubeCRDClient().Scheme.Copy(network)
if err != nil { if err != nil {
glog.Errorf("ERROR creating a deep copy of network object: %v\n", err) glog.Errorf("ERROR creating a deep copy of network object: %v\n", err)
return return

View File

@ -26,18 +26,21 @@ func (c *NetworkController) addNetworkToDriver(kubeNetwork *crv1.Network) {
if err != nil || tenantID == "" { if err != nil || tenantID == "" {
err = wait.Poll(2*time.Second, 10*time.Second, func() (bool, error) { err = wait.Poll(2*time.Second, 10*time.Second, func() (bool, error) {
tenantID, err = c.driver.GetTenantIDFromName(kubeNetwork.GetNamespace()) tenantID, err = c.driver.GetTenantIDFromName(kubeNetwork.GetNamespace())
if err != nil || tenantID == "" { if err != nil {
glog.V(5).Infof("failed to fetch tenantID for tenantName: %v, retrying\n", tenantName) glog.Errorf("failed to fetch tenantID for tenantName: %v, error: %v retrying\n", tenantName, err)
return false, nil
}
if tenantID == "" {
glog.V(5).Infof("tenantID is empty for tenantName: %v, retrying\n", tenantName)
return false, nil return false, nil
} }
return true, nil return true, nil
}) })
} }
if err != nil || tenantID == "" { if err != nil || tenantID == "" {
glog.Errorf("failed to fetch tenantID for tenantName: %v, abort! \n", tenantName) glog.Errorf("failed to fetch tenantID for tenantName: %v, error: %v abort! \n", tenantName, err)
return return
} else {
glog.V(3).Infof("Got tenantID: %v for tenantName: %v", tenantID, tenantName)
} }
networkName := util.BuildNetworkName(tenantName, kubeNetwork.GetName()) networkName := util.BuildNetworkName(tenantName, kubeNetwork.GetName())
@ -69,7 +72,7 @@ func (c *NetworkController) addNetworkToDriver(kubeNetwork *crv1.Network) {
if !check { if !check {
glog.Warningf("[NetworkController]: tenantID %s doesn't exist in network provider", driverNetwork.TenantID) glog.Warningf("[NetworkController]: tenantID %s doesn't exist in network provider", driverNetwork.TenantID)
kubeNetwork.Status.State = crv1.NetworkFailed kubeNetwork.Status.State = crv1.NetworkFailed
c.updateNetwork(kubeNetwork) c.kubeCRDClient.UpdateNetwork(kubeNetwork)
return return
} }
@ -104,22 +107,5 @@ func (c *NetworkController) addNetworkToDriver(kubeNetwork *crv1.Network) {
} }
kubeNetwork.Status.State = newNetworkStatus kubeNetwork.Status.State = newNetworkStatus
c.updateNetwork(kubeNetwork) c.kubeCRDClient.UpdateNetwork(kubeNetwork)
}
// updateNetwork updates Network CRD object by given object
func (c *NetworkController) updateNetwork(network *crv1.Network) {
err := c.networkClient.Put().
Name(network.Name).
Namespace(network.Namespace).
Resource(crv1.NetworkResourcePlural).
Body(network).
Do().
Error()
if err != nil {
glog.Errorf("ERROR updating network status: %v\n", err)
} else {
glog.V(3).Infof("UPDATED network status: %#v\n", network)
}
} }

View File

@ -5,8 +5,11 @@ import (
"fmt" "fmt"
"os" "os"
crv1 "git.openstack.org/openstack/stackube/pkg/apis/v1"
crdClient "git.openstack.org/openstack/stackube/pkg/kubecrd"
drivertypes "git.openstack.org/openstack/stackube/pkg/openstack/types" drivertypes "git.openstack.org/openstack/stackube/pkg/openstack/types"
"git.openstack.org/openstack/stackube/pkg/util" "git.openstack.org/openstack/stackube/pkg/util"
"github.com/docker/distribution/uuid" "github.com/docker/distribution/uuid"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/gophercloud/gophercloud" "github.com/gophercloud/gophercloud"
@ -21,6 +24,7 @@ import (
"github.com/gophercloud/gophercloud/openstack/networking/v2/ports" "github.com/gophercloud/gophercloud/openstack/networking/v2/ports"
"github.com/gophercloud/gophercloud/openstack/networking/v2/subnets" "github.com/gophercloud/gophercloud/openstack/networking/v2/subnets"
"github.com/gophercloud/gophercloud/pagination" "github.com/gophercloud/gophercloud/pagination"
gcfg "gopkg.in/gcfg.v1" gcfg "gopkg.in/gcfg.v1"
) )
@ -51,6 +55,7 @@ type Client struct {
ExtNetID string ExtNetID string
PluginName string PluginName string
IntegrationBridge string IntegrationBridge string
CRDClient *crdClient.CRDClient
} }
type PluginOpts struct { type PluginOpts struct {
@ -79,7 +84,7 @@ func toAuthOptions(cfg Config) gophercloud.AuthOptions {
} }
} }
func NewClient(config string) (*Client, error) { func NewClient(config string, kubeConfig string) (*Client, error) {
var opts gophercloud.AuthOptions var opts gophercloud.AuthOptions
cfg, err := readConfig(config) cfg, err := readConfig(config)
if err != nil { if err != nil {
@ -118,6 +123,16 @@ func NewClient(config string) (*Client, error) {
return nil, err return nil, err
} }
// Create CRD client
k8sConfig, err := util.NewClusterConfig(kubeConfig)
if err != nil {
return nil, fmt.Errorf("failed to build kubeconfig: %v", err)
}
kubeCRDClient, err := crdClient.NewCRDClient(k8sConfig)
if err != nil {
return nil, fmt.Errorf("failed to create client for CRD: %v", err)
}
client := &Client{ client := &Client{
Identity: identity, Identity: identity,
Provider: provider, Provider: provider,
@ -126,6 +141,7 @@ func NewClient(config string) (*Client, error) {
ExtNetID: cfg.Global.ExtNetID, ExtNetID: cfg.Global.ExtNetID,
PluginName: cfg.Plugin.PluginName, PluginName: cfg.Plugin.PluginName,
IntegrationBridge: cfg.Plugin.IntegrationBridge, IntegrationBridge: cfg.Plugin.IntegrationBridge,
CRDClient: kubeCRDClient,
} }
return client, nil return client, nil
} }
@ -147,8 +163,22 @@ func (c *Client) GetTenantIDFromName(tenantName string) (string, error) {
if util.IsSystemNamespace(tenantName) { if util.IsSystemNamespace(tenantName) {
tenantName = util.SystemTenant tenantName = util.SystemTenant
} }
// If tenantID is specified, return it directly
var (
tenant *crv1.Tenant
err error
)
if tenant, err = c.CRDClient.GetTenant(tenantName); err != nil {
return "", err
}
if tenant.Spec.TenantID != "" {
return tenant.Spec.TenantID, nil
}
// Otherwise, fetch tenantID from OpenStack
var tenantID string var tenantID string
err := tenants.List(c.Identity, nil).EachPage(func(page pagination.Page) (bool, error) { err = tenants.List(c.Identity, nil).EachPage(func(page pagination.Page) (bool, error) {
tenantList, err1 := tenants.ExtractTenants(page) tenantList, err1 := tenants.ExtractTenants(page)
if err1 != nil { if err1 != nil {
return false, err1 return false, err1
@ -164,6 +194,9 @@ func (c *Client) GetTenantIDFromName(tenantName string) (string, error) {
if err != nil { if err != nil {
return "", err return "", err
} }
glog.V(3).Infof("Got tenantID: %v for tenantName: %v", tenantID, tenantName)
return tenantID, nil return tenantID, nil
} }

View File

@ -14,13 +14,18 @@ import (
) )
func NewClusterConfig(kubeConfig string) (*rest.Config, error) { func NewClusterConfig(kubeConfig string) (*rest.Config, error) {
cfg, err := clientcmd.BuildConfigFromFlags("", kubeConfig) if kubeConfig != "" {
if err != nil { cfg, err := clientcmd.BuildConfigFromFlags("", kubeConfig)
return nil, err if err != nil {
return nil, err
}
cfg.QPS = 100
cfg.Burst = 100
return cfg, nil
} else {
return rest.InClusterConfig()
} }
cfg.QPS = 100
cfg.Burst = 100
return cfg, nil
} }
func WaitForCRDReady(clientset apiextensionsclient.Interface, crdName string) error { func WaitForCRDReady(clientset apiextensionsclient.Interface, crdName string) error {