aboutsummaryrefslogtreecommitdiffstats
path: root/old
diff options
context:
space:
mode:
authorWe-unite <3205135446@qq.com>2024-07-17 11:47:03 +0800
committerWe-unite <3205135446@qq.com>2024-07-17 14:03:06 +0800
commitf055b3940f999c2e26448812e67b68da363dcbad (patch)
tree145411eb93d96ecd4b5b24783d10da5e345791f7 /old
downloadgodo-f055b3940f999c2e26448812e67b68da363dcbad.tar.gz
godo-f055b3940f999c2e26448812e67b68da363dcbad.zip
Initial commit
This repo is to supervise all processes in containers, in other words inspect behaviors of dockers, and get the pid tree. There are several ways for programs in user space to intereact with kernel space: - system calls, which can be found out in source path arch/x86/syscalls - ioctl - /proc virtual file system, to read kernel realtime info - nerlink socket the pid we should pay attention to is /usr/bin/containerd, which may come from service docker-daemon and ppid is 1. Each time a docker is start or stop, this forks a pid, the pid then forks, that's the main process of the docker. To grub the info of pid create or exit, this program is based on go-libauditd, which uses netlink socket to hear from kernel about audit log. What's worrying is that one event is always devided into several entries, and several events may be received alternately. So, from my point of view, which program has 3 coroutines and 2 channels. the first receives raw event message from audit, then throw it to channel 1; the second listen to channel 1, and organizes each event until there's a EOE, then throw to channel 2; the third discover event from channel 2, deal with th event, such as create or delete pid. Specially, since two relative infomation(pid 1 fork pid2, then pid 1 exits)may comes out of order, deletion mast be delayed for some time(may 1 second), to keep the process tree correct.
Diffstat (limited to 'old')
-rw-r--r--old/audit.go127
-rw-r--r--old/auparse.go218
2 files changed, 345 insertions, 0 deletions
diff --git a/old/audit.go b/old/audit.go
new file mode 100644
index 0000000..2b9faa5
--- /dev/null
+++ b/old/audit.go
@@ -0,0 +1,127 @@
1package main
2
3import (
4 "errors"
5 "flag"
6 "fmt"
7 "io"
8 "log"
9 "os"
10
11 "github.com/elastic/go-libaudit/v2"
12 "github.com/elastic/go-libaudit/v2/auparse"
13)
14
15var (
16 fs = flag.NewFlagSet("audit", flag.ExitOnError)
17 diag = fs.String("diag", "", "dump raw information from kernel to file")
18 rate = fs.Uint("rate", 0, "rate limit in kernel (default 0, no rate limit)")
19 backlog = fs.Uint("backlog", 8192, "backlog limit")
20 immutable = fs.Bool("immutable", false, "make kernel audit settings immutable (requires reboot to undo)")
21 receiveOnly = fs.Bool("ro", false, "receive only using multicast, requires kernel 3.16+")
22)
23
24func main() {
25 if err := fs.Parse(os.Args[1:]); err != nil {
26 log.Fatal(err)
27 }
28
29 if err := read(); err != nil {
30 log.Fatalf("error: %v", err)
31 }
32}
33
34func read() error {
35 if os.Geteuid() != 0 {
36 return errors.New("you must be root to receive audit data")
37 }
38
39 // Write netlink response to a file for further analysis or for writing
40 // tests cases.
41 var diagWriter io.Writer
42 if *diag != "" {
43 f, err := os.OpenFile(*diag, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0o600)
44 if err != nil {
45 return err
46 }
47 defer f.Close()
48 diagWriter = f
49 }
50
51 log.Println("starting netlink client")
52
53 var err error
54 var client *libaudit.AuditClient
55 if *receiveOnly {
56 client, err = libaudit.NewMulticastAuditClient(diagWriter)
57 if err != nil {
58 return fmt.Errorf("failed to create receive-only audit client: %w", err)
59 }
60 defer client.Close()
61 } else {
62 client, err = libaudit.NewAuditClient(diagWriter)
63 if err != nil {
64 return fmt.Errorf("failed to create audit client: %w", err)
65 }
66 defer client.Close()
67
68 status, err := client.GetStatus()
69 if err != nil {
70 return fmt.Errorf("failed to get audit status: %w", err)
71 }
72 log.Printf("received audit status=%+v", status)
73
74 if status.Enabled == 0 {
75 log.Println("enabling auditing in the kernel")
76 if err = client.SetEnabled(true, libaudit.WaitForReply); err != nil {
77 return fmt.Errorf("failed to set enabled=true: %w", err)
78 }
79 }
80
81 if status.RateLimit != uint32(*rate) {
82 log.Printf("setting rate limit in kernel to %v", *rate)
83 if err = client.SetRateLimit(uint32(*rate), libaudit.NoWait); err != nil {
84 return fmt.Errorf("failed to set rate limit to unlimited: %w", err)
85 }
86 }
87
88 if status.BacklogLimit != uint32(*backlog) {
89 log.Printf("setting backlog limit in kernel to %v", *backlog)
90 if err = client.SetBacklogLimit(uint32(*backlog), libaudit.NoWait); err != nil {
91 return fmt.Errorf("failed to set backlog limit: %w", err)
92 }
93 }
94
95 if status.Enabled != 2 && *immutable {
96 log.Printf("setting kernel settings as immutable")
97 if err = client.SetImmutable(libaudit.NoWait); err != nil {
98 return fmt.Errorf("failed to set kernel as immutable: %w", err)
99 }
100 }
101
102 log.Printf("sending message to kernel registering our PID (%v) as the audit daemon", os.Getpid())
103 if err = client.SetPID(libaudit.NoWait); err != nil {
104 return fmt.Errorf("failed to set audit PID: %w", err)
105 }
106 }
107
108 return receive(client)
109}
110
111func receive(r *libaudit.AuditClient) error {
112 for {
113 rawEvent, err := r.Receive(false)
114 if err != nil {
115 return fmt.Errorf("receive failed: %w", err)
116 }
117
118 // Messages from 1300-2999 are valid audit messages.
119 if rawEvent.Type < auparse.AUDIT_USER_AUTH ||
120 rawEvent.Type > auparse.AUDIT_LAST_USER_MSG2 {
121 continue
122 }
123
124 fmt.Printf("type=%v msg=%s\n", rawEvent.Type, rawEvent.Data)
125 // fmt.Printf("type=%v\n", rawEvent.Type)
126 }
127}
diff --git a/old/auparse.go b/old/auparse.go
new file mode 100644
index 0000000..53b0c92
--- /dev/null
+++ b/old/auparse.go
@@ -0,0 +1,218 @@
1package main
2
3import (
4 "bufio"
5 "encoding/json"
6 "flag"
7 "fmt"
8 "io"
9 "log"
10 "os"
11 "time"
12
13 "gopkg.in/yaml.v3"
14
15 "github.com/elastic/go-libaudit/v2"
16 "github.com/elastic/go-libaudit/v2/aucoalesce"
17 "github.com/elastic/go-libaudit/v2/auparse"
18)
19
20var (
21 fs = flag.NewFlagSet("auparse", flag.ExitOnError)
22 in = fs.String("in", "-", "input file (defaults to stdin)")
23 out = fs.String("out", "-", "output file (defaults to stdout)")
24 interpret = fs.Bool("i", false, "interpret and normalize messages")
25 idLookup = fs.Bool("id", true, "lookup uid and gid values in messages (requires -i)")
26 format = fs.String("format", "", "output format, possible values - json, yaml, text (default)")
27)
28
29func main() {
30 if err := fs.Parse(os.Args[1:]); err != nil {
31 log.Fatal(err)
32 }
33
34 if err := processLogs(); err != nil {
35 log.Fatalf("error: %v", err)
36 }
37}
38
39func input() (io.ReadCloser, error) {
40 if *in == "-" {
41 return os.Stdin, nil
42 }
43
44 return os.Open(*in)
45}
46
47func output() (io.WriteCloser, error) {
48 if *out == "-" {
49 return os.Stdout, nil
50 }
51
52 return os.OpenFile(*out, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0o600)
53}
54
55func processLogs() error {
56 input, err := input()
57 if err != nil {
58 return err
59 }
60 defer input.Close()
61
62 output, err := output()
63 if err != nil {
64 return err
65 }
66 defer output.Close()
67
68 reassembler, err := libaudit.NewReassembler(5, 2*time.Second, &streamHandler{output})
69 if err != nil {
70 return fmt.Errorf("failed to create reassmbler: %w", err)
71 }
72 defer reassembler.Close()
73
74 // Start goroutine to periodically purge timed-out events.
75 go func() {
76 t := time.NewTicker(500 * time.Millisecond)
77 defer t.Stop()
78 for range t.C {
79 if reassembler.Maintain() != nil {
80 return
81 }
82 }
83 }()
84
85 // Process lines from the input.
86 s := bufio.NewScanner(input)
87 for s.Scan() {
88 line := s.Text()
89
90 auditMsg, err := auparse.ParseLogLine(line)
91 if err != nil {
92 log.Printf("failed to parse message header: %v", err)
93 }
94
95 reassembler.PushMessage(auditMsg)
96 }
97
98 return nil
99}
100
101type streamHandler struct {
102 output io.Writer
103}
104
105func (s *streamHandler) ReassemblyComplete(msgs []*auparse.AuditMessage) {
106 if err := s.outputMultipleMessages(msgs); err != nil {
107 log.Printf("[WARN] failed writing message to output: %v", err)
108 }
109}
110
111func (*streamHandler) EventsLost(count int) {
112 log.Printf("detected the loss of %v sequences.", count)
113}
114
115func (s *streamHandler) outputMultipleMessages(msgs []*auparse.AuditMessage) error {
116 var err error
117 if !*interpret {
118 if _, err = s.output.Write([]byte("---\n")); err != nil {
119 return err
120 }
121 for _, m := range msgs {
122 if err = s.outputSingleMessage(m); err != nil {
123 return err
124 }
125 }
126 return nil
127 }
128
129 event, err := aucoalesce.CoalesceMessages(msgs)
130 if err != nil {
131 log.Printf("failed to coalesce messages: %v", err)
132 return nil
133 }
134
135 if *idLookup {
136 aucoalesce.ResolveIDs(event)
137 }
138
139 switch *format {
140 case "json":
141 if err := s.printJSON(event); err != nil {
142 log.Printf("failed to marshal event to JSON: %v", err)
143 }
144 case "yaml":
145 if _, err := s.output.Write([]byte("---\n")); err != nil {
146 return err
147 }
148 if err := s.printYAML(event); err != nil {
149 log.Printf("failed to marshal message to YAML: %v", err)
150 }
151 default:
152 sm := event.Summary
153 if _, err := s.output.Write([]byte("---\n")); err != nil {
154 return err
155 }
156
157 _, err := fmt.Fprintf(
158 s.output,
159 `time="%v" sequence=%v category=%v type=%v actor=%v/%v action=%v thing=%v/%v how=%v tags=%v`+"\n",
160 event.Timestamp, event.Sequence, event.Category, event.Type, sm.Actor.Primary, sm.Actor.Secondary,
161 sm.Action, sm.Object.Primary, sm.Object.Secondary, sm.How, event.Tags,
162 )
163 if err != nil {
164 return err
165 }
166 }
167 return nil
168}
169
170func (s *streamHandler) outputSingleMessage(m *auparse.AuditMessage) error {
171 switch *format {
172 case "json":
173 if err := s.printJSON(m.ToMapStr()); err != nil {
174 log.Printf("failed to marshal message to JSON: %v", err)
175 }
176 case "yaml":
177 if err := s.printYAML(m.ToMapStr()); err != nil {
178 log.Printf("failed to marshal message to YAML: %v", err)
179 }
180 default:
181 if _, err := fmt.Fprintf(
182 s.output,
183 "type=%v msg=%v\n",
184 m.RecordType, m.RawData,
185 ); err != nil {
186 return err
187 }
188 }
189 return nil
190}
191
192func (s *streamHandler) printJSON(v interface{}) error {
193 jsonBytes, err := json.Marshal(v)
194 if err != nil {
195 return err
196 }
197 if _, err = s.output.Write(jsonBytes); err != nil {
198 return err
199 }
200 if _, err = s.output.Write([]byte("\n")); err != nil {
201 return err
202 }
203 return nil
204}
205
206func (s *streamHandler) printYAML(v interface{}) error {
207 yamlBytes, err := yaml.Marshal(v)
208 if err != nil {
209 return err
210 }
211 if _, err = s.output.Write(yamlBytes); err != nil {
212 return err
213 }
214 if _, err = s.output.Write([]byte("\n")); err != nil {
215 return err
216 }
217 return nil
218}