From 61809e72c524294cb07535d0e31c80a283495f80 Mon Sep 17 00:00:00 2001 From: We-unite <3205135446@qq.com> Date: Wed, 14 Aug 2024 17:28:28 +0800 Subject: Filter mainly finished, fix sth in lintener This commit I make some changes: - The filter got mainly finished. - Build a big node by the same tgid, and use the tgid node to build th tree we need by bfs. - Filt relative files, and for the files not closed, add close time stamp according to the exit time of their pids. - Put all the results into database. Besides, I enlarge the buffer size of netlink connector and channels in lintener. TODO: - the pivot_root syscall is used only by the initial shell(`docker start` makes a shell), other processes of shell change their root by changing namespace(mnt ns?), using setns syscall. So fix it. - It's time to fix the netlink connector socket. --- filter/filter.go | 336 +++++++++++++++++++++++++++++++++++++++---------------- filter/global.go | 20 +++- listener/godo.go | 11 +- 3 files changed, 265 insertions(+), 102 deletions(-) diff --git a/filter/filter.go b/filter/filter.go index 2a774a3..b2341ec 100644 --- a/filter/filter.go +++ b/filter/filter.go @@ -5,7 +5,7 @@ import ( "fmt" "log" "os" - "sync" + "sort" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" @@ -13,15 +13,22 @@ import ( ) const ( - oldDBName = "test" - oldPidColName = "pids" + oldDBName = "test" + oldPidColName = "pids" + oldFdColName = "fds" + oldFileColName = "files" + + newDBName = "cooked" + newPidColName = "tgids" + newFileColName = "files" ) -type treeNode struct { - Tgid int - Threads []Process - Children []int -} +// 进程树信息 +var findTgid map[int]int +var helloTree map[int]*tgidNode + +// 文件信息 +var files []*File func main() { // 连接到MongoDB @@ -31,110 +38,81 @@ func main() { } defer client.Disconnect(context.TODO()) - // 选择数据库和集合 - db := client.Database(oldDBName) - collection := db.Collection(oldPidColName) + oldDB := client.Database(oldDBName) - // 提取所有数据 - var res []Process + /* + * Step 1: 进程数据处理 + */ + oldPidCol := oldDB.Collection(oldPidColName) - cursor, err := collection.Find(context.Background(), bson.M{}) + // 数据提取 + var rawPidData []Process + cursor, err := oldPidCol.Find(context.Background(), bson.M{}) if err != nil { fmt.Fprintf(os.Stderr, "Err: %v\n", err) + return } - if err := cursor.All(context.Background(), &res); err != nil { + err = cursor.All(context.Background(), &rawPidData) + if err != nil { fmt.Fprintf(os.Stderr, "Err All: %v\n", err) + return } + cursor.Close(context.Background()) - var merged sync.Map - for _, process := range res { - tmp, ok := merged.Load(process.Pid) - if ok { - // 证明重复了,要合并 - tmp := ProMerge(tmp.(Process), process) - merged.Store(process.Pid, tmp) - } else { - // 没有,直接插入 - merged.Store(process.Pid, process) - } - } + filtPids(&rawPidData) - var treeMap sync.Map - findTgid := make(map[int]int) - var stared int - merged.Range(func(key, val interface{}) bool { - tmp := val.(Process) - if tmp.Star { - stared = tmp.Tgid - } - // 登记tgid - findTgid[tmp.Pid] = tmp.Tgid - nodeTmp, ok := treeMap.Load(tmp.Tgid) - if ok { - // 直接记录 - node := nodeTmp.(treeNode) - node.Threads = append(node.Threads, tmp) - node.Children = append(node.Children, tmp.Children...) - treeMap.Store(tmp.Tgid, node) - } else { - node := treeNode{ - Tgid: tmp.Tgid, - Threads: make([]Process, 0), - Children: make([]int, 0), - } - node.Threads = append(node.Threads, tmp) - node.Children = append(node.Children, tmp.Children...) - treeMap.Store(tmp.Tgid, node) - } - return true - }) + /* + * Step 2: 文件数据处理 + * - 将已经关闭的和未关闭的同等看待 + * - 未关闭的将关闭时间修改为对应进程退出时间 + * - 值得注意的是,同一进程各线程共享文件描述符……需要处理吗? + */ + // 提取files和fds里的数据 + // TODO:是否可以只筛选被写过的记录? + var rawFileData []File + oldFileCol := oldDB.Collection(oldFileColName) + cursor, err = oldFileCol.Find(context.Background(), bson.M{}) + if err != nil { + fmt.Fprintf(os.Stderr, "Err: %v\n", err) + return + } + err = cursor.All(context.Background(), &rawFileData) + if err != nil { + fmt.Fprintf(os.Stderr, "Err All: %v\n", err) + return + } + cursor.Close(context.Background()) - // 从tgid==stared开始,构建树 - var helloTree sync.Map // 在树上的tgid节点 - var q Queue // 记录每一个整理好的结构体,bfs - visited := make(map[int]bool) - visited[stared] = true - tmp, ok := treeMap.Load(stared) - if !ok { + var rawFdData []File + oldFdCol := oldDB.Collection(oldFdColName) + cursor, err = oldFdCol.Find(context.Background(), bson.M{}) + if err != nil { + fmt.Fprintf(os.Stderr, "Err: %v\n", err) return } + err = cursor.All(context.Background(), &rawFdData) + if err != nil { + fmt.Fprintf(os.Stderr, "Err All: %v\n", err) + return + } + cursor.Close(context.Background()) - q.Enqueue(tmp) - helloTree.Store(stared, tmp) - for !q.IsEmpty() { - tmp, ok := q.Dequeue() - if !ok { - continue - } - node := tmp.(treeNode) - for i := 0; i < len(node.Children); i++ { - tgid := findTgid[node.Children[i]] - _, exists := visited[tgid] - if !exists { - visited[tgid] = true - tgidNode, ok := treeMap.Load(tgid) - if !ok { - continue - } - helloTree.Store(tgid, tgidNode) - q.Enqueue(tgidNode) - } - } + // 合并,处理 + rawFileData = append(rawFileData, rawFdData...) + filtFiles(&rawFileData) + + // 扔回数据库 + newDB := client.Database(newDBName) + newDB.Drop(context.Background()) + newPidCol := newDB.Collection(newPidColName) + for _, pTgidNode := range helloTree { + newPidCol.InsertOne(context.Background(), *pTgidNode) } - // TODO: - // 1.修改数据结构,使之自身即存储树结构,插入数据库后前端拿出来就能用 - // 2.还有其余优化要做,比如线程退出时间与进程推出时间 - - // count := 0 - // helloTree.Range(func(key, val interface{}) bool { - // count++ - // fmt.Printf("tgid: %d\n", val.(treeNode).Tgid) - // return true - // }) - // fmt.Printf("Star: %d, res: %d\n", stared, count) - - // 接下来处理文件 + newFileCol := newDB.Collection(newFileColName) + for _, pFile := range files { + newFileCol.InsertOne(context.Background(), *pFile) + } } func ProMerge(a, b Process) (res Process) { @@ -215,3 +193,167 @@ func ProMerge(a, b Process) (res Process) { return res } + +func filtPids(pRawPidData *[]Process) { + rawPidData := *pRawPidData + // 合并由多线程导致的重复记录 + merged := make(map[int]Process) // pid --> Process + for _, process := range rawPidData { + tmp, exists := merged[process.Pid] + if exists { + // 证明重复了,要合并 + merged[process.Pid] = ProMerge(tmp, process) + } else { + // 没有,直接插入 + merged[process.Pid] = process + } + } + + // 合并出来的进程整理为tgidNode + // var tgidMap map[int]*tgidNode // tgid --> tgidNode + tgidMap := make(map[int]*tgidNode) + findTgid = make(map[int]int) // pid --> tgid + var stared int + stared = -1 + for _, val := range merged { + if val.Star { + stared = val.Tgid + } + // 登记tgid + findTgid[val.Pid] = val.Tgid + // nodeval, ok := tgidMap.Load(val.Tgid) + nodeval, exists := tgidMap[val.Tgid] + if exists { + // 直接记录 + // node := nodeval.(tgidNode) + nodeval.Threads = append(nodeval.Threads, val) + nodeval.FindPid[val.Pid] = len(nodeval.Threads) - 1 + // tgidMap.Store(val.Tgid, node) + } else { + node := tgidNode{ + Tgid: val.Tgid, + FindPid: make(map[int]int), + Threads: make([]Process, 0), + ChildTgid: make([]int, 0), + } + node.Threads = append(node.Threads, val) + node.FindPid[val.Pid] = 0 + // tgidMap.Store(val.Tgid, node) + tgidMap[val.Tgid] = &node + } + } + + // 从tgid==stared开始,构建树 + helloTree = make(map[int]*tgidNode) // 在树上的tgid节点,tgid --> *tgidNode + var q Queue // 记录每一个整理好的结构体,bfs + visited := make(map[int]bool) // 哪些tgid已经访问过 + + // tmp, ok := tgidMap.Load(stared) + // if !ok { + // return + // } + tmp, exists := tgidMap[stared] + if !exists { + return + } + + // helloTree负责在遍历到该节点时记录 + // 队列仅负责搞明白哪些节点在树上 + // 因而所有添加子代tgid的行为只针对helloTree + // q不添加,直接把新的tgid对应的tgidNode入队就是了 + q.Enqueue(tmp) + visited[stared] = true + for !q.IsEmpty() { + tmp, ok := q.Dequeue() + if !ok { + continue + } + node := tmp.(*tgidNode) // 队列里的一个节点,这里必须重新申请node + helloTree[node.Tgid] = node + for i := 0; i < len(node.Threads); i++ { + for j := 0; j < len(node.Threads[i].Children); j++ { + tgid := findTgid[node.Threads[i].Children[j]] + _, exists := visited[tgid] + if !exists { + // 子代里有没见过的tgid + // tgidNode, ok := tgidMap.Load(tgid) + tgidNode, exists := tgidMap[tgid] + if !exists { + continue + } + helloTree[node.Tgid].ChildTgid = append(helloTree[node.Tgid].ChildTgid, tgid) + q.Enqueue(tgidNode) + visited[tgid] = true + } + } + } + } + + // TODO: + // 1.√修改数据结构,使之自身即存储树结构,插入数据库后前端拿出来就能用 + // 2.还有其余优化要做,比如线程退出时间与进程推出时间,关系到后续的文件修理 + // 3.根文件系统,问题很重大 + + count := 0 + for _, val := range helloTree { + count++ + fmt.Printf("==============================\ntgid: %6d, size: %6d, children: ", val.Tgid, len(val.Threads)) + for _, child := range val.ChildTgid { + fmt.Printf("%7d", child) + } + fmt.Printf("\n") + for _, process := range val.Threads { + fmt.Printf("%v\n", process) + } + fmt.Printf("\n\n\n") + } + fmt.Printf("Star: %d, res: %d\n", stared, count) +} + +func filtFiles(pRawFileData *[]File) { + rawFileData := *pRawFileData + files = make([]*File, 0) + + // 所有文件按照特定顺序排 + sort.Slice(rawFileData, func(i, j int) bool { + pi := &rawFileData[i] + pj := &rawFileData[j] + + if pi.FileName < pj.FileName { + return true + } else if pi.FileName > pj.FileName { + return false + } + if pi.Pid < pj.Pid { + return true + } else if pi.Pid > pj.Pid { + return false + } + if pi.Fd < pj.Fd { + return true + } else if pi.Fd > pj.Fd { + return false + } + if pi.OpenTimestamp.Before(pj.OpenTimestamp) { + return true + } else { + return false + } + }) + + for _, file := range rawFileData { + tgid := findTgid[file.Pid] + pTgidNode, exists := helloTree[tgid] + if !exists { + continue + } + if file.CloseTimestamp.IsZero() { + index, exists := pTgidNode.FindPid[file.Pid] + if !exists || index < 0 || index >= len(pTgidNode.Threads) { + continue + } + file.CloseTimestamp = pTgidNode.Threads[index].ExitTimestamp + } + files = append(files, &file) + } +} diff --git a/filter/global.go b/filter/global.go index 45706d4..37af52b 100644 --- a/filter/global.go +++ b/filter/global.go @@ -28,9 +28,16 @@ type Process struct { ExitTimestamp time.Time `bson:"exit_timestamp"` } +type tgidNode struct { + Tgid int `bson:"tgid"` + FindPid map[int]int `bson:"findPid"` + Threads []Process `bson:"threads"` + ChildTgid []int `bson:"child_tgid"` +} + func (p Process) String() string { var res string - res = "" + res = "---------------------\n" res += fmt.Sprintf("timestamp\t%v\n", p.StartTimestamp) res += fmt.Sprintf("ppid\t%d\nparentTgid\t%d\n", p.Ppid, p.ParentTgid) res += fmt.Sprintf("pid\t%d\ntgid\t%d\nargs: ", p.Pid, p.Tgid) @@ -53,9 +60,20 @@ func (p Process) String() string { res += fmt.Sprintf("%d ", p.Children[i]) } res += fmt.Sprintf("\n") + res += fmt.Sprintf("exit_timestamp:\t%v\nexit_code:\t%d\nexit_signal:\t%d\n", p.ExitTimestamp, p.ExitCode, p.ExitSignal) return res } +type File struct { + OpenTimestamp time.Time `bson:"timestamp"` + FileName string `bson:"fileName"` + Pid int `bson:"pid"` + Fd int `bson:"fd"` + Flags [4]uint64 `bson:"flags"` + Written []time.Time `bson:"written"` + CloseTimestamp time.Time `bson:"close_timestamp"` +} + // Queue 定义一个队列结构体 type Queue struct { items []interface{} diff --git a/listener/godo.go b/listener/godo.go index efe9585..87e9446 100644 --- a/listener/godo.go +++ b/listener/godo.go @@ -24,6 +24,8 @@ var ( receiveOnly = fs.Bool("ro", false, "receive only using multicast, requires kernel 3.16+") ) +const bufferPages = 100 + func main() { // 检查用户身份,并添加auditd规则,监听所有syscall if os.Geteuid() != 0 { @@ -44,7 +46,7 @@ func main() { var auditCmd *exec.Cmd pidSyscall := []string{"execve", "pivot_root"} - // // 设置监听规则 + // 设置监听规则 for i := 0; i < len(pidSyscall); i++ { auditCmd = exec.Command("auditctl", "-a", "exit,always", "-F", "arch=b64", "-S", pidSyscall[i]) auditCmd.Run() @@ -77,8 +79,9 @@ func main() { func coroutine(client *libaudit.AuditClient) { // 各协程至此开始 - rawChan = make(chan interface{}, 65536) - cookedChan = make(chan Event, 65536) + bufferSize := bufferPages * syscall.Getpagesize() + rawChan = make(chan interface{}, bufferSize) + cookedChan = make(chan Event, bufferSize) wg.Add(1) go procWatch() @@ -102,7 +105,7 @@ func procWatch() error { } defer ns.Close() for { - res, err := ns.Receive(20) + res, err := ns.Receive(bufferPages) if err != nil { fmt.Fprintf(os.Stderr, "Error recv: %v\n", err) continue -- cgit v1.2.3-70-g09d2