diff options
Diffstat (limited to '')
-rw-r--r-- | filter/filter.go | 267 | ||||
-rw-r--r-- | filter/global.go | 92 | ||||
-rw-r--r-- | listener/deal.go | 47 |
3 files changed, 308 insertions, 98 deletions
diff --git a/filter/filter.go b/filter/filter.go index c83fb13..2a774a3 100644 --- a/filter/filter.go +++ b/filter/filter.go | |||
@@ -5,114 +5,213 @@ import ( | |||
5 | "fmt" | 5 | "fmt" |
6 | "log" | 6 | "log" |
7 | "os" | 7 | "os" |
8 | "time" | 8 | "sync" |
9 | 9 | ||
10 | "go.mongodb.org/mongo-driver/bson" | 10 | "go.mongodb.org/mongo-driver/bson" |
11 | "go.mongodb.org/mongo-driver/mongo" | 11 | "go.mongodb.org/mongo-driver/mongo" |
12 | "go.mongodb.org/mongo-driver/mongo/options" | 12 | "go.mongodb.org/mongo-driver/mongo/options" |
13 | "go.mongodb.org/mongo-driver/mongo/readpref" | ||
14 | ) | 13 | ) |
15 | 14 | ||
16 | type Exec struct { | 15 | const ( |
17 | timestamp time.Time `bson:"timestamp"` | 16 | oldDBName = "test" |
18 | execArgs []string `bson:"execArgs"` | 17 | oldPidColName = "pids" |
19 | } | 18 | ) |
20 | |||
21 | type Process struct { | ||
22 | timestamp time.Time `bson:"start_timestamp"` | ||
23 | ppid int `bson:"ppid"` | ||
24 | parentTgid int `bson:"parentTgid"` | ||
25 | pid int `bson:"pid"` | ||
26 | tgid int `bson:"tgid"` | ||
27 | args []string `bson:"args"` | ||
28 | comm string `bson:"comm"` | ||
29 | cwd string `bson:"cwd"` | ||
30 | execve []Exec `bson:"execve"` | ||
31 | exit_code int `bson:"exit_code"` | ||
32 | exit_signal int `bson:"exit_signal"` | ||
33 | exit_timestamp time.Time `bson:"exit_timestamp"` | ||
34 | } | ||
35 | 19 | ||
36 | func (p Process) String() string { | 20 | type treeNode struct { |
37 | var res string | 21 | Tgid int |
38 | res = "" | 22 | Threads []Process |
39 | res += fmt.Sprintf("timestamp\t%v\n", p.timestamp) | 23 | Children []int |
40 | res += fmt.Sprintf("ppid\t%d\nparentTgid\t%d\n", p.ppid, p.parentTgid) | ||
41 | res += fmt.Sprintf("pid\t%d\ntgid\t%d\nargs: ", p.pid, p.tgid) | ||
42 | for i := 0; i < len(p.args); i++ { | ||
43 | res += fmt.Sprintf("%s ", p.args[i]) | ||
44 | } | ||
45 | res += fmt.Sprintf("\ncomm\t%s\ncwd\t%s\n", p.comm, p.cwd) | ||
46 | return res | ||
47 | } | 24 | } |
48 | 25 | ||
49 | // type Process struct { | ||
50 | // StartTimestamp time.Time `bson:"start_timestamp"` | ||
51 | // Ppid *int `bson:"ppid"` | ||
52 | // ParentTgid *int `bson:"parentTgid"` | ||
53 | // Pid int `bson:"pid"` | ||
54 | // Tgid int `bson:"tgid"` | ||
55 | // Args []string `bson:"args"` | ||
56 | // Comm *string `bson:"comm"` | ||
57 | // Cwd *string `bson:"cwd"` | ||
58 | // Execve []Exec `bson:"execve"` | ||
59 | // ExitCode *int `bson:"exit_code"` | ||
60 | // ExitSignal *int `bson:"exit_signal"` | ||
61 | // ExitTimestamp *time.Time `bson:"exit_timestamp"` | ||
62 | // } | ||
63 | |||
64 | // func (p Process) String() string { | ||
65 | // var res string | ||
66 | // res = "" | ||
67 | // res += fmt.Sprintf("timestamp\t%v\n", p.StartTimestamp) | ||
68 | // if p.Ppid != nil && p.ParentTgid != nil { | ||
69 | // res += fmt.Sprintf("ppid\t%d\nparentTgid\t%d\n", *(p.Ppid), *(p.ParentTgid)) | ||
70 | // } | ||
71 | // res += fmt.Sprintf("pid\t%d\ntgid\t%d\nargs: ", p.Pid, p.Tgid) | ||
72 | // for i := 0; i < len(p.Args); i++ { | ||
73 | // res += fmt.Sprintf("%s ", p.Args[i]) | ||
74 | // } | ||
75 | // if p.Comm != nil && p.Cwd != nil { | ||
76 | // res += fmt.Sprintf("\ncomm\t%s\ncwd\t%s\n", *(p.Comm), *(p.Cwd)) | ||
77 | // } | ||
78 | // return res | ||
79 | // } | ||
80 | |||
81 | func main() { | 26 | func main() { |
27 | // 连接到MongoDB | ||
82 | client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI("mongodb://localhost:27017")) | 28 | client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI("mongodb://localhost:27017")) |
83 | if err != nil { | 29 | if err != nil { |
84 | fmt.Fprintf(os.Stderr, "Err connecting mongodb: %v\n", err) | 30 | log.Fatal(err) |
85 | } | 31 | } |
86 | defer client.Disconnect(context.TODO()) | 32 | defer client.Disconnect(context.TODO()) |
87 | 33 | ||
88 | // 检查连接 | 34 | // 选择数据库和集合 |
89 | err = client.Ping(context.TODO(), readpref.Primary()) | 35 | db := client.Database(oldDBName) |
36 | collection := db.Collection(oldPidColName) | ||
37 | |||
38 | // 提取所有数据 | ||
39 | var res []Process | ||
40 | |||
41 | cursor, err := collection.Find(context.Background(), bson.M{}) | ||
90 | if err != nil { | 42 | if err != nil { |
91 | log.Fatal(err) | 43 | fmt.Fprintf(os.Stderr, "Err: %v\n", err) |
44 | } | ||
45 | if err := cursor.All(context.Background(), &res); err != nil { | ||
46 | fmt.Fprintf(os.Stderr, "Err All: %v\n", err) | ||
92 | } | 47 | } |
93 | 48 | ||
94 | pidCol := client.Database("test").Collection("pids") | 49 | var merged sync.Map |
95 | cur, err := pidCol.Find(context.TODO(), bson.M{}) // 查询所有文档 | 50 | for _, process := range res { |
96 | if err != nil { | 51 | tmp, ok := merged.Load(process.Pid) |
97 | log.Fatal(err) | 52 | if ok { |
53 | // 证明重复了,要合并 | ||
54 | tmp := ProMerge(tmp.(Process), process) | ||
55 | merged.Store(process.Pid, tmp) | ||
56 | } else { | ||
57 | // 没有,直接插入 | ||
58 | merged.Store(process.Pid, process) | ||
59 | } | ||
98 | } | 60 | } |
99 | defer cur.Close(context.TODO()) // 确保游标被关闭 | ||
100 | 61 | ||
101 | var res []Process | 62 | var treeMap sync.Map |
102 | for cur.Next(context.TODO()) { | 63 | findTgid := make(map[int]int) |
103 | var tmp Process | 64 | var stared int |
104 | // 解码到Process结构体 | 65 | merged.Range(func(key, val interface{}) bool { |
105 | if err := cur.Decode(&tmp); err != nil { | 66 | tmp := val.(Process) |
106 | log.Fatal(err) | 67 | if tmp.Star { |
68 | stared = tmp.Tgid | ||
69 | } | ||
70 | // 登记tgid | ||
71 | findTgid[tmp.Pid] = tmp.Tgid | ||
72 | nodeTmp, ok := treeMap.Load(tmp.Tgid) | ||
73 | if ok { | ||
74 | // 直接记录 | ||
75 | node := nodeTmp.(treeNode) | ||
76 | node.Threads = append(node.Threads, tmp) | ||
77 | node.Children = append(node.Children, tmp.Children...) | ||
78 | treeMap.Store(tmp.Tgid, node) | ||
79 | } else { | ||
80 | node := treeNode{ | ||
81 | Tgid: tmp.Tgid, | ||
82 | Threads: make([]Process, 0), | ||
83 | Children: make([]int, 0), | ||
84 | } | ||
85 | node.Threads = append(node.Threads, tmp) | ||
86 | node.Children = append(node.Children, tmp.Children...) | ||
87 | treeMap.Store(tmp.Tgid, node) | ||
107 | } | 88 | } |
108 | res = append(res, tmp) | 89 | return true |
90 | }) | ||
91 | |||
92 | // 从tgid==stared开始,构建树 | ||
93 | var helloTree sync.Map // 在树上的tgid节点 | ||
94 | var q Queue // 记录每一个整理好的结构体,bfs | ||
95 | visited := make(map[int]bool) | ||
96 | visited[stared] = true | ||
97 | tmp, ok := treeMap.Load(stared) | ||
98 | if !ok { | ||
99 | return | ||
109 | } | 100 | } |
110 | 101 | ||
111 | if err := cur.Err(); err != nil { | 102 | q.Enqueue(tmp) |
112 | log.Fatal(err) | 103 | helloTree.Store(stared, tmp) |
104 | for !q.IsEmpty() { | ||
105 | tmp, ok := q.Dequeue() | ||
106 | if !ok { | ||
107 | continue | ||
108 | } | ||
109 | node := tmp.(treeNode) | ||
110 | for i := 0; i < len(node.Children); i++ { | ||
111 | tgid := findTgid[node.Children[i]] | ||
112 | _, exists := visited[tgid] | ||
113 | if !exists { | ||
114 | visited[tgid] = true | ||
115 | tgidNode, ok := treeMap.Load(tgid) | ||
116 | if !ok { | ||
117 | continue | ||
118 | } | ||
119 | helloTree.Store(tgid, tgidNode) | ||
120 | q.Enqueue(tgidNode) | ||
121 | } | ||
122 | } | ||
123 | } | ||
124 | |||
125 | // TODO: | ||
126 | // 1.修改数据结构,使之自身即存储树结构,插入数据库后前端拿出来就能用 | ||
127 | // 2.还有其余优化要做,比如线程退出时间与进程推出时间 | ||
128 | |||
129 | // count := 0 | ||
130 | // helloTree.Range(func(key, val interface{}) bool { | ||
131 | // count++ | ||
132 | // fmt.Printf("tgid: %d\n", val.(treeNode).Tgid) | ||
133 | // return true | ||
134 | // }) | ||
135 | // fmt.Printf("Star: %d, res: %d\n", stared, count) | ||
136 | |||
137 | // 接下来处理文件 | ||
138 | } | ||
139 | |||
140 | func ProMerge(a, b Process) (res Process) { | ||
141 | // 合并过程中会遇到什么问题? | ||
142 | res.Star = false | ||
143 | |||
144 | if a.StartTimestamp.IsZero() { | ||
145 | res.StartTimestamp = b.StartTimestamp | ||
146 | } else if b.StartTimestamp.IsZero() { | ||
147 | res.StartTimestamp = a.StartTimestamp | ||
148 | } else if a.StartTimestamp.Before(b.StartTimestamp) { | ||
149 | res.StartTimestamp = a.StartTimestamp | ||
150 | } else { | ||
151 | res.StartTimestamp = b.StartTimestamp | ||
152 | } | ||
153 | |||
154 | res.Ppid = a.Ppid | ||
155 | if a.ParentTgid == 0 { | ||
156 | res.ParentTgid = b.ParentTgid | ||
157 | } else { | ||
158 | res.ParentTgid = a.ParentTgid | ||
159 | } | ||
160 | |||
161 | res.Pid = a.Pid | ||
162 | if a.Tgid == 0 { | ||
163 | res.Tgid = b.Tgid | ||
164 | } else { | ||
165 | res.Tgid = a.Tgid | ||
166 | } | ||
167 | |||
168 | if len(a.Args) == 0 { | ||
169 | res.Args = b.Args | ||
170 | } else { | ||
171 | res.Args = a.Args | ||
113 | } | 172 | } |
114 | 173 | ||
115 | for i := 0; i < len(res); i++ { | 174 | if a.Comm == "" { |
116 | fmt.Printf("------\n%v\n", res[i]) | 175 | res.Comm = b.Comm |
176 | } else { | ||
177 | res.Comm = a.Comm | ||
117 | } | 178 | } |
179 | |||
180 | if a.RootFS == "" { | ||
181 | res.RootFS = b.RootFS | ||
182 | } else { | ||
183 | res.RootFS = a.RootFS | ||
184 | } | ||
185 | |||
186 | if a.Cwd == "" { | ||
187 | res.Cwd = b.Cwd | ||
188 | } else { | ||
189 | res.Cwd = a.Cwd | ||
190 | } | ||
191 | |||
192 | res.Execve = append(a.Execve, b.Execve...) | ||
193 | res.Children = append(a.Children, b.Children...) | ||
194 | |||
195 | var flag bool // 真a假b | ||
196 | if a.ExitTimestamp.IsZero() { | ||
197 | flag = false | ||
198 | } else if b.ExitTimestamp.IsZero() { | ||
199 | flag = true | ||
200 | } else if a.ExitTimestamp.Before(b.ExitTimestamp) { | ||
201 | flag = true | ||
202 | } else { | ||
203 | flag = false | ||
204 | } | ||
205 | |||
206 | if flag { | ||
207 | res.ExitCode = a.ExitCode | ||
208 | res.ExitSignal = a.ExitSignal | ||
209 | res.ExitTimestamp = a.ExitTimestamp | ||
210 | } else { | ||
211 | res.ExitCode = b.ExitCode | ||
212 | res.ExitSignal = b.ExitSignal | ||
213 | res.ExitTimestamp = b.ExitTimestamp | ||
214 | } | ||
215 | |||
216 | return res | ||
118 | } | 217 | } |
diff --git a/filter/global.go b/filter/global.go new file mode 100644 index 0000000..45706d4 --- /dev/null +++ b/filter/global.go | |||
@@ -0,0 +1,92 @@ | |||
1 | package main | ||
2 | |||
3 | import ( | ||
4 | "fmt" | ||
5 | "time" | ||
6 | ) | ||
7 | |||
8 | type Exec struct { | ||
9 | Timestamp time.Time `bson:"timestamp"` | ||
10 | ExecArgs []string `bson:"execArgs"` | ||
11 | } | ||
12 | |||
13 | type Process struct { | ||
14 | Star bool `bson:"star"` | ||
15 | StartTimestamp time.Time `bson:"start_timestamp"` | ||
16 | Ppid int `bson:"ppid"` | ||
17 | ParentTgid int `bson:"parentTgid"` | ||
18 | Pid int `bson:"pid"` | ||
19 | Tgid int `bson:"tgid"` | ||
20 | Args []string `bson:"args"` | ||
21 | Comm string `bson:"comm"` | ||
22 | RootFS string `bson:"rootfs"` | ||
23 | Cwd string `bson:"cwd"` | ||
24 | Children []int `bson:"children"` | ||
25 | Execve []Exec `bson:"execve"` | ||
26 | ExitCode int `bson:"exit_code"` | ||
27 | ExitSignal int `bson:"exit_signal"` | ||
28 | ExitTimestamp time.Time `bson:"exit_timestamp"` | ||
29 | } | ||
30 | |||
31 | func (p Process) String() string { | ||
32 | var res string | ||
33 | res = "" | ||
34 | res += fmt.Sprintf("timestamp\t%v\n", p.StartTimestamp) | ||
35 | res += fmt.Sprintf("ppid\t%d\nparentTgid\t%d\n", p.Ppid, p.ParentTgid) | ||
36 | res += fmt.Sprintf("pid\t%d\ntgid\t%d\nargs: ", p.Pid, p.Tgid) | ||
37 | for i := 0; i < len(p.Args); i++ { | ||
38 | res += fmt.Sprintf("%s ", p.Args[i]) | ||
39 | } | ||
40 | res += fmt.Sprintf("\ncomm\t%s\ncwd\t%s\n", p.Comm, p.Cwd) | ||
41 | if len(p.Execve) != 0 { | ||
42 | res += fmt.Sprintf("exec:\n") | ||
43 | for i := 0; i < len(p.Execve); i++ { | ||
44 | res += fmt.Sprintf("\ttimestamp: %v\n\texecArgs:\t", p.Execve[i].Timestamp) | ||
45 | for j := 0; j < len(p.Execve[i].ExecArgs); j++ { | ||
46 | res += fmt.Sprintf("%s ", p.Execve[i].ExecArgs[j]) | ||
47 | } | ||
48 | res += fmt.Sprintf("\n") | ||
49 | } | ||
50 | } | ||
51 | res += fmt.Sprintf("children: ") | ||
52 | for i := 0; i < len(p.Children); i++ { | ||
53 | res += fmt.Sprintf("%d ", p.Children[i]) | ||
54 | } | ||
55 | res += fmt.Sprintf("\n") | ||
56 | return res | ||
57 | } | ||
58 | |||
59 | // Queue 定义一个队列结构体 | ||
60 | type Queue struct { | ||
61 | items []interface{} | ||
62 | } | ||
63 | |||
64 | // NewQueue 创建一个新的队列 | ||
65 | func NewQueue() *Queue { | ||
66 | return &Queue{items: make([]interface{}, 0)} | ||
67 | } | ||
68 | |||
69 | // Enqueue 向队列中添加一个元素 | ||
70 | func (q *Queue) Enqueue(item interface{}) { | ||
71 | q.items = append(q.items, item) | ||
72 | } | ||
73 | |||
74 | // Dequeue 从队列中移除并返回队列前面的元素 | ||
75 | func (q *Queue) Dequeue() (interface{}, bool) { | ||
76 | if len(q.items) == 0 { | ||
77 | return nil, false | ||
78 | } | ||
79 | item := q.items[0] | ||
80 | q.items = q.items[1:] | ||
81 | return item, true | ||
82 | } | ||
83 | |||
84 | // Size 返回队列中的元素数量 | ||
85 | func (q *Queue) Size() int { | ||
86 | return len(q.items) | ||
87 | } | ||
88 | |||
89 | // IsEmpty 检查队列是否为空 | ||
90 | func (q *Queue) IsEmpty() bool { | ||
91 | return len(q.items) == 0 | ||
92 | } | ||
diff --git a/listener/deal.go b/listener/deal.go index 8f77431..8225224 100644 --- a/listener/deal.go +++ b/listener/deal.go | |||
@@ -3,6 +3,7 @@ package main | |||
3 | import ( | 3 | import ( |
4 | "fmt" | 4 | "fmt" |
5 | "os" | 5 | "os" |
6 | "strconv" | ||
6 | "syscall" | 7 | "syscall" |
7 | "time" | 8 | "time" |
8 | 9 | ||
@@ -17,27 +18,45 @@ const ( | |||
17 | ) | 18 | ) |
18 | 19 | ||
19 | var pidCol, fdCol, fileCol mongoClient | 20 | var pidCol, fdCol, fileCol mongoClient |
20 | var err error | 21 | |
22 | func initPidCol() (err error) { | ||
23 | // TODO: 这里是否需要补全一下进程信息? | ||
24 | dirs, err := os.ReadDir(fmt.Sprintf("/proc/%d/task", containerdPid)) | ||
25 | if err != nil { | ||
26 | return err | ||
27 | } | ||
28 | for _, file := range dirs { | ||
29 | pid, _ := strconv.Atoi(file.Name()) | ||
30 | process := Process{ | ||
31 | Ppid: 1, | ||
32 | ParentTgid: 1, | ||
33 | Pid: pid, | ||
34 | Tgid: containerdPid, | ||
35 | Cwd: "/", | ||
36 | Children: make([]int, 0), | ||
37 | Execve: make([]Exec, 0), | ||
38 | Args: make([]string, 0), | ||
39 | } | ||
40 | if pid == containerdPid { | ||
41 | process.Star = true | ||
42 | } | ||
43 | err = pidCol.InsertOne(process) | ||
44 | } | ||
45 | return nil | ||
46 | } | ||
21 | 47 | ||
22 | func deal() { | 48 | func deal() { |
23 | defer wg.Done() | 49 | defer wg.Done() |
24 | var cooked Event | 50 | var cooked Event |
25 | var ok bool | 51 | var ok bool |
52 | var err error | ||
26 | 53 | ||
27 | if err = pidCol.init(dbName, pidColName); err != nil { | 54 | if err = pidCol.init(dbName, pidColName); err != nil { |
28 | fmt.Fprintf(os.Stderr, "Error while initing the mongodb: %v\n", err) | 55 | fmt.Fprintf(os.Stderr, "Error while initing the mongodb: %v\n", err) |
29 | return | 56 | return |
30 | } | 57 | } |
31 | err = pidCol.InsertOne(Process{ | 58 | if err = initPidCol(); err != nil { |
32 | Ppid: 1, | 59 | fmt.Fprintf(os.Stderr, "Err while initing pidcol: %v\n", err) |
33 | Pid: containerdPid, | ||
34 | Cwd: "/", | ||
35 | Children: make([]int, 0), | ||
36 | Star: true, | ||
37 | }) | ||
38 | if err != nil { | ||
39 | fmt.Fprintf(os.Stderr, "Error while initing the mongodb: %v\n", err) | ||
40 | return | ||
41 | } | 60 | } |
42 | 61 | ||
43 | if err = fdCol.init(dbName, fdColName); err != nil { | 62 | if err = fdCol.init(dbName, fdColName); err != nil { |
@@ -96,7 +115,7 @@ func deletePid(cooked Event) { | |||
96 | func dealNewPid(cooked Event) { | 115 | func dealNewPid(cooked Event) { |
97 | // 自身是否已经记录 | 116 | // 自身是否已经记录 |
98 | var docRes []Process | 117 | var docRes []Process |
99 | err = pidCol.Finddoc(bson.M{"pid": cooked.pid}, &docRes) | 118 | err := pidCol.Finddoc(bson.M{"pid": cooked.pid}, &docRes) |
100 | if err != nil { | 119 | if err != nil { |
101 | fmt.Fprintf(os.Stderr, "Err finding: %v\n", err) | 120 | fmt.Fprintf(os.Stderr, "Err finding: %v\n", err) |
102 | return | 121 | return |
@@ -136,7 +155,7 @@ func dealNewPid(cooked Event) { | |||
136 | } | 155 | } |
137 | } | 156 | } |
138 | 157 | ||
139 | err := pidCol.UpdateOne(bson.M{"pid": cooked.ppid}, bson.M{ | 158 | err = pidCol.UpdateOne(bson.M{"pid": cooked.ppid}, bson.M{ |
140 | "$push": bson.M{ | 159 | "$push": bson.M{ |
141 | "children": cooked.pid, | 160 | "children": cooked.pid, |
142 | }, | 161 | }, |
@@ -149,7 +168,7 @@ func dealNewPid(cooked Event) { | |||
149 | func dealExecve(cooked Event) { | 168 | func dealExecve(cooked Event) { |
150 | var docRes []Process | 169 | var docRes []Process |
151 | // 首先检查进程是否存在,如不存在则为之创建 | 170 | // 首先检查进程是否存在,如不存在则为之创建 |
152 | err = pidCol.Finddoc(bson.M{"pid": cooked.pid}, &docRes) | 171 | err := pidCol.Finddoc(bson.M{"pid": cooked.pid}, &docRes) |
153 | if err != nil { | 172 | if err != nil { |
154 | return | 173 | return |
155 | } | 174 | } |