package main import ( "fmt" "os" "strconv" "syscall" "time" "go.mongodb.org/mongo-driver/bson" ) const ( dbName string = "test" pidColName string = "pids" fdColName string = "fds" fileColName string = "files" ) var pidCol, fdCol, fileCol mongoClient func initPidCol() (err error) { // TODO: 这里是否需要补全一下进程信息? dirs, err := os.ReadDir(fmt.Sprintf("/proc/%d/task", containerdPid)) if err != nil { return err } for _, file := range dirs { pid, _ := strconv.Atoi(file.Name()) process := Process{ Ppid: 1, ParentTgid: 1, Pid: pid, Tgid: containerdPid, Cwd: "/", Children: make([]int, 0), Execve: make([]Exec, 0), Args: make([]string, 0), } if pid == containerdPid { process.Star = true } err = pidCol.InsertOne(process) } return nil } func deal() { defer wg.Done() var cooked Event var ok bool var err error if err = pidCol.init(dbName, pidColName); err != nil { fmt.Fprintf(os.Stderr, "Error while initing the mongodb: %v\n", err) return } if err = initPidCol(); err != nil { fmt.Fprintf(os.Stderr, "Err while initing pidcol: %v\n", err) } if err = fdCol.init(dbName, fdColName); err != nil { fmt.Fprintf(os.Stderr, "Error while initing the mongodb: %v\n", err) return } if err = fileCol.init(dbName, fileColName); err != nil { fmt.Fprintf(os.Stderr, "Error while initing the mongodb: %v\n", err) } defer pidCol.Disconnect() defer fdCol.Disconnect() defer fileCol.Disconnect() for { cooked, ok = <-cookedChan if !ok { break } switch cooked.tag { case NEWPID: go dealNewPid(cooked) case EXECVE: go dealExecve(cooked) case PIDEXIT: go deletePid(cooked) case FILEOPEN: go fileOpen(cooked) case FILEWRITE: go fileWrite(cooked) case FILECLOSE: go fileClose(cooked) case PIVOTROOT: go pivotRoot(cooked) } } } func deletePid(cooked Event) { // 在这套逻辑里,孩子是不需要收容的,因为我们根本就不看ppid来工作 // 父节点那里也不需要销户 // 理论上这里需要关闭所有文件描述符,但为了处理效率,留给后续流程 err := pidCol.UpdateOne(bson.M{"pid": cooked.pid}, bson.M{ "$set": bson.M{ "exit_timestamp": cooked.timestamp, "exit_code": cooked.exit_code, "exit_signal": cooked.exit_signal, }, }) if err != nil { fmt.Fprintf(os.Stderr, "Err updating: %v\n", err) } } func dealNewPid(cooked Event) { // 自身是否已经记录 var docRes []Process err := pidCol.Finddoc(bson.M{"pid": cooked.pid}, &docRes) if err != nil { fmt.Fprintf(os.Stderr, "Err finding: %v\n", err) return } if len(docRes) != 0 { // 进程原本就存在,换言之别的消息先到了 // 所有先行抵达的消息必须保留execve/children字段 docRes[0].Ppid = cooked.ppid docRes[0].ParentTgid = cooked.parentTgid docRes[0].Pid = cooked.pid docRes[0].Tgid = cooked.tgid docRes[0].Cwd = cooked.cwd docRes[0].Comm = cooked.comm docRes[0].Args = cooked.argv err := pidCol.ReplaceOne(bson.M{"pid": cooked.pid}, docRes[0]) if err != nil { fmt.Fprintf(os.Stderr, "Err replaceing: %v\n", err) } } else { // 这进程本是新修的 err := pidCol.InsertOne(Process{ StartTimestamp: cooked.timestamp, Ppid: cooked.ppid, ParentTgid: cooked.parentTgid, Pid: cooked.pid, Tgid: cooked.tgid, Args: cooked.argv, Comm: cooked.comm, Cwd: cooked.cwd, Execve: make([]Exec, 0), Children: make([]int, 0), }) if err != nil { fmt.Fprintf(os.Stderr, "Err inserting: %v\n", err) } } err = pidCol.UpdateOne(bson.M{"pid": cooked.ppid}, bson.M{ "$push": bson.M{ "children": cooked.pid, }, }) if err != nil { fmt.Fprintf(os.Stderr, "Err updating: %v\n", err) } } func dealExecve(cooked Event) { var docRes []Process // 首先检查进程是否存在,如不存在则为之创建 err := pidCol.Finddoc(bson.M{"pid": cooked.pid}, &docRes) if err != nil { return } if len(docRes) == 1 { // 自身已在,直接记录 docRes[0].Execve = append(docRes[0].Execve, Exec{ Timestamp: cooked.timestamp, ExecArgs: cooked.argv, }) err := pidCol.ReplaceOne(bson.M{"pid": cooked.pid}, docRes[0]) if err != nil { fmt.Fprintf(os.Stderr, "Err replacing: %v\n", err) } } else { // 先fork抵达,插入 process := Process{ Ppid: cooked.ppid, Pid: cooked.pid, Execve: make([]Exec, 0), Children: make([]int, 0), } process.Execve = append(process.Execve, Exec{ Timestamp: cooked.timestamp, ExecArgs: cooked.argv, }) err := pidCol.InsertOne(process) if err != nil { fmt.Fprintf(os.Stderr, "Err inserting: %v\n", err) } } } func fileOpen(cooked Event) { // 权限检查过了,不必再查 file := File{ OpenTimestamp: cooked.timestamp, FileName: cooked.srcPath, Pid: cooked.pid, Fd: cooked.exit_code, Flags: cooked.syscallParam, Written: make([]time.Time, 0), } if cooked.syscallParam[1]&syscall.O_TRUNC != 0 { // 文件以清空方式打开 file.Written = append(file.Written, cooked.timestamp) } err := fdCol.InsertOne(file) if err != nil { fmt.Fprintf(os.Stderr, "Err inserting: %v\n", err) } } func fileClose(cooked Event) { res, err := fdCol.FindOneAndDelete(bson.M{"pid": cooked.pid, "fd": cooked.syscallParam[0]}) if err != nil { return } res["close_timestamp"] = cooked.timestamp err = fileCol.InsertOne(res) if err != nil { fmt.Fprintf(os.Stderr, "Err inserting files: %v\n", err) } } func fileWrite(cooked Event) { var res []File err := fdCol.Finddoc(bson.M{ "pid": cooked.pid, "fd": cooked.syscallParam[0], }, &res) if err != nil { fmt.Fprintf(os.Stderr, "Err closing fd %d of pid %d: %v\n", cooked.syscallParam[0], cooked.pid, err) } if len(res) == 0 { return } err = fdCol.UpdateOne(bson.M{ "pid": cooked.pid, "fd": cooked.syscallParam[0], }, bson.M{"$push": bson.M{"written": cooked.timestamp}}) if err != nil { fmt.Fprintf(os.Stderr, "Err updating: %v\n", err) } } func pivotRoot(cooked Event) { var docRes []Process // docker的根目录信息,记录 err := pidCol.Finddoc(bson.M{"pid": cooked.pid}, &docRes) if err != nil { fmt.Fprintf(os.Stderr, "Err finding: %v\n", err) return } if len(docRes) == 0 { // fork还没到,等一下 pidCol.InsertOne(bson.M{ "start_timestamp": cooked.timestamp, "ppid": cooked.ppid, "pid": cooked.pid, "rootfs": "cwd", }) } else { // 读取已有的工作目录 pidCol.UpdateOne(bson.M{"pid": cooked.pid}, bson.M{ "$set": bson.M{ "rootfs": docRes[0].Cwd, }, }) } }