aboutsummaryrefslogtreecommitdiffstats
path: root/filter
diff options
context:
space:
mode:
authorWe-unite <3205135446@qq.com>2024-08-13 10:53:24 +0800
committerWe-unite <3205135446@qq.com>2024-08-13 10:53:24 +0800
commitdfdb18f83f7a957f99196369d97827d6209eeb9a (patch)
tree2ac5234a8322ba0ee52bc87fcdd93d56161ab67c /filter
parent3e49a044d22635157916651f0acb5a062397b34b (diff)
downloadgodo-dfdb18f83f7a957f99196369d97827d6209eeb9a.tar.gz
godo-dfdb18f83f7a957f99196369d97827d6209eeb9a.zip
Filtering process data from mongodb
First of all, fix sth in listener to fit the function of filter. Ori- ginally, listener mark the /usr/bin/containerd process id with star, but the children in db is updated by ppid, which is pid of parent but not tgid, so the stared pid has no children. To Fix this, we add all the pid of /usr/bin/containerd into the db, and set their ptgid/tgid, so that they're just normal process as others. Maybe we should finish the info of these processes? haha. Then, the filter of pid. There're some designed steps to do, and their methods are as follows: - Initially, because of the multithreading execution of listener, there may be several entries for the same process, and we should merge them. Extract data from database into a slice, and use a map to record process info. Iterate the slice, if the pid is in the map, then merge them, else insert into the map. - Then, we should build process tree, but what we have is pid. So use another data structure, iterate merged process map, and build a map from tgid to a slice of processes. Find out the star. Build a map from pid to its tgid. - BFS. Design a simple queue, and build the tree from the root(stared tgid), record all the visited tgid in another map. That's just the tree. As usual, let's talk about the remaining issues: - Some pids did not recieve exit message. Check the exit time of its tgid, or even its ppid. - Optimize the data structure, record the tree by itself. Now the tree is recorded by not only the last helloTree map from tgid to slice but the map from pid to tgid. It's hard to store in the database. Design a better ds, so the viewer can build the tree quickly from the data in db. - For future file filter, the close time, the same file for the same pid, and the pathName of a file, should be paid mych attention. Fighting!
Diffstat (limited to 'filter')
-rw-r--r--filter/filter.go267
-rw-r--r--filter/global.go92
2 files changed, 275 insertions, 84 deletions
diff --git a/filter/filter.go b/filter/filter.go
index c83fb13..2a774a3 100644
--- a/filter/filter.go
+++ b/filter/filter.go
@@ -5,114 +5,213 @@ import (
5 "fmt" 5 "fmt"
6 "log" 6 "log"
7 "os" 7 "os"
8 "time" 8 "sync"
9 9
10 "go.mongodb.org/mongo-driver/bson" 10 "go.mongodb.org/mongo-driver/bson"
11 "go.mongodb.org/mongo-driver/mongo" 11 "go.mongodb.org/mongo-driver/mongo"
12 "go.mongodb.org/mongo-driver/mongo/options" 12 "go.mongodb.org/mongo-driver/mongo/options"
13 "go.mongodb.org/mongo-driver/mongo/readpref"
14) 13)
15 14
16type Exec struct { 15const (
17 timestamp time.Time `bson:"timestamp"` 16 oldDBName = "test"
18 execArgs []string `bson:"execArgs"` 17 oldPidColName = "pids"
19} 18)
20
21type Process struct {
22 timestamp time.Time `bson:"start_timestamp"`
23 ppid int `bson:"ppid"`
24 parentTgid int `bson:"parentTgid"`
25 pid int `bson:"pid"`
26 tgid int `bson:"tgid"`
27 args []string `bson:"args"`
28 comm string `bson:"comm"`
29 cwd string `bson:"cwd"`
30 execve []Exec `bson:"execve"`
31 exit_code int `bson:"exit_code"`
32 exit_signal int `bson:"exit_signal"`
33 exit_timestamp time.Time `bson:"exit_timestamp"`
34}
35 19
36func (p Process) String() string { 20type treeNode struct {
37 var res string 21 Tgid int
38 res = "" 22 Threads []Process
39 res += fmt.Sprintf("timestamp\t%v\n", p.timestamp) 23 Children []int
40 res += fmt.Sprintf("ppid\t%d\nparentTgid\t%d\n", p.ppid, p.parentTgid)
41 res += fmt.Sprintf("pid\t%d\ntgid\t%d\nargs: ", p.pid, p.tgid)
42 for i := 0; i < len(p.args); i++ {
43 res += fmt.Sprintf("%s ", p.args[i])
44 }
45 res += fmt.Sprintf("\ncomm\t%s\ncwd\t%s\n", p.comm, p.cwd)
46 return res
47} 24}
48 25
49// type Process struct {
50// StartTimestamp time.Time `bson:"start_timestamp"`
51// Ppid *int `bson:"ppid"`
52// ParentTgid *int `bson:"parentTgid"`
53// Pid int `bson:"pid"`
54// Tgid int `bson:"tgid"`
55// Args []string `bson:"args"`
56// Comm *string `bson:"comm"`
57// Cwd *string `bson:"cwd"`
58// Execve []Exec `bson:"execve"`
59// ExitCode *int `bson:"exit_code"`
60// ExitSignal *int `bson:"exit_signal"`
61// ExitTimestamp *time.Time `bson:"exit_timestamp"`
62// }
63
64// func (p Process) String() string {
65// var res string
66// res = ""
67// res += fmt.Sprintf("timestamp\t%v\n", p.StartTimestamp)
68// if p.Ppid != nil && p.ParentTgid != nil {
69// res += fmt.Sprintf("ppid\t%d\nparentTgid\t%d\n", *(p.Ppid), *(p.ParentTgid))
70// }
71// res += fmt.Sprintf("pid\t%d\ntgid\t%d\nargs: ", p.Pid, p.Tgid)
72// for i := 0; i < len(p.Args); i++ {
73// res += fmt.Sprintf("%s ", p.Args[i])
74// }
75// if p.Comm != nil && p.Cwd != nil {
76// res += fmt.Sprintf("\ncomm\t%s\ncwd\t%s\n", *(p.Comm), *(p.Cwd))
77// }
78// return res
79// }
80
81func main() { 26func main() {
27 // 连接到MongoDB
82 client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI("mongodb://localhost:27017")) 28 client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI("mongodb://localhost:27017"))
83 if err != nil { 29 if err != nil {
84 fmt.Fprintf(os.Stderr, "Err connecting mongodb: %v\n", err) 30 log.Fatal(err)
85 } 31 }
86 defer client.Disconnect(context.TODO()) 32 defer client.Disconnect(context.TODO())
87 33
88 // 检查连接 34 // 选择数据库和集合
89 err = client.Ping(context.TODO(), readpref.Primary()) 35 db := client.Database(oldDBName)
36 collection := db.Collection(oldPidColName)
37
38 // 提取所有数据
39 var res []Process
40
41 cursor, err := collection.Find(context.Background(), bson.M{})
90 if err != nil { 42 if err != nil {
91 log.Fatal(err) 43 fmt.Fprintf(os.Stderr, "Err: %v\n", err)
44 }
45 if err := cursor.All(context.Background(), &res); err != nil {
46 fmt.Fprintf(os.Stderr, "Err All: %v\n", err)
92 } 47 }
93 48
94 pidCol := client.Database("test").Collection("pids") 49 var merged sync.Map
95 cur, err := pidCol.Find(context.TODO(), bson.M{}) // 查询所有文档 50 for _, process := range res {
96 if err != nil { 51 tmp, ok := merged.Load(process.Pid)
97 log.Fatal(err) 52 if ok {
53 // 证明重复了,要合并
54 tmp := ProMerge(tmp.(Process), process)
55 merged.Store(process.Pid, tmp)
56 } else {
57 // 没有,直接插入
58 merged.Store(process.Pid, process)
59 }
98 } 60 }
99 defer cur.Close(context.TODO()) // 确保游标被关闭
100 61
101 var res []Process 62 var treeMap sync.Map
102 for cur.Next(context.TODO()) { 63 findTgid := make(map[int]int)
103 var tmp Process 64 var stared int
104 // 解码到Process结构体 65 merged.Range(func(key, val interface{}) bool {
105 if err := cur.Decode(&tmp); err != nil { 66 tmp := val.(Process)
106 log.Fatal(err) 67 if tmp.Star {
68 stared = tmp.Tgid
69 }
70 // 登记tgid
71 findTgid[tmp.Pid] = tmp.Tgid
72 nodeTmp, ok := treeMap.Load(tmp.Tgid)
73 if ok {
74 // 直接记录
75 node := nodeTmp.(treeNode)
76 node.Threads = append(node.Threads, tmp)
77 node.Children = append(node.Children, tmp.Children...)
78 treeMap.Store(tmp.Tgid, node)
79 } else {
80 node := treeNode{
81 Tgid: tmp.Tgid,
82 Threads: make([]Process, 0),
83 Children: make([]int, 0),
84 }
85 node.Threads = append(node.Threads, tmp)
86 node.Children = append(node.Children, tmp.Children...)
87 treeMap.Store(tmp.Tgid, node)
107 } 88 }
108 res = append(res, tmp) 89 return true
90 })
91
92 // 从tgid==stared开始,构建树
93 var helloTree sync.Map // 在树上的tgid节点
94 var q Queue // 记录每一个整理好的结构体,bfs
95 visited := make(map[int]bool)
96 visited[stared] = true
97 tmp, ok := treeMap.Load(stared)
98 if !ok {
99 return
109 } 100 }
110 101
111 if err := cur.Err(); err != nil { 102 q.Enqueue(tmp)
112 log.Fatal(err) 103 helloTree.Store(stared, tmp)
104 for !q.IsEmpty() {
105 tmp, ok := q.Dequeue()
106 if !ok {
107 continue
108 }
109 node := tmp.(treeNode)
110 for i := 0; i < len(node.Children); i++ {
111 tgid := findTgid[node.Children[i]]
112 _, exists := visited[tgid]
113 if !exists {
114 visited[tgid] = true
115 tgidNode, ok := treeMap.Load(tgid)
116 if !ok {
117 continue
118 }
119 helloTree.Store(tgid, tgidNode)
120 q.Enqueue(tgidNode)
121 }
122 }
123 }
124
125 // TODO:
126 // 1.修改数据结构,使之自身即存储树结构,插入数据库后前端拿出来就能用
127 // 2.还有其余优化要做,比如线程退出时间与进程推出时间
128
129 // count := 0
130 // helloTree.Range(func(key, val interface{}) bool {
131 // count++
132 // fmt.Printf("tgid: %d\n", val.(treeNode).Tgid)
133 // return true
134 // })
135 // fmt.Printf("Star: %d, res: %d\n", stared, count)
136
137 // 接下来处理文件
138}
139
140func ProMerge(a, b Process) (res Process) {
141 // 合并过程中会遇到什么问题?
142 res.Star = false
143
144 if a.StartTimestamp.IsZero() {
145 res.StartTimestamp = b.StartTimestamp
146 } else if b.StartTimestamp.IsZero() {
147 res.StartTimestamp = a.StartTimestamp
148 } else if a.StartTimestamp.Before(b.StartTimestamp) {
149 res.StartTimestamp = a.StartTimestamp
150 } else {
151 res.StartTimestamp = b.StartTimestamp
152 }
153
154 res.Ppid = a.Ppid
155 if a.ParentTgid == 0 {
156 res.ParentTgid = b.ParentTgid
157 } else {
158 res.ParentTgid = a.ParentTgid
159 }
160
161 res.Pid = a.Pid
162 if a.Tgid == 0 {
163 res.Tgid = b.Tgid
164 } else {
165 res.Tgid = a.Tgid
166 }
167
168 if len(a.Args) == 0 {
169 res.Args = b.Args
170 } else {
171 res.Args = a.Args
113 } 172 }
114 173
115 for i := 0; i < len(res); i++ { 174 if a.Comm == "" {
116 fmt.Printf("------\n%v\n", res[i]) 175 res.Comm = b.Comm
176 } else {
177 res.Comm = a.Comm
117 } 178 }
179
180 if a.RootFS == "" {
181 res.RootFS = b.RootFS
182 } else {
183 res.RootFS = a.RootFS
184 }
185
186 if a.Cwd == "" {
187 res.Cwd = b.Cwd
188 } else {
189 res.Cwd = a.Cwd
190 }
191
192 res.Execve = append(a.Execve, b.Execve...)
193 res.Children = append(a.Children, b.Children...)
194
195 var flag bool // 真a假b
196 if a.ExitTimestamp.IsZero() {
197 flag = false
198 } else if b.ExitTimestamp.IsZero() {
199 flag = true
200 } else if a.ExitTimestamp.Before(b.ExitTimestamp) {
201 flag = true
202 } else {
203 flag = false
204 }
205
206 if flag {
207 res.ExitCode = a.ExitCode
208 res.ExitSignal = a.ExitSignal
209 res.ExitTimestamp = a.ExitTimestamp
210 } else {
211 res.ExitCode = b.ExitCode
212 res.ExitSignal = b.ExitSignal
213 res.ExitTimestamp = b.ExitTimestamp
214 }
215
216 return res
118} 217}
diff --git a/filter/global.go b/filter/global.go
new file mode 100644
index 0000000..45706d4
--- /dev/null
+++ b/filter/global.go
@@ -0,0 +1,92 @@
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8type Exec struct {
9 Timestamp time.Time `bson:"timestamp"`
10 ExecArgs []string `bson:"execArgs"`
11}
12
13type Process struct {
14 Star bool `bson:"star"`
15 StartTimestamp time.Time `bson:"start_timestamp"`
16 Ppid int `bson:"ppid"`
17 ParentTgid int `bson:"parentTgid"`
18 Pid int `bson:"pid"`
19 Tgid int `bson:"tgid"`
20 Args []string `bson:"args"`
21 Comm string `bson:"comm"`
22 RootFS string `bson:"rootfs"`
23 Cwd string `bson:"cwd"`
24 Children []int `bson:"children"`
25 Execve []Exec `bson:"execve"`
26 ExitCode int `bson:"exit_code"`
27 ExitSignal int `bson:"exit_signal"`
28 ExitTimestamp time.Time `bson:"exit_timestamp"`
29}
30
31func (p Process) String() string {
32 var res string
33 res = ""
34 res += fmt.Sprintf("timestamp\t%v\n", p.StartTimestamp)
35 res += fmt.Sprintf("ppid\t%d\nparentTgid\t%d\n", p.Ppid, p.ParentTgid)
36 res += fmt.Sprintf("pid\t%d\ntgid\t%d\nargs: ", p.Pid, p.Tgid)
37 for i := 0; i < len(p.Args); i++ {
38 res += fmt.Sprintf("%s ", p.Args[i])
39 }
40 res += fmt.Sprintf("\ncomm\t%s\ncwd\t%s\n", p.Comm, p.Cwd)
41 if len(p.Execve) != 0 {
42 res += fmt.Sprintf("exec:\n")
43 for i := 0; i < len(p.Execve); i++ {
44 res += fmt.Sprintf("\ttimestamp: %v\n\texecArgs:\t", p.Execve[i].Timestamp)
45 for j := 0; j < len(p.Execve[i].ExecArgs); j++ {
46 res += fmt.Sprintf("%s ", p.Execve[i].ExecArgs[j])
47 }
48 res += fmt.Sprintf("\n")
49 }
50 }
51 res += fmt.Sprintf("children: ")
52 for i := 0; i < len(p.Children); i++ {
53 res += fmt.Sprintf("%d ", p.Children[i])
54 }
55 res += fmt.Sprintf("\n")
56 return res
57}
58
59// Queue 定义一个队列结构体
60type Queue struct {
61 items []interface{}
62}
63
64// NewQueue 创建一个新的队列
65func NewQueue() *Queue {
66 return &Queue{items: make([]interface{}, 0)}
67}
68
69// Enqueue 向队列中添加一个元素
70func (q *Queue) Enqueue(item interface{}) {
71 q.items = append(q.items, item)
72}
73
74// Dequeue 从队列中移除并返回队列前面的元素
75func (q *Queue) Dequeue() (interface{}, bool) {
76 if len(q.items) == 0 {
77 return nil, false
78 }
79 item := q.items[0]
80 q.items = q.items[1:]
81 return item, true
82}
83
84// Size 返回队列中的元素数量
85func (q *Queue) Size() int {
86 return len(q.items)
87}
88
89// IsEmpty 检查队列是否为空
90func (q *Queue) IsEmpty() bool {
91 return len(q.items) == 0
92}