aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--filter/filter.go145
-rw-r--r--filter/global.go25
-rw-r--r--listener/deal.go16
-rw-r--r--listener/global.go2
-rw-r--r--listener/godo.go21
5 files changed, 153 insertions, 56 deletions
diff --git a/filter/filter.go b/filter/filter.go
index b2341ec..98c326c 100644
--- a/filter/filter.go
+++ b/filter/filter.go
@@ -5,6 +5,7 @@ import (
5 "fmt" 5 "fmt"
6 "log" 6 "log"
7 "os" 7 "os"
8 "path"
8 "sort" 9 "sort"
9 10
10 "go.mongodb.org/mongo-driver/bson" 11 "go.mongodb.org/mongo-driver/bson"
@@ -28,7 +29,7 @@ var findTgid map[int]int
28var helloTree map[int]*tgidNode 29var helloTree map[int]*tgidNode
29 30
30// 文件信息 31// 文件信息
31var files []*File 32var files []File
32 33
33func main() { 34func main() {
34 // 连接到MongoDB 35 // 连接到MongoDB
@@ -110,8 +111,8 @@ func main() {
110 } 111 }
111 112
112 newFileCol := newDB.Collection(newFileColName) 113 newFileCol := newDB.Collection(newFileColName)
113 for _, pFile := range files { 114 for _, file := range files {
114 newFileCol.InsertOne(context.Background(), *pFile) 115 newFileCol.InsertOne(context.Background(), file)
115 } 116 }
116} 117}
117 118
@@ -194,41 +195,48 @@ func ProMerge(a, b Process) (res Process) {
194 return res 195 return res
195} 196}
196 197
197func filtPids(pRawPidData *[]Process) { 198func mergeProcess(pRawPidData *[]Process) (merged []Process) {
198 rawPidData := *pRawPidData 199 rawPidData := *pRawPidData
199 // 合并由多线程导致的重复记录 200 // 合并由多线程导致的重复记录,顺便按照pid升序
200 merged := make(map[int]Process) // pid --> Process 201 index := make(map[int]int)
201 for _, process := range rawPidData { 202 for _, process := range rawPidData {
202 tmp, exists := merged[process.Pid] 203 i, exists := index[process.Pid]
203 if exists { 204 if exists {
204 // 合并 205 // ,合并
205 merged[process.Pid] = ProMerge(tmp, process) 206 merged[i] = ProMerge(merged[i], process)
206 } else { 207 } else {
207 // 没有,直接插入 208 // 不存在,直接添加
208 merged[process.Pid] = process 209 merged = append(merged, process)
210 index[process.Pid] = len(merged) - 1
209 } 211 }
210 } 212 }
213 sort.Slice(merged, func(i, j int) bool {
214 return merged[i].Pid < merged[j].Pid
215 })
216 return merged
217}
211 218
219func getTgidNodes(merged []Process) (tgidMap map[int]*tgidNode, starTgid int, rootfsPids []int) {
212 // 合并出来的进程整理为tgidNode 220 // 合并出来的进程整理为tgidNode
213 // var tgidMap map[int]*tgidNode // tgid --> tgidNode 221 tgidMap = make(map[int]*tgidNode)
214 tgidMap := make(map[int]*tgidNode)
215 findTgid = make(map[int]int) // pid --> tgid 222 findTgid = make(map[int]int) // pid --> tgid
216 var stared int 223 // var starTgid, rootFsPid int
217 stared = -1 224 starTgid = -1
225 // rootfsPid = -1
226 rootfsPids = make([]int, 0)
218 for _, val := range merged { 227 for _, val := range merged {
219 if val.Star { 228 if val.Star {
220 stared = val.Tgid 229 starTgid = val.Tgid
230 } else if val.RootFS != "" {
231 rootfsPids = append(rootfsPids, val.Pid)
221 } 232 }
222 // 登记tgid 233 // 登记tgid
223 findTgid[val.Pid] = val.Tgid 234 findTgid[val.Pid] = val.Tgid
224 // nodeval, ok := tgidMap.Load(val.Tgid)
225 nodeval, exists := tgidMap[val.Tgid] 235 nodeval, exists := tgidMap[val.Tgid]
226 if exists { 236 if exists {
227 // 直接记录 237 // 直接记录
228 // node := nodeval.(tgidNode)
229 nodeval.Threads = append(nodeval.Threads, val) 238 nodeval.Threads = append(nodeval.Threads, val)
230 nodeval.FindPid[val.Pid] = len(nodeval.Threads) - 1 239 nodeval.FindPid[val.Pid] = len(nodeval.Threads) - 1
231 // tgidMap.Store(val.Tgid, node)
232 } else { 240 } else {
233 node := tgidNode{ 241 node := tgidNode{
234 Tgid: val.Tgid, 242 Tgid: val.Tgid,
@@ -238,21 +246,19 @@ func filtPids(pRawPidData *[]Process) {
238 } 246 }
239 node.Threads = append(node.Threads, val) 247 node.Threads = append(node.Threads, val)
240 node.FindPid[val.Pid] = 0 248 node.FindPid[val.Pid] = 0
241 // tgidMap.Store(val.Tgid, node)
242 tgidMap[val.Tgid] = &node 249 tgidMap[val.Tgid] = &node
243 } 250 }
244 } 251 }
252 return tgidMap, starTgid, rootfsPids
253}
245 254
246 // 从tgid==stared开始,构建树 255func buildTree(tgidMap map[int]*tgidNode, starTgid int) {
256 // 从tgid==starTgid开始,构建树
247 helloTree = make(map[int]*tgidNode) // 在树上的tgid节点,tgid --> *tgidNode 257 helloTree = make(map[int]*tgidNode) // 在树上的tgid节点,tgid --> *tgidNode
248 var q Queue // 记录每一个整理好的结构体,bfs 258 var q Queue // 记录每一个整理好的结构体,bfs
249 visited := make(map[int]bool) // 哪些tgid已经访问过 259 visited := make(map[int]bool) // 哪些tgid已经访问过
250 260
251 // tmp, ok := tgidMap.Load(stared) 261 tmp, exists := tgidMap[starTgid]
252 // if !ok {
253 // return
254 // }
255 tmp, exists := tgidMap[stared]
256 if !exists { 262 if !exists {
257 return 263 return
258 } 264 }
@@ -262,7 +268,7 @@ func filtPids(pRawPidData *[]Process) {
262 // 因而所有添加子代tgid的行为只针对helloTree 268 // 因而所有添加子代tgid的行为只针对helloTree
263 // q不添加,直接把新的tgid对应的tgidNode入队就是了 269 // q不添加,直接把新的tgid对应的tgidNode入队就是了
264 q.Enqueue(tmp) 270 q.Enqueue(tmp)
265 visited[stared] = true 271 visited[starTgid] = true
266 for !q.IsEmpty() { 272 for !q.IsEmpty() {
267 tmp, ok := q.Dequeue() 273 tmp, ok := q.Dequeue()
268 if !ok { 274 if !ok {
@@ -276,7 +282,6 @@ func filtPids(pRawPidData *[]Process) {
276 _, exists := visited[tgid] 282 _, exists := visited[tgid]
277 if !exists { 283 if !exists {
278 // 子代里有没见过的tgid 284 // 子代里有没见过的tgid
279 // tgidNode, ok := tgidMap.Load(tgid)
280 tgidNode, exists := tgidMap[tgid] 285 tgidNode, exists := tgidMap[tgid]
281 if !exists { 286 if !exists {
282 continue 287 continue
@@ -288,31 +293,79 @@ func filtPids(pRawPidData *[]Process) {
288 } 293 }
289 } 294 }
290 } 295 }
296}
291 297
292 // TODO: 298func optimazePid(starTgid int, rootfsPids []int) {
293 // 1.√修改数据结构,使之自身即存储树结构,插入数据库后前端拿出来就能用 299 getDockerRootFs := make(map[string]string) // dockerId --> rootfs
294 // 2.还有其余优化要做,比如线程退出时间与进程推出时间,关系到后续的文件修理 300 // 首先处理一下记录有pivot_root信息的进程,防止pivot先于fork
295 // 3.根文件系统,问题很重大 301 for _, rootfsPid := range rootfsPids {
302 rootfsTgid := findTgid[rootfsPid]
303 i := helloTree[rootfsTgid].FindPid[rootfsPid]
304 rootfsProcess := &(helloTree[rootfsTgid].Threads[i])
305 if rootfsProcess.RootFS == "cwd" {
306 rootfsProcess.RootFS = rootfsProcess.Cwd
307 }
308 getDockerRootFs[rootfsProcess.DockerId] = rootfsProcess.RootFS
309 }
296 310
297 count := 0 311 count := 0
298 for _, val := range helloTree { 312 for _, val := range helloTree {
299 count++ 313 // 处理一下pid结束时间,顺便找找爹
300 fmt.Printf("==============================\ntgid: %6d, size: %6d, children: ", val.Tgid, len(val.Threads)) 314 // 结束时间是因为很多线程结束时间没获取到,默认按照进程退出时间处理
301 for _, child := range val.ChildTgid { 315 // Ppid是因为进程产生之初收到的信息写的爹一定是亲爹
302 fmt.Printf("%7d", child) 316 // 但是产生线程时候该进程很可能已作为孤儿被收养,导致线程里关于爹的记录是继父
303 } 317 for i := 0; i < len(val.Threads); i++ {
304 fmt.Printf("\n") 318 if i != 0 {
305 for _, process := range val.Threads { 319 if val.Threads[i].Tgid < val.Threads[0].Tgid {
306 fmt.Printf("%v\n", process) 320 val.Threads[i].ParentTgid = val.Threads[0].ParentTgid
321 val.Threads[i].Ppid = val.Threads[0].Ppid
322 }
323 if val.Threads[i].ExitTimestamp.IsZero() {
324 val.Threads[i].ExitCode = val.Threads[0].ExitCode
325 val.Threads[i].ExitTimestamp = val.Threads[0].ExitTimestamp
326 val.Threads[i].ExitSignal = val.Threads[0].ExitSignal
327 }
328 }
329
330 dockerId := val.Threads[i].DockerId
331 if dockerId != "" {
332 rootfs, exists := getDockerRootFs[dockerId]
333 if !exists {
334 fmt.Fprintf(os.Stderr, "Err: the docker rootfs of pid %d is not known!\n", val.Threads[i].Pid)
335 continue
336 }
337 val.Threads[i].RootFS = rootfs
338 }
307 } 339 }
308 fmt.Printf("\n\n\n") 340
341 count++
342 fmt.Printf("%v\n", *val)
309 } 343 }
310 fmt.Printf("Star: %d, res: %d\n", stared, count) 344 fmt.Printf("Star: %d, res: %d\n", starTgid, count)
345}
346
347func filtPids(pRawPidData *[]Process) {
348 /* ATTENTION: 把map/slice直接传参是危险的
349 * 传递的是指针,不会引起大的复制开销,
350 * 但是map/slice在callee func内被修改**可能**导致内存更改
351 * 而这样的内存更改对caller function来说是不可见的,看到的还是原来的东西
352 * 这里由于参数几乎都是只读不写,因而用一下
353 */
354
355 // 合并由多线程导致的重复记录,顺便按照pid升序
356 // 多线程已经取消了,但保险起见还是留着
357 merged := mergeProcess(pRawPidData)
358 // 将Process按照tgid合并
359 tgidMap, starTgid, rootfsPids := getTgidNodes(merged)
360 // 建树,helloTree
361 buildTree(tgidMap, starTgid)
362 // 对树上的进程做一些优化处理
363 optimazePid(starTgid, rootfsPids)
311} 364}
312 365
313func filtFiles(pRawFileData *[]File) { 366func filtFiles(pRawFileData *[]File) {
314 rawFileData := *pRawFileData 367 rawFileData := *pRawFileData
315 files = make([]*File, 0) 368 files = make([]File, 0)
316 369
317 // 所有文件按照特定顺序排 370 // 所有文件按照特定顺序排
318 sort.Slice(rawFileData, func(i, j int) bool { 371 sort.Slice(rawFileData, func(i, j int) bool {
@@ -342,6 +395,9 @@ func filtFiles(pRawFileData *[]File) {
342 }) 395 })
343 396
344 for _, file := range rawFileData { 397 for _, file := range rawFileData {
398 if file.FileName == "/root/test/1/../.hello.c.swp" {
399 fmt.Printf("Test\n")
400 }
345 tgid := findTgid[file.Pid] 401 tgid := findTgid[file.Pid]
346 pTgidNode, exists := helloTree[tgid] 402 pTgidNode, exists := helloTree[tgid]
347 if !exists { 403 if !exists {
@@ -354,6 +410,7 @@ func filtFiles(pRawFileData *[]File) {
354 } 410 }
355 file.CloseTimestamp = pTgidNode.Threads[index].ExitTimestamp 411 file.CloseTimestamp = pTgidNode.Threads[index].ExitTimestamp
356 } 412 }
357 files = append(files, &file) 413 file.FileName = path.Clean(file.FileName)
414 files = append(files, file)
358 } 415 }
359} 416}
diff --git a/filter/global.go b/filter/global.go
index 37af52b..bade895 100644
--- a/filter/global.go
+++ b/filter/global.go
@@ -22,6 +22,7 @@ type Process struct {
22 RootFS string `bson:"rootfs"` 22 RootFS string `bson:"rootfs"`
23 Cwd string `bson:"cwd"` 23 Cwd string `bson:"cwd"`
24 Children []int `bson:"children"` 24 Children []int `bson:"children"`
25 DockerId string `bson:"docker_id"`
25 Execve []Exec `bson:"execve"` 26 Execve []Exec `bson:"execve"`
26 ExitCode int `bson:"exit_code"` 27 ExitCode int `bson:"exit_code"`
27 ExitSignal int `bson:"exit_signal"` 28 ExitSignal int `bson:"exit_signal"`
@@ -44,26 +45,40 @@ func (p Process) String() string {
44 for i := 0; i < len(p.Args); i++ { 45 for i := 0; i < len(p.Args); i++ {
45 res += fmt.Sprintf("%s ", p.Args[i]) 46 res += fmt.Sprintf("%s ", p.Args[i])
46 } 47 }
47 res += fmt.Sprintf("\ncomm\t%s\ncwd\t%s\n", p.Comm, p.Cwd) 48 res += fmt.Sprintf("\ncomm\t%s\ncwd\t%s\nrootfs\t%s\ndocker_id\t%s\n", p.Comm, p.Cwd, p.RootFS, p.DockerId)
48 if len(p.Execve) != 0 { 49 if len(p.Execve) != 0 {
49 res += fmt.Sprintf("exec:\n") 50 res += "exec:\n"
50 for i := 0; i < len(p.Execve); i++ { 51 for i := 0; i < len(p.Execve); i++ {
51 res += fmt.Sprintf("\ttimestamp: %v\n\texecArgs:\t", p.Execve[i].Timestamp) 52 res += fmt.Sprintf("\ttimestamp: %v\n\texecArgs:\t", p.Execve[i].Timestamp)
52 for j := 0; j < len(p.Execve[i].ExecArgs); j++ { 53 for j := 0; j < len(p.Execve[i].ExecArgs); j++ {
53 res += fmt.Sprintf("%s ", p.Execve[i].ExecArgs[j]) 54 res += fmt.Sprintf("%s ", p.Execve[i].ExecArgs[j])
54 } 55 }
55 res += fmt.Sprintf("\n") 56 res += "\n"
56 } 57 }
57 } 58 }
58 res += fmt.Sprintf("children: ") 59 res += "children: "
59 for i := 0; i < len(p.Children); i++ { 60 for i := 0; i < len(p.Children); i++ {
60 res += fmt.Sprintf("%d ", p.Children[i]) 61 res += fmt.Sprintf("%d ", p.Children[i])
61 } 62 }
62 res += fmt.Sprintf("\n") 63 res += "\n"
63 res += fmt.Sprintf("exit_timestamp:\t%v\nexit_code:\t%d\nexit_signal:\t%d\n", p.ExitTimestamp, p.ExitCode, p.ExitSignal) 64 res += fmt.Sprintf("exit_timestamp:\t%v\nexit_code:\t%d\nexit_signal:\t%d\n", p.ExitTimestamp, p.ExitCode, p.ExitSignal)
64 return res 65 return res
65} 66}
66 67
68func (node tgidNode) String() string {
69 var res string
70 res += fmt.Sprintf("==============================\ntgid: %6d, size: %6d, children: ", node.Tgid, len(node.Threads))
71 for _, child := range node.ChildTgid {
72 res += fmt.Sprintf("%7d", child)
73 }
74 res += "\n"
75 for _, process := range node.Threads {
76 res += fmt.Sprintf("%v\n", process)
77 }
78 res += "\n"
79 return res
80}
81
67type File struct { 82type File struct {
68 OpenTimestamp time.Time `bson:"timestamp"` 83 OpenTimestamp time.Time `bson:"timestamp"`
69 FileName string `bson:"fileName"` 84 FileName string `bson:"fileName"`
diff --git a/listener/deal.go b/listener/deal.go
index 8225224..70c2827 100644
--- a/listener/deal.go
+++ b/listener/deal.go
@@ -79,19 +79,19 @@ func deal() {
79 79
80 switch cooked.tag { 80 switch cooked.tag {
81 case NEWPID: 81 case NEWPID:
82 go dealNewPid(cooked) 82 dealNewPid(cooked)
83 case EXECVE: 83 case EXECVE:
84 go dealExecve(cooked) 84 dealExecve(cooked)
85 case PIDEXIT: 85 case PIDEXIT:
86 go deletePid(cooked) 86 deletePid(cooked)
87 case FILEOPEN: 87 case FILEOPEN:
88 go fileOpen(cooked) 88 fileOpen(cooked)
89 case FILEWRITE: 89 case FILEWRITE:
90 go fileWrite(cooked) 90 fileWrite(cooked)
91 case FILECLOSE: 91 case FILECLOSE:
92 go fileClose(cooked) 92 fileClose(cooked)
93 case PIVOTROOT: 93 case PIVOTROOT:
94 go pivotRoot(cooked) 94 pivotRoot(cooked)
95 } 95 }
96 } 96 }
97} 97}
@@ -131,6 +131,7 @@ func dealNewPid(cooked Event) {
131 docRes[0].Cwd = cooked.cwd 131 docRes[0].Cwd = cooked.cwd
132 docRes[0].Comm = cooked.comm 132 docRes[0].Comm = cooked.comm
133 docRes[0].Args = cooked.argv 133 docRes[0].Args = cooked.argv
134 docRes[0].DockerId = cooked.cgroup
134 135
135 err := pidCol.ReplaceOne(bson.M{"pid": cooked.pid}, docRes[0]) 136 err := pidCol.ReplaceOne(bson.M{"pid": cooked.pid}, docRes[0])
136 if err != nil { 137 if err != nil {
@@ -149,6 +150,7 @@ func dealNewPid(cooked Event) {
149 Cwd: cooked.cwd, 150 Cwd: cooked.cwd,
150 Execve: make([]Exec, 0), 151 Execve: make([]Exec, 0),
151 Children: make([]int, 0), 152 Children: make([]int, 0),
153 DockerId: cooked.cgroup,
152 }) 154 })
153 if err != nil { 155 if err != nil {
154 fmt.Fprintf(os.Stderr, "Err inserting: %v\n", err) 156 fmt.Fprintf(os.Stderr, "Err inserting: %v\n", err)
diff --git a/listener/global.go b/listener/global.go
index 11b18bf..b782284 100644
--- a/listener/global.go
+++ b/listener/global.go
@@ -37,6 +37,7 @@ type Event struct {
37 argv []string 37 argv []string
38 comm string 38 comm string
39 cwd string 39 cwd string
40 cgroup string
40 exit_code int 41 exit_code int
41 exit_signal int 42 exit_signal int
42 srcPath string 43 srcPath string
@@ -67,6 +68,7 @@ type Process struct {
67 RootFS string `bson:"rootfs"` 68 RootFS string `bson:"rootfs"`
68 Cwd string `bson:"cwd"` 69 Cwd string `bson:"cwd"`
69 Children []int `bson:"children"` 70 Children []int `bson:"children"`
71 DockerId string `bson:"docker_id"`
70 Execve []Exec `bson:"execve"` 72 Execve []Exec `bson:"execve"`
71 ExitCode int `bson:"exit_code"` 73 ExitCode int `bson:"exit_code"`
72 ExitSignal int `bson:"exit_signal"` 74 ExitSignal int `bson:"exit_signal"`
diff --git a/listener/godo.go b/listener/godo.go
index 87e9446..8d82231 100644
--- a/listener/godo.go
+++ b/listener/godo.go
@@ -8,6 +8,7 @@ import (
8 "netlink" 8 "netlink"
9 "os" 9 "os"
10 "os/exec" 10 "os/exec"
11 "regexp"
11 "strings" 12 "strings"
12 "syscall" 13 "syscall"
13 "time" 14 "time"
@@ -176,4 +177,24 @@ func checkProc(pCooked *Event) {
176 fmt.Fprintf(os.Stderr, "Err: %v\n", err) 177 fmt.Fprintf(os.Stderr, "Err: %v\n", err)
177 pCooked.cwd = "" 178 pCooked.cwd = ""
178 } 179 }
180
181 fd, err = os.Open(fileName + "cgroup")
182 if err != nil {
183 fmt.Fprintf(os.Stderr, "Err: %v\n", err)
184 // cgroup记空,即没赶上
185 return
186 }
187 scanner = bufio.NewScanner(fd)
188 cgroupRegex := regexp.MustCompile(`/docker/([0-9a-f]+)$`)
189 scanner.Split(bufio.ScanLines)
190 for scanner.Scan() {
191 line := scanner.Text()
192 if cgroupRegex.MatchString(line) {
193 match := cgroupRegex.FindStringSubmatch(line)
194 pCooked.cgroup = match[1]
195 return
196 }
197 }
198 fd.Close()
199 pCooked.cgroup = ""
179} 200}