aboutsummaryrefslogtreecommitdiffstats
path: root/listener/deal.go
diff options
context:
space:
mode:
Diffstat (limited to 'listener/deal.go')
-rw-r--r--listener/deal.go269
1 files changed, 269 insertions, 0 deletions
diff --git a/listener/deal.go b/listener/deal.go
new file mode 100644
index 0000000..8f77431
--- /dev/null
+++ b/listener/deal.go
@@ -0,0 +1,269 @@
1package main
2
3import (
4 "fmt"
5 "os"
6 "syscall"
7 "time"
8
9 "go.mongodb.org/mongo-driver/bson"
10)
11
12const (
13 dbName string = "test"
14 pidColName string = "pids"
15 fdColName string = "fds"
16 fileColName string = "files"
17)
18
19var pidCol, fdCol, fileCol mongoClient
20var err error
21
22func deal() {
23 defer wg.Done()
24 var cooked Event
25 var ok bool
26
27 if err = pidCol.init(dbName, pidColName); err != nil {
28 fmt.Fprintf(os.Stderr, "Error while initing the mongodb: %v\n", err)
29 return
30 }
31 err = pidCol.InsertOne(Process{
32 Ppid: 1,
33 Pid: containerdPid,
34 Cwd: "/",
35 Children: make([]int, 0),
36 Star: true,
37 })
38 if err != nil {
39 fmt.Fprintf(os.Stderr, "Error while initing the mongodb: %v\n", err)
40 return
41 }
42
43 if err = fdCol.init(dbName, fdColName); err != nil {
44 fmt.Fprintf(os.Stderr, "Error while initing the mongodb: %v\n", err)
45 return
46 }
47 if err = fileCol.init(dbName, fileColName); err != nil {
48 fmt.Fprintf(os.Stderr, "Error while initing the mongodb: %v\n", err)
49 }
50
51 defer pidCol.Disconnect()
52 defer fdCol.Disconnect()
53 defer fileCol.Disconnect()
54
55 for {
56 cooked, ok = <-cookedChan
57 if !ok {
58 break
59 }
60
61 switch cooked.tag {
62 case NEWPID:
63 go dealNewPid(cooked)
64 case EXECVE:
65 go dealExecve(cooked)
66 case PIDEXIT:
67 go deletePid(cooked)
68 case FILEOPEN:
69 go fileOpen(cooked)
70 case FILEWRITE:
71 go fileWrite(cooked)
72 case FILECLOSE:
73 go fileClose(cooked)
74 case PIVOTROOT:
75 go pivotRoot(cooked)
76 }
77 }
78}
79
80func deletePid(cooked Event) {
81 // 在这套逻辑里,孩子是不需要收容的,因为我们根本就不看ppid来工作
82 // 父节点那里也不需要销户
83 // 理论上这里需要关闭所有文件描述符,但为了处理效率,留给后续流程
84 err := pidCol.UpdateOne(bson.M{"pid": cooked.pid}, bson.M{
85 "$set": bson.M{
86 "exit_timestamp": cooked.timestamp,
87 "exit_code": cooked.exit_code,
88 "exit_signal": cooked.exit_signal,
89 },
90 })
91 if err != nil {
92 fmt.Fprintf(os.Stderr, "Err updating: %v\n", err)
93 }
94}
95
96func dealNewPid(cooked Event) {
97 // 自身是否已经记录
98 var docRes []Process
99 err = pidCol.Finddoc(bson.M{"pid": cooked.pid}, &docRes)
100 if err != nil {
101 fmt.Fprintf(os.Stderr, "Err finding: %v\n", err)
102 return
103 }
104
105 if len(docRes) != 0 {
106 // 进程原本就存在,换言之别的消息先到了
107 // 所有先行抵达的消息必须保留execve/children字段
108 docRes[0].Ppid = cooked.ppid
109 docRes[0].ParentTgid = cooked.parentTgid
110 docRes[0].Pid = cooked.pid
111 docRes[0].Tgid = cooked.tgid
112 docRes[0].Cwd = cooked.cwd
113 docRes[0].Comm = cooked.comm
114 docRes[0].Args = cooked.argv
115
116 err := pidCol.ReplaceOne(bson.M{"pid": cooked.pid}, docRes[0])
117 if err != nil {
118 fmt.Fprintf(os.Stderr, "Err replaceing: %v\n", err)
119 }
120 } else {
121 // 这进程本是新修的
122 err := pidCol.InsertOne(Process{
123 StartTimestamp: cooked.timestamp,
124 Ppid: cooked.ppid,
125 ParentTgid: cooked.parentTgid,
126 Pid: cooked.pid,
127 Tgid: cooked.tgid,
128 Args: cooked.argv,
129 Comm: cooked.comm,
130 Cwd: cooked.cwd,
131 Execve: make([]Exec, 0),
132 Children: make([]int, 0),
133 })
134 if err != nil {
135 fmt.Fprintf(os.Stderr, "Err inserting: %v\n", err)
136 }
137 }
138
139 err := pidCol.UpdateOne(bson.M{"pid": cooked.ppid}, bson.M{
140 "$push": bson.M{
141 "children": cooked.pid,
142 },
143 })
144 if err != nil {
145 fmt.Fprintf(os.Stderr, "Err updating: %v\n", err)
146 }
147}
148
149func dealExecve(cooked Event) {
150 var docRes []Process
151 // 首先检查进程是否存在,如不存在则为之创建
152 err = pidCol.Finddoc(bson.M{"pid": cooked.pid}, &docRes)
153 if err != nil {
154 return
155 }
156
157 if len(docRes) == 1 {
158 // 自身已在,直接记录
159 docRes[0].Execve = append(docRes[0].Execve, Exec{
160 Timestamp: cooked.timestamp,
161 ExecArgs: cooked.argv,
162 })
163
164 err := pidCol.ReplaceOne(bson.M{"pid": cooked.pid}, docRes[0])
165 if err != nil {
166 fmt.Fprintf(os.Stderr, "Err replacing: %v\n", err)
167 }
168 } else {
169 // 先fork抵达,插入
170 process := Process{
171 Ppid: cooked.ppid,
172 Pid: cooked.pid,
173 Execve: make([]Exec, 0),
174 Children: make([]int, 0),
175 }
176 process.Execve = append(process.Execve, Exec{
177 Timestamp: cooked.timestamp,
178 ExecArgs: cooked.argv,
179 })
180
181 err := pidCol.InsertOne(process)
182 if err != nil {
183 fmt.Fprintf(os.Stderr, "Err inserting: %v\n", err)
184 }
185 }
186}
187
188func fileOpen(cooked Event) {
189 // 权限检查过了,不必再查
190 file := File{
191 OpenTimestamp: cooked.timestamp,
192 FileName: cooked.srcPath,
193 Pid: cooked.pid,
194 Fd: cooked.exit_code,
195 Flags: cooked.syscallParam,
196 Written: make([]time.Time, 0),
197 }
198 if cooked.syscallParam[1]&syscall.O_TRUNC != 0 {
199 // 文件以清空方式打开
200 file.Written = append(file.Written, cooked.timestamp)
201 }
202
203 err := fdCol.InsertOne(file)
204 if err != nil {
205 fmt.Fprintf(os.Stderr, "Err inserting: %v\n", err)
206 }
207}
208
209func fileClose(cooked Event) {
210 res, err := fdCol.FindOneAndDelete(bson.M{"pid": cooked.pid, "fd": cooked.syscallParam[0]})
211 if err != nil {
212 return
213 }
214 res["close_timestamp"] = cooked.timestamp
215
216 err = fileCol.InsertOne(res)
217 if err != nil {
218 fmt.Fprintf(os.Stderr, "Err inserting files: %v\n", err)
219 }
220}
221
222func fileWrite(cooked Event) {
223 var res []File
224 err := fdCol.Finddoc(bson.M{
225 "pid": cooked.pid,
226 "fd": cooked.syscallParam[0],
227 }, &res)
228 if err != nil {
229 fmt.Fprintf(os.Stderr, "Err closing fd %d of pid %d: %v\n", cooked.syscallParam[0], cooked.pid, err)
230 }
231 if len(res) == 0 {
232 return
233 }
234
235 err = fdCol.UpdateOne(bson.M{
236 "pid": cooked.pid,
237 "fd": cooked.syscallParam[0],
238 }, bson.M{"$push": bson.M{"written": cooked.timestamp}})
239 if err != nil {
240 fmt.Fprintf(os.Stderr, "Err updating: %v\n", err)
241 }
242}
243
244func pivotRoot(cooked Event) {
245 var docRes []Process
246 // docker的根目录信息,记录
247 err := pidCol.Finddoc(bson.M{"pid": cooked.pid}, &docRes)
248 if err != nil {
249 fmt.Fprintf(os.Stderr, "Err finding: %v\n", err)
250 return
251 }
252
253 if len(docRes) == 0 {
254 // fork还没到,等一下
255 pidCol.InsertOne(bson.M{
256 "start_timestamp": cooked.timestamp,
257 "ppid": cooked.ppid,
258 "pid": cooked.pid,
259 "rootfs": "cwd",
260 })
261 } else {
262 // 读取已有的工作目录
263 pidCol.UpdateOne(bson.M{"pid": cooked.pid}, bson.M{
264 "$set": bson.M{
265 "rootfs": docRes[0].Cwd,
266 },
267 })
268 }
269}