0%

k8s client_go源码解析(1) 源码结构及客户端

本文为《Kubernetes 源码剖析》读书笔记,书籍简介: http://www.broadview.com.cn/book/6104

1.client_go源码结构

client_go在Kubernetes的vendor/k8s.io/client-go目录下

tree  -L 1
.
├── discovery  提供DiscoveryClient发现客户端
├── dynamic  提供DynamicClient动态客户端
├── informers 每种kubernetes资源的informer实现
├── kubernetes  提供ClientSet客户端
├── listers  为每个kubernetes资源提供Lister功能,该功能对Get和List提供只读的缓存数据
├── plugin 提供OpenStack、Gcp和Azure等云服务商授权插件
├── rest 提供RestClient客户端,对api server执行RESTful操作
├── scale 提供ScaleClient客户端,用于扩容或缩容Deployment、ReplicaSet、Replication Controller等资源对象
├── tools 提供常用工具,例如SharedInformer、Reflector、DealtFIFO、Indexers。提供Client查询和缓存机制,减少向api server发起的请求数等
├── transport 提供安全的TCP连接,支持Http Stream,某些操作需要在需要在客户端和容器之间传输二进制流,例如exec、attach等操作。该功能由内部的spdy包提供
└── util 提供常用方法,例如WorkQueue工作队列、Certificate证书管理等

2.Client客户端对象

client_go 包含4种客户端对象与API Server交互,分别是:RESTClient、ClientSet、DynamicClient、DiscoveryClient

客户端 说明
RESTClient 是最基础的客户端,其他三种客户端都是基于它实现的,它对Http Request进行了封装,实现了RESTful风格API
ClientSet 封装了对Resource和Version的管理方法,每一个Resource可以理解为一个客户端,而ClientSet是多个客户端的集合,每一个Resource和Version都以函数的方式暴露给开发者。同时ClientSet只能处理Kubernetes的内置资源,通过client-gen代码生成器自动生成
DynamicClient 可以处理Kubernetes中所有资源对象,包括内置资源和CRD自定义资源
DiscoveryClient 发现客户端,用于发现api server所支持的资源组、资源版本、资源信息(Group、Versions、Resources)

以上4种客户端都可以通过kubeconfig连接到指定的api server。

RESTClient客户端

example代码示例

//加载kubeconfig配置信息
config,error:=clientcmd.BuildConfigFromFlags("","/Users/.kube/config")
if error!=nil{
    panic(error)
}
//设置APIPath请求的Http路径
config.APIPath="api"
//设置请求的资源组和资源版本
config.GroupVersion=&corev1.SchemeGroupVersion
//设置解码器
config.NegotiatedSerializer=scheme.Codecs

//根据config信息实例化restClient
restClient,error:=rest.RESTClientFor(config)
if error != nil {
    panic(error)
}
result:=&corev1.PodList{}

error = restClient.Get().
    Namespace("dev").
    Resource("pods").
    VersionedParams(&metav1.ListOptions{Limit: 500},scheme.ParameterCodec).
    //执行请求
    Do(context.TODO()).
    //解析对象到result中
    Into(result)
for _,d:=range result.Items{
    fmt.Printf("NAMESPACE:%v \t NAME:%v \t STATUS:%+v \n",d.Namespace,d.Name,d.Status.Phase)
}

运行以上代码,列出dev命名空间下所有Pod资源信息。

Do方法是发起http请求的代码,代码路径在:rest/request.go

Do函数代码:

func (r *Request) Do(ctx context.Context) Result {
    var result Result
    err := r.request(ctx, func(req *http.Request, resp *http.Response) {
        result = r.transformResponse(resp, req)
    })
    if err != nil {
        return Result{err: err}
    }
    return result
}

Do函数调用的是Request的request函数

request函数代码:

func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error {
    ...
    for {
        url := r.URL().String()
        req, err := http.NewRequest(r.verb, url, r.body)
        if err != nil {
            return err
        }
        req = req.WithContext(ctx)
        req.Header = r.headers
        ...
        //发起调用
        resp, err := client.Do(req)
        
        ...
    
        if err != nil {
            ...
            //构建response
                resp = &http.Response{
                    StatusCode: http.StatusInternalServerError,
                    Header:     http.Header{"Retry-After": []string{"1"}},
                    Body:       ioutil.NopCloser(bytes.NewReader([]byte{})),
                }
            } else {
                return err
            }
        }

        done := func() bool {
                 
            defer func() {
            ...
                resp.Body.Close()
            }()
               ....
              fn(req, resp)
            return true
        }()
        if done {
            return nil
        }
    }
}

