diff options
author | We-unite <3205135446@qq.com> | 2024-08-13 10:53:24 +0800 |
---|---|---|
committer | We-unite <3205135446@qq.com> | 2024-08-13 10:53:24 +0800 |
commit | dfdb18f83f7a957f99196369d97827d6209eeb9a (patch) | |
tree | 2ac5234a8322ba0ee52bc87fcdd93d56161ab67c /filter | |
parent | 3e49a044d22635157916651f0acb5a062397b34b (diff) | |
download | godo-dfdb18f83f7a957f99196369d97827d6209eeb9a.tar.gz godo-dfdb18f83f7a957f99196369d97827d6209eeb9a.zip |
Filtering process data from mongodb
First of all, fix sth in listener to fit the function of filter. Ori-
ginally, listener mark the /usr/bin/containerd process id with star,
but the children in db is updated by ppid, which is pid of parent but
not tgid, so the stared pid has no children. To Fix this, we add all
the pid of /usr/bin/containerd into the db, and set their ptgid/tgid,
so that they're just normal process as others. Maybe we should finish
the info of these processes? haha.
Then, the filter of pid. There're some designed steps to do, and their
methods are as follows:
- Initially, because of the multithreading execution of listener,
there may be several entries for the same process, and we should
merge them. Extract data from database into a slice, and use a map
to record process info. Iterate the slice, if the pid is in the
map, then merge them, else insert into the map.
- Then, we should build process tree, but what we have is pid. So
use another data structure, iterate merged process map, and build
a map from tgid to a slice of processes. Find out the star. Build
a map from pid to its tgid.
- BFS. Design a simple queue, and build the tree from the root(stared
tgid), record all the visited tgid in another map. That's just the
tree.
As usual, let's talk about the remaining issues:
- Some pids did not recieve exit message. Check the exit time of
its tgid, or even its ppid.
- Optimize the data structure, record the tree by itself. Now the
tree is recorded by not only the last helloTree map from tgid to
slice but the map from pid to tgid. It's hard to store in the
database. Design a better ds, so the viewer can build the tree
quickly from the data in db.
- For future file filter, the close time, the same file for the same
pid, and the pathName of a file, should be paid mych attention.
Fighting!
Diffstat (limited to 'filter')
-rw-r--r-- | filter/filter.go | 267 | ||||
-rw-r--r-- | filter/global.go | 92 |
2 files changed, 275 insertions, 84 deletions
diff --git a/filter/filter.go b/filter/filter.go index c83fb13..2a774a3 100644 --- a/filter/filter.go +++ b/filter/filter.go | |||
@@ -5,114 +5,213 @@ import ( | |||
5 | "fmt" | 5 | "fmt" |
6 | "log" | 6 | "log" |
7 | "os" | 7 | "os" |
8 | "time" | 8 | "sync" |
9 | 9 | ||
10 | "go.mongodb.org/mongo-driver/bson" | 10 | "go.mongodb.org/mongo-driver/bson" |
11 | "go.mongodb.org/mongo-driver/mongo" | 11 | "go.mongodb.org/mongo-driver/mongo" |
12 | "go.mongodb.org/mongo-driver/mongo/options" | 12 | "go.mongodb.org/mongo-driver/mongo/options" |
13 | "go.mongodb.org/mongo-driver/mongo/readpref" | ||
14 | ) | 13 | ) |
15 | 14 | ||
16 | type Exec struct { | 15 | const ( |
17 | timestamp time.Time `bson:"timestamp"` | 16 | oldDBName = "test" |
18 | execArgs []string `bson:"execArgs"` | 17 | oldPidColName = "pids" |
19 | } | 18 | ) |
20 | |||
21 | type Process struct { | ||
22 | timestamp time.Time `bson:"start_timestamp"` | ||
23 | ppid int `bson:"ppid"` | ||
24 | parentTgid int `bson:"parentTgid"` | ||
25 | pid int `bson:"pid"` | ||
26 | tgid int `bson:"tgid"` | ||
27 | args []string `bson:"args"` | ||
28 | comm string `bson:"comm"` | ||
29 | cwd string `bson:"cwd"` | ||
30 | execve []Exec `bson:"execve"` | ||
31 | exit_code int `bson:"exit_code"` | ||
32 | exit_signal int `bson:"exit_signal"` | ||
33 | exit_timestamp time.Time `bson:"exit_timestamp"` | ||
34 | } | ||
35 | 19 | ||
36 | func (p Process) String() string { | 20 | type treeNode struct { |
37 | var res string | 21 | Tgid int |
38 | res = "" | 22 | Threads []Process |
39 | res += fmt.Sprintf("timestamp\t%v\n", p.timestamp) | 23 | Children []int |
40 | res += fmt.Sprintf("ppid\t%d\nparentTgid\t%d\n", p.ppid, p.parentTgid) | ||
41 | res += fmt.Sprintf("pid\t%d\ntgid\t%d\nargs: ", p.pid, p.tgid) | ||
42 | for i := 0; i < len(p.args); i++ { | ||
43 | res += fmt.Sprintf("%s ", p.args[i]) | ||
44 | } | ||
45 | res += fmt.Sprintf("\ncomm\t%s\ncwd\t%s\n", p.comm, p.cwd) | ||
46 | return res | ||
47 | } | 24 | } |
48 | 25 | ||
49 | // type Process struct { | ||
50 | // StartTimestamp time.Time `bson:"start_timestamp"` | ||
51 | // Ppid *int `bson:"ppid"` | ||
52 | // ParentTgid *int `bson:"parentTgid"` | ||
53 | // Pid int `bson:"pid"` | ||
54 | // Tgid int `bson:"tgid"` | ||
55 | // Args []string `bson:"args"` | ||
56 | // Comm *string `bson:"comm"` | ||
57 | // Cwd *string `bson:"cwd"` | ||
58 | // Execve []Exec `bson:"execve"` | ||
59 | // ExitCode *int `bson:"exit_code"` | ||
60 | // ExitSignal *int `bson:"exit_signal"` | ||
61 | // ExitTimestamp *time.Time `bson:"exit_timestamp"` | ||
62 | // } | ||
63 | |||
64 | // func (p Process) String() string { | ||
65 | // var res string | ||
66 | // res = "" | ||
67 | // res += fmt.Sprintf("timestamp\t%v\n", p.StartTimestamp) | ||
68 | // if p.Ppid != nil && p.ParentTgid != nil { | ||
69 | // res += fmt.Sprintf("ppid\t%d\nparentTgid\t%d\n", *(p.Ppid), *(p.ParentTgid)) | ||
70 | // } | ||
71 | // res += fmt.Sprintf("pid\t%d\ntgid\t%d\nargs: ", p.Pid, p.Tgid) | ||
72 | // for i := 0; i < len(p.Args); i++ { | ||
73 | // res += fmt.Sprintf("%s ", p.Args[i]) | ||
74 | // } | ||
75 | // if p.Comm != nil && p.Cwd != nil { | ||
76 | // res += fmt.Sprintf("\ncomm\t%s\ncwd\t%s\n", *(p.Comm), *(p.Cwd)) | ||
77 | // } | ||
78 | // return res | ||
79 | // } | ||
80 | |||
81 | func main() { | 26 | func main() { |
27 | // 连接到MongoDB | ||
82 | client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI("mongodb://localhost:27017")) | 28 | client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI("mongodb://localhost:27017")) |
83 | if err != nil { | 29 | if err != nil { |
84 | fmt.Fprintf(os.Stderr, "Err connecting mongodb: %v\n", err) | 30 | log.Fatal(err) |
85 | } | 31 | } |
86 | defer client.Disconnect(context.TODO()) | 32 | defer client.Disconnect(context.TODO()) |
87 | 33 | ||
88 | // 检查连接 | 34 | // 选择数据库和集合 |
89 | err = client.Ping(context.TODO(), readpref.Primary()) | 35 | db := client.Database(oldDBName) |
36 | collection := db.Collection(oldPidColName) | ||
37 | |||
38 | // 提取所有数据 | ||
39 | var res []Process | ||
40 | |||
41 | cursor, err := collection.Find(context.Background(), bson.M{}) | ||
90 | if err != nil { | 42 | if err != nil { |
91 | log.Fatal(err) | 43 | fmt.Fprintf(os.Stderr, "Err: %v\n", err) |
44 | } | ||
45 | if err := cursor.All(context.Background(), &res); err != nil { | ||
46 | fmt.Fprintf(os.Stderr, "Err All: %v\n", err) | ||
92 | } | 47 | } |
93 | 48 | ||
94 | pidCol := client.Database("test").Collection("pids") | 49 | var merged sync.Map |
95 | cur, err := pidCol.Find(context.TODO(), bson.M{}) // 查询所有文档 | 50 | for _, process := range res { |
96 | if err != nil { | 51 | tmp, ok := merged.Load(process.Pid) |
97 | log.Fatal(err) | 52 | if ok { |
53 | // 证明重复了,要合并 | ||
54 | tmp := ProMerge(tmp.(Process), process) | ||
55 | merged.Store(process.Pid, tmp) | ||
56 | } else { | ||
57 | // 没有,直接插入 | ||
58 | merged.Store(process.Pid, process) | ||
59 | } | ||
98 | } | 60 | } |
99 | defer cur.Close(context.TODO()) // 确保游标被关闭 | ||
100 | 61 | ||
101 | var res []Process | 62 | var treeMap sync.Map |
102 | for cur.Next(context.TODO()) { | 63 | findTgid := make(map[int]int) |
103 | var tmp Process | 64 | var stared int |
104 | // 解码到Process结构体 | 65 | merged.Range(func(key, val interface{}) bool { |
105 | if err := cur.Decode(&tmp); err != nil { | 66 | tmp := val.(Process) |
106 | log.Fatal(err) | 67 | if tmp.Star { |
68 | stared = tmp.Tgid | ||
69 | } | ||
70 | // 登记tgid | ||
71 | findTgid[tmp.Pid] = tmp.Tgid | ||
72 | nodeTmp, ok := treeMap.Load(tmp.Tgid) | ||
73 | if ok { | ||
74 | // 直接记录 | ||
75 | node := nodeTmp.(treeNode) | ||
76 | node.Threads = append(node.Threads, tmp) | ||
77 | node.Children = append(node.Children, tmp.Children...) | ||
78 | treeMap.Store(tmp.Tgid, node) | ||
79 | } else { | ||
80 | node := treeNode{ | ||
81 | Tgid: tmp.Tgid, | ||
82 | Threads: make([]Process, 0), | ||
83 | Children: make([]int, 0), | ||
84 | } | ||
85 | node.Threads = append(node.Threads, tmp) | ||
86 | node.Children = append(node.Children, tmp.Children...) | ||
87 | treeMap.Store(tmp.Tgid, node) | ||
107 | } | 88 | } |
108 | res = append(res, tmp) | 89 | return true |
90 | }) | ||
91 | |||
92 | // 从tgid==stared开始,构建树 | ||
93 | var helloTree sync.Map // 在树上的tgid节点 | ||
94 | var q Queue // 记录每一个整理好的结构体,bfs | ||
95 | visited := make(map[int]bool) | ||
96 | visited[stared] = true | ||
97 | tmp, ok := treeMap.Load(stared) | ||
98 | if !ok { | ||
99 | return | ||
109 | } | 100 | } |
110 | 101 | ||
111 | if err := cur.Err(); err != nil { | 102 | q.Enqueue(tmp) |
112 | log.Fatal(err) | 103 | helloTree.Store(stared, tmp) |
104 | for !q.IsEmpty() { | ||
105 | tmp, ok := q.Dequeue() | ||
106 | if !ok { | ||
107 | continue | ||
108 | } | ||
109 | node := tmp.(treeNode) | ||
110 | for i := 0; i < len(node.Children); i++ { | ||
111 | tgid := findTgid[node.Children[i]] | ||
112 | _, exists := visited[tgid] | ||
113 | if !exists { | ||
114 | visited[tgid] = true | ||
115 | tgidNode, ok := treeMap.Load(tgid) | ||
116 | if !ok { | ||
117 | continue | ||
118 | } | ||
119 | helloTree.Store(tgid, tgidNode) | ||
120 | q.Enqueue(tgidNode) | ||
121 | } | ||
122 | } | ||
123 | } | ||
124 | |||
125 | // TODO: | ||
126 | // 1.修改数据结构,使之自身即存储树结构,插入数据库后前端拿出来就能用 | ||
127 | // 2.还有其余优化要做,比如线程退出时间与进程推出时间 | ||
128 | |||
129 | // count := 0 | ||
130 | // helloTree.Range(func(key, val interface{}) bool { | ||
131 | // count++ | ||
132 | // fmt.Printf("tgid: %d\n", val.(treeNode).Tgid) | ||
133 | // return true | ||
134 | // }) | ||
135 | // fmt.Printf("Star: %d, res: %d\n", stared, count) | ||
136 | |||
137 | // 接下来处理文件 | ||
138 | } | ||
139 | |||
140 | func ProMerge(a, b Process) (res Process) { | ||
141 | // 合并过程中会遇到什么问题? | ||
142 | res.Star = false | ||
143 | |||
144 | if a.StartTimestamp.IsZero() { | ||
145 | res.StartTimestamp = b.StartTimestamp | ||
146 | } else if b.StartTimestamp.IsZero() { | ||
147 | res.StartTimestamp = a.StartTimestamp | ||
148 | } else if a.StartTimestamp.Before(b.StartTimestamp) { | ||
149 | res.StartTimestamp = a.StartTimestamp | ||
150 | } else { | ||
151 | res.StartTimestamp = b.StartTimestamp | ||
152 | } | ||
153 | |||
154 | res.Ppid = a.Ppid | ||
155 | if a.ParentTgid == 0 { | ||
156 | res.ParentTgid = b.ParentTgid | ||
157 | } else { | ||
158 | res.ParentTgid = a.ParentTgid | ||
159 | } | ||
160 | |||
161 | res.Pid = a.Pid | ||
162 | if a.Tgid == 0 { | ||
163 | res.Tgid = b.Tgid | ||
164 | } else { | ||
165 | res.Tgid = a.Tgid | ||
166 | } | ||
167 | |||
168 | if len(a.Args) == 0 { | ||
169 | res.Args = b.Args | ||
170 | } else { | ||
171 | res.Args = a.Args | ||
113 | } | 172 | } |
114 | 173 | ||
115 | for i := 0; i < len(res); i++ { | 174 | if a.Comm == "" { |
116 | fmt.Printf("------\n%v\n", res[i]) | 175 | res.Comm = b.Comm |
176 | } else { | ||
177 | res.Comm = a.Comm | ||
117 | } | 178 | } |
179 | |||
180 | if a.RootFS == "" { | ||
181 | res.RootFS = b.RootFS | ||
182 | } else { | ||
183 | res.RootFS = a.RootFS | ||
184 | } | ||
185 | |||
186 | if a.Cwd == "" { | ||
187 | res.Cwd = b.Cwd | ||
188 | } else { | ||
189 | res.Cwd = a.Cwd | ||
190 | } | ||
191 | |||
192 | res.Execve = append(a.Execve, b.Execve...) | ||
193 | res.Children = append(a.Children, b.Children...) | ||
194 | |||
195 | var flag bool // 真a假b | ||
196 | if a.ExitTimestamp.IsZero() { | ||
197 | flag = false | ||
198 | } else if b.ExitTimestamp.IsZero() { | ||
199 | flag = true | ||
200 | } else if a.ExitTimestamp.Before(b.ExitTimestamp) { | ||
201 | flag = true | ||
202 | } else { | ||
203 | flag = false | ||
204 | } | ||
205 | |||
206 | if flag { | ||
207 | res.ExitCode = a.ExitCode | ||
208 | res.ExitSignal = a.ExitSignal | ||
209 | res.ExitTimestamp = a.ExitTimestamp | ||
210 | } else { | ||
211 | res.ExitCode = b.ExitCode | ||
212 | res.ExitSignal = b.ExitSignal | ||
213 | res.ExitTimestamp = b.ExitTimestamp | ||
214 | } | ||
215 | |||
216 | return res | ||
118 | } | 217 | } |
diff --git a/filter/global.go b/filter/global.go new file mode 100644 index 0000000..45706d4 --- /dev/null +++ b/filter/global.go | |||
@@ -0,0 +1,92 @@ | |||
1 | package main | ||
2 | |||
3 | import ( | ||
4 | "fmt" | ||
5 | "time" | ||
6 | ) | ||
7 | |||
8 | type Exec struct { | ||
9 | Timestamp time.Time `bson:"timestamp"` | ||
10 | ExecArgs []string `bson:"execArgs"` | ||
11 | } | ||
12 | |||
13 | type Process struct { | ||
14 | Star bool `bson:"star"` | ||
15 | StartTimestamp time.Time `bson:"start_timestamp"` | ||
16 | Ppid int `bson:"ppid"` | ||
17 | ParentTgid int `bson:"parentTgid"` | ||
18 | Pid int `bson:"pid"` | ||
19 | Tgid int `bson:"tgid"` | ||
20 | Args []string `bson:"args"` | ||
21 | Comm string `bson:"comm"` | ||
22 | RootFS string `bson:"rootfs"` | ||
23 | Cwd string `bson:"cwd"` | ||
24 | Children []int `bson:"children"` | ||
25 | Execve []Exec `bson:"execve"` | ||
26 | ExitCode int `bson:"exit_code"` | ||
27 | ExitSignal int `bson:"exit_signal"` | ||
28 | ExitTimestamp time.Time `bson:"exit_timestamp"` | ||
29 | } | ||
30 | |||
31 | func (p Process) String() string { | ||
32 | var res string | ||
33 | res = "" | ||
34 | res += fmt.Sprintf("timestamp\t%v\n", p.StartTimestamp) | ||
35 | res += fmt.Sprintf("ppid\t%d\nparentTgid\t%d\n", p.Ppid, p.ParentTgid) | ||
36 | res += fmt.Sprintf("pid\t%d\ntgid\t%d\nargs: ", p.Pid, p.Tgid) | ||
37 | for i := 0; i < len(p.Args); i++ { | ||
38 | res += fmt.Sprintf("%s ", p.Args[i]) | ||
39 | } | ||
40 | res += fmt.Sprintf("\ncomm\t%s\ncwd\t%s\n", p.Comm, p.Cwd) | ||
41 | if len(p.Execve) != 0 { | ||
42 | res += fmt.Sprintf("exec:\n") | ||
43 | for i := 0; i < len(p.Execve); i++ { | ||
44 | res += fmt.Sprintf("\ttimestamp: %v\n\texecArgs:\t", p.Execve[i].Timestamp) | ||
45 | for j := 0; j < len(p.Execve[i].ExecArgs); j++ { | ||
46 | res += fmt.Sprintf("%s ", p.Execve[i].ExecArgs[j]) | ||
47 | } | ||
48 | res += fmt.Sprintf("\n") | ||
49 | } | ||
50 | } | ||
51 | res += fmt.Sprintf("children: ") | ||
52 | for i := 0; i < len(p.Children); i++ { | ||
53 | res += fmt.Sprintf("%d ", p.Children[i]) | ||
54 | } | ||
55 | res += fmt.Sprintf("\n") | ||
56 | return res | ||
57 | } | ||
58 | |||
59 | // Queue 定义一个队列结构体 | ||
60 | type Queue struct { | ||
61 | items []interface{} | ||
62 | } | ||
63 | |||
64 | // NewQueue 创建一个新的队列 | ||
65 | func NewQueue() *Queue { | ||
66 | return &Queue{items: make([]interface{}, 0)} | ||
67 | } | ||
68 | |||
69 | // Enqueue 向队列中添加一个元素 | ||
70 | func (q *Queue) Enqueue(item interface{}) { | ||
71 | q.items = append(q.items, item) | ||
72 | } | ||
73 | |||
74 | // Dequeue 从队列中移除并返回队列前面的元素 | ||
75 | func (q *Queue) Dequeue() (interface{}, bool) { | ||
76 | if len(q.items) == 0 { | ||
77 | return nil, false | ||
78 | } | ||
79 | item := q.items[0] | ||
80 | q.items = q.items[1:] | ||
81 | return item, true | ||
82 | } | ||
83 | |||
84 | // Size 返回队列中的元素数量 | ||
85 | func (q *Queue) Size() int { | ||
86 | return len(q.items) | ||
87 | } | ||
88 | |||
89 | // IsEmpty 检查队列是否为空 | ||
90 | func (q *Queue) IsEmpty() bool { | ||
91 | return len(q.items) == 0 | ||
92 | } | ||