diff options
author | 2025-03-08 22:04:20 +0800 | |
---|---|---|
committer | 2025-03-08 22:04:20 +0800 | |
commit | a07bb8fd1299070229f0e8f3dcb57ffd5ef9870a (patch) | |
tree | 84f21bd0bf7071bc5fc7dd989e77d7ceb5476682 /net/smc/smc_rx.c | |
download | ohosKernel-a07bb8fd1299070229f0e8f3dcb57ffd5ef9870a.tar.gz ohosKernel-a07bb8fd1299070229f0e8f3dcb57ffd5ef9870a.zip |
Initial commit: OpenHarmony-v4.0-ReleaseOpenHarmony-v4.0-Release
Diffstat (limited to 'net/smc/smc_rx.c')
-rw-r--r-- | net/smc/smc_rx.c | 444 |
1 files changed, 444 insertions, 0 deletions
diff --git a/net/smc/smc_rx.c b/net/smc/smc_rx.c new file mode 100644 index 000000000..7f7e983e4 --- /dev/null +++ b/net/smc/smc_rx.c | |||
@@ -0,0 +1,444 @@ | |||
1 | // SPDX-License-Identifier: GPL-2.0 | ||
2 | /* | ||
3 | * Shared Memory Communications over RDMA (SMC-R) and RoCE | ||
4 | * | ||
5 | * Manage RMBE | ||
6 | * copy new RMBE data into user space | ||
7 | * | ||
8 | * Copyright IBM Corp. 2016 | ||
9 | * | ||
10 | * Author(s): Ursula Braun <ubraun@linux.vnet.ibm.com> | ||
11 | */ | ||
12 | |||
13 | #include <linux/net.h> | ||
14 | #include <linux/rcupdate.h> | ||
15 | #include <linux/sched/signal.h> | ||
16 | |||
17 | #include <net/sock.h> | ||
18 | |||
19 | #include "smc.h" | ||
20 | #include "smc_core.h" | ||
21 | #include "smc_cdc.h" | ||
22 | #include "smc_tx.h" /* smc_tx_consumer_update() */ | ||
23 | #include "smc_rx.h" | ||
24 | |||
25 | /* callback implementation to wakeup consumers blocked with smc_rx_wait(). | ||
26 | * indirectly called by smc_cdc_msg_recv_action(). | ||
27 | */ | ||
28 | static void smc_rx_wake_up(struct sock *sk) | ||
29 | { | ||
30 | struct socket_wq *wq; | ||
31 | |||
32 | /* derived from sock_def_readable() */ | ||
33 | /* called already in smc_listen_work() */ | ||
34 | rcu_read_lock(); | ||
35 | wq = rcu_dereference(sk->sk_wq); | ||
36 | if (skwq_has_sleeper(wq)) | ||
37 | wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN | EPOLLPRI | | ||
38 | EPOLLRDNORM | EPOLLRDBAND); | ||
39 | sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN); | ||
40 | if ((sk->sk_shutdown == SHUTDOWN_MASK) || | ||
41 | (sk->sk_state == SMC_CLOSED)) | ||
42 | sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_HUP); | ||
43 | rcu_read_unlock(); | ||
44 | } | ||
45 | |||
46 | /* Update consumer cursor | ||
47 | * @conn connection to update | ||
48 | * @cons consumer cursor | ||
49 | * @len number of Bytes consumed | ||
50 | * Returns: | ||
51 | * 1 if we should end our receive, 0 otherwise | ||
52 | */ | ||
53 | static int smc_rx_update_consumer(struct smc_sock *smc, | ||
54 | union smc_host_cursor cons, size_t len) | ||
55 | { | ||
56 | struct smc_connection *conn = &smc->conn; | ||
57 | struct sock *sk = &smc->sk; | ||
58 | bool force = false; | ||
59 | int diff, rc = 0; | ||
60 | |||
61 | smc_curs_add(conn->rmb_desc->len, &cons, len); | ||
62 | |||
63 | /* did we process urgent data? */ | ||
64 | if (conn->urg_state == SMC_URG_VALID || conn->urg_rx_skip_pend) { | ||
65 | diff = smc_curs_comp(conn->rmb_desc->len, &cons, | ||
66 | &conn->urg_curs); | ||
67 | if (sock_flag(sk, SOCK_URGINLINE)) { | ||
68 | if (diff == 0) { | ||
69 | force = true; | ||
70 | rc = 1; | ||
71 | conn->urg_state = SMC_URG_READ; | ||
72 | } | ||
73 | } else { | ||
74 | if (diff == 1) { | ||
75 | /* skip urgent byte */ | ||
76 | force = true; | ||
77 | smc_curs_add(conn->rmb_desc->len, &cons, 1); | ||
78 | conn->urg_rx_skip_pend = false; | ||
79 | } else if (diff < -1) | ||
80 | /* we read past urgent byte */ | ||
81 | conn->urg_state = SMC_URG_READ; | ||
82 | } | ||
83 | } | ||
84 | |||
85 | smc_curs_copy(&conn->local_tx_ctrl.cons, &cons, conn); | ||
86 | |||
87 | /* send consumer cursor update if required */ | ||
88 | /* similar to advertising new TCP rcv_wnd if required */ | ||
89 | smc_tx_consumer_update(conn, force); | ||
90 | |||
91 | return rc; | ||
92 | } | ||
93 | |||
94 | static void smc_rx_update_cons(struct smc_sock *smc, size_t len) | ||
95 | { | ||
96 | struct smc_connection *conn = &smc->conn; | ||
97 | union smc_host_cursor cons; | ||
98 | |||
99 | smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn); | ||
100 | smc_rx_update_consumer(smc, cons, len); | ||
101 | } | ||
102 | |||
103 | struct smc_spd_priv { | ||
104 | struct smc_sock *smc; | ||
105 | size_t len; | ||
106 | }; | ||
107 | |||
108 | static void smc_rx_pipe_buf_release(struct pipe_inode_info *pipe, | ||
109 | struct pipe_buffer *buf) | ||
110 | { | ||
111 | struct smc_spd_priv *priv = (struct smc_spd_priv *)buf->private; | ||
112 | struct smc_sock *smc = priv->smc; | ||
113 | struct smc_connection *conn; | ||
114 | struct sock *sk = &smc->sk; | ||
115 | |||
116 | if (sk->sk_state == SMC_CLOSED || | ||
117 | sk->sk_state == SMC_PEERFINCLOSEWAIT || | ||
118 | sk->sk_state == SMC_APPFINCLOSEWAIT) | ||
119 | goto out; | ||
120 | conn = &smc->conn; | ||
121 | lock_sock(sk); | ||
122 | smc_rx_update_cons(smc, priv->len); | ||
123 | release_sock(sk); | ||
124 | if (atomic_sub_and_test(priv->len, &conn->splice_pending)) | ||
125 | smc_rx_wake_up(sk); | ||
126 | out: | ||
127 | kfree(priv); | ||
128 | put_page(buf->page); | ||
129 | sock_put(sk); | ||
130 | } | ||
131 | |||
132 | static const struct pipe_buf_operations smc_pipe_ops = { | ||
133 | .release = smc_rx_pipe_buf_release, | ||
134 | .get = generic_pipe_buf_get | ||
135 | }; | ||
136 | |||
137 | static void smc_rx_spd_release(struct splice_pipe_desc *spd, | ||
138 | unsigned int i) | ||
139 | { | ||
140 | put_page(spd->pages[i]); | ||
141 | } | ||
142 | |||
143 | static int smc_rx_splice(struct pipe_inode_info *pipe, char *src, size_t len, | ||
144 | struct smc_sock *smc) | ||
145 | { | ||
146 | struct splice_pipe_desc spd; | ||
147 | struct partial_page partial; | ||
148 | struct smc_spd_priv *priv; | ||
149 | int bytes; | ||
150 | |||
151 | priv = kzalloc(sizeof(*priv), GFP_KERNEL); | ||
152 | if (!priv) | ||
153 | return -ENOMEM; | ||
154 | priv->len = len; | ||
155 | priv->smc = smc; | ||
156 | partial.offset = src - (char *)smc->conn.rmb_desc->cpu_addr; | ||
157 | partial.len = len; | ||
158 | partial.private = (unsigned long)priv; | ||
159 | |||
160 | spd.nr_pages_max = 1; | ||
161 | spd.nr_pages = 1; | ||
162 | spd.pages = &smc->conn.rmb_desc->pages; | ||
163 | spd.partial = &partial; | ||
164 | spd.ops = &smc_pipe_ops; | ||
165 | spd.spd_release = smc_rx_spd_release; | ||
166 | |||
167 | bytes = splice_to_pipe(pipe, &spd); | ||
168 | if (bytes > 0) { | ||
169 | sock_hold(&smc->sk); | ||
170 | get_page(smc->conn.rmb_desc->pages); | ||
171 | atomic_add(bytes, &smc->conn.splice_pending); | ||
172 | } | ||
173 | |||
174 | return bytes; | ||
175 | } | ||
176 | |||
177 | static int smc_rx_data_available_and_no_splice_pend(struct smc_connection *conn) | ||
178 | { | ||
179 | return atomic_read(&conn->bytes_to_rcv) && | ||
180 | !atomic_read(&conn->splice_pending); | ||
181 | } | ||
182 | |||
183 | /* blocks rcvbuf consumer until >=len bytes available or timeout or interrupted | ||
184 | * @smc smc socket | ||
185 | * @timeo pointer to max seconds to wait, pointer to value 0 for no timeout | ||
186 | * @fcrit add'l criterion to evaluate as function pointer | ||
187 | * Returns: | ||
188 | * 1 if at least 1 byte available in rcvbuf or if socket error/shutdown. | ||
189 | * 0 otherwise (nothing in rcvbuf nor timeout, e.g. interrupted). | ||
190 | */ | ||
191 | int smc_rx_wait(struct smc_sock *smc, long *timeo, | ||
192 | int (*fcrit)(struct smc_connection *conn)) | ||
193 | { | ||
194 | DEFINE_WAIT_FUNC(wait, woken_wake_function); | ||
195 | struct smc_connection *conn = &smc->conn; | ||
196 | struct smc_cdc_conn_state_flags *cflags = | ||
197 | &conn->local_tx_ctrl.conn_state_flags; | ||
198 | struct sock *sk = &smc->sk; | ||
199 | int rc; | ||
200 | |||
201 | if (fcrit(conn)) | ||
202 | return 1; | ||
203 | sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk); | ||
204 | add_wait_queue(sk_sleep(sk), &wait); | ||
205 | rc = sk_wait_event(sk, timeo, | ||
206 | sk->sk_err || | ||
207 | cflags->peer_conn_abort || | ||
208 | sk->sk_shutdown & RCV_SHUTDOWN || | ||
209 | conn->killed || | ||
210 | fcrit(conn), | ||
211 | &wait); | ||
212 | remove_wait_queue(sk_sleep(sk), &wait); | ||
213 | sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk); | ||
214 | return rc; | ||
215 | } | ||
216 | |||
217 | static int smc_rx_recv_urg(struct smc_sock *smc, struct msghdr *msg, int len, | ||
218 | int flags) | ||
219 | { | ||
220 | struct smc_connection *conn = &smc->conn; | ||
221 | union smc_host_cursor cons; | ||
222 | struct sock *sk = &smc->sk; | ||
223 | int rc = 0; | ||
224 | |||
225 | if (sock_flag(sk, SOCK_URGINLINE) || | ||
226 | !(conn->urg_state == SMC_URG_VALID) || | ||
227 | conn->urg_state == SMC_URG_READ) | ||
228 | return -EINVAL; | ||
229 | |||
230 | if (conn->urg_state == SMC_URG_VALID) { | ||
231 | if (!(flags & MSG_PEEK)) | ||
232 | smc->conn.urg_state = SMC_URG_READ; | ||
233 | msg->msg_flags |= MSG_OOB; | ||
234 | if (len > 0) { | ||
235 | if (!(flags & MSG_TRUNC)) | ||
236 | rc = memcpy_to_msg(msg, &conn->urg_rx_byte, 1); | ||
237 | len = 1; | ||
238 | smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn); | ||
239 | if (smc_curs_diff(conn->rmb_desc->len, &cons, | ||
240 | &conn->urg_curs) > 1) | ||
241 | conn->urg_rx_skip_pend = true; | ||
242 | /* Urgent Byte was already accounted for, but trigger | ||
243 | * skipping the urgent byte in non-inline case | ||
244 | */ | ||
245 | if (!(flags & MSG_PEEK)) | ||
246 | smc_rx_update_consumer(smc, cons, 0); | ||
247 | } else { | ||
248 | msg->msg_flags |= MSG_TRUNC; | ||
249 | } | ||
250 | |||
251 | return rc ? -EFAULT : len; | ||
252 | } | ||
253 | |||
254 | if (sk->sk_state == SMC_CLOSED || sk->sk_shutdown & RCV_SHUTDOWN) | ||
255 | return 0; | ||
256 | |||
257 | return -EAGAIN; | ||
258 | } | ||
259 | |||
260 | static bool smc_rx_recvmsg_data_available(struct smc_sock *smc) | ||
261 | { | ||
262 | struct smc_connection *conn = &smc->conn; | ||
263 | |||
264 | if (smc_rx_data_available(conn)) | ||
265 | return true; | ||
266 | else if (conn->urg_state == SMC_URG_VALID) | ||
267 | /* we received a single urgent Byte - skip */ | ||
268 | smc_rx_update_cons(smc, 0); | ||
269 | return false; | ||
270 | } | ||
271 | |||
272 | /* smc_rx_recvmsg - receive data from RMBE | ||
273 | * @msg: copy data to receive buffer | ||
274 | * @pipe: copy data to pipe if set - indicates splice() call | ||
275 | * | ||
276 | * rcvbuf consumer: main API called by socket layer. | ||
277 | * Called under sk lock. | ||
278 | */ | ||
279 | int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, | ||
280 | struct pipe_inode_info *pipe, size_t len, int flags) | ||
281 | { | ||
282 | size_t copylen, read_done = 0, read_remaining = len; | ||
283 | size_t chunk_len, chunk_off, chunk_len_sum; | ||
284 | struct smc_connection *conn = &smc->conn; | ||
285 | int (*func)(struct smc_connection *conn); | ||
286 | union smc_host_cursor cons; | ||
287 | int readable, chunk; | ||
288 | char *rcvbuf_base; | ||
289 | struct sock *sk; | ||
290 | int splbytes; | ||
291 | long timeo; | ||
292 | int target; /* Read at least these many bytes */ | ||
293 | int rc; | ||
294 | |||
295 | if (unlikely(flags & MSG_ERRQUEUE)) | ||
296 | return -EINVAL; /* future work for sk.sk_family == AF_SMC */ | ||
297 | |||
298 | sk = &smc->sk; | ||
299 | if (sk->sk_state == SMC_LISTEN) | ||
300 | return -ENOTCONN; | ||
301 | if (flags & MSG_OOB) | ||
302 | return smc_rx_recv_urg(smc, msg, len, flags); | ||
303 | timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); | ||
304 | target = sock_rcvlowat(sk, flags & MSG_WAITALL, len); | ||
305 | |||
306 | /* we currently use 1 RMBE per RMB, so RMBE == RMB base addr */ | ||
307 | rcvbuf_base = conn->rx_off + conn->rmb_desc->cpu_addr; | ||
308 | |||
309 | do { /* while (read_remaining) */ | ||
310 | if (read_done >= target || (pipe && read_done)) | ||
311 | break; | ||
312 | |||
313 | if (conn->killed) | ||
314 | break; | ||
315 | |||
316 | if (smc_rx_recvmsg_data_available(smc)) | ||
317 | goto copy; | ||
318 | |||
319 | if (sk->sk_shutdown & RCV_SHUTDOWN) { | ||
320 | /* smc_cdc_msg_recv_action() could have run after | ||
321 | * above smc_rx_recvmsg_data_available() | ||
322 | */ | ||
323 | if (smc_rx_recvmsg_data_available(smc)) | ||
324 | goto copy; | ||
325 | break; | ||
326 | } | ||
327 | |||
328 | if (read_done) { | ||
329 | if (sk->sk_err || | ||
330 | sk->sk_state == SMC_CLOSED || | ||
331 | !timeo || | ||
332 | signal_pending(current)) | ||
333 | break; | ||
334 | } else { | ||
335 | if (sk->sk_err) { | ||
336 | read_done = sock_error(sk); | ||
337 | break; | ||
338 | } | ||
339 | if (sk->sk_state == SMC_CLOSED) { | ||
340 | if (!sock_flag(sk, SOCK_DONE)) { | ||
341 | /* This occurs when user tries to read | ||
342 | * from never connected socket. | ||
343 | */ | ||
344 | read_done = -ENOTCONN; | ||
345 | break; | ||
346 | } | ||
347 | break; | ||
348 | } | ||
349 | if (!timeo) | ||
350 | return -EAGAIN; | ||
351 | if (signal_pending(current)) { | ||
352 | read_done = sock_intr_errno(timeo); | ||
353 | break; | ||
354 | } | ||
355 | } | ||
356 | |||
357 | if (!smc_rx_data_available(conn)) { | ||
358 | smc_rx_wait(smc, &timeo, smc_rx_data_available); | ||
359 | continue; | ||
360 | } | ||
361 | |||
362 | copy: | ||
363 | /* initialize variables for 1st iteration of subsequent loop */ | ||
364 | /* could be just 1 byte, even after waiting on data above */ | ||
365 | readable = atomic_read(&conn->bytes_to_rcv); | ||
366 | splbytes = atomic_read(&conn->splice_pending); | ||
367 | if (!readable || (msg && splbytes)) { | ||
368 | if (splbytes) | ||
369 | func = smc_rx_data_available_and_no_splice_pend; | ||
370 | else | ||
371 | func = smc_rx_data_available; | ||
372 | smc_rx_wait(smc, &timeo, func); | ||
373 | continue; | ||
374 | } | ||
375 | |||
376 | smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn); | ||
377 | /* subsequent splice() calls pick up where previous left */ | ||
378 | if (splbytes) | ||
379 | smc_curs_add(conn->rmb_desc->len, &cons, splbytes); | ||
380 | if (conn->urg_state == SMC_URG_VALID && | ||
381 | sock_flag(&smc->sk, SOCK_URGINLINE) && | ||
382 | readable > 1) | ||
383 | readable--; /* always stop at urgent Byte */ | ||
384 | /* not more than what user space asked for */ | ||
385 | copylen = min_t(size_t, read_remaining, readable); | ||
386 | /* determine chunks where to read from rcvbuf */ | ||
387 | /* either unwrapped case, or 1st chunk of wrapped case */ | ||
388 | chunk_len = min_t(size_t, copylen, conn->rmb_desc->len - | ||
389 | cons.count); | ||
390 | chunk_len_sum = chunk_len; | ||
391 | chunk_off = cons.count; | ||
392 | smc_rmb_sync_sg_for_cpu(conn); | ||
393 | for (chunk = 0; chunk < 2; chunk++) { | ||
394 | if (!(flags & MSG_TRUNC)) { | ||
395 | if (msg) { | ||
396 | rc = memcpy_to_msg(msg, rcvbuf_base + | ||
397 | chunk_off, | ||
398 | chunk_len); | ||
399 | } else { | ||
400 | rc = smc_rx_splice(pipe, rcvbuf_base + | ||
401 | chunk_off, chunk_len, | ||
402 | smc); | ||
403 | } | ||
404 | if (rc < 0) { | ||
405 | if (!read_done) | ||
406 | read_done = -EFAULT; | ||
407 | smc_rmb_sync_sg_for_device(conn); | ||
408 | goto out; | ||
409 | } | ||
410 | } | ||
411 | read_remaining -= chunk_len; | ||
412 | read_done += chunk_len; | ||
413 | |||
414 | if (chunk_len_sum == copylen) | ||
415 | break; /* either on 1st or 2nd iteration */ | ||
416 | /* prepare next (== 2nd) iteration */ | ||
417 | chunk_len = copylen - chunk_len; /* remainder */ | ||
418 | chunk_len_sum += chunk_len; | ||
419 | chunk_off = 0; /* modulo offset in recv ring buffer */ | ||
420 | } | ||
421 | smc_rmb_sync_sg_for_device(conn); | ||
422 | |||
423 | /* update cursors */ | ||
424 | if (!(flags & MSG_PEEK)) { | ||
425 | /* increased in recv tasklet smc_cdc_msg_rcv() */ | ||
426 | smp_mb__before_atomic(); | ||
427 | atomic_sub(copylen, &conn->bytes_to_rcv); | ||
428 | /* guarantee 0 <= bytes_to_rcv <= rmb_desc->len */ | ||
429 | smp_mb__after_atomic(); | ||
430 | if (msg && smc_rx_update_consumer(smc, cons, copylen)) | ||
431 | goto out; | ||
432 | } | ||
433 | } while (read_remaining); | ||
434 | out: | ||
435 | return read_done; | ||
436 | } | ||
437 | |||
438 | /* Initialize receive properties on connection establishment. NB: not __init! */ | ||
439 | void smc_rx_init(struct smc_sock *smc) | ||
440 | { | ||
441 | smc->sk.sk_data_ready = smc_rx_wake_up; | ||
442 | atomic_set(&smc->conn.splice_pending, 0); | ||
443 | smc->conn.urg_state = SMC_URG_READ; | ||
444 | } | ||