缘起
起源来自阿里云的文档
发现能对 kubernetes Event 进行消息推送之后,非常喜欢。但是其本身的钉钉推送方式不好用,所以决定亲自修改。
决定开发
项目源代码位于 kube-eventer ,顺便了解了一下kubernetes 的 Event 机制
- Controller Manager 会记录节点注册和销毁的事件、Deployment 扩容和升级的事件
- kubelet 会记录镜像回收事件、volume 无法挂载事件。基本上所有的事件都在
kubernetes/pkg/kubelet/events/event.gol里面定义
Event 结构体
Event 结构体定义在 "k8s.io/api/core/v1"里面
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
// Event is a report of an event somewhere in the cluster.
type Event struct {
metav1.TypeMeta `json:",inline"`
// Standard object's metadata.
// More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#metadata
metav1.ObjectMeta `json:"metadata" protobuf:"bytes,1,opt,name=metadata"`
// The object that this event is about.
InvolvedObject ObjectReference `json:"involvedObject" protobuf:"bytes,2,opt,name=involvedObject"`
// This should be a short, machine understandable string that gives the reason
// for the transition into the object's current status.
// TODO: provide exact specification for format.
// +optional
Reason string `json:"reason,omitempty" protobuf:"bytes,3,opt,name=reason"`
// A human-readable description of the status of this operation.
// TODO: decide on maximum length.
// +optional
Message string `json:"message,omitempty" protobuf:"bytes,4,opt,name=message"`
// The component reporting this event. Should be a short machine understandable string.
// +optional
Source EventSource `json:"source,omitempty" protobuf:"bytes,5,opt,name=source"`
// The time at which the event was first recorded. (Time of server receipt is in TypeMeta.)
// +optional
FirstTimestamp metav1.Time `json:"firstTimestamp,omitempty" protobuf:"bytes,6,opt,name=firstTimestamp"`
// The time at which the most recent occurrence of this event was recorded.
// +optional
LastTimestamp metav1.Time `json:"lastTimestamp,omitempty" protobuf:"bytes,7,opt,name=lastTimestamp"`
// The number of times this event has occurred.
// +optional
Count int32 `json:"count,omitempty" protobuf:"varint,8,opt,name=count"`
// Type of this event (Normal, Warning), new types could be added in the future
// +optional
Type string `json:"type,omitempty" protobuf:"bytes,9,opt,name=type"`
// Time when this Event was first observed.
// +optional
EventTime metav1.MicroTime `json:"eventTime,omitempty" protobuf:"bytes,10,opt,name=eventTime"`
// Data about the Event series this event represents or nil if it's a singleton Event.
// +optional
Series *EventSeries `json:"series,omitempty" protobuf:"bytes,11,opt,name=series"`
// What action was taken/failed regarding to the Regarding object.
// +optional
Action string `json:"action,omitempty" protobuf:"bytes,12,opt,name=action"`
// Optional secondary object for more complex actions.
// +optional
Related *ObjectReference `json:"related,omitempty" protobuf:"bytes,13,opt,name=related"`
// Name of the controller that emitted this Event, e.g. `kubernetes.io/kubelet`.
// +optional
ReportingController string `json:"reportingComponent" protobuf:"bytes,14,opt,name=reportingComponent"`
// ID of the controller instance, e.g. `kubelet-xyzf`.
// +optional
ReportingInstance string `json:"reportingInstance" protobuf:"bytes,15,opt,name=reportingInstance"`
}
// ObjectReference contains enough information to let you inspect or modify the referred object.
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type ObjectReference struct {
// Kind of the referent.
// More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds
// +optional
Kind string `json:"kind,omitempty" protobuf:"bytes,1,opt,name=kind"`
// Namespace of the referent.
// More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/
// +optional
Namespace string `json:"namespace,omitempty" protobuf:"bytes,2,opt,name=namespace"`
// Name of the referent.
// More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
// +optional
Name string `json:"name,omitempty" protobuf:"bytes,3,opt,name=name"`
// UID of the referent.
// More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#uids
// +optional
UID types.UID `json:"uid,omitempty" protobuf:"bytes,4,opt,name=uid,casttype=k8s.io/apimachinery/pkg/types.UID"`
// API version of the referent.
// +optional
APIVersion string `json:"apiVersion,omitempty" protobuf:"bytes,5,opt,name=apiVersion"`
// Specific resourceVersion to which this reference is made, if any.
// More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#concurrency-control-and-consistency
// +optional
ResourceVersion string `json:"resourceVersion,omitempty" protobuf:"bytes,6,opt,name=resourceVersion"`
// If referring to a piece of an object instead of an entire object, this string
// should contain a valid JSON/Go field access statement, such as desiredState.manifest.containers[2].
// For example, if the object reference is to a container within a pod, this would take on a value like:
// "spec.containers{name}" (where "name" refers to the name of the container that triggered
// the event) or if no container name is specified "spec.containers[2]" (container with
// index 2 in this pod). This syntax is chosen only to have some well-defined way of
// referencing a part of an object.
// TODO: this design is not final and this field is subject to change in the future.
// +optional
FieldPath string `json:"fieldPath,omitempty" protobuf:"bytes,7,opt,name=fieldPath"`
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// kubectl get event -o json
{
"apiVersion": "v1",
"items": [{
"apiVersion": "v1",
"count": 2416,
"eventTime": null,
"firstTimestamp": "2020-02-14T12:22:43Z",
"involvedObject": {
"apiVersion": "v1",
"kind": "Service",
"name": "my-sb-svc",
"namespace": "default",
"resourceVersion": "264028180",
"uid": "96117aad-4f24-11ea-a87c-00163e04f1e0"
},
"kind": "Event",
"lastTimestamp": "2020-02-19T13:08:25Z",
"message": "Port 666 was assigned to multiple services; please recreate service",
"metadata": {
"creationTimestamp": "2020-02-14T12:22:43Z",
"name": "my-sb-svc.15f344468d77364d",
"namespace": "default",
"resourceVersion": "267629591",
"selfLink": "/api/v1/namespaces/test/events/my-sb-svc.15f344468d77364d",
"uid": "b3a56707-4f24-11ea-81ec-00163e0a865a"
},
"reason": "PortAlreadyAllocated",
"reportingComponent": "",
"reportingInstance": "",
"source": {
"component": "portallocator-repair-controller"
},
"type": "Warning"
}],
"kind": "List",
"metadata": {
"resourceVersion": "",
"selfLink": ""
}
}
设计细节
程序的入口是 eventer.go
sink 是程序的输出端,比如可以输出到钉钉,elasticsearch等等。
这一块插件会在一开始通过
1
sinkManager, err := sinks.NewEventSinkManager(sinkList, sinks.DefaultSinkExportEventsTimeout, sinks.DefaultSinkStopTimeout)
这个方法,以go func() 形式并行启动所有 sink 。
真正的主角是 manager
1
manager, err := manager.NewManager(sources[0], sinkManager, *argFrequency)
它接受 sinkManager 和其他一系列参数,启动主函数。重复展开定义之后,会找到Housekeep 这个方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (rm *realManager) Housekeep() {
for {
// Try to infovke housekeep at fixed time.
now := time.Now()
start := now.Truncate(rm.frequency)
end := start.Add(rm.frequency)
timeToNextSync := end.Sub(now)
select {
case <-time.After(timeToNextSync):
rm.housekeep()
case <-rm.stopChan:
rm.sink.Stop()
return
}
}
}
这个方法写得非常简单明了,无限递归调用,除非接收到 stopChan 这个停止信号。
除此以外,还默认监听了 0.0.0.0:8084 作为健康检查的端口。
Event 的获取也相当高效
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// NewKubernetesSource 事件来源
func NewKubernetesSource(uri *url.URL) (*KubernetesEventSource, error) {
kubeConfig, err := kubeconfig.GetKubeClientConfig(uri)
if err != nil {
return nil, err
}
kubeClient, err := kubeclient.NewForConfig(kubeConfig)
if err != nil {
return nil, err
}
eventClient := kubeClient.CoreV1().Events(kubeapi.NamespaceAll)
result := KubernetesEventSource{
localEventsBuffer: make(chan *kubeapi.Event, LocalEventsBufferSize),
stopChannel: make(chan struct{}),
eventClient: eventClient,
}
go result.watch()
return &result, nil
}
结语
这个项目的开发者语言表达非常精炼,这个项目很适用于学习 golang 并发。
Origin
Originated from Alibaba Cloud’s documentation
After discovering that kubernetes Event can be used for message push, I really liked it. But its own DingTalk push method wasn’t good to use, so I decided to modify it myself.
Decision to Develop
Project source code is located at kube-eventer , and I also learned about kubernetes’ Event mechanism
- Controller Manager records events of node registration and destruction, Deployment scaling and upgrade events
- kubelet records image recycling events, volume mount failure events. Basically all events are defined in
kubernetes/pkg/kubelet/events/event.go
Event Structure
Event structure is defined in "k8s.io/api/core/v1"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
// Event is a report of an event somewhere in the cluster.
type Event struct {
metav1.TypeMeta `json:",inline"`
// Standard object's metadata.
// More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#metadata
metav1.ObjectMeta `json:"metadata" protobuf:"bytes,1,opt,name=metadata"`
// The object that this event is about.
InvolvedObject ObjectReference `json:"involvedObject" protobuf:"bytes,2,opt,name=involvedObject"`
// This should be a short, machine understandable string that gives the reason
// for the transition into the object's current status.
// TODO: provide exact specification for format.
// +optional
Reason string `json:"reason,omitempty" protobuf:"bytes,3,opt,name=reason"`
// A human-readable description of the status of this operation.
// TODO: decide on maximum length.
// +optional
Message string `json:"message,omitempty" protobuf:"bytes,4,opt,name=message"`
// The component reporting this event. Should be a short machine understandable string.
// +optional
Source EventSource `json:"source,omitempty" protobuf:"bytes,5,opt,name=source"`
// The time at which the event was first recorded. (Time of server receipt is in TypeMeta.)
// +optional
FirstTimestamp metav1.Time `json:"firstTimestamp,omitempty" protobuf:"bytes,6,opt,name=firstTimestamp"`
// The time at which the most recent occurrence of this event was recorded.
// +optional
LastTimestamp metav1.Time `json:"lastTimestamp,omitempty" protobuf:"bytes,7,opt,name=lastTimestamp"`
// The number of times this event has occurred.
// +optional
Count int32 `json:"count,omitempty" protobuf:"varint,8,opt,name=count"`
// Type of this event (Normal, Warning), new types could be added in the future
// +optional
Type string `json:"type,omitempty" protobuf:"bytes,9,opt,name=type"`
// Time when this Event was first observed.
// +optional
EventTime metav1.MicroTime `json:"eventTime,omitempty" protobuf:"bytes,10,opt,name=eventTime"`
// Data about the Event series this event represents or nil if it's a singleton Event.
// +optional
Series *EventSeries `json:"series,omitempty" protobuf:"bytes,11,opt,name=series"`
// What action was taken/failed regarding to the Regarding object.
// +optional
Action string `json:"action,omitempty" protobuf:"bytes,12,opt,name=action"`
// Optional secondary object for more complex actions.
// +optional
Related *ObjectReference `json:"related,omitempty" protobuf:"bytes,13,opt,name=related"`
// Name of the controller that emitted this Event, e.g. `kubernetes.io/kubelet`.
// +optional
ReportingController string `json:"reportingComponent" protobuf:"bytes,14,opt,name=reportingComponent"`
// ID of the controller instance, e.g. `kubelet-xyzf`.
// +optional
ReportingInstance string `json:"reportingInstance" protobuf:"bytes,15,opt,name=reportingInstance"`
}
// ObjectReference contains enough information to let you inspect or modify the referred object.
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type ObjectReference struct {
// Kind of the referent.
// More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds
// +optional
Kind string `json:"kind,omitempty" protobuf:"bytes,1,opt,name=kind"`
// Namespace of the referent.
// More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/
// +optional
Namespace string `json:"namespace,omitempty" protobuf:"bytes,2,opt,name=namespace"`
// Name of the referent.
// More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
// +optional
Name string `json:"name,omitempty" protobuf:"bytes,3,opt,name=name"`
// UID of the referent.
// More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#uids
// +optional
UID types.UID `json:"uid,omitempty" protobuf:"bytes,4,opt,name=uid,casttype=k8s.io/apimachinery/pkg/types.UID"`
// API version of the referent.
// +optional
APIVersion string `json:"apiVersion,omitempty" protobuf:"bytes,5,opt,name=apiVersion"`
// Specific resourceVersion to which this reference is made, if any.
// More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#concurrency-control-and-consistency
// +optional
ResourceVersion string `json:"resourceVersion,omitempty" protobuf:"bytes,6,opt,name=resourceVersion"`
// If referring to a piece of an object instead of an entire object, this string
// should contain a valid JSON/Go field access statement, such as desiredState.manifest.containers[2].
// For example, if the object reference is to a container within a pod, this would take on a value like:
// "spec.containers{name}" (where "name" refers to the name of the container that triggered
// the event) or if no container name is specified "spec.containers[2]" (container with
// index 2 in this pod). This syntax is chosen only to have some well-defined way of
// referencing a part of an object.
// TODO: this design is not final and this field is subject to change in the future.
// +optional
FieldPath string `json:"fieldPath,omitempty" protobuf:"bytes,7,opt,name=fieldPath"`
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// kubectl get event -o json
{
"apiVersion": "v1",
"items": [{
"apiVersion": "v1",
"count": 2416,
"eventTime": null,
"firstTimestamp": "2020-02-14T12:22:43Z",
"involvedObject": {
"apiVersion": "v1",
"kind": "Service",
"name": "my-sb-svc",
"namespace": "default",
"resourceVersion": "264028180",
"uid": "96117aad-4f24-11ea-a87c-00163e04f1e0"
},
"kind": "Event",
"lastTimestamp": "2020-02-19T13:08:25Z",
"message": "Port 666 was assigned to multiple services; please recreate service",
"metadata": {
"creationTimestamp": "2020-02-14T12:22:43Z",
"name": "my-sb-svc.15f344468d77364d",
"namespace": "default",
"resourceVersion": "267629591",
"selfLink": "/api/v1/namespaces/test/events/my-sb-svc.15f344468d77364d",
"uid": "b3a56707-4f24-11ea-81ec-00163e0a865a"
},
"reason": "PortAlreadyAllocated",
"reportingComponent": "",
"reportingInstance": "",
"source": {
"component": "portallocator-repair-controller"
},
"type": "Warning"
}],
"kind": "List",
"metadata": {
"resourceVersion": "",
"selfLink": ""
}
}
Design Details
The program’s entry point is eventer.go
sink is the program’s output end, for example, it can output to DingTalk, elasticsearch, etc.
This plugin will start all sink in parallel in the form of go func() at the beginning through
1
sinkManager, err := sinks.NewEventSinkManager(sinkList, sinks.DefaultSinkExportEventsTimeout, sinks.DefaultSinkStopTimeout)
This method.
The real protagonist is manager
1
manager, err := manager.NewManager(sources[0], sinkManager, *argFrequency)
It accepts sinkManager and a series of other parameters, starts the main function. After repeatedly expanding definitions, you’ll find the Housekeep method
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (rm *realManager) Housekeep() {
for {
// Try to infovke housekeep at fixed time.
now := time.Now()
start := now.Truncate(rm.frequency)
end := start.Add(rm.frequency)
timeToNextSync := end.Sub(now)
select {
case <-time.After(timeToNextSync):
rm.housekeep()
case <-rm.stopChan:
rm.sink.Stop()
return
}
}
}
This method is written very simply and clearly, infinite recursive calls, unless it receives the stopChan stop signal.
In addition, it also listens to 0.0.0.0:8084 by default as the health check port.
Event retrieval is also quite efficient
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// NewKubernetesSource Event source
func NewKubernetesSource(uri *url.URL) (*KubernetesEventSource, error) {
kubeConfig, err := kubeconfig.GetKubeClientConfig(uri)
if err != nil {
return nil, err
}
kubeClient, err := kubeclient.NewForConfig(kubeConfig)
if err != nil {
return nil, err
}
eventClient := kubeClient.CoreV1().Events(kubeapi.NamespaceAll)
result := KubernetesEventSource{
localEventsBuffer: make(chan *kubeapi.Event, LocalEventsBufferSize),
stopChannel: make(chan struct{}),
eventClient: eventClient,
}
go result.watch()
return &result, nil
}
Conclusion
The developer’s language expression in this project is very concise. This project is very suitable for learning golang concurrency.
起源
阿里云のドキュメントから始まりました
kubernetes Eventでメッセージプッシュができることを発見した後、非常に気に入りました。しかし、その独自の钉钉プッシュ方法は使いにくかったため、自分で変更することにしました。
開発の決定
プロジェクトのソースコードはkube-eventerにあります。 ついでにkubernetesのEventメカニズムについても理解しました。
- Controller Managerはノードの登録と破棄のイベント、Deploymentのスケーリングとアップグレードのイベントを記録します
- kubeletはイメージリサイクルイベント、volumeマウント失敗イベントを記録します。基本的にすべてのイベントは
kubernetes/pkg/kubelet/events/event.goで定義されています
Event構造体
Event構造体は"k8s.io/api/core/v1"で定義されています
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
// Event is a report of an event somewhere in the cluster.
type Event struct {
metav1.TypeMeta `json:",inline"`
// Standard object's metadata.
// More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#metadata
metav1.ObjectMeta `json:"metadata" protobuf:"bytes,1,opt,name=metadata"`
// The object that this event is about.
InvolvedObject ObjectReference `json:"involvedObject" protobuf:"bytes,2,opt,name=involvedObject"`
// This should be a short, machine understandable string that gives the reason
// for the transition into the object's current status.
// TODO: provide exact specification for format.
// +optional
Reason string `json:"reason,omitempty" protobuf:"bytes,3,opt,name=reason"`
// A human-readable description of the status of this operation.
// TODO: decide on maximum length.
// +optional
Message string `json:"message,omitempty" protobuf:"bytes,4,opt,name=message"`
// The component reporting this event. Should be a short machine understandable string.
// +optional
Source EventSource `json:"source,omitempty" protobuf:"bytes,5,opt,name=source"`
// The time at which the event was first recorded. (Time of server receipt is in TypeMeta.)
// +optional
FirstTimestamp metav1.Time `json:"firstTimestamp,omitempty" protobuf:"bytes,6,opt,name=firstTimestamp"`
// The time at which the most recent occurrence of this event was recorded.
// +optional
LastTimestamp metav1.Time `json:"lastTimestamp,omitempty" protobuf:"bytes,7,opt,name=lastTimestamp"`
// The number of times this event has occurred.
// +optional
Count int32 `json:"count,omitempty" protobuf:"varint,8,opt,name=count"`
// Type of this event (Normal, Warning), new types could be added in the future
// +optional
Type string `json:"type,omitempty" protobuf:"bytes,9,opt,name=type"`
// Time when this Event was first observed.
// +optional
EventTime metav1.MicroTime `json:"eventTime,omitempty" protobuf:"bytes,10,opt,name=eventTime"`
// Data about the Event series this event represents or nil if it's a singleton Event.
// +optional
Series *EventSeries `json:"series,omitempty" protobuf:"bytes,11,opt,name=series"`
// What action was taken/failed regarding to the Regarding object.
// +optional
Action string `json:"action,omitempty" protobuf:"bytes,12,opt,name=action"`
// Optional secondary object for more complex actions.
// +optional
Related *ObjectReference `json:"related,omitempty" protobuf:"bytes,13,opt,name=related"`
// Name of the controller that emitted this Event, e.g. `kubernetes.io/kubelet`.
// +optional
ReportingController string `json:"reportingComponent" protobuf:"bytes,14,opt,name=reportingComponent"`
// ID of the controller instance, e.g. `kubelet-xyzf`.
// +optional
ReportingInstance string `json:"reportingInstance" protobuf:"bytes,15,opt,name=reportingInstance"`
}
// ObjectReference contains enough information to let you inspect or modify the referred object.
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type ObjectReference struct {
// Kind of the referent.
// More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds
// +optional
Kind string `json:"kind,omitempty" protobuf:"bytes,1,opt,name=kind"`
// Namespace of the referent.
// More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/
// +optional
Namespace string `json:"namespace,omitempty" protobuf:"bytes,2,opt,name=namespace"`
// Name of the referent.
// More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
// +optional
Name string `json:"name,omitempty" protobuf:"bytes,3,opt,name=name"`
// UID of the referent.
// More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#uids
// +optional
UID types.UID `json:"uid,omitempty" protobuf:"bytes,4,opt,name=uid,casttype=k8s.io/apimachinery/pkg/types.UID"`
// API version of the referent.
// +optional
APIVersion string `json:"apiVersion,omitempty" protobuf:"bytes,5,opt,name=apiVersion"`
// Specific resourceVersion to which this reference is made, if any.
// More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#concurrency-control-and-consistency
// +optional
ResourceVersion string `json:"resourceVersion,omitempty" protobuf:"bytes,6,opt,name=resourceVersion"`
// If referring to a piece of an object instead of an entire object, this string
// should contain a valid JSON/Go field access statement, such as desiredState.manifest.containers[2].
// For example, if the object reference is to a container within a pod, this would take on a value like:
// "spec.containers{name}" (where "name" refers to the name of the container that triggered
// the event) or if no container name is specified "spec.containers[2]" (container with
// index 2 in this pod). This syntax is chosen only to have some well-defined way of
// referencing a part of an object.
// TODO: this design is not final and this field is subject to change in the future.
// +optional
FieldPath string `json:"fieldPath,omitempty" protobuf:"bytes,7,opt,name=fieldPath"`
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// kubectl get event -o json
{
"apiVersion": "v1",
"items": [{
"apiVersion": "v1",
"count": 2416,
"eventTime": null,
"firstTimestamp": "2020-02-14T12:22:43Z",
"involvedObject": {
"apiVersion": "v1",
"kind": "Service",
"name": "my-sb-svc",
"namespace": "default",
"resourceVersion": "264028180",
"uid": "96117aad-4f24-11ea-a87c-00163e04f1e0"
},
"kind": "Event",
"lastTimestamp": "2020-02-19T13:08:25Z",
"message": "Port 666 was assigned to multiple services; please recreate service",
"metadata": {
"creationTimestamp": "2020-02-14T12:22:43Z",
"name": "my-sb-svc.15f344468d77364d",
"namespace": "default",
"resourceVersion": "267629591",
"selfLink": "/api/v1/namespaces/test/events/my-sb-svc.15f344468d77364d",
"uid": "b3a56707-4f24-11ea-81ec-00163e0a865a"
},
"reason": "PortAlreadyAllocated",
"reportingComponent": "",
"reportingInstance": "",
"source": {
"component": "portallocator-repair-controller"
},
"type": "Warning"
}],
"kind": "List",
"metadata": {
"resourceVersion": "",
"selfLink": ""
}
}
設計の詳細
プログラムのエントリーポイントは eventer.goです
sinkはプログラムの出力端で、たとえば钉钉、elasticsearchなどに出力できます。
このプラグインは、最初に
1
sinkManager, err := sinks.NewEventSinkManager(sinkList, sinks.DefaultSinkExportEventsTimeout, sinks.DefaultSinkStopTimeout)
このメソッドを通じて、go func()形式で並列にすべてのsinkを起動します。
真の主役はmanagerです
1
manager, err := manager.NewManager(sources[0], sinkManager, *argFrequency)
これはsinkManagerと他の一連のパラメータを受け取り、メイン関数を起動します。定義を繰り返し展開した後、Housekeepメソッドが見つかります
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (rm *realManager) Housekeep() {
for {
// Try to infovke housekeep at fixed time.
now := time.Now()
start := now.Truncate(rm.frequency)
end := start.Add(rm.frequency)
timeToNextSync := end.Sub(now)
select {
case <-time.After(timeToNextSync):
rm.housekeep()
case <-rm.stopChan:
rm.sink.Stop()
return
}
}
}
このメソッドは非常にシンプルで明確に書かれており、無限再帰呼び出しで、stopChan停止信号を受信しない限り続きます。
これに加えて、デフォルトで0.0.0.0:8084をヘルスチェックポートとしてリッスンします。
Eventの取得も非常に効率的です
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// NewKubernetesSource イベントソース
func NewKubernetesSource(uri *url.URL) (*KubernetesEventSource, error) {
kubeConfig, err := kubeconfig.GetKubeClientConfig(uri)
if err != nil {
return nil, err
}
kubeClient, err := kubeclient.NewForConfig(kubeConfig)
if err != nil {
return nil, err
}
eventClient := kubeClient.CoreV1().Events(kubeapi.NamespaceAll)
result := KubernetesEventSource{
localEventsBuffer: make(chan *kubeapi.Event, LocalEventsBufferSize),
stopChannel: make(chan struct{}),
eventClient: eventClient,
}
go result.watch()
return &result, nil
}
結語
このプロジェクトの開発者の言語表現は非常に簡潔です。このプロジェクトはgolangの並行性を学習するのに非常に適しています。
Происхождение
Происходит из документации Alibaba Cloud
После обнаружения, что kubernetes Event можно использовать для push-уведомлений, мне это очень понравилось. Но его собственный метод push DingTalk был неудобен в использовании, поэтому я решил изменить его самостоятельно.
Решение о разработке
Исходный код проекта находится в kube-eventer , и я также узнал о механизме Event kubernetes
- Controller Manager записывает события регистрации и уничтожения узлов, события масштабирования и обновления Deployment
- kubelet записывает события переработки образов, события неудачной монтировки volume. В основном все события определены в
kubernetes/pkg/kubelet/events/event.go
Структура Event
Структура Event определена в "k8s.io/api/core/v1"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
// Event is a report of an event somewhere in the cluster.
type Event struct {
metav1.TypeMeta `json:",inline"`
// Standard object's metadata.
// More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#metadata
metav1.ObjectMeta `json:"metadata" protobuf:"bytes,1,opt,name=metadata"`
// The object that this event is about.
InvolvedObject ObjectReference `json:"involvedObject" protobuf:"bytes,2,opt,name=involvedObject"`
// This should be a short, machine understandable string that gives the reason
// for the transition into the object's current status.
// TODO: provide exact specification for format.
// +optional
Reason string `json:"reason,omitempty" protobuf:"bytes,3,opt,name=reason"`
// A human-readable description of the status of this operation.
// TODO: decide on maximum length.
// +optional
Message string `json:"message,omitempty" protobuf:"bytes,4,opt,name=message"`
// The component reporting this event. Should be a short machine understandable string.
// +optional
Source EventSource `json:"source,omitempty" protobuf:"bytes,5,opt,name=source"`
// The time at which the event was first recorded. (Time of server receipt is in TypeMeta.)
// +optional
FirstTimestamp metav1.Time `json:"firstTimestamp,omitempty" protobuf:"bytes,6,opt,name=firstTimestamp"`
// The time at which the most recent occurrence of this event was recorded.
// +optional
LastTimestamp metav1.Time `json:"lastTimestamp,omitempty" protobuf:"bytes,7,opt,name=lastTimestamp"`
// The number of times this event has occurred.
// +optional
Count int32 `json:"count,omitempty" protobuf:"varint,8,opt,name=count"`
// Type of this event (Normal, Warning), new types could be added in the future
// +optional
Type string `json:"type,omitempty" protobuf:"bytes,9,opt,name=type"`
// Time when this Event was first observed.
// +optional
EventTime metav1.MicroTime `json:"eventTime,omitempty" protobuf:"bytes,10,opt,name=eventTime"`
// Data about the Event series this event represents or nil if it's a singleton Event.
// +optional
Series *EventSeries `json:"series,omitempty" protobuf:"bytes,11,opt,name=series"`
// What action was taken/failed regarding to the Regarding object.
// +optional
Action string `json:"action,omitempty" protobuf:"bytes,12,opt,name=action"`
// Optional secondary object for more complex actions.
// +optional
Related *ObjectReference `json:"related,omitempty" protobuf:"bytes,13,opt,name=related"`
// Name of the controller that emitted this Event, e.g. `kubernetes.io/kubelet`.
// +optional
ReportingController string `json:"reportingComponent" protobuf:"bytes,14,opt,name=reportingComponent"`
// ID of the controller instance, e.g. `kubelet-xyzf`.
// +optional
ReportingInstance string `json:"reportingInstance" protobuf:"bytes,15,opt,name=reportingInstance"`
}
// ObjectReference contains enough information to let you inspect or modify the referred object.
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type ObjectReference struct {
// Kind of the referent.
// More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds
// +optional
Kind string `json:"kind,omitempty" protobuf:"bytes,1,opt,name=kind"`
// Namespace of the referent.
// More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/
// +optional
Namespace string `json:"namespace,omitempty" protobuf:"bytes,2,opt,name=namespace"`
// Name of the referent.
// More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
// +optional
Name string `json:"name,omitempty" protobuf:"bytes,3,opt,name=name"`
// UID of the referent.
// More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#uids
// +optional
UID types.UID `json:"uid,omitempty" protobuf:"bytes,4,opt,name=uid,casttype=k8s.io/apimachinery/pkg/types.UID"`
// API version of the referent.
// +optional
APIVersion string `json:"apiVersion,omitempty" protobuf:"bytes,5,opt,name=apiVersion"`
// Specific resourceVersion to which this reference is made, if any.
// More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#concurrency-control-and-consistency
// +optional
ResourceVersion string `json:"resourceVersion,omitempty" protobuf:"bytes,6,opt,name=resourceVersion"`
// If referring to a piece of an object instead of an entire object, this string
// should contain a valid JSON/Go field access statement, such as desiredState.manifest.containers[2].
// For example, if the object reference is to a container within a pod, this would take on a value like:
// "spec.containers{name}" (where "name" refers to the name of the container that triggered
// the event) or if no container name is specified "spec.containers[2]" (container with
// index 2 in this pod). This syntax is chosen only to have some well-defined way of
// referencing a part of an object.
// TODO: this design is not final and this field is subject to change in the future.
// +optional
FieldPath string `json:"fieldPath,omitempty" protobuf:"bytes,7,opt,name=fieldPath"`
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// kubectl get event -o json
{
"apiVersion": "v1",
"items": [{
"apiVersion": "v1",
"count": 2416,
"eventTime": null,
"firstTimestamp": "2020-02-14T12:22:43Z",
"involvedObject": {
"apiVersion": "v1",
"kind": "Service",
"name": "my-sb-svc",
"namespace": "default",
"resourceVersion": "264028180",
"uid": "96117aad-4f24-11ea-a87c-00163e04f1e0"
},
"kind": "Event",
"lastTimestamp": "2020-02-19T13:08:25Z",
"message": "Port 666 was assigned to multiple services; please recreate service",
"metadata": {
"creationTimestamp": "2020-02-14T12:22:43Z",
"name": "my-sb-svc.15f344468d77364d",
"namespace": "default",
"resourceVersion": "267629591",
"selfLink": "/api/v1/namespaces/test/events/my-sb-svc.15f344468d77364d",
"uid": "b3a56707-4f24-11ea-81ec-00163e0a865a"
},
"reason": "PortAlreadyAllocated",
"reportingComponent": "",
"reportingInstance": "",
"source": {
"component": "portallocator-repair-controller"
},
"type": "Warning"
}],
"kind": "List",
"metadata": {
"resourceVersion": "",
"selfLink": ""
}
}
Детали дизайна
Точка входа программы — eventer.go
sink — это выходной конец программы, например, может выводить в DingTalk, elasticsearch и т.д.
Этот плагин запустит все sink параллельно в форме go func() в начале через
1
sinkManager, err := sinks.NewEventSinkManager(sinkList, sinks.DefaultSinkExportEventsTimeout, sinks.DefaultSinkStopTimeout)
Этот метод.
Настоящий главный герой — это manager
1
manager, err := manager.NewManager(sources[0], sinkManager, *argFrequency)
Он принимает sinkManager и ряд других параметров, запускает главную функцию. После многократного развертывания определений вы найдете метод Housekeep
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (rm *realManager) Housekeep() {
for {
// Try to infovke housekeep at fixed time.
now := time.Now()
start := now.Truncate(rm.frequency)
end := start.Add(rm.frequency)
timeToNextSync := end.Sub(now)
select {
case <-time.After(timeToNextSync):
rm.housekeep()
case <-rm.stopChan:
rm.sink.Stop()
return
}
}
}
Этот метод написан очень просто и ясно, бесконечные рекурсивные вызовы, если не получен сигнал остановки stopChan.
Кроме того, он также по умолчанию прослушивает 0.0.0.0:8084 как порт проверки работоспособности.
Получение Event также довольно эффективно
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// NewKubernetesSource Источник событий
func NewKubernetesSource(uri *url.URL) (*KubernetesEventSource, error) {
kubeConfig, err := kubeconfig.GetKubeClientConfig(uri)
if err != nil {
return nil, err
}
kubeClient, err := kubeclient.NewForConfig(kubeConfig)
if err != nil {
return nil, err
}
eventClient := kubeClient.CoreV1().Events(kubeapi.NamespaceAll)
result := KubernetesEventSource{
localEventsBuffer: make(chan *kubeapi.Event, LocalEventsBufferSize),
stopChannel: make(chan struct{}),
eventClient: eventClient,
}
go result.watch()
return &result, nil
}
Заключение
Языковое выражение разработчика в этом проекте очень лаконично. Этот проект очень подходит для изучения параллелизма golang.