aboutsummaryrefslogtreecommitdiffstats
path: root/filter/pids.go
diff options
context:
space:
mode:
Diffstat (limited to 'filter/pids.go')
-rw-r--r--filter/pids.go301
1 files changed, 301 insertions, 0 deletions
diff --git a/filter/pids.go b/filter/pids.go
new file mode 100644
index 0000000..bd4b095
--- /dev/null
+++ b/filter/pids.go
@@ -0,0 +1,301 @@
1package main
2
3import (
4 "encoding/json"
5 "fmt"
6 "os"
7 "sort"
8)
9
10var count int
11var starTgid int
12
13func filtPids(pRawPidData *[]Process) {
14 /* ATTENTION: 把map/slice直接传参是危险的
15 * 传递的是指针,不会引起大的复制开销,
16 * 但是map/slice在callee func内被修改**可能**导致内存更改
17 * 而这样的内存更改对caller function来说是不可见的,看到的还是原来的东西
18 * 这里由于参数几乎都是只读不写,因而用一下
19 */
20
21 // 合并由多线程导致的重复记录,顺便按照pid升序
22 // 多线程已经取消了,但保险起见还是留着
23 merged := mergeProcess(pRawPidData)
24 // 将Process按照tgid合并
25 var tgidMap map[int]*tgidNode
26 var rootfsPids []int
27 tgidMap, starTgid, rootfsPids = getTgidNodes(merged)
28 // 建树,helloTree
29 buildTree(tgidMap, starTgid)
30 // 对树上的进程做一些优化处理
31 optimazePid(starTgid, rootfsPids)
32}
33
34func ProMerge(a, b Process) (res Process) {
35 // 合并过程中会遇到什么问题?
36 res.Star = false
37
38 if a.StartTimestamp.IsZero() {
39 res.StartTimestamp = b.StartTimestamp
40 } else if b.StartTimestamp.IsZero() {
41 res.StartTimestamp = a.StartTimestamp
42 } else if a.StartTimestamp.Before(b.StartTimestamp) {
43 res.StartTimestamp = a.StartTimestamp
44 } else {
45 res.StartTimestamp = b.StartTimestamp
46 }
47
48 res.Ppid = a.Ppid
49 if a.ParentTgid == 0 {
50 res.ParentTgid = b.ParentTgid
51 } else {
52 res.ParentTgid = a.ParentTgid
53 }
54
55 res.Pid = a.Pid
56 if a.Tgid == 0 {
57 res.Tgid = b.Tgid
58 } else {
59 res.Tgid = a.Tgid
60 }
61
62 if len(a.Args) == 0 {
63 res.Args = b.Args
64 } else {
65 res.Args = a.Args
66 }
67
68 if a.Comm == "" {
69 res.Comm = b.Comm
70 } else {
71 res.Comm = a.Comm
72 }
73
74 if a.RootFS == "" {
75 res.RootFS = b.RootFS
76 } else {
77 res.RootFS = a.RootFS
78 }
79
80 if a.Cwd == "" {
81 res.Cwd = b.Cwd
82 } else {
83 res.Cwd = a.Cwd
84 }
85
86 res.Execve = append(a.Execve, b.Execve...)
87 res.Children = append(a.Children, b.Children...)
88
89 var flag bool // 真a假b
90 if a.ExitTimestamp.IsZero() {
91 flag = false
92 } else if b.ExitTimestamp.IsZero() {
93 flag = true
94 } else if a.ExitTimestamp.Before(b.ExitTimestamp) {
95 flag = true
96 } else {
97 flag = false
98 }
99
100 if flag {
101 res.ExitCode = a.ExitCode
102 res.ExitSignal = a.ExitSignal
103 res.ExitTimestamp = a.ExitTimestamp
104 } else {
105 res.ExitCode = b.ExitCode
106 res.ExitSignal = b.ExitSignal
107 res.ExitTimestamp = b.ExitTimestamp
108 }
109
110 return res
111}
112
113func mergeProcess(pRawPidData *[]Process) (merged []Process) {
114 rawPidData := *pRawPidData
115 // 合并由多线程导致的重复记录,顺便按照pid升序
116 index := make(map[int]int)
117 for _, process := range rawPidData {
118 i, exists := index[process.Pid]
119 if exists {
120 // 已存在,合并
121 merged[i] = ProMerge(merged[i], process)
122 } else {
123 // 不存在,直接添加
124 merged = append(merged, process)
125 index[process.Pid] = len(merged) - 1
126 }
127 }
128 sort.Slice(merged, func(i, j int) bool {
129 return merged[i].Pid < merged[j].Pid
130 })
131 return merged
132}
133
134func getTgidNodes(merged []Process) (tgidMap map[int]*tgidNode, starTgid int, rootfsPids []int) {
135 // 合并出来的进程整理为tgidNode
136 tgidMap = make(map[int]*tgidNode)
137 findTgid = make(map[int]int) // pid --> tgid
138 // var starTgid, rootFsPid int
139 starTgid = -1
140 // rootfsPid = -1
141 rootfsPids = make([]int, 0)
142 for _, val := range merged {
143 if val.Star {
144 starTgid = val.Tgid
145 } else if val.RootFS != "" {
146 rootfsPids = append(rootfsPids, val.Pid)
147 }
148 // 登记tgid
149 findTgid[val.Pid] = val.Tgid
150 nodeval, exists := tgidMap[val.Tgid]
151 if exists {
152 // 直接记录
153 nodeval.Threads = append(nodeval.Threads, val)
154 nodeval.FindPid[val.Pid] = len(nodeval.Threads) - 1
155 } else {
156 node := tgidNode{
157 Tgid: val.Tgid,
158 FindPid: make(map[int]int),
159 Threads: make([]Process, 0),
160 ChildTgid: make([]int, 0),
161 }
162 node.Threads = append(node.Threads, val)
163 node.FindPid[val.Pid] = 0
164 tgidMap[val.Tgid] = &node
165 }
166 }
167 return tgidMap, starTgid, rootfsPids
168}
169
170func buildTree(tgidMap map[int]*tgidNode, starTgid int) {
171 // 从tgid==starTgid开始,构建树
172 helloTree = make(map[int]*tgidNode) // 在树上的tgid节点,tgid --> *tgidNode
173 var q Queue // 记录每一个整理好的结构体,bfs
174 visited := make(map[int]bool) // 哪些tgid已经访问过
175
176 tmp, exists := tgidMap[starTgid]
177 if !exists {
178 return
179 }
180
181 // helloTree负责在遍历到该节点时记录
182 // 队列仅负责搞明白哪些节点在树上
183 // 因而所有添加子代tgid的行为只针对helloTree
184 // q不添加,直接把新的tgid对应的tgidNode入队就是了
185 q.Enqueue(tmp)
186 visited[starTgid] = true
187 for !q.IsEmpty() {
188 tmp, ok := q.Dequeue()
189 if !ok {
190 continue
191 }
192 node := tmp.(*tgidNode) // 队列里的一个节点,这里必须重新申请node
193 helloTree[node.Tgid] = node
194 for i := 0; i < len(node.Threads); i++ {
195 for j := 0; j < len(node.Threads[i].Children); j++ {
196 tgid := findTgid[node.Threads[i].Children[j]]
197 _, exists := visited[tgid]
198 if !exists {
199 // 子代里有没见过的tgid
200 tgidNode, exists := tgidMap[tgid]
201 if !exists {
202 continue
203 }
204 helloTree[node.Tgid].ChildTgid = append(helloTree[node.Tgid].ChildTgid, tgid)
205 q.Enqueue(tgidNode)
206 visited[tgid] = true
207 }
208 }
209 }
210 }
211}
212
213func optimazePid(starTgid int, rootfsPids []int) {
214 getDockerRootFs := make(map[string]string) // dockerId --> rootfs
215 // 首先处理一下记录有pivot_root信息的进程,防止pivot先于fork
216 for _, rootfsPid := range rootfsPids {
217 rootfsTgid := findTgid[rootfsPid]
218 i := helloTree[rootfsTgid].FindPid[rootfsPid]
219 rootfsProcess := &(helloTree[rootfsTgid].Threads[i])
220 if rootfsProcess.RootFS == "cwd" {
221 rootfsProcess.RootFS = rootfsProcess.Cwd
222 }
223 getDockerRootFs[rootfsProcess.DockerId] = rootfsProcess.RootFS
224 }
225
226 count = 0
227 for _, val := range helloTree {
228 // 处理一下pid结束时间,顺便找找爹
229 // 结束时间是因为很多线程结束时间没获取到,默认按照进程退出时间处理
230 // Ppid是因为进程产生之初收到的信息写的爹一定是亲爹
231 // 但是产生线程时候该进程很可能已作为孤儿被收养,导致线程里关于爹的记录是继父
232 for i := 0; i < len(val.Threads); i++ {
233 if i != 0 {
234 if val.Threads[i].Tgid < val.Threads[0].Tgid {
235 val.Threads[i].ParentTgid = val.Threads[0].ParentTgid
236 val.Threads[i].Ppid = val.Threads[0].Ppid
237 }
238 if val.Threads[i].ExitTimestamp.IsZero() {
239 val.Threads[i].ExitCode = val.Threads[0].ExitCode
240 val.Threads[i].ExitTimestamp = val.Threads[0].ExitTimestamp
241 val.Threads[i].ExitSignal = val.Threads[0].ExitSignal
242 }
243 }
244
245 dockerId := val.Threads[i].DockerId
246 if dockerId != "" {
247 rootfs, exists := getDockerRootFs[dockerId]
248 if !exists {
249 fmt.Fprintf(os.Stderr, "Err: the docker rootfs of pid %d is not known!\n", val.Threads[i].Pid)
250 continue
251 }
252 val.Threads[i].RootFS = rootfs
253 }
254 }
255
256 count++
257 }
258}
259
260// 绘制进程树
261func drawTree(treeFile *os.File, pidFile *os.File, node *tgidNode, prefix string, isLast bool) {
262 if node == nil {
263 return
264 }
265
266 fmt.Fprintf(treeFile, "%s", prefix)
267 if isLast {
268 fmt.Fprintf(treeFile, "└── ")
269 prefix += " "
270 } else {
271 fmt.Fprintf(treeFile, "├── ")
272 prefix += "│ "
273 }
274 // 将当前进程的参数整理为一行命令
275 argv := ""
276 for i, arg := range node.Threads[0].Args {
277 if i == 0 {
278 argv = arg
279 } else {
280 argv += " " + arg
281 }
282 }
283 fmt.Fprintf(treeFile, "%d: %s\n", node.Tgid, argv)
284
285 // 当前节点信息以json格式写入pidFile
286 jsonData, err := json.MarshalIndent(node, "", " ")
287 if err != nil {
288 fmt.Fprintf(os.Stderr, "Err: %v\n", err)
289 return
290 }
291 pidFile.Write(jsonData)
292 pidFile.WriteString("\n\n")
293
294 // 递归打印子节点
295 for i, childTgid := range node.ChildTgid {
296 childNode, exists := helloTree[childTgid]
297 if exists {
298 drawTree(treeFile, pidFile, childNode, prefix, i == len(node.ChildTgid)-1)
299 }
300 }
301}