1 /*
   2  * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
   3  */
   4 
   5 /*
   6  * This file contains code imported from the OFED rds source file recv.c
   7  * Oracle elects to have and use the contents of rds_recv.c under and governed
   8  * by the OpenIB.org BSD license (see below for full license text). However,
   9  * the following notice accompanied the original version of this file:
  10  */
  11 
  12 /*
  13  * Copyright (c) 2006 Oracle.  All rights reserved.
  14  *
  15  * This software is available to you under a choice of one of two
  16  * licenses.  You may choose to be licensed under the terms of the GNU
  17  * General Public License (GPL) Version 2, available from the file
  18  * COPYING in the main directory of this source tree, or the
  19  * OpenIB.org BSD license below:
  20  *
  21  *     Redistribution and use in source and binary forms, with or
  22  *     without modification, are permitted provided that the following
  23  *     conditions are met:
  24  *
  25  *      - Redistributions of source code must retain the above
  26  *        copyright notice, this list of conditions and the following
  27  *        disclaimer.
  28  *
  29  *      - Redistributions in binary form must reproduce the above
  30  *        copyright notice, this list of conditions and the following
  31  *        disclaimer in the documentation and/or other materials
  32  *        provided with the distribution.
  33  *
  34  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  35  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  36  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  37  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  38  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  39  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  40  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  41  * SOFTWARE.
  42  *
  43  */
  44 #include <sys/rds.h>
  45 
  46 #include <sys/ib/clients/rdsv3/rdsv3.h>
  47 #include <sys/ib/clients/rdsv3/rdma.h>
  48 #include <sys/ib/clients/rdsv3/rdsv3_debug.h>
  49 
  50 void
  51 rdsv3_inc_init(struct rdsv3_incoming *inc, struct rdsv3_connection *conn,
  52     uint32_be_t saddr)
  53 {
  54         RDSV3_DPRINTF5("rdsv3_inc_init", "Enter(inc: %p, conn: %p)", inc, conn);
  55         inc->i_refcount = 1;
  56         list_link_init(&inc->i_item);
  57         inc->i_conn = conn;
  58         inc->i_saddr = saddr;
  59         inc->i_rdma_cookie = 0;
  60 }
  61 
  62 void
  63 rdsv3_inc_addref(struct rdsv3_incoming *inc)
  64 {
  65         RDSV3_DPRINTF4("rdsv3_inc_addref",
  66             "addref inc %p ref %d", inc, atomic_get(&inc->i_refcount));
  67         atomic_inc_32(&inc->i_refcount);
  68 }
  69 
  70 void
  71 rdsv3_inc_put(struct rdsv3_incoming *inc)
  72 {
  73         RDSV3_DPRINTF4("rdsv3_inc_put", "put inc %p ref %d",
  74             inc, atomic_get(&inc->i_refcount));
  75         if (atomic_dec_and_test(&inc->i_refcount)) {
  76                 ASSERT(!list_link_active(&inc->i_item));
  77 
  78                 inc->i_conn->c_trans->inc_free(inc);
  79         }
  80 }
  81 
  82 /*ARGSUSED*/
  83 static void
  84 rdsv3_recv_rcvbuf_delta(struct rdsv3_sock *rs, struct rsock *sk,
  85     struct rdsv3_cong_map *map,
  86     int delta, uint16_be_t port)
  87 {
  88         int now_congested;
  89 
  90         RDSV3_DPRINTF4("rdsv3_recv_rcvbuf_delta",
  91             "Enter(rs: %p, map: %p, delta: %d, port: %d)",
  92             rs, map, delta, port);
  93 
  94         if (delta == 0)
  95                 return;
  96 
  97         rs->rs_rcv_bytes += delta;
  98         now_congested = rs->rs_rcv_bytes > rdsv3_sk_rcvbuf(rs);
  99 
 100         RDSV3_DPRINTF5("rdsv3_recv_rcvbuf_delta",
 101             "rs %p (%u.%u.%u.%u:%u) recv bytes %d buf %d "
 102             "now_cong %d delta %d",
 103             rs, NIPQUAD(rs->rs_bound_addr),
 104             (int)ntohs(rs->rs_bound_port), rs->rs_rcv_bytes,
 105             rdsv3_sk_rcvbuf(rs), now_congested, delta);
 106 
 107         /* wasn't -> am congested */
 108         if (!rs->rs_congested && now_congested) {
 109                 rs->rs_congested = 1;
 110                 rdsv3_cong_set_bit(map, port);
 111                 rdsv3_cong_queue_updates(map);
 112         }
 113         /* was -> aren't congested */
 114         /*
 115          * Require more free space before reporting uncongested to prevent
 116          * bouncing cong/uncong state too often
 117          */
 118         else if (rs->rs_congested &&
 119             (rs->rs_rcv_bytes < (rdsv3_sk_rcvbuf(rs)/2))) {
 120                 rs->rs_congested = 0;
 121                 rdsv3_cong_clear_bit(map, port);
 122                 rdsv3_cong_queue_updates(map);
 123         }
 124 
 125         /* do nothing if no change in cong state */
 126 
 127         RDSV3_DPRINTF4("rdsv3_recv_rcvbuf_delta", "Return(rs: %p)", rs);
 128 }
 129 
 130 /*
 131  * Process all extension headers that come with this message.
 132  */
 133 static void
 134 rdsv3_recv_incoming_exthdrs(struct rdsv3_incoming *inc, struct rdsv3_sock *rs)
 135 {
 136         struct rdsv3_header *hdr = &inc->i_hdr;
 137         unsigned int pos = 0, type, len;
 138         union {
 139                 struct rdsv3_ext_header_version version;
 140                 struct rdsv3_ext_header_rdma rdma;
 141                 struct rdsv3_ext_header_rdma_dest rdma_dest;
 142         } buffer;
 143 
 144         RDSV3_DPRINTF4("rdsv3_recv_incoming_exthdrs", "Enter");
 145         while (1) {
 146                 len = sizeof (buffer);
 147                 type = rdsv3_message_next_extension(hdr, &pos, &buffer, &len);
 148                 if (type == RDSV3_EXTHDR_NONE)
 149                         break;
 150                 RDSV3_DPRINTF4("recv_incoming_exthdrs", "type %d", type);
 151                 /* Process extension header here */
 152                 switch (type) {
 153                 case RDSV3_EXTHDR_RDMA:
 154                         rdsv3_rdma_unuse(rs, ntohl(buffer.rdma.h_rdma_rkey),
 155                             0);
 156                         break;
 157 
 158                 case RDSV3_EXTHDR_RDMA_DEST:
 159                         /*
 160                          * We ignore the size for now. We could stash it
 161                          * somewhere and use it for error checking.
 162                          */
 163                         inc->i_rdma_cookie = rdsv3_rdma_make_cookie(
 164                             ntohl(buffer.rdma_dest.h_rdma_rkey),
 165                             ntohl(buffer.rdma_dest.h_rdma_offset));
 166 
 167                         break;
 168                 }
 169         }
 170         RDSV3_DPRINTF4("rdsv3_recv_incoming_exthdrs", "Return");
 171 }
 172 
 173 /*
 174  * The transport must make sure that this is serialized against other
 175  * rx and conn reset on this specific conn.
 176  *
 177  * We currently assert that only one fragmented message will be sent
 178  * down a connection at a time.  This lets us reassemble in the conn
 179  * instead of per-flow which means that we don't have to go digging through
 180  * flows to tear down partial reassembly progress on conn failure and
 181  * we save flow lookup and locking for each frag arrival.  It does mean
 182  * that small messages will wait behind large ones.  Fragmenting at all
 183  * is only to reduce the memory consumption of pre-posted buffers.
 184  *
 185  * The caller passes in saddr and daddr instead of us getting it from the
 186  * conn.  This lets loopback, who only has one conn for both directions,
 187  * tell us which roles the addrs in the conn are playing for this message.
 188  */
 189 /* ARGSUSED */
 190 void
 191 rdsv3_recv_incoming(struct rdsv3_connection *conn, uint32_be_t saddr,
 192     uint32_be_t daddr, struct rdsv3_incoming *inc, int gfp)
 193 {
 194         struct rdsv3_sock *rs = NULL;
 195         struct rsock *sk;
 196 
 197         inc->i_conn = conn;
 198         inc->i_rx_jiffies = jiffies;
 199 
 200         RDSV3_DPRINTF5("rdsv3_recv_incoming",
 201             "conn %p next %llu inc %p seq %llu len %u sport %u dport %u "
 202             "flags 0x%x rx_jiffies %lu", conn,
 203             (unsigned long long)conn->c_next_rx_seq,
 204             inc,
 205             (unsigned long long)ntohll(inc->i_hdr.h_sequence),
 206             ntohl(inc->i_hdr.h_len),
 207             ntohs(inc->i_hdr.h_sport),
 208             ntohs(inc->i_hdr.h_dport),
 209             inc->i_hdr.h_flags,
 210             inc->i_rx_jiffies);
 211 
 212         /*
 213          * Sequence numbers should only increase.  Messages get their
 214          * sequence number as they're queued in a sending conn.  They
 215          * can be dropped, though, if the sending socket is closed before
 216          * they hit the wire.  So sequence numbers can skip forward
 217          * under normal operation.  They can also drop back in the conn
 218          * failover case as previously sent messages are resent down the
 219          * new instance of a conn.  We drop those, otherwise we have
 220          * to assume that the next valid seq does not come after a
 221          * hole in the fragment stream.
 222          *
 223          * The headers don't give us a way to realize if fragments of
 224          * a message have been dropped.  We assume that frags that arrive
 225          * to a flow are part of the current message on the flow that is
 226          * being reassembled.  This means that senders can't drop messages
 227          * from the sending conn until all their frags are sent.
 228          *
 229          * XXX we could spend more on the wire to get more robust failure
 230          * detection, arguably worth it to avoid data corruption.
 231          */
 232         if (ntohll(inc->i_hdr.h_sequence) < conn->c_next_rx_seq &&
 233             (inc->i_hdr.h_flags & RDSV3_FLAG_RETRANSMITTED)) {
 234                 rdsv3_stats_inc(s_recv_drop_old_seq);
 235                 goto out;
 236         }
 237         conn->c_next_rx_seq = ntohll(inc->i_hdr.h_sequence) + 1;
 238 
 239         if (rdsv3_sysctl_ping_enable && inc->i_hdr.h_dport == 0) {
 240                 rdsv3_stats_inc(s_recv_ping);
 241                 (void) rdsv3_send_pong(conn, inc->i_hdr.h_sport);
 242                 goto out;
 243         }
 244 
 245         rs = rdsv3_find_bound(conn, inc->i_hdr.h_dport);
 246         if (!rs) {
 247                 rdsv3_stats_inc(s_recv_drop_no_sock);
 248                 goto out;
 249         }
 250 
 251         /* Process extension headers */
 252         rdsv3_recv_incoming_exthdrs(inc, rs);
 253 
 254         /* We can be racing with rdsv3_release() which marks the socket dead. */
 255         sk = rdsv3_rs_to_sk(rs);
 256 
 257         /* serialize with rdsv3_release -> sock_orphan */
 258         rw_enter(&rs->rs_recv_lock, RW_WRITER);
 259         if (!rdsv3_sk_sock_flag(sk, SOCK_DEAD)) {
 260                 int error, bytes;
 261                 RDSV3_DPRINTF5("rdsv3_recv_incoming",
 262                     "adding inc %p to rs %p's recv queue", inc, rs);
 263                 rdsv3_stats_inc(s_recv_queued);
 264                 rdsv3_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
 265                     ntohl(inc->i_hdr.h_len),
 266                     inc->i_hdr.h_dport);
 267                 rdsv3_inc_addref(inc);
 268                 list_insert_tail(&rs->rs_recv_queue, inc);
 269                 bytes = rs->rs_rcv_bytes;
 270                 rw_exit(&rs->rs_recv_lock);
 271 
 272                 __rdsv3_wake_sk_sleep(sk);
 273 
 274                 /* wake up anyone waiting in poll */
 275                 sk->sk_upcalls->su_recv(sk->sk_upper_handle, NULL,
 276                     bytes, 0, &error, NULL);
 277                 if (error != 0) {
 278                         RDSV3_DPRINTF2("rdsv3_recv_incoming",
 279                             "su_recv returned: %d", error);
 280                 }
 281         } else {
 282                 rdsv3_stats_inc(s_recv_drop_dead_sock);
 283                 rw_exit(&rs->rs_recv_lock);
 284         }
 285 
 286 out:
 287         if (rs)
 288                 rdsv3_sock_put(rs);
 289 }
 290 
 291 /*
 292  * be very careful here.  This is being called as the condition in
 293  * wait_event_*() needs to cope with being called many times.
 294  */
 295 static int
 296 rdsv3_next_incoming(struct rdsv3_sock *rs, struct rdsv3_incoming **inc)
 297 {
 298         if (!*inc) {
 299                 rw_enter(&rs->rs_recv_lock, RW_READER);
 300                 if (!list_is_empty(&rs->rs_recv_queue)) {
 301                         *inc = list_head(&rs->rs_recv_queue);
 302                         rdsv3_inc_addref(*inc);
 303                 }
 304                 rw_exit(&rs->rs_recv_lock);
 305         }
 306 
 307         return (*inc != NULL);
 308 }
 309 
 310 static int
 311 rdsv3_still_queued(struct rdsv3_sock *rs, struct rdsv3_incoming *inc,
 312     int drop)
 313 {
 314         struct rsock *sk = rdsv3_rs_to_sk(rs);
 315         int ret = 0;
 316 
 317         RDSV3_DPRINTF4("rdsv3_still_queued", "Enter rs: %p inc: %p drop: %d",
 318             rs, inc, drop);
 319 
 320         rw_enter(&rs->rs_recv_lock, RW_WRITER);
 321         if (list_link_active(&inc->i_item)) {
 322                 ret = 1;
 323                 if (drop) {
 324                         /* XXX make sure this i_conn is reliable */
 325                         rdsv3_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
 326                             -ntohl(inc->i_hdr.h_len),
 327                             inc->i_hdr.h_dport);
 328                         list_remove_node(&inc->i_item);
 329                         rdsv3_inc_put(inc);
 330                 }
 331         }
 332         rw_exit(&rs->rs_recv_lock);
 333 
 334         RDSV3_DPRINTF5("rdsv3_still_queued",
 335             "inc %p rs %p still %d dropped %d", inc, rs, ret, drop);
 336         return (ret);
 337 }
 338 
 339 /*
 340  * Pull errors off the error queue.
 341  * If msghdr is NULL, we will just purge the error queue.
 342  */
 343 int
 344 rdsv3_notify_queue_get(struct rdsv3_sock *rs, struct msghdr *msghdr)
 345 {
 346         struct rdsv3_notifier *notifier;
 347         struct rds_rdma_notify cmsg;
 348         unsigned int count = 0, max_messages = ~0U;
 349         list_t copy;
 350         int err = 0;
 351 
 352         RDSV3_DPRINTF4("rdsv3_notify_queue_get", "Enter(rs: %p)", rs);
 353 
 354         list_create(&copy, sizeof (struct rdsv3_notifier),
 355             offsetof(struct rdsv3_notifier, n_list));
 356 
 357 
 358         /*
 359          * put_cmsg copies to user space and thus may sleep. We can't do this
 360          * with rs_lock held, so first grab as many notifications as we can
 361          * stuff
 362          * in the user provided cmsg buffer. We don't try to copy more, to avoid
 363          * losing notifications - except when the buffer is so small that
 364          * it wouldn't
 365          * even hold a single notification. Then we give him as much of this
 366          * single
 367          * msg as we can squeeze in, and set MSG_CTRUNC.
 368          */
 369         if (msghdr) {
 370                 max_messages =
 371                     msghdr->msg_controllen / CMSG_SPACE(sizeof (cmsg));
 372                 if (!max_messages)
 373                         max_messages = 1;
 374         }
 375 
 376         mutex_enter(&rs->rs_lock);
 377         while (!list_is_empty(&rs->rs_notify_queue) && count < max_messages) {
 378                 notifier = list_remove_head(&rs->rs_notify_queue);
 379                 list_insert_tail(&copy, notifier);
 380                 count++;
 381         }
 382         mutex_exit(&rs->rs_lock);
 383 
 384         if (!count)
 385                 return (0);
 386 
 387         while (!list_is_empty(&copy)) {
 388                 notifier = list_remove_head(&copy);
 389 
 390                 if (msghdr) {
 391                         cmsg.user_token = notifier->n_user_token;
 392                         cmsg.status  = notifier->n_status;
 393 
 394                         err = rdsv3_put_cmsg(msghdr, SOL_RDS,
 395                             RDS_CMSG_RDMA_STATUS, sizeof (cmsg), &cmsg);
 396                         if (err)
 397                                 break;
 398                 }
 399 
 400                 kmem_free(notifier, sizeof (struct rdsv3_notifier));
 401         }
 402 
 403         /*
 404          * If we bailed out because of an error in put_cmsg,
 405          * we may be left with one or more notifications that we
 406          * didn't process. Return them to the head of the list.
 407          */
 408         if (!list_is_empty(&copy)) {
 409                 mutex_enter(&rs->rs_lock);
 410                 list_splice(&copy, &rs->rs_notify_queue);
 411                 mutex_exit(&rs->rs_lock);
 412         }
 413 
 414         RDSV3_DPRINTF4("rdsv3_notify_queue_get", "Return(rs: %p)", rs);
 415 
 416         return (err);
 417 }
 418 
 419 /*
 420  * Queue a congestion notification
 421  */
 422 static int
 423 rdsv3_notify_cong(struct rdsv3_sock *rs, struct msghdr *msghdr)
 424 {
 425         uint64_t notify = rs->rs_cong_notify;
 426         int err;
 427 
 428         err = rdsv3_put_cmsg(msghdr, SOL_RDS, RDS_CMSG_CONG_UPDATE,
 429             sizeof (notify), &notify);
 430         if (err)
 431                 return (err);
 432 
 433         mutex_enter(&rs->rs_lock);
 434         rs->rs_cong_notify &= ~notify;
 435         mutex_exit(&rs->rs_lock);
 436 
 437         return (0);
 438 }
 439 
 440 /*
 441  * Receive any control messages.
 442  */
 443 static int
 444 rdsv3_cmsg_recv(struct rdsv3_incoming *inc, struct msghdr *msg)
 445 {
 446         int ret = 0;
 447         if (inc->i_rdma_cookie) {
 448                 ret = rdsv3_put_cmsg(msg, SOL_RDS, RDS_CMSG_RDMA_DEST,
 449                     sizeof (inc->i_rdma_cookie), &inc->i_rdma_cookie);
 450         }
 451         return (ret);
 452 }
 453 
 454 int
 455 rdsv3_recvmsg(struct rdsv3_sock *rs, uio_t *uio,
 456     struct nmsghdr *msg, size_t size, int msg_flags)
 457 {
 458         struct rsock *sk = rdsv3_rs_to_sk(rs);
 459         long timeo;
 460         int ret = 0;
 461         struct sockaddr_in *sin = NULL;
 462         struct rdsv3_incoming *inc = NULL;
 463         boolean_t nonblock = B_FALSE;
 464 
 465         RDSV3_DPRINTF4("rdsv3_recvmsg",
 466             "Enter(rs: %p size: %d msg_flags: 0x%x)", rs, size, msg_flags);
 467 
 468         if ((uio->uio_fmode & (FNDELAY | FNONBLOCK)) ||
 469             (msg_flags & MSG_DONTWAIT))
 470                 nonblock = B_TRUE;
 471 
 472         /* udp_recvmsg()->sock_recvtimeo() gets away without locking too.. */
 473         timeo = rdsv3_rcvtimeo(sk, nonblock);
 474 
 475         if (msg_flags & MSG_OOB)
 476                 goto out;
 477 
 478         /* mark the first cmsg position */
 479         if (msg) {
 480                 msg->msg_control = NULL;
 481         }
 482 
 483         while (1) {
 484                 /*
 485                  * If there are pending notifications, do those -
 486                  * and nothing else
 487                  */
 488                 if (!list_is_empty(&rs->rs_notify_queue)) {
 489                         ret = rdsv3_notify_queue_get(rs, msg);
 490 
 491                         if (msg && msg->msg_namelen) {
 492                                 sin = kmem_zalloc(sizeof (struct sockaddr_in),
 493                                     KM_SLEEP);
 494                                 sin->sin_family = AF_INET_OFFLOAD;
 495                                 if (inc) {
 496                                         sin->sin_port = inc->i_hdr.h_sport;
 497                                         sin->sin_addr.s_addr = inc->i_saddr;
 498                                 }
 499                                 msg->msg_namelen = sizeof (struct sockaddr_in);
 500                                 msg->msg_name = sin;
 501                         }
 502                         break;
 503                 }
 504 
 505                 if (rs->rs_cong_notify) {
 506                         ret = rdsv3_notify_cong(rs, msg);
 507                         goto out;
 508                 }
 509 
 510                 if (!rdsv3_next_incoming(rs, &inc)) {
 511                         if (nonblock) {
 512                                 ret = -EAGAIN;
 513                                 break;
 514                         }
 515 
 516                         RDSV3_DPRINTF3("rdsv3_recvmsg",
 517                             "Before wait (rs: %p)", rs);
 518 
 519 #if 0
 520                         ret = rdsv3_wait_sig(sk->sk_sleep,
 521                             !(list_is_empty(&rs->rs_notify_queue) &&
 522                             !rs->rs_cong_notify &&
 523                             !rdsv3_next_incoming(rs, &inc)));
 524                         if (ret == 0) {
 525                                 /* signal/timeout pending */
 526                                 RDSV3_DPRINTF2("rdsv3_recvmsg",
 527                                     "woke due to signal");
 528                                 ret = -ERESTART;
 529                         }
 530 #else
 531                         mutex_enter(&sk->sk_sleep->waitq_mutex);
 532                         sk->sk_sleep->waitq_waiters++;
 533                         while ((list_is_empty(&rs->rs_notify_queue) &&
 534                             !rs->rs_cong_notify &&
 535                             !rdsv3_next_incoming(rs, &inc))) {
 536                                 ret = cv_wait_sig(&sk->sk_sleep->waitq_cv,
 537                                     &sk->sk_sleep->waitq_mutex);
 538                                 if (ret == 0) {
 539                                         /* signal/timeout pending */
 540                                         RDSV3_DPRINTF2("rdsv3_recvmsg",
 541                                             "woke due to signal");
 542                                         ret = -EINTR;
 543                                         break;
 544                                 }
 545                         }
 546                         sk->sk_sleep->waitq_waiters--;
 547                         mutex_exit(&sk->sk_sleep->waitq_mutex);
 548 #endif
 549 
 550                         RDSV3_DPRINTF5("rdsv3_recvmsg",
 551                             "recvmsg woke rs: %p inc %p ret %d",
 552                             rs, inc, -ret);
 553 
 554                         if (ret < 0)
 555                                 break;
 556 
 557                         /*
 558                          * if the wakeup was due to rs_notify_queue or
 559                          * rs_cong_notify then we need to handle those first.
 560                          */
 561                         continue;
 562                 }
 563 
 564                 RDSV3_DPRINTF5("rdsv3_recvmsg",
 565                     "copying inc %p from %u.%u.%u.%u:%u to user", inc,
 566                     NIPQUAD(inc->i_conn->c_faddr),
 567                     ntohs(inc->i_hdr.h_sport));
 568 
 569                 ret = inc->i_conn->c_trans->inc_copy_to_user(inc, uio, size);
 570                 if (ret < 0)
 571                         break;
 572 
 573                 /*
 574                  * if the message we just copied isn't at the head of the
 575                  * recv queue then someone else raced us to return it, try
 576                  * to get the next message.
 577                  */
 578                 if (!rdsv3_still_queued(rs, inc, !(msg_flags & MSG_PEEK))) {
 579                         rdsv3_inc_put(inc);
 580                         inc = NULL;
 581                         rdsv3_stats_inc(s_recv_deliver_raced);
 582                         continue;
 583                 }
 584 
 585                 if (ret < ntohl(inc->i_hdr.h_len)) {
 586                         if (msg_flags & MSG_TRUNC)
 587                                 ret = ntohl(inc->i_hdr.h_len);
 588                         msg->msg_flags |= MSG_TRUNC;
 589                 }
 590 
 591                 if (rdsv3_cmsg_recv(inc, msg)) {
 592                         ret = -EFAULT;
 593                         goto out;
 594                 }
 595 
 596                 rdsv3_stats_inc(s_recv_delivered);
 597 
 598                 if (msg->msg_namelen) {
 599                         sin = kmem_alloc(sizeof (struct sockaddr_in), KM_SLEEP);
 600                         sin->sin_family = AF_INET_OFFLOAD;
 601                         sin->sin_port = inc->i_hdr.h_sport;
 602                         sin->sin_addr.s_addr = inc->i_saddr;
 603                         (void) memset(sin->sin_zero, 0,
 604                             sizeof (sin->sin_zero));
 605                         msg->msg_namelen = sizeof (struct sockaddr_in);
 606                         msg->msg_name = sin;
 607                 }
 608                 break;
 609         }
 610 
 611         if (inc)
 612                 rdsv3_inc_put(inc);
 613 
 614 out:
 615         if (msg && msg->msg_control == NULL)
 616                 msg->msg_controllen = 0;
 617 
 618         RDSV3_DPRINTF4("rdsv3_recvmsg", "Return(rs: %p, ret: %d)", rs, ret);
 619 
 620         return (ret);
 621 }
 622 
 623 /*
 624  * The socket is being shut down and we're asked to drop messages that were
 625  * queued for recvmsg.  The caller has unbound the socket so the receive path
 626  * won't queue any more incoming fragments or messages on the socket.
 627  */
 628 void
 629 rdsv3_clear_recv_queue(struct rdsv3_sock *rs)
 630 {
 631         struct rsock *sk = rdsv3_rs_to_sk(rs);
 632         struct rdsv3_incoming *inc, *tmp;
 633 
 634         RDSV3_DPRINTF4("rdsv3_clear_recv_queue", "Enter(rs: %p)", rs);
 635 
 636         rw_enter(&rs->rs_recv_lock, RW_WRITER);
 637         RDSV3_FOR_EACH_LIST_NODE_SAFE(inc, tmp, &rs->rs_recv_queue, i_item) {
 638                 rdsv3_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
 639                     -ntohl(inc->i_hdr.h_len),
 640                     inc->i_hdr.h_dport);
 641                 list_remove_node(&inc->i_item);
 642                 rdsv3_inc_put(inc);
 643         }
 644         rw_exit(&rs->rs_recv_lock);
 645 
 646         RDSV3_DPRINTF4("rdsv3_clear_recv_queue", "Return(rs: %p)", rs);
 647 }
 648 
 649 /*
 650  * inc->i_saddr isn't used here because it is only set in the receive
 651  * path.
 652  */
 653 void
 654 rdsv3_inc_info_copy(struct rdsv3_incoming *inc,
 655     struct rdsv3_info_iterator *iter,
 656     uint32_be_t saddr, uint32_be_t daddr, int flip)
 657 {
 658         struct rds_info_message minfo;
 659 
 660         minfo.seq = ntohll(inc->i_hdr.h_sequence);
 661         minfo.len = ntohl(inc->i_hdr.h_len);
 662 
 663         if (flip) {
 664                 minfo.laddr = daddr;
 665                 minfo.faddr = saddr;
 666                 minfo.lport = inc->i_hdr.h_dport;
 667                 minfo.fport = inc->i_hdr.h_sport;
 668         } else {
 669                 minfo.laddr = saddr;
 670                 minfo.faddr = daddr;
 671                 minfo.lport = inc->i_hdr.h_sport;
 672                 minfo.fport = inc->i_hdr.h_dport;
 673         }
 674 
 675         rdsv3_info_copy(iter, &minfo, sizeof (minfo));
 676 }