diff options
Diffstat (limited to 'filter/pids.go')
-rw-r--r-- | filter/pids.go | 301 |
1 files changed, 301 insertions, 0 deletions
diff --git a/filter/pids.go b/filter/pids.go new file mode 100644 index 0000000..bd4b095 --- /dev/null +++ b/filter/pids.go | |||
@@ -0,0 +1,301 @@ | |||
1 | package main | ||
2 | |||
3 | import ( | ||
4 | "encoding/json" | ||
5 | "fmt" | ||
6 | "os" | ||
7 | "sort" | ||
8 | ) | ||
9 | |||
10 | var count int | ||
11 | var starTgid int | ||
12 | |||
13 | func filtPids(pRawPidData *[]Process) { | ||
14 | /* ATTENTION: 把map/slice直接传参是危险的 | ||
15 | * 传递的是指针,不会引起大的复制开销, | ||
16 | * 但是map/slice在callee func内被修改**可能**导致内存更改 | ||
17 | * 而这样的内存更改对caller function来说是不可见的,看到的还是原来的东西 | ||
18 | * 这里由于参数几乎都是只读不写,因而用一下 | ||
19 | */ | ||
20 | |||
21 | // 合并由多线程导致的重复记录,顺便按照pid升序 | ||
22 | // 多线程已经取消了,但保险起见还是留着 | ||
23 | merged := mergeProcess(pRawPidData) | ||
24 | // 将Process按照tgid合并 | ||
25 | var tgidMap map[int]*tgidNode | ||
26 | var rootfsPids []int | ||
27 | tgidMap, starTgid, rootfsPids = getTgidNodes(merged) | ||
28 | // 建树,helloTree | ||
29 | buildTree(tgidMap, starTgid) | ||
30 | // 对树上的进程做一些优化处理 | ||
31 | optimazePid(starTgid, rootfsPids) | ||
32 | } | ||
33 | |||
34 | func ProMerge(a, b Process) (res Process) { | ||
35 | // 合并过程中会遇到什么问题? | ||
36 | res.Star = false | ||
37 | |||
38 | if a.StartTimestamp.IsZero() { | ||
39 | res.StartTimestamp = b.StartTimestamp | ||
40 | } else if b.StartTimestamp.IsZero() { | ||
41 | res.StartTimestamp = a.StartTimestamp | ||
42 | } else if a.StartTimestamp.Before(b.StartTimestamp) { | ||
43 | res.StartTimestamp = a.StartTimestamp | ||
44 | } else { | ||
45 | res.StartTimestamp = b.StartTimestamp | ||
46 | } | ||
47 | |||
48 | res.Ppid = a.Ppid | ||
49 | if a.ParentTgid == 0 { | ||
50 | res.ParentTgid = b.ParentTgid | ||
51 | } else { | ||
52 | res.ParentTgid = a.ParentTgid | ||
53 | } | ||
54 | |||
55 | res.Pid = a.Pid | ||
56 | if a.Tgid == 0 { | ||
57 | res.Tgid = b.Tgid | ||
58 | } else { | ||
59 | res.Tgid = a.Tgid | ||
60 | } | ||
61 | |||
62 | if len(a.Args) == 0 { | ||
63 | res.Args = b.Args | ||
64 | } else { | ||
65 | res.Args = a.Args | ||
66 | } | ||
67 | |||
68 | if a.Comm == "" { | ||
69 | res.Comm = b.Comm | ||
70 | } else { | ||
71 | res.Comm = a.Comm | ||
72 | } | ||
73 | |||
74 | if a.RootFS == "" { | ||
75 | res.RootFS = b.RootFS | ||
76 | } else { | ||
77 | res.RootFS = a.RootFS | ||
78 | } | ||
79 | |||
80 | if a.Cwd == "" { | ||
81 | res.Cwd = b.Cwd | ||
82 | } else { | ||
83 | res.Cwd = a.Cwd | ||
84 | } | ||
85 | |||
86 | res.Execve = append(a.Execve, b.Execve...) | ||
87 | res.Children = append(a.Children, b.Children...) | ||
88 | |||
89 | var flag bool // 真a假b | ||
90 | if a.ExitTimestamp.IsZero() { | ||
91 | flag = false | ||
92 | } else if b.ExitTimestamp.IsZero() { | ||
93 | flag = true | ||
94 | } else if a.ExitTimestamp.Before(b.ExitTimestamp) { | ||
95 | flag = true | ||
96 | } else { | ||
97 | flag = false | ||
98 | } | ||
99 | |||
100 | if flag { | ||
101 | res.ExitCode = a.ExitCode | ||
102 | res.ExitSignal = a.ExitSignal | ||
103 | res.ExitTimestamp = a.ExitTimestamp | ||
104 | } else { | ||
105 | res.ExitCode = b.ExitCode | ||
106 | res.ExitSignal = b.ExitSignal | ||
107 | res.ExitTimestamp = b.ExitTimestamp | ||
108 | } | ||
109 | |||
110 | return res | ||
111 | } | ||
112 | |||
113 | func mergeProcess(pRawPidData *[]Process) (merged []Process) { | ||
114 | rawPidData := *pRawPidData | ||
115 | // 合并由多线程导致的重复记录,顺便按照pid升序 | ||
116 | index := make(map[int]int) | ||
117 | for _, process := range rawPidData { | ||
118 | i, exists := index[process.Pid] | ||
119 | if exists { | ||
120 | // 已存在,合并 | ||
121 | merged[i] = ProMerge(merged[i], process) | ||
122 | } else { | ||
123 | // 不存在,直接添加 | ||
124 | merged = append(merged, process) | ||
125 | index[process.Pid] = len(merged) - 1 | ||
126 | } | ||
127 | } | ||
128 | sort.Slice(merged, func(i, j int) bool { | ||
129 | return merged[i].Pid < merged[j].Pid | ||
130 | }) | ||
131 | return merged | ||
132 | } | ||
133 | |||
134 | func getTgidNodes(merged []Process) (tgidMap map[int]*tgidNode, starTgid int, rootfsPids []int) { | ||
135 | // 合并出来的进程整理为tgidNode | ||
136 | tgidMap = make(map[int]*tgidNode) | ||
137 | findTgid = make(map[int]int) // pid --> tgid | ||
138 | // var starTgid, rootFsPid int | ||
139 | starTgid = -1 | ||
140 | // rootfsPid = -1 | ||
141 | rootfsPids = make([]int, 0) | ||
142 | for _, val := range merged { | ||
143 | if val.Star { | ||
144 | starTgid = val.Tgid | ||
145 | } else if val.RootFS != "" { | ||
146 | rootfsPids = append(rootfsPids, val.Pid) | ||
147 | } | ||
148 | // 登记tgid | ||
149 | findTgid[val.Pid] = val.Tgid | ||
150 | nodeval, exists := tgidMap[val.Tgid] | ||
151 | if exists { | ||
152 | // 直接记录 | ||
153 | nodeval.Threads = append(nodeval.Threads, val) | ||
154 | nodeval.FindPid[val.Pid] = len(nodeval.Threads) - 1 | ||
155 | } else { | ||
156 | node := tgidNode{ | ||
157 | Tgid: val.Tgid, | ||
158 | FindPid: make(map[int]int), | ||
159 | Threads: make([]Process, 0), | ||
160 | ChildTgid: make([]int, 0), | ||
161 | } | ||
162 | node.Threads = append(node.Threads, val) | ||
163 | node.FindPid[val.Pid] = 0 | ||
164 | tgidMap[val.Tgid] = &node | ||
165 | } | ||
166 | } | ||
167 | return tgidMap, starTgid, rootfsPids | ||
168 | } | ||
169 | |||
170 | func buildTree(tgidMap map[int]*tgidNode, starTgid int) { | ||
171 | // 从tgid==starTgid开始,构建树 | ||
172 | helloTree = make(map[int]*tgidNode) // 在树上的tgid节点,tgid --> *tgidNode | ||
173 | var q Queue // 记录每一个整理好的结构体,bfs | ||
174 | visited := make(map[int]bool) // 哪些tgid已经访问过 | ||
175 | |||
176 | tmp, exists := tgidMap[starTgid] | ||
177 | if !exists { | ||
178 | return | ||
179 | } | ||
180 | |||
181 | // helloTree负责在遍历到该节点时记录 | ||
182 | // 队列仅负责搞明白哪些节点在树上 | ||
183 | // 因而所有添加子代tgid的行为只针对helloTree | ||
184 | // q不添加,直接把新的tgid对应的tgidNode入队就是了 | ||
185 | q.Enqueue(tmp) | ||
186 | visited[starTgid] = true | ||
187 | for !q.IsEmpty() { | ||
188 | tmp, ok := q.Dequeue() | ||
189 | if !ok { | ||
190 | continue | ||
191 | } | ||
192 | node := tmp.(*tgidNode) // 队列里的一个节点,这里必须重新申请node | ||
193 | helloTree[node.Tgid] = node | ||
194 | for i := 0; i < len(node.Threads); i++ { | ||
195 | for j := 0; j < len(node.Threads[i].Children); j++ { | ||
196 | tgid := findTgid[node.Threads[i].Children[j]] | ||
197 | _, exists := visited[tgid] | ||
198 | if !exists { | ||
199 | // 子代里有没见过的tgid | ||
200 | tgidNode, exists := tgidMap[tgid] | ||
201 | if !exists { | ||
202 | continue | ||
203 | } | ||
204 | helloTree[node.Tgid].ChildTgid = append(helloTree[node.Tgid].ChildTgid, tgid) | ||
205 | q.Enqueue(tgidNode) | ||
206 | visited[tgid] = true | ||
207 | } | ||
208 | } | ||
209 | } | ||
210 | } | ||
211 | } | ||
212 | |||
213 | func optimazePid(starTgid int, rootfsPids []int) { | ||
214 | getDockerRootFs := make(map[string]string) // dockerId --> rootfs | ||
215 | // 首先处理一下记录有pivot_root信息的进程,防止pivot先于fork | ||
216 | for _, rootfsPid := range rootfsPids { | ||
217 | rootfsTgid := findTgid[rootfsPid] | ||
218 | i := helloTree[rootfsTgid].FindPid[rootfsPid] | ||
219 | rootfsProcess := &(helloTree[rootfsTgid].Threads[i]) | ||
220 | if rootfsProcess.RootFS == "cwd" { | ||
221 | rootfsProcess.RootFS = rootfsProcess.Cwd | ||
222 | } | ||
223 | getDockerRootFs[rootfsProcess.DockerId] = rootfsProcess.RootFS | ||
224 | } | ||
225 | |||
226 | count = 0 | ||
227 | for _, val := range helloTree { | ||
228 | // 处理一下pid结束时间,顺便找找爹 | ||
229 | // 结束时间是因为很多线程结束时间没获取到,默认按照进程退出时间处理 | ||
230 | // Ppid是因为进程产生之初收到的信息写的爹一定是亲爹 | ||
231 | // 但是产生线程时候该进程很可能已作为孤儿被收养,导致线程里关于爹的记录是继父 | ||
232 | for i := 0; i < len(val.Threads); i++ { | ||
233 | if i != 0 { | ||
234 | if val.Threads[i].Tgid < val.Threads[0].Tgid { | ||
235 | val.Threads[i].ParentTgid = val.Threads[0].ParentTgid | ||
236 | val.Threads[i].Ppid = val.Threads[0].Ppid | ||
237 | } | ||
238 | if val.Threads[i].ExitTimestamp.IsZero() { | ||
239 | val.Threads[i].ExitCode = val.Threads[0].ExitCode | ||
240 | val.Threads[i].ExitTimestamp = val.Threads[0].ExitTimestamp | ||
241 | val.Threads[i].ExitSignal = val.Threads[0].ExitSignal | ||
242 | } | ||
243 | } | ||
244 | |||
245 | dockerId := val.Threads[i].DockerId | ||
246 | if dockerId != "" { | ||
247 | rootfs, exists := getDockerRootFs[dockerId] | ||
248 | if !exists { | ||
249 | fmt.Fprintf(os.Stderr, "Err: the docker rootfs of pid %d is not known!\n", val.Threads[i].Pid) | ||
250 | continue | ||
251 | } | ||
252 | val.Threads[i].RootFS = rootfs | ||
253 | } | ||
254 | } | ||
255 | |||
256 | count++ | ||
257 | } | ||
258 | } | ||
259 | |||
260 | // 绘制进程树 | ||
261 | func drawTree(treeFile *os.File, pidFile *os.File, node *tgidNode, prefix string, isLast bool) { | ||
262 | if node == nil { | ||
263 | return | ||
264 | } | ||
265 | |||
266 | fmt.Fprintf(treeFile, "%s", prefix) | ||
267 | if isLast { | ||
268 | fmt.Fprintf(treeFile, "└── ") | ||
269 | prefix += " " | ||
270 | } else { | ||
271 | fmt.Fprintf(treeFile, "├── ") | ||
272 | prefix += "│ " | ||
273 | } | ||
274 | // 将当前进程的参数整理为一行命令 | ||
275 | argv := "" | ||
276 | for i, arg := range node.Threads[0].Args { | ||
277 | if i == 0 { | ||
278 | argv = arg | ||
279 | } else { | ||
280 | argv += " " + arg | ||
281 | } | ||
282 | } | ||
283 | fmt.Fprintf(treeFile, "%d: %s\n", node.Tgid, argv) | ||
284 | |||
285 | // 当前节点信息以json格式写入pidFile | ||
286 | jsonData, err := json.MarshalIndent(node, "", " ") | ||
287 | if err != nil { | ||
288 | fmt.Fprintf(os.Stderr, "Err: %v\n", err) | ||
289 | return | ||
290 | } | ||
291 | pidFile.Write(jsonData) | ||
292 | pidFile.WriteString("\n\n") | ||
293 | |||
294 | // 递归打印子节点 | ||
295 | for i, childTgid := range node.ChildTgid { | ||
296 | childNode, exists := helloTree[childTgid] | ||
297 | if exists { | ||
298 | drawTree(treeFile, pidFile, childNode, prefix, i == len(node.ChildTgid)-1) | ||
299 | } | ||
300 | } | ||
301 | } | ||