package main
import (
"fmt"
"os"
"syscall"
"go.mongodb.org/mongo-driver/bson"
)
const (
dbName string = "test"
pidColName string = "pids"
fdColName string = "fds"
fileColName string = "files"
)
// var mongoMutex sync.Mutex
var pidCol, fdCol, fileCol mongoClient
var docRes []bson.M
var err error
func deal() {
defer wg.Done()
var cooked Event
var ok bool
if err = pidCol.init(dbName, pidColName); err != nil {
fmt.Fprintf(os.Stderr, "Error while initing the mongodb: %v\n", err)
return
}
err = pidCol.InsertOne(bson.M{
"ppid": 1,
"pid": containerdPid,
"cwd": "/",
"children": []bson.M{},
"daemon": true,
})
if err != nil {
fmt.Fprintf(os.Stderr, "Error while initing the mongodb: %v\n", err)
return
}
if err = fdCol.init(dbName, fdColName); err != nil {
fmt.Fprintf(os.Stderr, "Error while initing the mongodb: %v\n", err)
return
}
if err = fileCol.init(dbName, fileColName); err != nil {
fmt.Fprintf(os.Stderr, "Error while initing the mongodb: %v\n", err)
}
fmt.Printf("Containerd: %d\n", containerdPid)
defer pidCol.Disconnect()
defer fdCol.Disconnect()
defer fileCol.Disconnect()
for {
cooked, ok = <-cookedChan
if !ok {
break
}
switch cooked.tag {
case NEWPID:
go dealNewPid(cooked)
case EXECVE:
go dealExecve(cooked)
case PIDEXIT:
go deletePid(cooked)
case FILEOPEN:
go fileOpen(cooked)
case FILEWRITE:
go fileWrite(cooked)
case FILECLOSE:
go fileClose(cooked)
case PIVOTROOT:
go pivotRoot(cooked)
}
}
}
func deletePid(cooked Event) {
// 先从老爹那里销户
pidCol.UpdateOne(bson.M{"pid": cooked.ppid}, bson.M{
"$pull": bson.M{
"children": cooked.pid,
},
})
// 在这套逻辑里,孩子是不需要收容的,因为我们根本就不看ppid来工作
// 可以去死了
pidCol.UpdateOne(bson.M{"pid": cooked.pid}, bson.M{
"$set": bson.M{
"exit_timestamp": cooked.timestamp,
"exit_code": cooked.exit_code,
"exit_signal": cooked.exit_signal,
},
})
// 理论上这里需要关闭所有文件描述符,但为了处理效率,留给后续流程
}
func dealNewPid(cooked Event) {
// 自身是否已经记录
docRes, err = pidCol.Finddoc(bson.M{"pid": cooked.pid})
if err != nil {
fmt.Fprintf(os.Stderr, "Err finding: %v\n", err)
return
}
if len(docRes) != 0 {
// 进程原本就存在,换言之别的消息先到了
// 所有先行抵达的消息必须保留execve/children字段
// 此处不再更新
// 以防把原有信息更没了
pidCol.UpdateOne(bson.M{"pid": cooked.pid}, bson.M{
"start_timestamp": cooked.timestamp,
"ppid": cooked.ppid,
"parentTgid": cooked.parentTgid,
"pid": cooked.pid,
"tgid": cooked.tgid,
"cwd": cooked.cwd,
// "execve": []bson.M{},
"args": cooked.argv,
// "children": []bson.M{},
})
} else {
// 这进程本是新修的
pidCol.InsertOne(bson.M{
"start_timestamp": cooked.timestamp,
"ppid": cooked.ppid,
"parentTgid": cooked.parentTgid,
"pid": cooked.pid,
"tgid": cooked.tgid,
"cwd": cooked.cwd,
"execve": []bson.M{},
"args": cooked.argv,
"children": []bson.M{},
})
}
pidCol.UpdateOne(bson.M{"pid": cooked.ppid}, bson.M{
"$push": bson.M{
"children": cooked.pid,
},
})
}
func dealExecve(cooked Event) {
// 父进程在不在?不在扔
docRes, err = pidCol.Finddoc(bson.M{"pid": cooked.ppid})
if err != nil || len(docRes) != 1 {
return
}
// 首先检查进程是否存在,如不存在则为之创建
docRes, err = pidCol.Finddoc(bson.M{"pid": cooked.pid})
if err != nil {
return
}
if len(docRes) == 1 {
// 自身已在,直接记录
pidCol.UpdateOne(bson.M{"pid": cooked.pid}, bson.M{
"$push": bson.M{
"execve": bson.M{
"timestamp": cooked.timestamp,
"execArgs": cooked.argv,
},
},
})
} else {
// 先fork抵达,插入
pidCol.InsertOne(bson.M{
"ppid": cooked.ppid,
"pid": cooked.pid,
"children": []bson.M{},
"execve": []bson.M{
{
"timestamp": cooked.timestamp,
"execArgs": cooked.argv,
},
},
})
}
}
func fileOpen(cooked Event) {
// 权限检查过了,不必再查
fdCol.InsertOne(bson.M{
"timestamp": cooked.timestamp,
"fileName": cooked.srcPath,
"pid": cooked.pid,
"fd": cooked.exit_code,
"flags": cooked.syscallParam,
"written": []bson.M{},
})
if cooked.syscallParam[1]&syscall.O_TRUNC != 0 {
fdCol.UpdateOne(bson.M{"pid": cooked.pid, "fd": cooked.exit_code}, bson.M{
"$push": bson.M{
"written": cooked.timestamp,
},
})
}
}
func fileClose(cooked Event) {
res, err := fdCol.FindOneAndDelete(bson.M{"pid": cooked.pid, "fd": cooked.syscallParam[0]})
if err != nil {
return
}
res["close_timestamp"] = cooked.timestamp
if err := fileCol.InsertOne(res); err != nil {
fmt.Fprintf(os.Stderr, "Err inserting files: %v\n", err)
}
}
func fileWrite(cooked Event) {
res, err := fdCol.Finddoc(bson.M{
"pid": cooked.pid,
"fd": cooked.syscallParam[0],
"close_timestamp": bson.M{"$exists": false},
})
if err != nil {
fmt.Fprintf(os.Stderr, "Err closing fd %d of pid %d: %v\n", cooked.syscallParam[0], cooked.pid, err)
}
if len(res) == 0 {
return
}
fdCol.UpdateOne(bson.M{
"pid": cooked.pid,
"fd": cooked.syscallParam[0],
"close_timestamp": bson.M{"$exists": false},
}, bson.M{"$push": bson.M{"written": cooked.timestamp}})
}
func pivotRoot(cooked Event) {
// docker的根目录信息,记录
docRes, err := pidCol.Finddoc(bson.M{"pid": cooked.pid})
if err != nil {
fmt.Fprintf(os.Stderr, "Err finding: %v\n", err)
return
}
if len(docRes) == 0 {
// fork还没到,等一下
pidCol.InsertOne(bson.M{
"start_timestamp": cooked.timestamp,
"ppid": cooked.ppid,
"pid": cooked.pid,
"rootfs": "cwd",
})
} else {
// 读取已有的工作目录
cwd := docRes[0]["cwd"]
pidCol.UpdateOne(bson.M{"pid": cooked.pid}, bson.M{
"$set": bson.M{
"rootfs": cwd,
},
})
}
}
|