其中 r.URL().String() 对请求的信息进行解析为 REST请求 URL; 比如上述的Demo解析的地址为 http://x.x.x.x/api/v1/namespaces/dev/pods?limit=500,最后通过net/http想该url(即api server发送请求),将请求结果放在http.Response的Body对象中,在调用fn函数(即transformResponse)将结果转为资源对象,在通过resp.Body.close命令进行关闭。

ClientSet客户端

example代码示例

//加载kubeconfig配置信息
    config,error:=clientcmd.BuildConfigFromFlags("","/Users/.kube/config")
    if error!=nil{
        panic(error)
    }
    //实例化client
    clientSet,error :=kubernetes.NewForConfig(config)
    podClient :=clientSet.CoreV1().Pods("kube-system")
    //查询pod
    list,error :=podClient.List(context.TODO(),metav1.ListOptions{Limit: 500})
    if error!=nil{
        panic(error)
    }
    for _,d:=range list.Items{
        fmt.Printf("NAMESPACE:%v \t NAME:%v \t STATUS:%+v \n",d.Namespace,d.Name,d.Status.Phase)
    }

podClient.List底层调用的是RESTClient获取pod列表

    func (c *pods) List(ctx context.Context, opts metav1.ListOptions) (result *v1.PodList, err error) {
    var timeout time.Duration
    if opts.TimeoutSeconds != nil {
        timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
    }
    result = &v1.PodList{}
    err = c.client.Get().
        Namespace(c.ns).
        Resource("pods").
        VersionedParams(&opts, scheme.ParameterCodec).
        Timeout(timeout).
        Do(ctx).
        Into(result)
    return
}    

DynamicClient客户端

ClientSet只能处理k8s内部资源类型是因为它k8s内部资源是已知的数据结构,ClientSet预先实现了对Resource和Version的操作。

但是CRD的自定义资源是未知的数据结构,DynamicClient能访问CRD自定义资源,就依赖于内部实现了Unstructured,来处理非结构化的数据结构。
处理过程:客户端首先会将resource转换成Unstructured结构。

type UnstructuredList struct {
    Object map[string]interface{}

    // Items is a list of unstructured objects.
    Items []Unstructured `json:"items"`
}    

unstructured其实就是一个map[string]interface{}数据结构。

再通过

runtime.DefaultUnstructuredConverter.FromUnstructured(unstructObj.UnstructuredContent(),podList)

方法,反射将map[string]interface{}类型转为目标类型

Demo:

//加载kubeconfig配置信息
    config,error:=clientcmd.BuildConfigFromFlags("","/Users/.kube/config")
    if error!=nil{
        panic(error)
    }
    //实例化client
    dyClient,error:=dynamic.NewForConfig(config)
    if error!=nil{
        panic(error)
    }
    gvr:=schema.GroupVersionResource{Version: "v1",Resource: "pods"}
    unstructObj,error:=dyClient.Resource(gvr).Namespace("dev").List(context.TODO(),metav1.ListOptions{Limit: 500})
    if error!=nil{
        panic(error)
    }
    podList:=&corev1.PodList{}
    error=runtime.DefaultUnstructuredConverter.FromUnstructured(unstructObj.UnstructuredContent(),podList)
    if error!=nil{
        panic(error)
    }
    for _,d:=range podList.Items{
        fmt.Printf("NAMESPACE:%v \t NAME:%v \t STATUS:%+v \n",d.Namespace,d.Name,d.Status.Phase)
    }

DiscoveryClient客户端

DiscoveryClient功能主要用于发现api server所支持的资源组、资源版本、资源信息,同时还可以将这些信息缓存在本地,减少对api server的压力,
默认缓存信息存储在/kube/cache和/.kube/http-cache下
Demo:

//加载kubeconfig配置信息
    config,error:=clientcmd.BuildConfigFromFlags("","/Users/.kube/config")
    if error!=nil{
        panic(error)
    }
    discoveryClient,error:=discovery.NewDiscoveryClientForConfig(config)
    if error!=nil{
        panic(error)
    }
    _, apiResourceList,error:=discoveryClient.ServerGroupsAndResources()
    if error!=nil{
        panic(error)
    }
    for _,list:=range  apiResourceList{
        gv,error:=schema.ParseGroupVersion(list.GroupVersion)
        if error!=nil{
            panic(error)
        }
        for _,resource :=range list.APIResources{
            fmt.Printf("name:%v ,group:%v,version:%v\n",resource.Name,gv.Group,gv.Version)
        }
    }