简介
访问 k8s 集群获取资源有多种方式
- 命令行 kubectl
- http k8s REST API
- 代码库 client-go
- ClientSet
- Dynamic Client
- RESTClient
- informer
Kubernetes的client-go库介绍client-go是一个调用kubernetes集群资源对象http API的客户端(是一个典型的web服务客户端库),即通过client-go实现对kubernetes集群中资源对象(包括deployment、service、ingress、replicaSet、pod、namespace、node等)的增删改查等操作。
ClientSet 方式
使用示例
以node resource 为例,展示使用client-go 对 resource 进行查询和更新
clientset, err := kubernetes.NewForConfig(config)
// 获取node 列表
nodes, err := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
// 更新
_, err = a.client.CoreV1().Nodes().Update(context.TODO(), newNode, metav1.UpdateOptions{})
// 发送patch 指令更新
patchTemplate := map[string]interface{}{
"metadata": map[string]interface{}{
"labels": map[string]interface{}{
labelkey: labelvaule,
},
},
}
patchdata, _ := json.Marshal(patchTemplate)
_, err := clientset.CoreV1().Nodes().Patch(ctx, Nodes[i].Name, types.StrategicMergePatchType, patchdata, metav1.PatchOptions{})
k8s.io/client-go
/rest // 底层rest client 定义 RESTClient struct
/kubernetes // 访问 Kubernetes API的一系列的clientset
/typed
/core/v1
/pod.go // pod 相关api
/extensions/v1beta1
/deployment.go // deployment 相关api
/dynamic // 对任意Kubernetes对象执行通用操作的动态client
/informer
k8s.io/api
/core/v1
/types.go // 定义了pod service 等struct
/register.go
类似于 /core/v1
和 /extensions/v1beta1
这些GroupVersion 在 k8s.io/client-go
和 k8s.io/api
都有对应目录。
config,err := clientcmd.BuildConfigFromFlags("",kubeconfig)
clientset, err := kubernetes.NewForConfig(config)
pod,err := clientset
.CoreV1() // 选择APIGroupVersion 即 /api/v1
.Pods("book") // 命名空间
.Get("example",metav1.GetOptions{}) // 访问 /api/v1/namespaces/book/pods/example
从上到下来说:Clientset是调用Kubernetes资源对象最常用的client,可以操作所有的资源对象。需要指定Group、Version,然后根据Resource获取 对应的XXInterface。
pod /node 等API Resource 按GroupVersion(CoreV1/ExtensionsV1beta1) 进行了聚合,对外提供CoreV1Client/ExtensionsV1beta1Client,各个GroupVersion Interface 聚合为 clientset
type CoreV1Interface interface {
RESTClient() rest.Interface
ConfigMapsGetter
EventsGetter
NamespacesGetter
NodesGetter
PersistentVolumesGetter
PersistentVolumeClaimsGetter
PodsGetter
PodTemplatesGetter
ReplicationControllersGetter
SecretsGetter
ServicesGetter
...
}
type CoreV1Client struct {
restClient rest.Interface // 通用的REST 客户端
}
以pod 为例,对外提供了 PodInterface 封装了对Pod 的api。 Pod 的schema 数据 k8s.io/api
对应GroupVesion 路径下的 register.go 文件中 注册到 统一的 Schema 中,schema 数据在client-go 中用于 http 数据的解封装。
// k8s.io/client-go/deprecated/typed/core/v1/pod.go
type PodInterface interface {
Create(*v1.Pod) (*v1.Pod, error)
Update(*v1.Pod) (*v1.Pod, error)
Delete(name string, options *metav1.DeleteOptions) error
Get(name string, options metav1.GetOptions) (*v1.Pod, error)
List(opts metav1.ListOptions) (*v1.PodList, error)
Watch(opts metav1.ListOptions) (watch.Interface, error)
Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1.Pod, err error)
...
}
// pods implements PodInterface
type pods struct {
client rest.Interface
ns string
}
// k8s.io/client-go/rest/client.go
type RESTClient struct {
base *url.URL
Client *http.Client
...
}
func (c *pods) Get(name string, options metav1.GetOptions) (result *v1.Pod, err error) {
result = &v1.Pod{}
err = c.client.Get(). // 新建Request 对象
Namespace(c.ns). // 设置Request.namespace
Resource("pods"). // 设置Request.resource
Name(name). // 设置Request.resourceName
VersionedParams(&options, scheme.ParameterCodec).
Do(context.TODO()). // 执行Request.request
Into(result)
return
}
client-go 包含了 k8s 一些核心对象的访问,此外一些非核心对象 或用户crd 对象可以独立提供类似 client-go 功能
- 比如metric 机制相关的 PodMetrics/NodeMetrics对象,其代码都在
k8s.io/metrics
包里。 - controller-runtime 为cr 生成对应的client,scheme中 包含了cr 的信息。
informer 方式
Informer是一个带有本地缓存和索引机制的、可以注册 EventHandler 的 client,本地缓存被称为 Store,索引被称为 Index。使用 informer 的目的是为了减轻 apiserver 数据交互的压力而抽象出来的一个 cache 层, 客户端对 apiserver 数据的 “读取” 和 “监听” 操作都通过本地 informer 进行(相对于直接监听apiserverresp, err := http.Get("http://apiserver:8080/api/v1/watch/pods?watch=yes")
)。Informer 实例的Lister()方法可以直接查找缓存在本地内存中的数据。
使用示例
// 通过informer 获取node 列表
factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
nodeInformer := factory.Core().V1().Nodes()
go nodeInformer.Informer().Run(stopCh)
if !cache.WaitForCacheSync(stopCh, nodeInformer.Informer().HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
nodes, err := nodeInformer.Lister().List(labels.NewSelector())
“高冷”的 Kubernetes Informer 一探究竟为了让 Client-go 更快地返回 List/Get 请求的结果、减少对 Kubenetes API 的直接调用,Informer 被设计实现为一个依赖(并且只依赖) Kubernetes List/Watch API 、可监听事件并触发回调函数的二级缓存工具包。PS:这点zk/etcd 等client 也提供类似能力,只是zk/etcd client 存储的是通用数据,没有封装资源对象。
源码分析
Kubernetes: Controllers, Informers, Reflectors and StoresKubernetes offers these powerful structures to get a local representation of the API server’s resources.The Informer just a convenient wrapper to automagically syncs the upstream data to a downstream store and even offers you some handy event hooks.
k8s.io/client-go
/rest
/informer
/core
/v1
/pod.go
/interface.go
/interface.go
/factory.go // 定义sharedInformerFactory struct
/tools
/cache // informer 机制的的重点在cache 包里
/shared_informer.go // 定义了 sharedIndexInformer struct
/controller.go
/reflector.go
/delta_fifo.go
实际使用中 每一个资源对象(比如Pod、Deployment)都对应一个Informer,底层都用到了SharedIndexInformer
Kubernetes Informer 详解 Informer 只会调用 Kubernetes List 和 Watch 两种类型的 API,Informer 在初始化的时,先调用 Kubernetes List API 获得某种 resource 的全部 Object(真的只调了一次),缓存在内存中; 然后,调用 Watch API 去 watch 这种 resource,去维护这份缓存; 之后,Informer 就不再调用 Kubernetes 的任何 API。
informer 机制主要两个流程
- Reflector 通过ListWatcher 同步apiserver 数据(只启动时搞一次),并watch apiserver ,将event 加入到 Queue 中
- controller 从 Queue中获取event,更新存储,并触发Processor 业务层注册的 ResourceEventHandler
Reflector
// k8s.io/client-go/tools/cache/reflector.go
type Reflector struct {
name string
expectedTypeName string
expectedType reflect.Type // 放到 Store 中的对象类型
expectedGVK *schema.GroupVersionKind
// 与 watch 源同步的目标 Store
store Store
// 用来执行 lists 和 watches 操作的 listerWatcher 接口(最重要的)
listerWatcher ListerWatcher
WatchListPageSize int64
...
Reflector 对象通过 Run 函数来启动监控并处理监控事件
// k8s.io/client-go/tools/cache/reflector.go
// Run 函数反复使用 ListAndWatch 函数来获取所有对象和后续的 deltas。
// 当 stopCh 被关闭的时候,Run函数才会退出。
func (r *Reflector) Run(stopCh <-chan struct{}) {
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
}, r.backoffManager, true, stopCh)
}
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
var resourceVersion string
options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
if err := func() error {
var list runtime.Object
listCh := make(chan struct{}, 1)
go func() {
pager := pager.New(...)
pager.PageSize = xx
list, paginatedResult, err = pager.List(context.Background(), options)
close(listCh) //close listCh后,下面的select 会解除阻塞
}()
select {
case <-stopCh:
return nil
case r := <-panicCh:
panic(r)
case <-listCh:
}
...
r.setLastSyncResourceVersion(resourceVersion)
return nil
}()
// 处理resync 逻辑
for {
options = metav1.ListOptions{...}
w, err := r.listerWatcher.Watch(options)
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {...}
}
}
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
loop:
for {
select {
case <-stopCh:
return errorStopRequested
case err := <-errc:
return err
case event, ok := <-w.ResultChan():
meta, err := meta.Accessor(event.Object)
newResourceVersion := meta.GetResourceVersion()
switch event.Type {
case watch.Added:
err := r.store.Add(event.Object)
case watch.Modified:
err := r.store.Update(event.Object)
case watch.Deleted:
err := r.store.Delete(event.Object)
case watch.Bookmark:
// A `Bookmark` means watch has synced here, just update the resourceVersion
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
}
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
if rvu, ok := r.store.(ResourceVersionUpdater); ok {
rvu.UpdateResourceVersion(newResourceVersion)
}
}
}
return nil
}
Reflector.Run ==> pager.List + listerWatcher.Watch ==> Reflector.watchHandler ==> store.Add/Update/Delete ==> DeltaFIFO.Add obj 加入DeltaFIFO
首先通过Reflector的 relistResourceVersion 函数获得Reflector relist 的资源版本,如果资源版本非 0,则表示根据资源版本号继续获取,当传输过程中遇到网络故障或者其他原因导致中断,下次再连接时,会根据资源版本号继续传输未完成的部分。
ResourceVersion(资源版本号)非常重要,Kubernetes 中所有的资源都拥有该字段,它标识当前资源对象的版本号,每次修改(CUD)当前资源对象时,Kubernetes API Server 都会更改 ResourceVersion,这样 client-go 执行 Watch 操作时可以根据ResourceVersion 来确定当前资源对象是否发生了变化。
DeltaFIFO 和 FIFO 一样也是一个队列,DeltaFIFO里面的元素是一个 Delta。DeltaFIFO实现了Store和 Queue Interface
// k8s.io/client-go/tools/cache/delta_fifo.go
type Delta struct {
Type DeltaType
Object interface{}
}
type DeltaFIFO struct {
items map[string]Deltas // 存储key到元素对象的Map,提供Store能力
queue []string // key的队列,提供Queue能力
...
}
ADD/UPDATE 时判断items 是否包含元素,若包含则更新,不包含则加入items并写入queue。DELETE时直接从items 中移除,queue中不管。因此 items和queue中所包含的Key可能不一致,会定期resync。
Watch event 消费
sharedIndexInformer.Run ==> controller.Run ==> controller.processLoop ==> for Queue.Pop 也就是 sharedIndexInformer.HandleDeltas ==> 更新LocalStore + processor.distribute
watch 到的event(包含资源对象数据),先看本地cache 有没有,有就是更新,没有就是新增。
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
isSync := d.Type == Sync
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {...}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {...}
s.processor.distribute(addNotification{newObj: d.Object}, isSync)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {...}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
processor 是如何处理数据的
两条主线
- sharedIndexInformer.Run ==> sharedProcessor.run ==> sharedProcessor.run/pop 从channel 读取数据并执行
- sharedIndexInformer.HandleDeltas ==> sharedProcessor.distribute ==> processorListener.addCh 往channel 里塞数据
// k8s.io/client-go/tools/cache/shared_informer.go
type sharedProcessor struct {
listenersStarted bool
listenersLock sync.RWMutex
listeners []*processorListener
syncingListeners []*processorListener
clock clock.Clock
wg wait.Group
}
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
for _, listener := range p.listeners {
// 加入到processorListener 的addCh 中,随后进入pendingNotifications,因为这里不能阻塞
listener.add(obj)
}
}
// k8s.io/client-go/tools/cache/shared_informer.go
type processorListener struct {
nextCh chan interface{}
addCh chan interface{}
handler ResourceEventHandler
pendingNotifications buffer.RingGrowing
...
}
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
for _, listener := range p.listeners {
p.wg.Start(listener.run) // 消费nextCh
p.wg.Start(listener.pop) // 消费addCh 经过 mq 转到 nextCh
}
p.listenersStarted = true
}()
...
}
消息流转的具体路径:addCh ==> notificationToAdd ==> pendingNotifications ==> notification ==> nextCh
func (p *processorListener) pop() {
var nextCh chan<- interface{}
var notification interface{} // 用来做消息的中转,并在最开始的时候标记pendingNotifications 为空
for {
// select case channel 更多是事件驱动的感觉,哪个channel 来数据了或者可以 接收数据了就处理哪个 case 内逻辑
select {
case nextCh <- notification:
// Notification dispatched
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd, ok := <-p.addCh:
if notification == nil { // No notification to pop (and pendingNotifications is empty)
// Optimize the case - skip adding to pendingNotifications
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
func (p *processorListener) run() {
stopCh := make(chan struct{})
wait.Until(func() {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
close(stopCh)
}, 1*time.Second, stopCh)
}
可以看到,对于handler 来说,除非特殊场景,否则一般不需要另起协程了。
watch 是如何实现的?
理解 K8S 的设计精髓之 List-Watch机制和Informer模块HTTP 分块传输编码允许服务器为动态生成的内容维持 HTTP 持久链接。通常,持久链接需要服务器在开始发送消息体前发送Content-Length消息头字段,但是对于动态生成的内容来说,在内容创建完之前是不可知的。使用分块传输编码,数据分解成一系列数据块,并以一个或多个块发送,这样服务器可以发送数据而不需要预先知道发送内容的总大小。
当客户端调用watch API时,apiserver 在response的HTTP Header中设置Transfer-Encoding的值为chunked,表示采用分块传输编码,客户端收到该信息后,便和服务端保持该链接,并等待下一个数据块,即资源的事件信息。例如:
$ curl -i http://{kube-api-server-ip}:8080/api/v1/watch/pods?watch=yes
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
Date: Thu, 02 Jan 2019 20:22:59 GMT
Transfer-Encoding: chunked
{"type":"ADDED", "object":{"kind":"Pod","apiVersion":"v1",...}}
{"type":"ADDED", "object":{"kind":"Pod","apiVersion":"v1",...}}
{"type":"MODIFIED", "object":{"kind":"Pod","apiVersion":"v1",...}}
从k8s.io/apimachinery/pkg/watch
返回的watch.Interface
type Interface interface{
Stop()
ResultChan() <- Event
}
type Event struct{
Type EventType // ADDED/MODIFIED/DELETED/ERROR
Object runtime.Object
}
其它
Scheme
k8s.io/api
/apps
/v1beta1/types.go // 定义了StatefulSet.go
/v1/types.go // 定义了StatefulSet.go
Scheme defines methods for serializing and deserializing API objects, a type registry for converting group, version, and kind information to and from Go schemas, and mappings between Go schemas of different versions. A scheme is the foundation for a versioned API and versioned configuration over time.
// k8s.io/apimachinery/pkg/runtime/scheme.go
type Scheme struct {
// a Type is a particular Go struct,比如k8s.io/api/apps/v1.StatefulSet
gvkToType map[schema.GroupVersionKind]reflect.Type
typeToGVK map[reflect.Type][]schema.GroupVersionKind
...
}
func (s *Scheme) New(kind schema.GroupVersionKind) (Object, error) {
if t, exists := s.gvkToType[kind]; exists {
return reflect.New(t).Interface().(Object), nil
}
...
return nil, NewNotRegisteredErrForKind(s.schemeName, kind)
}
Dynamic client
Dynamic client 是一种动态的 client,它能处理 kubernetes 所有的资源。不同于 clientset,dynamic client 对GVK 一无所知, 返回的对象unstructured.Unstructured(在k8s.io/apimachinery 中定义,并注册到了schema 中) 是一个 map[string]interface{}
,如果一个 controller 中需要控制所有的 API,可以使用dynamic client,目前它在 garbage collector 和 namespace controller中被使用。
k8s.io/client-go
/dynamic
/dynamicinformer
/dynamiclister
/interface.go
相比底层的 RESTClient,基于 unstructured.Unstructured 实现了 数据的解封装 及watch 机制。
// k8s.io/client-go/dynamic/interface.go
type ResourceInterface interface {
Create(ctx context.Context, obj *unstructured.Unstructured, options metav1.CreateOptions, subresources ...string) (*unstructured.Unstructured, error)
Update(ctx context.Context, obj *unstructured.Unstructured, options metav1.UpdateOptions, subresources ...string) (*unstructured.Unstructured, error)
Delete(ctx context.Context, name string, options metav1.DeleteOptions, subresources ...string) error
Get(ctx context.Context, name string, options metav1.GetOptions, subresources ...string) (*unstructured.Unstructured, error)
List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error)
Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
...
}
// k8s.io/client-go/dynamic/simple.go
func (c *dynamicResourceClient) Get(ctx context.Context, name string, opts metav1.GetOptions, subresources ...string) (*unstructured.Unstructured, error) {
// 这里直接拼接了 api resource 的请求路径
result := c.client.client.Get().AbsPath(append(c.makeURLSegments(name), subresources...)...).SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).Do(ctx)
retBytes, err := result.Raw()
uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes)
return uncastObj.(*unstructured.Unstructured), nil
}
更新status
更新status,以Deployment 为例,/apis/apps/v1beta1/namespaces/${ns}/deployments/${name}
只能更新deployment 的 spec。/apis/apps/v1beta1/namespaces/${ns}/deployments/${name}/status
只能更新 deployment 的status。
kube-proxy
可以通过 kubectl proxy
(监听127.0.0.1:8001
) 来访问 apiserver http://127.0.0.1:8001/api/v1/namespaces/default/pods/$podName
。也可以 kubectl get --raw /api/v1/namespaces/default/pods/$podName
访问http 接口拿到pod 数据,为了方便 数据的访问, client-go 提供了RESTClient 来进行初步的封装。类似于 java 的HttpUtils 类。
// k8s.io/client-go/rest/client.go
type Interface interface {
GetRateLimiter() flowcontrol.RateLimiter
Verb(verb string) *Request
Post() *Request
Put() *Request
Patch(pt types.PatchType) *Request
Get() *Request
Delete() *Request
APIVersion() schema.GroupVersion
}
// 你可以直接 根据 rest.Config 初始化RESTClient,从apiServer 中拿到数据
type RESTClient struct {
// base is the root URL for all invocations of the client
base *url.URL
// versionedAPIPath is a path segment connecting the base URL to the resource root
versionedAPIPath string
// Set specific behavior of the client. If not set http.DefaultClient will be used.
Client *http.Client
...
}
// k8s.io/client-go/rest/request.go
func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error {
client := r.c.Client // http.Client
// Right now we make about ten retry attempts if we get a Retry-After response.
retries := 0
for {
url := r.URL().String()
req, err := http.NewRequest(r.verb, url, r.body)
req = req.WithContext(ctx)
req.Header = r.headers
resp, err := client.Do(req)
done := func() bool {
...
}()
if done {
return nil
}
}
}