diff options
author | We-unite <3205135446@qq.com> | 2024-08-14 17:28:28 +0800 |
---|---|---|
committer | We-unite <3205135446@qq.com> | 2024-08-14 17:28:28 +0800 |
commit | 61809e72c524294cb07535d0e31c80a283495f80 (patch) | |
tree | f3f8c6b4584d9071b58e8866b747399b3797c22c /filter | |
parent | dfdb18f83f7a957f99196369d97827d6209eeb9a (diff) | |
download | godo-61809e72c524294cb07535d0e31c80a283495f80.tar.gz godo-61809e72c524294cb07535d0e31c80a283495f80.zip |
Filter mainly finished, fix sth in lintener
This commit I make some changes:
- The filter got mainly finished.
- Build a big node by the same tgid, and use the tgid node to
build th tree we need by bfs.
- Filt relative files, and for the files not closed, add close
time stamp according to the exit time of their pids.
- Put all the results into database.
Besides, I enlarge the buffer size of netlink connector and channels
in lintener.
TODO:
- the pivot_root syscall is used only by the initial shell(`docker
start` makes a shell), other processes of shell change their root
by changing namespace(mnt ns?), using setns syscall. So fix it.
- It's time to fix the netlink connector socket.
Diffstat (limited to 'filter')
-rw-r--r-- | filter/filter.go | 336 | ||||
-rw-r--r-- | filter/global.go | 20 |
2 files changed, 258 insertions, 98 deletions
diff --git a/filter/filter.go b/filter/filter.go index 2a774a3..b2341ec 100644 --- a/filter/filter.go +++ b/filter/filter.go | |||
@@ -5,7 +5,7 @@ import ( | |||
5 | "fmt" | 5 | "fmt" |
6 | "log" | 6 | "log" |
7 | "os" | 7 | "os" |
8 | "sync" | 8 | "sort" |
9 | 9 | ||
10 | "go.mongodb.org/mongo-driver/bson" | 10 | "go.mongodb.org/mongo-driver/bson" |
11 | "go.mongodb.org/mongo-driver/mongo" | 11 | "go.mongodb.org/mongo-driver/mongo" |
@@ -13,15 +13,22 @@ import ( | |||
13 | ) | 13 | ) |
14 | 14 | ||
15 | const ( | 15 | const ( |
16 | oldDBName = "test" | 16 | oldDBName = "test" |
17 | oldPidColName = "pids" | 17 | oldPidColName = "pids" |
18 | oldFdColName = "fds" | ||
19 | oldFileColName = "files" | ||
20 | |||
21 | newDBName = "cooked" | ||
22 | newPidColName = "tgids" | ||
23 | newFileColName = "files" | ||
18 | ) | 24 | ) |
19 | 25 | ||
20 | type treeNode struct { | 26 | // 进程树信息 |
21 | Tgid int | 27 | var findTgid map[int]int |
22 | Threads []Process | 28 | var helloTree map[int]*tgidNode |
23 | Children []int | 29 | |
24 | } | 30 | // 文件信息 |
31 | var files []*File | ||
25 | 32 | ||
26 | func main() { | 33 | func main() { |
27 | // 连接到MongoDB | 34 | // 连接到MongoDB |
@@ -31,110 +38,81 @@ func main() { | |||
31 | } | 38 | } |
32 | defer client.Disconnect(context.TODO()) | 39 | defer client.Disconnect(context.TODO()) |
33 | 40 | ||
34 | // 选择数据库和集合 | 41 | oldDB := client.Database(oldDBName) |
35 | db := client.Database(oldDBName) | ||
36 | collection := db.Collection(oldPidColName) | ||
37 | 42 | ||
38 | // 提取所有数据 | 43 | /* |
39 | var res []Process | 44 | * Step 1: 进程数据处理 |
45 | */ | ||
46 | oldPidCol := oldDB.Collection(oldPidColName) | ||
40 | 47 | ||
41 | cursor, err := collection.Find(context.Background(), bson.M{}) | 48 | // 数据提取 |
49 | var rawPidData []Process | ||
50 | cursor, err := oldPidCol.Find(context.Background(), bson.M{}) | ||
42 | if err != nil { | 51 | if err != nil { |
43 | fmt.Fprintf(os.Stderr, "Err: %v\n", err) | 52 | fmt.Fprintf(os.Stderr, "Err: %v\n", err) |
53 | return | ||
44 | } | 54 | } |
45 | if err := cursor.All(context.Background(), &res); err != nil { | 55 | err = cursor.All(context.Background(), &rawPidData) |
56 | if err != nil { | ||
46 | fmt.Fprintf(os.Stderr, "Err All: %v\n", err) | 57 | fmt.Fprintf(os.Stderr, "Err All: %v\n", err) |
58 | return | ||
47 | } | 59 | } |
60 | cursor.Close(context.Background()) | ||
48 | 61 | ||
49 | var merged sync.Map | 62 | filtPids(&rawPidData) |
50 | for _, process := range res { | ||
51 | tmp, ok := merged.Load(process.Pid) | ||
52 | if ok { | ||
53 | // 证明重复了,要合并 | ||
54 | tmp := ProMerge(tmp.(Process), process) | ||
55 | merged.Store(process.Pid, tmp) | ||
56 | } else { | ||
57 | // 没有,直接插入 | ||
58 | merged.Store(process.Pid, process) | ||
59 | } | ||
60 | } | ||
61 | 63 | ||
62 | var treeMap sync.Map | 64 | /* |
63 | findTgid := make(map[int]int) | 65 | * Step 2: 文件数据处理 |
64 | var stared int | 66 | * - 将已经关闭的和未关闭的同等看待 |
65 | merged.Range(func(key, val interface{}) bool { | 67 | * - 未关闭的将关闭时间修改为对应进程退出时间 |
66 | tmp := val.(Process) | 68 | * - 值得注意的是,同一进程各线程共享文件描述符……需要处理吗? |
67 | if tmp.Star { | 69 | */ |
68 | stared = tmp.Tgid | 70 | // 提取files和fds里的数据 |
69 | } | 71 | // TODO:是否可以只筛选被写过的记录? |
70 | // 登记tgid | 72 | var rawFileData []File |
71 | findTgid[tmp.Pid] = tmp.Tgid | 73 | oldFileCol := oldDB.Collection(oldFileColName) |
72 | nodeTmp, ok := treeMap.Load(tmp.Tgid) | 74 | cursor, err = oldFileCol.Find(context.Background(), bson.M{}) |
73 | if ok { | 75 | if err != nil { |
74 | // 直接记录 | 76 | fmt.Fprintf(os.Stderr, "Err: %v\n", err) |
75 | node := nodeTmp.(treeNode) | 77 | return |
76 | node.Threads = append(node.Threads, tmp) | 78 | } |
77 | node.Children = append(node.Children, tmp.Children...) | 79 | err = cursor.All(context.Background(), &rawFileData) |
78 | treeMap.Store(tmp.Tgid, node) | 80 | if err != nil { |
79 | } else { | 81 | fmt.Fprintf(os.Stderr, "Err All: %v\n", err) |
80 | node := treeNode{ | 82 | return |
81 | Tgid: tmp.Tgid, | 83 | } |
82 | Threads: make([]Process, 0), | 84 | cursor.Close(context.Background()) |
83 | Children: make([]int, 0), | ||
84 | } | ||
85 | node.Threads = append(node.Threads, tmp) | ||
86 | node.Children = append(node.Children, tmp.Children...) | ||
87 | treeMap.Store(tmp.Tgid, node) | ||
88 | } | ||
89 | return true | ||
90 | }) | ||
91 | 85 | ||
92 | // 从tgid==stared开始,构建树 | 86 | var rawFdData []File |
93 | var helloTree sync.Map // 在树上的tgid节点 | 87 | oldFdCol := oldDB.Collection(oldFdColName) |
94 | var q Queue // 记录每一个整理好的结构体,bfs | 88 | cursor, err = oldFdCol.Find(context.Background(), bson.M{}) |
95 | visited := make(map[int]bool) | 89 | if err != nil { |
96 | visited[stared] = true | 90 | fmt.Fprintf(os.Stderr, "Err: %v\n", err) |
97 | tmp, ok := treeMap.Load(stared) | ||
98 | if !ok { | ||
99 | return | 91 | return |
100 | } | 92 | } |
93 | err = cursor.All(context.Background(), &rawFdData) | ||
94 | if err != nil { | ||
95 | fmt.Fprintf(os.Stderr, "Err All: %v\n", err) | ||
96 | return | ||
97 | } | ||
98 | cursor.Close(context.Background()) | ||
101 | 99 | ||
102 | q.Enqueue(tmp) | 100 | // 合并,处理 |
103 | helloTree.Store(stared, tmp) | 101 | rawFileData = append(rawFileData, rawFdData...) |
104 | for !q.IsEmpty() { | 102 | filtFiles(&rawFileData) |
105 | tmp, ok := q.Dequeue() | 103 | |
106 | if !ok { | 104 | // 扔回数据库 |
107 | continue | 105 | newDB := client.Database(newDBName) |
108 | } | 106 | newDB.Drop(context.Background()) |
109 | node := tmp.(treeNode) | 107 | newPidCol := newDB.Collection(newPidColName) |
110 | for i := 0; i < len(node.Children); i++ { | 108 | for _, pTgidNode := range helloTree { |
111 | tgid := findTgid[node.Children[i]] | 109 | newPidCol.InsertOne(context.Background(), *pTgidNode) |
112 | _, exists := visited[tgid] | ||
113 | if !exists { | ||
114 | visited[tgid] = true | ||
115 | tgidNode, ok := treeMap.Load(tgid) | ||
116 | if !ok { | ||
117 | continue | ||
118 | } | ||
119 | helloTree.Store(tgid, tgidNode) | ||
120 | q.Enqueue(tgidNode) | ||
121 | } | ||
122 | } | ||
123 | } | 110 | } |
124 | 111 | ||
125 | // TODO: | 112 | newFileCol := newDB.Collection(newFileColName) |
126 | // 1.修改数据结构,使之自身即存储树结构,插入数据库后前端拿出来就能用 | 113 | for _, pFile := range files { |
127 | // 2.还有其余优化要做,比如线程退出时间与进程推出时间 | 114 | newFileCol.InsertOne(context.Background(), *pFile) |
128 | 115 | } | |
129 | // count := 0 | ||
130 | // helloTree.Range(func(key, val interface{}) bool { | ||
131 | // count++ | ||
132 | // fmt.Printf("tgid: %d\n", val.(treeNode).Tgid) | ||
133 | // return true | ||
134 | // }) | ||
135 | // fmt.Printf("Star: %d, res: %d\n", stared, count) | ||
136 | |||
137 | // 接下来处理文件 | ||
138 | } | 116 | } |
139 | 117 | ||
140 | func ProMerge(a, b Process) (res Process) { | 118 | func ProMerge(a, b Process) (res Process) { |
@@ -215,3 +193,167 @@ func ProMerge(a, b Process) (res Process) { | |||
215 | 193 | ||
216 | return res | 194 | return res |
217 | } | 195 | } |
196 | |||
197 | func filtPids(pRawPidData *[]Process) { | ||
198 | rawPidData := *pRawPidData | ||
199 | // 合并由多线程导致的重复记录 | ||
200 | merged := make(map[int]Process) // pid --> Process | ||
201 | for _, process := range rawPidData { | ||
202 | tmp, exists := merged[process.Pid] | ||
203 | if exists { | ||
204 | // 证明重复了,要合并 | ||
205 | merged[process.Pid] = ProMerge(tmp, process) | ||
206 | } else { | ||
207 | // 没有,直接插入 | ||
208 | merged[process.Pid] = process | ||
209 | } | ||
210 | } | ||
211 | |||
212 | // 合并出来的进程整理为tgidNode | ||
213 | // var tgidMap map[int]*tgidNode // tgid --> tgidNode | ||
214 | tgidMap := make(map[int]*tgidNode) | ||
215 | findTgid = make(map[int]int) // pid --> tgid | ||
216 | var stared int | ||
217 | stared = -1 | ||
218 | for _, val := range merged { | ||
219 | if val.Star { | ||
220 | stared = val.Tgid | ||
221 | } | ||
222 | // 登记tgid | ||
223 | findTgid[val.Pid] = val.Tgid | ||
224 | // nodeval, ok := tgidMap.Load(val.Tgid) | ||
225 | nodeval, exists := tgidMap[val.Tgid] | ||
226 | if exists { | ||
227 | // 直接记录 | ||
228 | // node := nodeval.(tgidNode) | ||
229 | nodeval.Threads = append(nodeval.Threads, val) | ||
230 | nodeval.FindPid[val.Pid] = len(nodeval.Threads) - 1 | ||
231 | // tgidMap.Store(val.Tgid, node) | ||
232 | } else { | ||
233 | node := tgidNode{ | ||
234 | Tgid: val.Tgid, | ||
235 | FindPid: make(map[int]int), | ||
236 | Threads: make([]Process, 0), | ||
237 | ChildTgid: make([]int, 0), | ||
238 | } | ||
239 | node.Threads = append(node.Threads, val) | ||
240 | node.FindPid[val.Pid] = 0 | ||
241 | // tgidMap.Store(val.Tgid, node) | ||
242 | tgidMap[val.Tgid] = &node | ||
243 | } | ||
244 | } | ||
245 | |||
246 | // 从tgid==stared开始,构建树 | ||
247 | helloTree = make(map[int]*tgidNode) // 在树上的tgid节点,tgid --> *tgidNode | ||
248 | var q Queue // 记录每一个整理好的结构体,bfs | ||
249 | visited := make(map[int]bool) // 哪些tgid已经访问过 | ||
250 | |||
251 | // tmp, ok := tgidMap.Load(stared) | ||
252 | // if !ok { | ||
253 | // return | ||
254 | // } | ||
255 | tmp, exists := tgidMap[stared] | ||
256 | if !exists { | ||
257 | return | ||
258 | } | ||
259 | |||
260 | // helloTree负责在遍历到该节点时记录 | ||
261 | // 队列仅负责搞明白哪些节点在树上 | ||
262 | // 因而所有添加子代tgid的行为只针对helloTree | ||
263 | // q不添加,直接把新的tgid对应的tgidNode入队就是了 | ||
264 | q.Enqueue(tmp) | ||
265 | visited[stared] = true | ||
266 | for !q.IsEmpty() { | ||
267 | tmp, ok := q.Dequeue() | ||
268 | if !ok { | ||
269 | continue | ||
270 | } | ||
271 | node := tmp.(*tgidNode) // 队列里的一个节点,这里必须重新申请node | ||
272 | helloTree[node.Tgid] = node | ||
273 | for i := 0; i < len(node.Threads); i++ { | ||
274 | for j := 0; j < len(node.Threads[i].Children); j++ { | ||
275 | tgid := findTgid[node.Threads[i].Children[j]] | ||
276 | _, exists := visited[tgid] | ||
277 | if !exists { | ||
278 | // 子代里有没见过的tgid | ||
279 | // tgidNode, ok := tgidMap.Load(tgid) | ||
280 | tgidNode, exists := tgidMap[tgid] | ||
281 | if !exists { | ||
282 | continue | ||
283 | } | ||
284 | helloTree[node.Tgid].ChildTgid = append(helloTree[node.Tgid].ChildTgid, tgid) | ||
285 | q.Enqueue(tgidNode) | ||
286 | visited[tgid] = true | ||
287 | } | ||
288 | } | ||
289 | } | ||
290 | } | ||
291 | |||
292 | // TODO: | ||
293 | // 1.√修改数据结构,使之自身即存储树结构,插入数据库后前端拿出来就能用 | ||
294 | // 2.还有其余优化要做,比如线程退出时间与进程推出时间,关系到后续的文件修理 | ||
295 | // 3.根文件系统,问题很重大 | ||
296 | |||
297 | count := 0 | ||
298 | for _, val := range helloTree { | ||
299 | count++ | ||
300 | fmt.Printf("==============================\ntgid: %6d, size: %6d, children: ", val.Tgid, len(val.Threads)) | ||
301 | for _, child := range val.ChildTgid { | ||
302 | fmt.Printf("%7d", child) | ||
303 | } | ||
304 | fmt.Printf("\n") | ||
305 | for _, process := range val.Threads { | ||
306 | fmt.Printf("%v\n", process) | ||
307 | } | ||
308 | fmt.Printf("\n\n\n") | ||
309 | } | ||
310 | fmt.Printf("Star: %d, res: %d\n", stared, count) | ||
311 | } | ||
312 | |||
313 | func filtFiles(pRawFileData *[]File) { | ||
314 | rawFileData := *pRawFileData | ||
315 | files = make([]*File, 0) | ||
316 | |||
317 | // 所有文件按照特定顺序排 | ||
318 | sort.Slice(rawFileData, func(i, j int) bool { | ||
319 | pi := &rawFileData[i] | ||
320 | pj := &rawFileData[j] | ||
321 | |||
322 | if pi.FileName < pj.FileName { | ||
323 | return true | ||
324 | } else if pi.FileName > pj.FileName { | ||
325 | return false | ||
326 | } | ||
327 | if pi.Pid < pj.Pid { | ||
328 | return true | ||
329 | } else if pi.Pid > pj.Pid { | ||
330 | return false | ||
331 | } | ||
332 | if pi.Fd < pj.Fd { | ||
333 | return true | ||
334 | } else if pi.Fd > pj.Fd { | ||
335 | return false | ||
336 | } | ||
337 | if pi.OpenTimestamp.Before(pj.OpenTimestamp) { | ||
338 | return true | ||
339 | } else { | ||
340 | return false | ||
341 | } | ||
342 | }) | ||
343 | |||
344 | for _, file := range rawFileData { | ||
345 | tgid := findTgid[file.Pid] | ||
346 | pTgidNode, exists := helloTree[tgid] | ||
347 | if !exists { | ||
348 | continue | ||
349 | } | ||
350 | if file.CloseTimestamp.IsZero() { | ||
351 | index, exists := pTgidNode.FindPid[file.Pid] | ||
352 | if !exists || index < 0 || index >= len(pTgidNode.Threads) { | ||
353 | continue | ||
354 | } | ||
355 | file.CloseTimestamp = pTgidNode.Threads[index].ExitTimestamp | ||
356 | } | ||
357 | files = append(files, &file) | ||
358 | } | ||
359 | } | ||
diff --git a/filter/global.go b/filter/global.go index 45706d4..37af52b 100644 --- a/filter/global.go +++ b/filter/global.go | |||
@@ -28,9 +28,16 @@ type Process struct { | |||
28 | ExitTimestamp time.Time `bson:"exit_timestamp"` | 28 | ExitTimestamp time.Time `bson:"exit_timestamp"` |
29 | } | 29 | } |
30 | 30 | ||
31 | type tgidNode struct { | ||
32 | Tgid int `bson:"tgid"` | ||
33 | FindPid map[int]int `bson:"findPid"` | ||
34 | Threads []Process `bson:"threads"` | ||
35 | ChildTgid []int `bson:"child_tgid"` | ||
36 | } | ||
37 | |||
31 | func (p Process) String() string { | 38 | func (p Process) String() string { |
32 | var res string | 39 | var res string |
33 | res = "" | 40 | res = "---------------------\n" |
34 | res += fmt.Sprintf("timestamp\t%v\n", p.StartTimestamp) | 41 | res += fmt.Sprintf("timestamp\t%v\n", p.StartTimestamp) |
35 | res += fmt.Sprintf("ppid\t%d\nparentTgid\t%d\n", p.Ppid, p.ParentTgid) | 42 | res += fmt.Sprintf("ppid\t%d\nparentTgid\t%d\n", p.Ppid, p.ParentTgid) |
36 | res += fmt.Sprintf("pid\t%d\ntgid\t%d\nargs: ", p.Pid, p.Tgid) | 43 | res += fmt.Sprintf("pid\t%d\ntgid\t%d\nargs: ", p.Pid, p.Tgid) |
@@ -53,9 +60,20 @@ func (p Process) String() string { | |||
53 | res += fmt.Sprintf("%d ", p.Children[i]) | 60 | res += fmt.Sprintf("%d ", p.Children[i]) |
54 | } | 61 | } |
55 | res += fmt.Sprintf("\n") | 62 | res += fmt.Sprintf("\n") |
63 | res += fmt.Sprintf("exit_timestamp:\t%v\nexit_code:\t%d\nexit_signal:\t%d\n", p.ExitTimestamp, p.ExitCode, p.ExitSignal) | ||
56 | return res | 64 | return res |
57 | } | 65 | } |
58 | 66 | ||
67 | type File struct { | ||
68 | OpenTimestamp time.Time `bson:"timestamp"` | ||
69 | FileName string `bson:"fileName"` | ||
70 | Pid int `bson:"pid"` | ||
71 | Fd int `bson:"fd"` | ||
72 | Flags [4]uint64 `bson:"flags"` | ||
73 | Written []time.Time `bson:"written"` | ||
74 | CloseTimestamp time.Time `bson:"close_timestamp"` | ||
75 | } | ||
76 | |||
59 | // Queue 定义一个队列结构体 | 77 | // Queue 定义一个队列结构体 |
60 | type Queue struct { | 78 | type Queue struct { |
61 | items []interface{} | 79 | items []interface{} |