package main import ( "context" "fmt" "log" "os" "sort" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) const ( oldDBName = "test" oldPidColName = "pids" oldFdColName = "fds" oldFileColName = "files" newDBName = "cooked" newPidColName = "tgids" newFileColName = "files" ) // 进程树信息 var findTgid map[int]int var helloTree map[int]*tgidNode // 文件信息 var files []*File func main() { // 连接到MongoDB client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI("mongodb://localhost:27017")) if err != nil { log.Fatal(err) } defer client.Disconnect(context.TODO()) oldDB := client.Database(oldDBName) /* * Step 1: 进程数据处理 */ oldPidCol := oldDB.Collection(oldPidColName) // 数据提取 var rawPidData []Process cursor, err := oldPidCol.Find(context.Background(), bson.M{}) if err != nil { fmt.Fprintf(os.Stderr, "Err: %v\n", err) return } err = cursor.All(context.Background(), &rawPidData) if err != nil { fmt.Fprintf(os.Stderr, "Err All: %v\n", err) return } cursor.Close(context.Background()) filtPids(&rawPidData) /* * Step 2: 文件数据处理 * - 将已经关闭的和未关闭的同等看待 * - 未关闭的将关闭时间修改为对应进程退出时间 * - 值得注意的是,同一进程各线程共享文件描述符……需要处理吗? */ // 提取files和fds里的数据 // TODO:是否可以只筛选被写过的记录? var rawFileData []File oldFileCol := oldDB.Collection(oldFileColName) cursor, err = oldFileCol.Find(context.Background(), bson.M{}) if err != nil { fmt.Fprintf(os.Stderr, "Err: %v\n", err) return } err = cursor.All(context.Background(), &rawFileData) if err != nil { fmt.Fprintf(os.Stderr, "Err All: %v\n", err) return } cursor.Close(context.Background()) var rawFdData []File oldFdCol := oldDB.Collection(oldFdColName) cursor, err = oldFdCol.Find(context.Background(), bson.M{}) if err != nil { fmt.Fprintf(os.Stderr, "Err: %v\n", err) return } err = cursor.All(context.Background(), &rawFdData) if err != nil { fmt.Fprintf(os.Stderr, "Err All: %v\n", err) return } cursor.Close(context.Background()) // 合并,处理 rawFileData = append(rawFileData, rawFdData...) filtFiles(&rawFileData) // 扔回数据库 newDB := client.Database(newDBName) newDB.Drop(context.Background()) newPidCol := newDB.Collection(newPidColName) for _, pTgidNode := range helloTree { newPidCol.InsertOne(context.Background(), *pTgidNode) } newFileCol := newDB.Collection(newFileColName) for _, pFile := range files { newFileCol.InsertOne(context.Background(), *pFile) } } func ProMerge(a, b Process) (res Process) { // 合并过程中会遇到什么问题? res.Star = false if a.StartTimestamp.IsZero() { res.StartTimestamp = b.StartTimestamp } else if b.StartTimestamp.IsZero() { res.StartTimestamp = a.StartTimestamp } else if a.StartTimestamp.Before(b.StartTimestamp) { res.StartTimestamp = a.StartTimestamp } else { res.StartTimestamp = b.StartTimestamp } res.Ppid = a.Ppid if a.ParentTgid == 0 { res.ParentTgid = b.ParentTgid } else { res.ParentTgid = a.ParentTgid } res.Pid = a.Pid if a.Tgid == 0 { res.Tgid = b.Tgid } else { res.Tgid = a.Tgid } if len(a.Args) == 0 { res.Args = b.Args } else { res.Args = a.Args } if a.Comm == "" { res.Comm = b.Comm } else { res.Comm = a.Comm } if a.RootFS == "" { res.RootFS = b.RootFS } else { res.RootFS = a.RootFS } if a.Cwd == "" { res.Cwd = b.Cwd } else { res.Cwd = a.Cwd } res.Execve = append(a.Execve, b.Execve...) res.Children = append(a.Children, b.Children...) var flag bool // 真a假b if a.ExitTimestamp.IsZero() { flag = false } else if b.ExitTimestamp.IsZero() { flag = true } else if a.ExitTimestamp.Before(b.ExitTimestamp) { flag = true } else { flag = false } if flag { res.ExitCode = a.ExitCode res.ExitSignal = a.ExitSignal res.ExitTimestamp = a.ExitTimestamp } else { res.ExitCode = b.ExitCode res.ExitSignal = b.ExitSignal res.ExitTimestamp = b.ExitTimestamp } return res } func filtPids(pRawPidData *[]Process) { rawPidData := *pRawPidData // 合并由多线程导致的重复记录 merged := make(map[int]Process) // pid --> Process for _, process := range rawPidData { tmp, exists := merged[process.Pid] if exists { // 证明重复了,要合并 merged[process.Pid] = ProMerge(tmp, process) } else { // 没有,直接插入 merged[process.Pid] = process } } // 合并出来的进程整理为tgidNode // var tgidMap map[int]*tgidNode // tgid --> tgidNode tgidMap := make(map[int]*tgidNode) findTgid = make(map[int]int) // pid --> tgid var stared int stared = -1 for _, val := range merged { if val.Star { stared = val.Tgid } // 登记tgid findTgid[val.Pid] = val.Tgid // nodeval, ok := tgidMap.Load(val.Tgid) nodeval, exists := tgidMap[val.Tgid] if exists { // 直接记录 // node := nodeval.(tgidNode) nodeval.Threads = append(nodeval.Threads, val) nodeval.FindPid[val.Pid] = len(nodeval.Threads) - 1 // tgidMap.Store(val.Tgid, node) } else { node := tgidNode{ Tgid: val.Tgid, FindPid: make(map[int]int), Threads: make([]Process, 0), ChildTgid: make([]int, 0), } node.Threads = append(node.Threads, val) node.FindPid[val.Pid] = 0 // tgidMap.Store(val.Tgid, node) tgidMap[val.Tgid] = &node } } // 从tgid==stared开始,构建树 helloTree = make(map[int]*tgidNode) // 在树上的tgid节点,tgid --> *tgidNode var q Queue // 记录每一个整理好的结构体,bfs visited := make(map[int]bool) // 哪些tgid已经访问过 // tmp, ok := tgidMap.Load(stared) // if !ok { // return // } tmp, exists := tgidMap[stared] if !exists { return } // helloTree负责在遍历到该节点时记录 // 队列仅负责搞明白哪些节点在树上 // 因而所有添加子代tgid的行为只针对helloTree // q不添加,直接把新的tgid对应的tgidNode入队就是了 q.Enqueue(tmp) visited[stared] = true for !q.IsEmpty() { tmp, ok := q.Dequeue() if !ok { continue } node := tmp.(*tgidNode) // 队列里的一个节点,这里必须重新申请node helloTree[node.Tgid] = node for i := 0; i < len(node.Threads); i++ { for j := 0; j < len(node.Threads[i].Children); j++ { tgid := findTgid[node.Threads[i].Children[j]] _, exists := visited[tgid] if !exists { // 子代里有没见过的tgid // tgidNode, ok := tgidMap.Load(tgid) tgidNode, exists := tgidMap[tgid] if !exists { continue } helloTree[node.Tgid].ChildTgid = append(helloTree[node.Tgid].ChildTgid, tgid) q.Enqueue(tgidNode) visited[tgid] = true } } } } // TODO: // 1.√修改数据结构,使之自身即存储树结构,插入数据库后前端拿出来就能用 // 2.还有其余优化要做,比如线程退出时间与进程推出时间,关系到后续的文件修理 // 3.根文件系统,问题很重大 count := 0 for _, val := range helloTree { count++ fmt.Printf("==============================\ntgid: %6d, size: %6d, children: ", val.Tgid, len(val.Threads)) for _, child := range val.ChildTgid { fmt.Printf("%7d", child) } fmt.Printf("\n") for _, process := range val.Threads { fmt.Printf("%v\n", process) } fmt.Printf("\n\n\n") } fmt.Printf("Star: %d, res: %d\n", stared, count) } func filtFiles(pRawFileData *[]File) { rawFileData := *pRawFileData files = make([]*File, 0) // 所有文件按照特定顺序排 sort.Slice(rawFileData, func(i, j int) bool { pi := &rawFileData[i] pj := &rawFileData[j] if pi.FileName < pj.FileName { return true } else if pi.FileName > pj.FileName { return false } if pi.Pid < pj.Pid { return true } else if pi.Pid > pj.Pid { return false } if pi.Fd < pj.Fd { return true } else if pi.Fd > pj.Fd { return false } if pi.OpenTimestamp.Before(pj.OpenTimestamp) { return true } else { return false } }) for _, file := range rawFileData { tgid := findTgid[file.Pid] pTgidNode, exists := helloTree[tgid] if !exists { continue } if file.CloseTimestamp.IsZero() { index, exists := pTgidNode.FindPid[file.Pid] if !exists || index < 0 || index >= len(pTgidNode.Threads) { continue } file.CloseTimestamp = pTgidNode.Threads[index].ExitTimestamp } files = append(files, &file) } }