aboutsummaryrefslogtreecommitdiffstats
path: root/filter/filter.go
diff options
context:
space:
mode:
Diffstat (limited to 'filter/filter.go')
-rw-r--r--filter/filter.go328
1 files changed, 36 insertions, 292 deletions
diff --git a/filter/filter.go b/filter/filter.go
index 98c326c..6391afc 100644
--- a/filter/filter.go
+++ b/filter/filter.go
@@ -2,11 +2,10 @@ package main
2 2
3import ( 3import (
4 "context" 4 "context"
5 "encoding/json"
5 "fmt" 6 "fmt"
6 "log" 7 "log"
7 "os" 8 "os"
8 "path"
9 "sort"
10 9
11 "go.mongodb.org/mongo-driver/bson" 10 "go.mongodb.org/mongo-driver/bson"
12 "go.mongodb.org/mongo-driver/mongo" 11 "go.mongodb.org/mongo-driver/mongo"
@@ -33,7 +32,7 @@ var files []File
33 32
34func main() { 33func main() {
35 // 连接到MongoDB 34 // 连接到MongoDB
36 client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI("mongodb://localhost:27017")) 35 client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI("mongodb://192.168.192.136:27017"))
37 if err != nil { 36 if err != nil {
38 log.Fatal(err) 37 log.Fatal(err)
39 } 38 }
@@ -114,303 +113,48 @@ func main() {
114 for _, file := range files { 113 for _, file := range files {
115 newFileCol.InsertOne(context.Background(), file) 114 newFileCol.InsertOne(context.Background(), file)
116 } 115 }
117}
118
119func ProMerge(a, b Process) (res Process) {
120 // 合并过程中会遇到什么问题?
121 res.Star = false
122
123 if a.StartTimestamp.IsZero() {
124 res.StartTimestamp = b.StartTimestamp
125 } else if b.StartTimestamp.IsZero() {
126 res.StartTimestamp = a.StartTimestamp
127 } else if a.StartTimestamp.Before(b.StartTimestamp) {
128 res.StartTimestamp = a.StartTimestamp
129 } else {
130 res.StartTimestamp = b.StartTimestamp
131 }
132
133 res.Ppid = a.Ppid
134 if a.ParentTgid == 0 {
135 res.ParentTgid = b.ParentTgid
136 } else {
137 res.ParentTgid = a.ParentTgid
138 }
139
140 res.Pid = a.Pid
141 if a.Tgid == 0 {
142 res.Tgid = b.Tgid
143 } else {
144 res.Tgid = a.Tgid
145 }
146
147 if len(a.Args) == 0 {
148 res.Args = b.Args
149 } else {
150 res.Args = a.Args
151 }
152
153 if a.Comm == "" {
154 res.Comm = b.Comm
155 } else {
156 res.Comm = a.Comm
157 }
158
159 if a.RootFS == "" {
160 res.RootFS = b.RootFS
161 } else {
162 res.RootFS = a.RootFS
163 }
164
165 if a.Cwd == "" {
166 res.Cwd = b.Cwd
167 } else {
168 res.Cwd = a.Cwd
169 }
170
171 res.Execve = append(a.Execve, b.Execve...)
172 res.Children = append(a.Children, b.Children...)
173 116
174 var flag bool // 真a假b 117 /* Step 3: 输出到文件
175 if a.ExitTimestamp.IsZero() { 118 * - 所有内容输出到logs目录,所有文本存在则覆盖,不存在则创建
176 flag = false 119 * - 进程树输出到logs/tree.log
177 } else if b.ExitTimestamp.IsZero() { 120 * - 每个进程以json格式输出到logs/pids.log
178 flag = true 121 * - 文件信息输出到logs/files.log
179 } else if a.ExitTimestamp.Before(b.ExitTimestamp) { 122 */
180 flag = true 123 stat, err := os.Stat("logs")
181 } else { 124 if err != nil || !stat.IsDir() {
182 flag = false 125 os.Mkdir("logs", 0755)
183 }
184
185 if flag {
186 res.ExitCode = a.ExitCode
187 res.ExitSignal = a.ExitSignal
188 res.ExitTimestamp = a.ExitTimestamp
189 } else {
190 res.ExitCode = b.ExitCode
191 res.ExitSignal = b.ExitSignal
192 res.ExitTimestamp = b.ExitTimestamp
193 }
194
195 return res
196}
197
198func mergeProcess(pRawPidData *[]Process) (merged []Process) {
199 rawPidData := *pRawPidData
200 // 合并由多线程导致的重复记录,顺便按照pid升序
201 index := make(map[int]int)
202 for _, process := range rawPidData {
203 i, exists := index[process.Pid]
204 if exists {
205 // 已存在,合并
206 merged[i] = ProMerge(merged[i], process)
207 } else {
208 // 不存在,直接添加
209 merged = append(merged, process)
210 index[process.Pid] = len(merged) - 1
211 }
212 }
213 sort.Slice(merged, func(i, j int) bool {
214 return merged[i].Pid < merged[j].Pid
215 })
216 return merged
217}
218
219func getTgidNodes(merged []Process) (tgidMap map[int]*tgidNode, starTgid int, rootfsPids []int) {
220 // 合并出来的进程整理为tgidNode
221 tgidMap = make(map[int]*tgidNode)
222 findTgid = make(map[int]int) // pid --> tgid
223 // var starTgid, rootFsPid int
224 starTgid = -1
225 // rootfsPid = -1
226 rootfsPids = make([]int, 0)
227 for _, val := range merged {
228 if val.Star {
229 starTgid = val.Tgid
230 } else if val.RootFS != "" {
231 rootfsPids = append(rootfsPids, val.Pid)
232 }
233 // 登记tgid
234 findTgid[val.Pid] = val.Tgid
235 nodeval, exists := tgidMap[val.Tgid]
236 if exists {
237 // 直接记录
238 nodeval.Threads = append(nodeval.Threads, val)
239 nodeval.FindPid[val.Pid] = len(nodeval.Threads) - 1
240 } else {
241 node := tgidNode{
242 Tgid: val.Tgid,
243 FindPid: make(map[int]int),
244 Threads: make([]Process, 0),
245 ChildTgid: make([]int, 0),
246 }
247 node.Threads = append(node.Threads, val)
248 node.FindPid[val.Pid] = 0
249 tgidMap[val.Tgid] = &node
250 }
251 } 126 }
252 return tgidMap, starTgid, rootfsPids
253}
254 127
255func buildTree(tgidMap map[int]*tgidNode, starTgid int) { 128 // 进程树
256 // 从tgid==starTgid开始,构建树 129 treeFile, err := os.OpenFile("logs/tree.log", os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
257 helloTree = make(map[int]*tgidNode) // 在树上的tgid节点,tgid --> *tgidNode 130 if err != nil {
258 var q Queue // 记录每一个整理好的结构体,bfs 131 fmt.Fprintf(os.Stderr, "Err: %v\n", err)
259 visited := make(map[int]bool) // 哪些tgid已经访问过
260
261 tmp, exists := tgidMap[starTgid]
262 if !exists {
263 return 132 return
264 } 133 }
265 134 defer treeFile.Close()
266 // helloTree负责在遍历到该节点时记录 135 pidFile, err := os.OpenFile("logs/pid.log", os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
267 // 队列仅负责搞明白哪些节点在树上 136 if err != nil {
268 // 因而所有添加子代tgid的行为只针对helloTree 137 fmt.Fprintf(os.Stderr, "Err: %v\n", err)
269 // q不添加,直接把新的tgid对应的tgidNode入队就是了 138 return
270 q.Enqueue(tmp)
271 visited[starTgid] = true
272 for !q.IsEmpty() {
273 tmp, ok := q.Dequeue()
274 if !ok {
275 continue
276 }
277 node := tmp.(*tgidNode) // 队列里的一个节点,这里必须重新申请node
278 helloTree[node.Tgid] = node
279 for i := 0; i < len(node.Threads); i++ {
280 for j := 0; j < len(node.Threads[i].Children); j++ {
281 tgid := findTgid[node.Threads[i].Children[j]]
282 _, exists := visited[tgid]
283 if !exists {
284 // 子代里有没见过的tgid
285 tgidNode, exists := tgidMap[tgid]
286 if !exists {
287 continue
288 }
289 helloTree[node.Tgid].ChildTgid = append(helloTree[node.Tgid].ChildTgid, tgid)
290 q.Enqueue(tgidNode)
291 visited[tgid] = true
292 }
293 }
294 }
295 }
296}
297
298func optimazePid(starTgid int, rootfsPids []int) {
299 getDockerRootFs := make(map[string]string) // dockerId --> rootfs
300 // 首先处理一下记录有pivot_root信息的进程,防止pivot先于fork
301 for _, rootfsPid := range rootfsPids {
302 rootfsTgid := findTgid[rootfsPid]
303 i := helloTree[rootfsTgid].FindPid[rootfsPid]
304 rootfsProcess := &(helloTree[rootfsTgid].Threads[i])
305 if rootfsProcess.RootFS == "cwd" {
306 rootfsProcess.RootFS = rootfsProcess.Cwd
307 }
308 getDockerRootFs[rootfsProcess.DockerId] = rootfsProcess.RootFS
309 } 139 }
140 defer pidFile.Close()
141 // 从starTgid开始,按照树的形状输出
142 drawTree(treeFile, pidFile, helloTree[starTgid], "", true)
310 143
311 count := 0 144 // 文件信息,json格式
312 for _, val := range helloTree { 145 fileFile, err := os.OpenFile("logs/files.log", os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
313 // 处理一下pid结束时间,顺便找找爹 146 if err != nil {
314 // 结束时间是因为很多线程结束时间没获取到,默认按照进程退出时间处理 147 fmt.Fprintf(os.Stderr, "Err: %v\n", err)
315 // Ppid是因为进程产生之初收到的信息写的爹一定是亲爹 148 return
316 // 但是产生线程时候该进程很可能已作为孤儿被收养,导致线程里关于爹的记录是继父
317 for i := 0; i < len(val.Threads); i++ {
318 if i != 0 {
319 if val.Threads[i].Tgid < val.Threads[0].Tgid {
320 val.Threads[i].ParentTgid = val.Threads[0].ParentTgid
321 val.Threads[i].Ppid = val.Threads[0].Ppid
322 }
323 if val.Threads[i].ExitTimestamp.IsZero() {
324 val.Threads[i].ExitCode = val.Threads[0].ExitCode
325 val.Threads[i].ExitTimestamp = val.Threads[0].ExitTimestamp
326 val.Threads[i].ExitSignal = val.Threads[0].ExitSignal
327 }
328 }
329
330 dockerId := val.Threads[i].DockerId
331 if dockerId != "" {
332 rootfs, exists := getDockerRootFs[dockerId]
333 if !exists {
334 fmt.Fprintf(os.Stderr, "Err: the docker rootfs of pid %d is not known!\n", val.Threads[i].Pid)
335 continue
336 }
337 val.Threads[i].RootFS = rootfs
338 }
339 }
340
341 count++
342 fmt.Printf("%v\n", *val)
343 } 149 }
344 fmt.Printf("Star: %d, res: %d\n", starTgid, count) 150 defer fileFile.Close()
345} 151 for _, file := range files {
346 152 jsonData, err := json.MarshalIndent(file, "", " ")
347func filtPids(pRawPidData *[]Process) { 153 if err != nil {
348 /* ATTENTION: 把map/slice直接传参是危险的 154 fmt.Fprintf(os.Stderr, "Err: %v\n", err)
349 * 传递的是指针,不会引起大的复制开销, 155 return
350 * 但是map/slice在callee func内被修改**可能**导致内存更改
351 * 而这样的内存更改对caller function来说是不可见的,看到的还是原来的东西
352 * 这里由于参数几乎都是只读不写,因而用一下
353 */
354
355 // 合并由多线程导致的重复记录,顺便按照pid升序
356 // 多线程已经取消了,但保险起见还是留着
357 merged := mergeProcess(pRawPidData)
358 // 将Process按照tgid合并
359 tgidMap, starTgid, rootfsPids := getTgidNodes(merged)
360 // 建树,helloTree
361 buildTree(tgidMap, starTgid)
362 // 对树上的进程做一些优化处理
363 optimazePid(starTgid, rootfsPids)
364}
365
366func filtFiles(pRawFileData *[]File) {
367 rawFileData := *pRawFileData
368 files = make([]File, 0)
369
370 // 所有文件按照特定顺序排
371 sort.Slice(rawFileData, func(i, j int) bool {
372 pi := &rawFileData[i]
373 pj := &rawFileData[j]
374
375 if pi.FileName < pj.FileName {
376 return true
377 } else if pi.FileName > pj.FileName {
378 return false
379 }
380 if pi.Pid < pj.Pid {
381 return true
382 } else if pi.Pid > pj.Pid {
383 return false
384 }
385 if pi.Fd < pj.Fd {
386 return true
387 } else if pi.Fd > pj.Fd {
388 return false
389 }
390 if pi.OpenTimestamp.Before(pj.OpenTimestamp) {
391 return true
392 } else {
393 return false
394 }
395 })
396
397 for _, file := range rawFileData {
398 if file.FileName == "/root/test/1/../.hello.c.swp" {
399 fmt.Printf("Test\n")
400 }
401 tgid := findTgid[file.Pid]
402 pTgidNode, exists := helloTree[tgid]
403 if !exists {
404 continue
405 }
406 if file.CloseTimestamp.IsZero() {
407 index, exists := pTgidNode.FindPid[file.Pid]
408 if !exists || index < 0 || index >= len(pTgidNode.Threads) {
409 continue
410 }
411 file.CloseTimestamp = pTgidNode.Threads[index].ExitTimestamp
412 } 156 }
413 file.FileName = path.Clean(file.FileName) 157 fileFile.Write(jsonData)
414 files = append(files, file) 158 fileFile.WriteString("\n\n")
415 } 159 }
416} 160}