package main import ( "context" "fmt" "log" "os" "path" "sort" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) const ( oldDBName = "test" oldPidColName = "pids" oldFdColName = "fds" oldFileColName = "files" newDBName = "cooked" newPidColName = "tgids" newFileColName = "files" ) // 进程树信息 var findTgid map[int]int var helloTree map[int]*tgidNode // 文件信息 var files []File func main() { // 连接到MongoDB client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI("mongodb://localhost:27017")) if err != nil { log.Fatal(err) } defer client.Disconnect(context.TODO()) oldDB := client.Database(oldDBName) /* * Step 1: 进程数据处理 */ oldPidCol := oldDB.Collection(oldPidColName) // 数据提取 var rawPidData []Process cursor, err := oldPidCol.Find(context.Background(), bson.M{}) if err != nil { fmt.Fprintf(os.Stderr, "Err: %v\n", err) return } err = cursor.All(context.Background(), &rawPidData) if err != nil { fmt.Fprintf(os.Stderr, "Err All: %v\n", err) return } cursor.Close(context.Background()) filtPids(&rawPidData) /* * Step 2: 文件数据处理 * - 将已经关闭的和未关闭的同等看待 * - 未关闭的将关闭时间修改为对应进程退出时间 * - 值得注意的是,同一进程各线程共享文件描述符……需要处理吗? */ // 提取files和fds里的数据 // TODO:是否可以只筛选被写过的记录? var rawFileData []File oldFileCol := oldDB.Collection(oldFileColName) cursor, err = oldFileCol.Find(context.Background(), bson.M{}) if err != nil { fmt.Fprintf(os.Stderr, "Err: %v\n", err) return } err = cursor.All(context.Background(), &rawFileData) if err != nil { fmt.Fprintf(os.Stderr, "Err All: %v\n", err) return } cursor.Close(context.Background()) var rawFdData []File oldFdCol := oldDB.Collection(oldFdColName) cursor, err = oldFdCol.Find(context.Background(), bson.M{}) if err != nil { fmt.Fprintf(os.Stderr, "Err: %v\n", err) return } err = cursor.All(context.Background(), &rawFdData) if err != nil { fmt.Fprintf(os.Stderr, "Err All: %v\n", err) return } cursor.Close(context.Background()) // 合并,处理 rawFileData = append(rawFileData, rawFdData...) filtFiles(&rawFileData) // 扔回数据库 newDB := client.Database(newDBName) newDB.Drop(context.Background()) newPidCol := newDB.Collection(newPidColName) for _, pTgidNode := range helloTree { newPidCol.InsertOne(context.Background(), *pTgidNode) } newFileCol := newDB.Collection(newFileColName) for _, file := range files { newFileCol.InsertOne(context.Background(), file) } } func ProMerge(a, b Process) (res Process) { // 合并过程中会遇到什么问题? res.Star = false if a.StartTimestamp.IsZero() { res.StartTimestamp = b.StartTimestamp } else if b.StartTimestamp.IsZero() { res.StartTimestamp = a.StartTimestamp } else if a.StartTimestamp.Before(b.StartTimestamp) { res.StartTimestamp = a.StartTimestamp } else { res.StartTimestamp = b.StartTimestamp } res.Ppid = a.Ppid if a.ParentTgid == 0 { res.ParentTgid = b.ParentTgid } else { res.ParentTgid = a.ParentTgid } res.Pid = a.Pid if a.Tgid == 0 { res.Tgid = b.Tgid } else { res.Tgid = a.Tgid } if len(a.Args) == 0 { res.Args = b.Args } else { res.Args = a.Args } if a.Comm == "" { res.Comm = b.Comm } else { res.Comm = a.Comm } if a.RootFS == "" { res.RootFS = b.RootFS } else { res.RootFS = a.RootFS } if a.Cwd == "" { res.Cwd = b.Cwd } else { res.Cwd = a.Cwd } res.Execve = append(a.Execve, b.Execve...) res.Children = append(a.Children, b.Children...) var flag bool // 真a假b if a.ExitTimestamp.IsZero() { flag = false } else if b.ExitTimestamp.IsZero() { flag = true } else if a.ExitTimestamp.Before(b.ExitTimestamp) { flag = true } else { flag = false } if flag { res.ExitCode = a.ExitCode res.ExitSignal = a.ExitSignal res.ExitTimestamp = a.ExitTimestamp } else { res.ExitCode = b.ExitCode res.ExitSignal = b.ExitSignal res.ExitTimestamp = b.ExitTimestamp } return res } func mergeProcess(pRawPidData *[]Process) (merged []Process) { rawPidData := *pRawPidData // 合并由多线程导致的重复记录,顺便按照pid升序 index := make(map[int]int) for _, process := range rawPidData { i, exists := index[process.Pid] if exists { // 已存在,合并 merged[i] = ProMerge(merged[i], process) } else { // 不存在,直接添加 merged = append(merged, process) index[process.Pid] = len(merged) - 1 } } sort.Slice(merged, func(i, j int) bool { return merged[i].Pid < merged[j].Pid }) return merged } func getTgidNodes(merged []Process) (tgidMap map[int]*tgidNode, starTgid int, rootfsPids []int) { // 合并出来的进程整理为tgidNode tgidMap = make(map[int]*tgidNode) findTgid = make(map[int]int) // pid --> tgid // var starTgid, rootFsPid int starTgid = -1 // rootfsPid = -1 rootfsPids = make([]int, 0) for _, val := range merged { if val.Star { starTgid = val.Tgid } else if val.RootFS != "" { rootfsPids = append(rootfsPids, val.Pid) } // 登记tgid findTgid[val.Pid] = val.Tgid nodeval, exists := tgidMap[val.Tgid] if exists { // 直接记录 nodeval.Threads = append(nodeval.Threads, val) nodeval.FindPid[val.Pid] = len(nodeval.Threads) - 1 } else { node := tgidNode{ Tgid: val.Tgid, FindPid: make(map[int]int), Threads: make([]Process, 0), ChildTgid: make([]int, 0), } node.Threads = append(node.Threads, val) node.FindPid[val.Pid] = 0 tgidMap[val.Tgid] = &node } } return tgidMap, starTgid, rootfsPids } func buildTree(tgidMap map[int]*tgidNode, starTgid int) { // 从tgid==starTgid开始,构建树 helloTree = make(map[int]*tgidNode) // 在树上的tgid节点,tgid --> *tgidNode var q Queue // 记录每一个整理好的结构体,bfs visited := make(map[int]bool) // 哪些tgid已经访问过 tmp, exists := tgidMap[starTgid] if !exists { return } // helloTree负责在遍历到该节点时记录 // 队列仅负责搞明白哪些节点在树上 // 因而所有添加子代tgid的行为只针对helloTree // q不添加,直接把新的tgid对应的tgidNode入队就是了 q.Enqueue(tmp) visited[starTgid] = true for !q.IsEmpty() { tmp, ok := q.Dequeue() if !ok { continue } node := tmp.(*tgidNode) // 队列里的一个节点,这里必须重新申请node helloTree[node.Tgid] = node for i := 0; i < len(node.Threads); i++ { for j := 0; j < len(node.Threads[i].Children); j++ { tgid := findTgid[node.Threads[i].Children[j]] _, exists := visited[tgid] if !exists { // 子代里有没见过的tgid tgidNode, exists := tgidMap[tgid] if !exists { continue } helloTree[node.Tgid].ChildTgid = append(helloTree[node.Tgid].ChildTgid, tgid) q.Enqueue(tgidNode) visited[tgid] = true } } } } } func optimazePid(starTgid int, rootfsPids []int) { getDockerRootFs := make(map[string]string) // dockerId --> rootfs // 首先处理一下记录有pivot_root信息的进程,防止pivot先于fork for _, rootfsPid := range rootfsPids { rootfsTgid := findTgid[rootfsPid] i := helloTree[rootfsTgid].FindPid[rootfsPid] rootfsProcess := &(helloTree[rootfsTgid].Threads[i]) if rootfsProcess.RootFS == "cwd" { rootfsProcess.RootFS = rootfsProcess.Cwd } getDockerRootFs[rootfsProcess.DockerId] = rootfsProcess.RootFS } count := 0 for _, val := range helloTree { // 处理一下pid结束时间,顺便找找爹 // 结束时间是因为很多线程结束时间没获取到,默认按照进程退出时间处理 // Ppid是因为进程产生之初收到的信息写的爹一定是亲爹 // 但是产生线程时候该进程很可能已作为孤儿被收养,导致线程里关于爹的记录是继父 for i := 0; i < len(val.Threads); i++ { if i != 0 { if val.Threads[i].Tgid < val.Threads[0].Tgid { val.Threads[i].ParentTgid = val.Threads[0].ParentTgid val.Threads[i].Ppid = val.Threads[0].Ppid } if val.Threads[i].ExitTimestamp.IsZero() { val.Threads[i].ExitCode = val.Threads[0].ExitCode val.Threads[i].ExitTimestamp = val.Threads[0].ExitTimestamp val.Threads[i].ExitSignal = val.Threads[0].ExitSignal } } dockerId := val.Threads[i].DockerId if dockerId != "" { rootfs, exists := getDockerRootFs[dockerId] if !exists { fmt.Fprintf(os.Stderr, "Err: the docker rootfs of pid %d is not known!\n", val.Threads[i].Pid) continue } val.Threads[i].RootFS = rootfs } } count++ fmt.Printf("%v\n", *val) } fmt.Printf("Star: %d, res: %d\n", starTgid, count) } func filtPids(pRawPidData *[]Process) { /* ATTENTION: 把map/slice直接传参是危险的 * 传递的是指针,不会引起大的复制开销, * 但是map/slice在callee func内被修改**可能**导致内存更改 * 而这样的内存更改对caller function来说是不可见的,看到的还是原来的东西 * 这里由于参数几乎都是只读不写,因而用一下 */ // 合并由多线程导致的重复记录,顺便按照pid升序 // 多线程已经取消了,但保险起见还是留着 merged := mergeProcess(pRawPidData) // 将Process按照tgid合并 tgidMap, starTgid, rootfsPids := getTgidNodes(merged) // 建树,helloTree buildTree(tgidMap, starTgid) // 对树上的进程做一些优化处理 optimazePid(starTgid, rootfsPids) } func filtFiles(pRawFileData *[]File) { rawFileData := *pRawFileData files = make([]File, 0) // 所有文件按照特定顺序排 sort.Slice(rawFileData, func(i, j int) bool { pi := &rawFileData[i] pj := &rawFileData[j] if pi.FileName < pj.FileName { return true } else if pi.FileName > pj.FileName { return false } if pi.Pid < pj.Pid { return true } else if pi.Pid > pj.Pid { return false } if pi.Fd < pj.Fd { return true } else if pi.Fd > pj.Fd { return false } if pi.OpenTimestamp.Before(pj.OpenTimestamp) { return true } else { return false } }) for _, file := range rawFileData { if file.FileName == "/root/test/1/../.hello.c.swp" { fmt.Printf("Test\n") } tgid := findTgid[file.Pid] pTgidNode, exists := helloTree[tgid] if !exists { continue } if file.CloseTimestamp.IsZero() { index, exists := pTgidNode.FindPid[file.Pid] if !exists || index < 0 || index >= len(pTgidNode.Threads) { continue } file.CloseTimestamp = pTgidNode.Threads[index].ExitTimestamp } file.FileName = path.Clean(file.FileName) files = append(files, file) } }