aboutsummaryrefslogtreecommitdiffstats
path: root/filter
diff options
context:
space:
mode:
Diffstat (limited to 'filter')
-rw-r--r--filter/filter.go336
-rw-r--r--filter/global.go20
2 files changed, 258 insertions, 98 deletions
diff --git a/filter/filter.go b/filter/filter.go
index 2a774a3..b2341ec 100644
--- a/filter/filter.go
+++ b/filter/filter.go
@@ -5,7 +5,7 @@ import (
5 "fmt" 5 "fmt"
6 "log" 6 "log"
7 "os" 7 "os"
8 "sync" 8 "sort"
9 9
10 "go.mongodb.org/mongo-driver/bson" 10 "go.mongodb.org/mongo-driver/bson"
11 "go.mongodb.org/mongo-driver/mongo" 11 "go.mongodb.org/mongo-driver/mongo"
@@ -13,15 +13,22 @@ import (
13) 13)
14 14
15const ( 15const (
16 oldDBName = "test" 16 oldDBName = "test"
17 oldPidColName = "pids" 17 oldPidColName = "pids"
18 oldFdColName = "fds"
19 oldFileColName = "files"
20
21 newDBName = "cooked"
22 newPidColName = "tgids"
23 newFileColName = "files"
18) 24)
19 25
20type treeNode struct { 26// 进程树信息
21 Tgid int 27var findTgid map[int]int
22 Threads []Process 28var helloTree map[int]*tgidNode
23 Children []int 29
24} 30// 文件信息
31var files []*File
25 32
26func main() { 33func main() {
27 // 连接到MongoDB 34 // 连接到MongoDB
@@ -31,110 +38,81 @@ func main() {
31 } 38 }
32 defer client.Disconnect(context.TODO()) 39 defer client.Disconnect(context.TODO())
33 40
34 // 选择数据库和集合 41 oldDB := client.Database(oldDBName)
35 db := client.Database(oldDBName)
36 collection := db.Collection(oldPidColName)
37 42
38 // 提取所有数据 43 /*
39 var res []Process 44 * Step 1: 进程数据处理
45 */
46 oldPidCol := oldDB.Collection(oldPidColName)
40 47
41 cursor, err := collection.Find(context.Background(), bson.M{}) 48 // 数据提取
49 var rawPidData []Process
50 cursor, err := oldPidCol.Find(context.Background(), bson.M{})
42 if err != nil { 51 if err != nil {
43 fmt.Fprintf(os.Stderr, "Err: %v\n", err) 52 fmt.Fprintf(os.Stderr, "Err: %v\n", err)
53 return
44 } 54 }
45 if err := cursor.All(context.Background(), &res); err != nil { 55 err = cursor.All(context.Background(), &rawPidData)
56 if err != nil {
46 fmt.Fprintf(os.Stderr, "Err All: %v\n", err) 57 fmt.Fprintf(os.Stderr, "Err All: %v\n", err)
58 return
47 } 59 }
60 cursor.Close(context.Background())
48 61
49 var merged sync.Map 62 filtPids(&rawPidData)
50 for _, process := range res {
51 tmp, ok := merged.Load(process.Pid)
52 if ok {
53 // 证明重复了,要合并
54 tmp := ProMerge(tmp.(Process), process)
55 merged.Store(process.Pid, tmp)
56 } else {
57 // 没有,直接插入
58 merged.Store(process.Pid, process)
59 }
60 }
61 63
62 var treeMap sync.Map 64 /*
63 findTgid := make(map[int]int) 65 * Step 2: 文件数据处理
64 var stared int 66 * - 将已经关闭的和未关闭的同等看待
65 merged.Range(func(key, val interface{}) bool { 67 * - 未关闭的将关闭时间修改为对应进程退出时间
66 tmp := val.(Process) 68 * - 值得注意的是,同一进程各线程共享文件描述符……需要处理吗?
67 if tmp.Star { 69 */
68 stared = tmp.Tgid 70 // 提取files和fds里的数据
69 } 71 // TODO:是否可以只筛选被写过的记录?
70 // 登记tgid 72 var rawFileData []File
71 findTgid[tmp.Pid] = tmp.Tgid 73 oldFileCol := oldDB.Collection(oldFileColName)
72 nodeTmp, ok := treeMap.Load(tmp.Tgid) 74 cursor, err = oldFileCol.Find(context.Background(), bson.M{})
73 if ok { 75 if err != nil {
74 // 直接记录 76 fmt.Fprintf(os.Stderr, "Err: %v\n", err)
75 node := nodeTmp.(treeNode) 77 return
76 node.Threads = append(node.Threads, tmp) 78 }
77 node.Children = append(node.Children, tmp.Children...) 79 err = cursor.All(context.Background(), &rawFileData)
78 treeMap.Store(tmp.Tgid, node) 80 if err != nil {
79 } else { 81 fmt.Fprintf(os.Stderr, "Err All: %v\n", err)
80 node := treeNode{ 82 return
81 Tgid: tmp.Tgid, 83 }
82 Threads: make([]Process, 0), 84 cursor.Close(context.Background())
83 Children: make([]int, 0),
84 }
85 node.Threads = append(node.Threads, tmp)
86 node.Children = append(node.Children, tmp.Children...)
87 treeMap.Store(tmp.Tgid, node)
88 }
89 return true
90 })
91 85
92 // 从tgid==stared开始,构建树 86 var rawFdData []File
93 var helloTree sync.Map // 在树上的tgid节点 87 oldFdCol := oldDB.Collection(oldFdColName)
94 var q Queue // 记录每一个整理好的结构体,bfs 88 cursor, err = oldFdCol.Find(context.Background(), bson.M{})
95 visited := make(map[int]bool) 89 if err != nil {
96 visited[stared] = true 90 fmt.Fprintf(os.Stderr, "Err: %v\n", err)
97 tmp, ok := treeMap.Load(stared)
98 if !ok {
99 return 91 return
100 } 92 }
93 err = cursor.All(context.Background(), &rawFdData)
94 if err != nil {
95 fmt.Fprintf(os.Stderr, "Err All: %v\n", err)
96 return
97 }
98 cursor.Close(context.Background())
101 99
102 q.Enqueue(tmp) 100 // 合并,处理
103 helloTree.Store(stared, tmp) 101 rawFileData = append(rawFileData, rawFdData...)
104 for !q.IsEmpty() { 102 filtFiles(&rawFileData)
105 tmp, ok := q.Dequeue() 103
106 if !ok { 104 // 扔回数据库
107 continue 105 newDB := client.Database(newDBName)
108 } 106 newDB.Drop(context.Background())
109 node := tmp.(treeNode) 107 newPidCol := newDB.Collection(newPidColName)
110 for i := 0; i < len(node.Children); i++ { 108 for _, pTgidNode := range helloTree {
111 tgid := findTgid[node.Children[i]] 109 newPidCol.InsertOne(context.Background(), *pTgidNode)
112 _, exists := visited[tgid]
113 if !exists {
114 visited[tgid] = true
115 tgidNode, ok := treeMap.Load(tgid)
116 if !ok {
117 continue
118 }
119 helloTree.Store(tgid, tgidNode)
120 q.Enqueue(tgidNode)
121 }
122 }
123 } 110 }
124 111
125 // TODO: 112 newFileCol := newDB.Collection(newFileColName)
126 // 1.修改数据结构,使之自身即存储树结构,插入数据库后前端拿出来就能用 113 for _, pFile := range files {
127 // 2.还有其余优化要做,比如线程退出时间与进程推出时间 114 newFileCol.InsertOne(context.Background(), *pFile)
128 115 }
129 // count := 0
130 // helloTree.Range(func(key, val interface{}) bool {
131 // count++
132 // fmt.Printf("tgid: %d\n", val.(treeNode).Tgid)
133 // return true
134 // })
135 // fmt.Printf("Star: %d, res: %d\n", stared, count)
136
137 // 接下来处理文件
138} 116}
139 117
140func ProMerge(a, b Process) (res Process) { 118func ProMerge(a, b Process) (res Process) {
@@ -215,3 +193,167 @@ func ProMerge(a, b Process) (res Process) {
215 193
216 return res 194 return res
217} 195}
196
197func filtPids(pRawPidData *[]Process) {
198 rawPidData := *pRawPidData
199 // 合并由多线程导致的重复记录
200 merged := make(map[int]Process) // pid --> Process
201 for _, process := range rawPidData {
202 tmp, exists := merged[process.Pid]
203 if exists {
204 // 证明重复了,要合并
205 merged[process.Pid] = ProMerge(tmp, process)
206 } else {
207 // 没有,直接插入
208 merged[process.Pid] = process
209 }
210 }
211
212 // 合并出来的进程整理为tgidNode
213 // var tgidMap map[int]*tgidNode // tgid --> tgidNode
214 tgidMap := make(map[int]*tgidNode)
215 findTgid = make(map[int]int) // pid --> tgid
216 var stared int
217 stared = -1
218 for _, val := range merged {
219 if val.Star {
220 stared = val.Tgid
221 }
222 // 登记tgid
223 findTgid[val.Pid] = val.Tgid
224 // nodeval, ok := tgidMap.Load(val.Tgid)
225 nodeval, exists := tgidMap[val.Tgid]
226 if exists {
227 // 直接记录
228 // node := nodeval.(tgidNode)
229 nodeval.Threads = append(nodeval.Threads, val)
230 nodeval.FindPid[val.Pid] = len(nodeval.Threads) - 1
231 // tgidMap.Store(val.Tgid, node)
232 } else {
233 node := tgidNode{
234 Tgid: val.Tgid,
235 FindPid: make(map[int]int),
236 Threads: make([]Process, 0),
237 ChildTgid: make([]int, 0),
238 }
239 node.Threads = append(node.Threads, val)
240 node.FindPid[val.Pid] = 0
241 // tgidMap.Store(val.Tgid, node)
242 tgidMap[val.Tgid] = &node
243 }
244 }
245
246 // 从tgid==stared开始,构建树
247 helloTree = make(map[int]*tgidNode) // 在树上的tgid节点,tgid --> *tgidNode
248 var q Queue // 记录每一个整理好的结构体,bfs
249 visited := make(map[int]bool) // 哪些tgid已经访问过
250
251 // tmp, ok := tgidMap.Load(stared)
252 // if !ok {
253 // return
254 // }
255 tmp, exists := tgidMap[stared]
256 if !exists {
257 return
258 }
259
260 // helloTree负责在遍历到该节点时记录
261 // 队列仅负责搞明白哪些节点在树上
262 // 因而所有添加子代tgid的行为只针对helloTree
263 // q不添加,直接把新的tgid对应的tgidNode入队就是了
264 q.Enqueue(tmp)
265 visited[stared] = true
266 for !q.IsEmpty() {
267 tmp, ok := q.Dequeue()
268 if !ok {
269 continue
270 }
271 node := tmp.(*tgidNode) // 队列里的一个节点,这里必须重新申请node
272 helloTree[node.Tgid] = node
273 for i := 0; i < len(node.Threads); i++ {
274 for j := 0; j < len(node.Threads[i].Children); j++ {
275 tgid := findTgid[node.Threads[i].Children[j]]
276 _, exists := visited[tgid]
277 if !exists {
278 // 子代里有没见过的tgid
279 // tgidNode, ok := tgidMap.Load(tgid)
280 tgidNode, exists := tgidMap[tgid]
281 if !exists {
282 continue
283 }
284 helloTree[node.Tgid].ChildTgid = append(helloTree[node.Tgid].ChildTgid, tgid)
285 q.Enqueue(tgidNode)
286 visited[tgid] = true
287 }
288 }
289 }
290 }
291
292 // TODO:
293 // 1.√修改数据结构,使之自身即存储树结构,插入数据库后前端拿出来就能用
294 // 2.还有其余优化要做,比如线程退出时间与进程推出时间,关系到后续的文件修理
295 // 3.根文件系统,问题很重大
296
297 count := 0
298 for _, val := range helloTree {
299 count++
300 fmt.Printf("==============================\ntgid: %6d, size: %6d, children: ", val.Tgid, len(val.Threads))
301 for _, child := range val.ChildTgid {
302 fmt.Printf("%7d", child)
303 }
304 fmt.Printf("\n")
305 for _, process := range val.Threads {
306 fmt.Printf("%v\n", process)
307 }
308 fmt.Printf("\n\n\n")
309 }
310 fmt.Printf("Star: %d, res: %d\n", stared, count)
311}
312
313func filtFiles(pRawFileData *[]File) {
314 rawFileData := *pRawFileData
315 files = make([]*File, 0)
316
317 // 所有文件按照特定顺序排
318 sort.Slice(rawFileData, func(i, j int) bool {
319 pi := &rawFileData[i]
320 pj := &rawFileData[j]
321
322 if pi.FileName < pj.FileName {
323 return true
324 } else if pi.FileName > pj.FileName {
325 return false
326 }
327 if pi.Pid < pj.Pid {
328 return true
329 } else if pi.Pid > pj.Pid {
330 return false
331 }
332 if pi.Fd < pj.Fd {
333 return true
334 } else if pi.Fd > pj.Fd {
335 return false
336 }
337 if pi.OpenTimestamp.Before(pj.OpenTimestamp) {
338 return true
339 } else {
340 return false
341 }
342 })
343
344 for _, file := range rawFileData {
345 tgid := findTgid[file.Pid]
346 pTgidNode, exists := helloTree[tgid]
347 if !exists {
348 continue
349 }
350 if file.CloseTimestamp.IsZero() {
351 index, exists := pTgidNode.FindPid[file.Pid]
352 if !exists || index < 0 || index >= len(pTgidNode.Threads) {
353 continue
354 }
355 file.CloseTimestamp = pTgidNode.Threads[index].ExitTimestamp
356 }
357 files = append(files, &file)
358 }
359}
diff --git a/filter/global.go b/filter/global.go
index 45706d4..37af52b 100644
--- a/filter/global.go
+++ b/filter/global.go
@@ -28,9 +28,16 @@ type Process struct {
28 ExitTimestamp time.Time `bson:"exit_timestamp"` 28 ExitTimestamp time.Time `bson:"exit_timestamp"`
29} 29}
30 30
31type tgidNode struct {
32 Tgid int `bson:"tgid"`
33 FindPid map[int]int `bson:"findPid"`
34 Threads []Process `bson:"threads"`
35 ChildTgid []int `bson:"child_tgid"`
36}
37
31func (p Process) String() string { 38func (p Process) String() string {
32 var res string 39 var res string
33 res = "" 40 res = "---------------------\n"
34 res += fmt.Sprintf("timestamp\t%v\n", p.StartTimestamp) 41 res += fmt.Sprintf("timestamp\t%v\n", p.StartTimestamp)
35 res += fmt.Sprintf("ppid\t%d\nparentTgid\t%d\n", p.Ppid, p.ParentTgid) 42 res += fmt.Sprintf("ppid\t%d\nparentTgid\t%d\n", p.Ppid, p.ParentTgid)
36 res += fmt.Sprintf("pid\t%d\ntgid\t%d\nargs: ", p.Pid, p.Tgid) 43 res += fmt.Sprintf("pid\t%d\ntgid\t%d\nargs: ", p.Pid, p.Tgid)
@@ -53,9 +60,20 @@ func (p Process) String() string {
53 res += fmt.Sprintf("%d ", p.Children[i]) 60 res += fmt.Sprintf("%d ", p.Children[i])
54 } 61 }
55 res += fmt.Sprintf("\n") 62 res += fmt.Sprintf("\n")
63 res += fmt.Sprintf("exit_timestamp:\t%v\nexit_code:\t%d\nexit_signal:\t%d\n", p.ExitTimestamp, p.ExitCode, p.ExitSignal)
56 return res 64 return res
57} 65}
58 66
67type File struct {
68 OpenTimestamp time.Time `bson:"timestamp"`
69 FileName string `bson:"fileName"`
70 Pid int `bson:"pid"`
71 Fd int `bson:"fd"`
72 Flags [4]uint64 `bson:"flags"`
73 Written []time.Time `bson:"written"`
74 CloseTimestamp time.Time `bson:"close_timestamp"`
75}
76
59// Queue 定义一个队列结构体 77// Queue 定义一个队列结构体
60type Queue struct { 78type Queue struct {
61 items []interface{} 79 items []interface{}