aboutsummaryrefslogtreecommitdiffstats
path: root/filter/filter.go
diff options
context:
space:
mode:
Diffstat (limited to 'filter/filter.go')
-rw-r--r--filter/filter.go145
1 files changed, 101 insertions, 44 deletions
diff --git a/filter/filter.go b/filter/filter.go
index b2341ec..98c326c 100644
--- a/filter/filter.go
+++ b/filter/filter.go
@@ -5,6 +5,7 @@ import (
5 "fmt" 5 "fmt"
6 "log" 6 "log"
7 "os" 7 "os"
8 "path"
8 "sort" 9 "sort"
9 10
10 "go.mongodb.org/mongo-driver/bson" 11 "go.mongodb.org/mongo-driver/bson"
@@ -28,7 +29,7 @@ var findTgid map[int]int
28var helloTree map[int]*tgidNode 29var helloTree map[int]*tgidNode
29 30
30// 文件信息 31// 文件信息
31var files []*File 32var files []File
32 33
33func main() { 34func main() {
34 // 连接到MongoDB 35 // 连接到MongoDB
@@ -110,8 +111,8 @@ func main() {
110 } 111 }
111 112
112 newFileCol := newDB.Collection(newFileColName) 113 newFileCol := newDB.Collection(newFileColName)
113 for _, pFile := range files { 114 for _, file := range files {
114 newFileCol.InsertOne(context.Background(), *pFile) 115 newFileCol.InsertOne(context.Background(), file)
115 } 116 }
116} 117}
117 118
@@ -194,41 +195,48 @@ func ProMerge(a, b Process) (res Process) {
194 return res 195 return res
195} 196}
196 197
197func filtPids(pRawPidData *[]Process) { 198func mergeProcess(pRawPidData *[]Process) (merged []Process) {
198 rawPidData := *pRawPidData 199 rawPidData := *pRawPidData
199 // 合并由多线程导致的重复记录 200 // 合并由多线程导致的重复记录,顺便按照pid升序
200 merged := make(map[int]Process) // pid --> Process 201 index := make(map[int]int)
201 for _, process := range rawPidData { 202 for _, process := range rawPidData {
202 tmp, exists := merged[process.Pid] 203 i, exists := index[process.Pid]
203 if exists { 204 if exists {
204 // 合并 205 // ,合并
205 merged[process.Pid] = ProMerge(tmp, process) 206 merged[i] = ProMerge(merged[i], process)
206 } else { 207 } else {
207 // 没有,直接插入 208 // 不存在,直接添加
208 merged[process.Pid] = process 209 merged = append(merged, process)
210 index[process.Pid] = len(merged) - 1
209 } 211 }
210 } 212 }
213 sort.Slice(merged, func(i, j int) bool {
214 return merged[i].Pid < merged[j].Pid
215 })
216 return merged
217}
211 218
219func getTgidNodes(merged []Process) (tgidMap map[int]*tgidNode, starTgid int, rootfsPids []int) {
212 // 合并出来的进程整理为tgidNode 220 // 合并出来的进程整理为tgidNode
213 // var tgidMap map[int]*tgidNode // tgid --> tgidNode 221 tgidMap = make(map[int]*tgidNode)
214 tgidMap := make(map[int]*tgidNode)
215 findTgid = make(map[int]int) // pid --> tgid 222 findTgid = make(map[int]int) // pid --> tgid
216 var stared int 223 // var starTgid, rootFsPid int
217 stared = -1 224 starTgid = -1
225 // rootfsPid = -1
226 rootfsPids = make([]int, 0)
218 for _, val := range merged { 227 for _, val := range merged {
219 if val.Star { 228 if val.Star {
220 stared = val.Tgid 229 starTgid = val.Tgid
230 } else if val.RootFS != "" {
231 rootfsPids = append(rootfsPids, val.Pid)
221 } 232 }
222 // 登记tgid 233 // 登记tgid
223 findTgid[val.Pid] = val.Tgid 234 findTgid[val.Pid] = val.Tgid
224 // nodeval, ok := tgidMap.Load(val.Tgid)
225 nodeval, exists := tgidMap[val.Tgid] 235 nodeval, exists := tgidMap[val.Tgid]
226 if exists { 236 if exists {
227 // 直接记录 237 // 直接记录
228 // node := nodeval.(tgidNode)
229 nodeval.Threads = append(nodeval.Threads, val) 238 nodeval.Threads = append(nodeval.Threads, val)
230 nodeval.FindPid[val.Pid] = len(nodeval.Threads) - 1 239 nodeval.FindPid[val.Pid] = len(nodeval.Threads) - 1
231 // tgidMap.Store(val.Tgid, node)
232 } else { 240 } else {
233 node := tgidNode{ 241 node := tgidNode{
234 Tgid: val.Tgid, 242 Tgid: val.Tgid,
@@ -238,21 +246,19 @@ func filtPids(pRawPidData *[]Process) {
238 } 246 }
239 node.Threads = append(node.Threads, val) 247 node.Threads = append(node.Threads, val)
240 node.FindPid[val.Pid] = 0 248 node.FindPid[val.Pid] = 0
241 // tgidMap.Store(val.Tgid, node)
242 tgidMap[val.Tgid] = &node 249 tgidMap[val.Tgid] = &node
243 } 250 }
244 } 251 }
252 return tgidMap, starTgid, rootfsPids
253}
245 254
246 // 从tgid==stared开始,构建树 255func buildTree(tgidMap map[int]*tgidNode, starTgid int) {
256 // 从tgid==starTgid开始,构建树
247 helloTree = make(map[int]*tgidNode) // 在树上的tgid节点,tgid --> *tgidNode 257 helloTree = make(map[int]*tgidNode) // 在树上的tgid节点,tgid --> *tgidNode
248 var q Queue // 记录每一个整理好的结构体,bfs 258 var q Queue // 记录每一个整理好的结构体,bfs
249 visited := make(map[int]bool) // 哪些tgid已经访问过 259 visited := make(map[int]bool) // 哪些tgid已经访问过
250 260
251 // tmp, ok := tgidMap.Load(stared) 261 tmp, exists := tgidMap[starTgid]
252 // if !ok {
253 // return
254 // }
255 tmp, exists := tgidMap[stared]
256 if !exists { 262 if !exists {
257 return 263 return
258 } 264 }
@@ -262,7 +268,7 @@ func filtPids(pRawPidData *[]Process) {
262 // 因而所有添加子代tgid的行为只针对helloTree 268 // 因而所有添加子代tgid的行为只针对helloTree
263 // q不添加,直接把新的tgid对应的tgidNode入队就是了 269 // q不添加,直接把新的tgid对应的tgidNode入队就是了
264 q.Enqueue(tmp) 270 q.Enqueue(tmp)
265 visited[stared] = true 271 visited[starTgid] = true
266 for !q.IsEmpty() { 272 for !q.IsEmpty() {
267 tmp, ok := q.Dequeue() 273 tmp, ok := q.Dequeue()
268 if !ok { 274 if !ok {
@@ -276,7 +282,6 @@ func filtPids(pRawPidData *[]Process) {
276 _, exists := visited[tgid] 282 _, exists := visited[tgid]
277 if !exists { 283 if !exists {
278 // 子代里有没见过的tgid 284 // 子代里有没见过的tgid
279 // tgidNode, ok := tgidMap.Load(tgid)
280 tgidNode, exists := tgidMap[tgid] 285 tgidNode, exists := tgidMap[tgid]
281 if !exists { 286 if !exists {
282 continue 287 continue
@@ -288,31 +293,79 @@ func filtPids(pRawPidData *[]Process) {
288 } 293 }
289 } 294 }
290 } 295 }
296}
291 297
292 // TODO: 298func optimazePid(starTgid int, rootfsPids []int) {
293 // 1.√修改数据结构,使之自身即存储树结构,插入数据库后前端拿出来就能用 299 getDockerRootFs := make(map[string]string) // dockerId --> rootfs
294 // 2.还有其余优化要做,比如线程退出时间与进程推出时间,关系到后续的文件修理 300 // 首先处理一下记录有pivot_root信息的进程,防止pivot先于fork
295 // 3.根文件系统,问题很重大 301 for _, rootfsPid := range rootfsPids {
302 rootfsTgid := findTgid[rootfsPid]
303 i := helloTree[rootfsTgid].FindPid[rootfsPid]
304 rootfsProcess := &(helloTree[rootfsTgid].Threads[i])
305 if rootfsProcess.RootFS == "cwd" {
306 rootfsProcess.RootFS = rootfsProcess.Cwd
307 }
308 getDockerRootFs[rootfsProcess.DockerId] = rootfsProcess.RootFS
309 }
296 310
297 count := 0 311 count := 0
298 for _, val := range helloTree { 312 for _, val := range helloTree {
299 count++ 313 // 处理一下pid结束时间,顺便找找爹
300 fmt.Printf("==============================\ntgid: %6d, size: %6d, children: ", val.Tgid, len(val.Threads)) 314 // 结束时间是因为很多线程结束时间没获取到,默认按照进程退出时间处理
301 for _, child := range val.ChildTgid { 315 // Ppid是因为进程产生之初收到的信息写的爹一定是亲爹
302 fmt.Printf("%7d", child) 316 // 但是产生线程时候该进程很可能已作为孤儿被收养,导致线程里关于爹的记录是继父
303 } 317 for i := 0; i < len(val.Threads); i++ {
304 fmt.Printf("\n") 318 if i != 0 {
305 for _, process := range val.Threads { 319 if val.Threads[i].Tgid < val.Threads[0].Tgid {
306 fmt.Printf("%v\n", process) 320 val.Threads[i].ParentTgid = val.Threads[0].ParentTgid
321 val.Threads[i].Ppid = val.Threads[0].Ppid
322 }
323 if val.Threads[i].ExitTimestamp.IsZero() {
324 val.Threads[i].ExitCode = val.Threads[0].ExitCode
325 val.Threads[i].ExitTimestamp = val.Threads[0].ExitTimestamp
326 val.Threads[i].ExitSignal = val.Threads[0].ExitSignal
327 }
328 }
329
330 dockerId := val.Threads[i].DockerId
331 if dockerId != "" {
332 rootfs, exists := getDockerRootFs[dockerId]
333 if !exists {
334 fmt.Fprintf(os.Stderr, "Err: the docker rootfs of pid %d is not known!\n", val.Threads[i].Pid)
335 continue
336 }
337 val.Threads[i].RootFS = rootfs
338 }
307 } 339 }
308 fmt.Printf("\n\n\n") 340
341 count++
342 fmt.Printf("%v\n", *val)
309 } 343 }
310 fmt.Printf("Star: %d, res: %d\n", stared, count) 344 fmt.Printf("Star: %d, res: %d\n", starTgid, count)
345}
346
347func filtPids(pRawPidData *[]Process) {
348 /* ATTENTION: 把map/slice直接传参是危险的
349 * 传递的是指针,不会引起大的复制开销,
350 * 但是map/slice在callee func内被修改**可能**导致内存更改
351 * 而这样的内存更改对caller function来说是不可见的,看到的还是原来的东西
352 * 这里由于参数几乎都是只读不写,因而用一下
353 */
354
355 // 合并由多线程导致的重复记录,顺便按照pid升序
356 // 多线程已经取消了,但保险起见还是留着
357 merged := mergeProcess(pRawPidData)
358 // 将Process按照tgid合并
359 tgidMap, starTgid, rootfsPids := getTgidNodes(merged)
360 // 建树,helloTree
361 buildTree(tgidMap, starTgid)
362 // 对树上的进程做一些优化处理
363 optimazePid(starTgid, rootfsPids)
311} 364}
312 365
313func filtFiles(pRawFileData *[]File) { 366func filtFiles(pRawFileData *[]File) {
314 rawFileData := *pRawFileData 367 rawFileData := *pRawFileData
315 files = make([]*File, 0) 368 files = make([]File, 0)
316 369
317 // 所有文件按照特定顺序排 370 // 所有文件按照特定顺序排
318 sort.Slice(rawFileData, func(i, j int) bool { 371 sort.Slice(rawFileData, func(i, j int) bool {
@@ -342,6 +395,9 @@ func filtFiles(pRawFileData *[]File) {
342 }) 395 })
343 396
344 for _, file := range rawFileData { 397 for _, file := range rawFileData {
398 if file.FileName == "/root/test/1/../.hello.c.swp" {
399 fmt.Printf("Test\n")
400 }
345 tgid := findTgid[file.Pid] 401 tgid := findTgid[file.Pid]
346 pTgidNode, exists := helloTree[tgid] 402 pTgidNode, exists := helloTree[tgid]
347 if !exists { 403 if !exists {
@@ -354,6 +410,7 @@ func filtFiles(pRawFileData *[]File) {
354 } 410 }
355 file.CloseTimestamp = pTgidNode.Threads[index].ExitTimestamp 411 file.CloseTimestamp = pTgidNode.Threads[index].ExitTimestamp
356 } 412 }
357 files = append(files, &file) 413 file.FileName = path.Clean(file.FileName)
414 files = append(files, file)
358 } 415 }
359} 416}