1 /*
   2  * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
   3  */
   4 
   5 /*
   6  * This file contains code imported from the OFED rds source file send.c
   7  * Oracle elects to have and use the contents of send.c under and governed
   8  * by the OpenIB.org BSD license (see below for full license text). However,
   9  * the following notice accompanied the original version of this file:
  10  */
  11 
  12 /*
  13  * Copyright (c) 2006 Oracle.  All rights reserved.
  14  *
  15  * This software is available to you under a choice of one of two
  16  * licenses.  You may choose to be licensed under the terms of the GNU
  17  * General Public License (GPL) Version 2, available from the file
  18  * COPYING in the main directory of this source tree, or the
  19  * OpenIB.org BSD license below:
  20  *
  21  *     Redistribution and use in source and binary forms, with or
  22  *     without modification, are permitted provided that the following
  23  *     conditions are met:
  24  *
  25  *      - Redistributions of source code must retain the above
  26  *        copyright notice, this list of conditions and the following
  27  *        disclaimer.
  28  *
  29  *      - Redistributions in binary form must reproduce the above
  30  *        copyright notice, this list of conditions and the following
  31  *        disclaimer in the documentation and/or other materials
  32  *        provided with the distribution.
  33  *
  34  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  35  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  36  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  37  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  38  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  39  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  40  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  41  * SOFTWARE.
  42  *
  43  */
  44 #include <sys/stropts.h>
  45 #include <sys/systm.h>
  46 
  47 #include <sys/rds.h>
  48 #include <sys/socket.h>
  49 #include <sys/socketvar.h>
  50 
  51 #include <sys/ib/clients/rdsv3/rdsv3.h>
  52 #include <sys/ib/clients/rdsv3/rdma.h>
  53 #include <sys/ib/clients/rdsv3/rdsv3_debug.h>
  54 
  55 /*
  56  * When transmitting messages in rdsv3_send_xmit, we need to emerge from
  57  * time to time and briefly release the CPU. Otherwise the softlock watchdog
  58  * will kick our shin.
  59  * Also, it seems fairer to not let one busy connection stall all the
  60  * others.
  61  *
  62  * send_batch_count is the number of times we'll loop in send_xmit. Setting
  63  * it to 0 will restore the old behavior (where we looped until we had
  64  * drained the queue).
  65  */
  66 static int send_batch_count = 64;
  67 
  68 extern void rdsv3_ib_send_unmap_rdma(void *ic, struct rdsv3_rdma_op *op);
  69 /*
  70  * Reset the send state. Caller must hold c_send_lock when calling here.
  71  */
  72 void
  73 rdsv3_send_reset(struct rdsv3_connection *conn)
  74 {
  75         struct rdsv3_message *rm, *tmp;
  76         struct rdsv3_rdma_op *ro;
  77 
  78         RDSV3_DPRINTF4("rdsv3_send_reset", "Enter(conn: %p)", conn);
  79 
  80         ASSERT(MUTEX_HELD(&conn->c_send_lock));
  81 
  82         if (conn->c_xmit_rm) {
  83                 rm = conn->c_xmit_rm;
  84                 ro = rm->m_rdma_op;
  85                 if (ro && ro->r_mapped) {
  86                         RDSV3_DPRINTF2("rdsv3_send_reset",
  87                             "rm %p mflg 0x%x map %d mihdl %p sgl %p",
  88                             rm, rm->m_flags, ro->r_mapped,
  89                             ro->r_rdma_sg[0].mihdl,
  90                             ro->r_rdma_sg[0].swr.wr_sgl);
  91                         rdsv3_ib_send_unmap_rdma(conn->c_transport_data, ro);
  92                 }
  93                 /*
  94                  * Tell the user the RDMA op is no longer mapped by the
  95                  * transport. This isn't entirely true (it's flushed out
  96                  * independently) but as the connection is down, there's
  97                  * no ongoing RDMA to/from that memory
  98                  */
  99                 rdsv3_message_unmapped(conn->c_xmit_rm);
 100                 rdsv3_message_put(conn->c_xmit_rm);
 101                 conn->c_xmit_rm = NULL;
 102         }
 103 
 104         conn->c_xmit_sg = 0;
 105         conn->c_xmit_hdr_off = 0;
 106         conn->c_xmit_data_off = 0;
 107         conn->c_xmit_rdma_sent = 0;
 108         conn->c_map_queued = 0;
 109 
 110         conn->c_unacked_packets = rdsv3_sysctl_max_unacked_packets;
 111         conn->c_unacked_bytes = rdsv3_sysctl_max_unacked_bytes;
 112 
 113         /* Mark messages as retransmissions, and move them to the send q */
 114         mutex_enter(&conn->c_lock);
 115         RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) {
 116                 set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags);
 117                 set_bit(RDSV3_MSG_RETRANSMITTED, &rm->m_flags);
 118                 if (rm->m_rdma_op && rm->m_rdma_op->r_mapped) {
 119                         RDSV3_DPRINTF4("_send_reset",
 120                             "RT rm %p mflg 0x%x sgl %p",
 121                             rm, rm->m_flags,
 122                             rm->m_rdma_op->r_rdma_sg[0].swr.wr_sgl);
 123                 }
 124         }
 125         list_move_tail(&conn->c_send_queue, &conn->c_retrans);
 126         mutex_exit(&conn->c_lock);
 127 
 128         RDSV3_DPRINTF4("rdsv3_send_reset", "Return(conn: %p)", conn);
 129 }
 130 
 131 /*
 132  * We're making the concious trade-off here to only send one message
 133  * down the connection at a time.
 134  *   Pro:
 135  *      - tx queueing is a simple fifo list
 136  *      - reassembly is optional and easily done by transports per conn
 137  *      - no per flow rx lookup at all, straight to the socket
 138  *      - less per-frag memory and wire overhead
 139  *   Con:
 140  *      - queued acks can be delayed behind large messages
 141  *   Depends:
 142  *      - small message latency is higher behind queued large messages
 143  *      - large message latency isn't starved by intervening small sends
 144  */
 145 int
 146 rdsv3_send_xmit(struct rdsv3_connection *conn)
 147 {
 148         struct rdsv3_message *rm;
 149         unsigned int tmp;
 150         unsigned int send_quota = send_batch_count;
 151         struct rdsv3_scatterlist *sg;
 152         int ret = 0;
 153         int was_empty = 0;
 154         list_t to_be_dropped;
 155 
 156 restart:
 157         if (!rdsv3_conn_up(conn))
 158                 goto out;
 159 
 160         RDSV3_DPRINTF4("rdsv3_send_xmit", "Enter(conn: %p)", conn);
 161 
 162         list_create(&to_be_dropped, sizeof (struct rdsv3_message),
 163             offsetof(struct rdsv3_message, m_conn_item));
 164 
 165         /*
 166          * sendmsg calls here after having queued its message on the send
 167          * queue.  We only have one task feeding the connection at a time.  If
 168          * another thread is already feeding the queue then we back off.  This
 169          * avoids blocking the caller and trading per-connection data between
 170          * caches per message.
 171          */
 172         if (!mutex_tryenter(&conn->c_send_lock)) {
 173                 RDSV3_DPRINTF4("rdsv3_send_xmit",
 174                     "Another thread running(conn: %p)", conn);
 175                 rdsv3_stats_inc(s_send_sem_contention);
 176                 ret = -ENOMEM;
 177                 goto out;
 178         }
 179         atomic_add_32(&conn->c_senders, 1);
 180 
 181         if (conn->c_trans->xmit_prepare)
 182                 conn->c_trans->xmit_prepare(conn);
 183 
 184         /*
 185          * spin trying to push headers and data down the connection until
 186          * the connection doesn't make forward progress.
 187          */
 188         while (--send_quota) {
 189                 /*
 190                  * See if need to send a congestion map update if we're
 191                  * between sending messages.  The send_sem protects our sole
 192                  * use of c_map_offset and _bytes.
 193                  * Note this is used only by transports that define a special
 194                  * xmit_cong_map function. For all others, we create allocate
 195                  * a cong_map message and treat it just like any other send.
 196                  */
 197                 if (conn->c_map_bytes) {
 198                         ret = conn->c_trans->xmit_cong_map(conn, conn->c_lcong,
 199                             conn->c_map_offset);
 200                         if (ret <= 0)
 201                                 break;
 202 
 203                         conn->c_map_offset += ret;
 204                         conn->c_map_bytes -= ret;
 205                         if (conn->c_map_bytes)
 206                                 continue;
 207                 }
 208 
 209                 /*
 210                  * If we're done sending the current message, clear the
 211                  * offset and S/G temporaries.
 212                  */
 213                 rm = conn->c_xmit_rm;
 214                 if (rm != NULL &&
 215                     conn->c_xmit_hdr_off == sizeof (struct rdsv3_header) &&
 216                     conn->c_xmit_sg == rm->m_nents) {
 217                         conn->c_xmit_rm = NULL;
 218                         conn->c_xmit_sg = 0;
 219                         conn->c_xmit_hdr_off = 0;
 220                         conn->c_xmit_data_off = 0;
 221                         conn->c_xmit_rdma_sent = 0;
 222 
 223                         /* Release the reference to the previous message. */
 224                         rdsv3_message_put(rm);
 225                         rm = NULL;
 226                 }
 227 
 228                 /* If we're asked to send a cong map update, do so. */
 229                 if (rm == NULL && test_and_clear_bit(0, &conn->c_map_queued)) {
 230                         if (conn->c_trans->xmit_cong_map != NULL) {
 231                                 conn->c_map_offset = 0;
 232                                 conn->c_map_bytes =
 233                                     sizeof (struct rdsv3_header) +
 234                                     RDSV3_CONG_MAP_BYTES;
 235                                 continue;
 236                         }
 237 
 238                         rm = rdsv3_cong_update_alloc(conn);
 239                         if (IS_ERR(rm)) {
 240                                 ret = PTR_ERR(rm);
 241                                 break;
 242                         }
 243 
 244                         conn->c_xmit_rm = rm;
 245                 }
 246 
 247                 /*
 248                  * Grab the next message from the send queue, if there is one.
 249                  *
 250                  * c_xmit_rm holds a ref while we're sending this message down
 251                  * the connction.  We can use this ref while holding the
 252                  * send_sem.. rdsv3_send_reset() is serialized with it.
 253                  */
 254                 if (rm == NULL) {
 255                         unsigned int len;
 256 
 257                         mutex_enter(&conn->c_lock);
 258 
 259                         if (!list_is_empty(&conn->c_send_queue)) {
 260                                 rm = list_remove_head(&conn->c_send_queue);
 261                                 rdsv3_message_addref(rm);
 262 
 263                                 /*
 264                                  * Move the message from the send queue to
 265                                  * the retransmit
 266                                  * list right away.
 267                                  */
 268                                 list_insert_tail(&conn->c_retrans, rm);
 269                         }
 270 
 271                         mutex_exit(&conn->c_lock);
 272 
 273                         if (rm == NULL) {
 274                                 was_empty = 1;
 275                                 break;
 276                         }
 277 
 278                         /*
 279                          * Unfortunately, the way Infiniband deals with
 280                          * RDMA to a bad MR key is by moving the entire
 281                          * queue pair to error state. We cold possibly
 282                          * recover from that, but right now we drop the
 283                          * connection.
 284                          * Therefore, we never retransmit messages with
 285                          * RDMA ops.
 286                          */
 287                         if (rm->m_rdma_op &&
 288                             test_bit(RDSV3_MSG_RETRANSMITTED, &rm->m_flags)) {
 289                                 mutex_enter(&conn->c_lock);
 290                                 if (test_and_clear_bit(RDSV3_MSG_ON_CONN,
 291                                     &rm->m_flags))
 292                                         list_remove_node(&rm->m_conn_item);
 293                                         list_insert_tail(&to_be_dropped, rm);
 294                                 mutex_exit(&conn->c_lock);
 295                                 rdsv3_message_put(rm);
 296                                 continue;
 297                         }
 298 
 299                         /* Require an ACK every once in a while */
 300                         len = ntohl(rm->m_inc.i_hdr.h_len);
 301                         if (conn->c_unacked_packets == 0 ||
 302                             conn->c_unacked_bytes < len) {
 303                                 set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags);
 304 
 305                                 conn->c_unacked_packets =
 306                                     rdsv3_sysctl_max_unacked_packets;
 307                                 conn->c_unacked_bytes =
 308                                     rdsv3_sysctl_max_unacked_bytes;
 309                                 rdsv3_stats_inc(s_send_ack_required);
 310                         } else {
 311                                 conn->c_unacked_bytes -= len;
 312                                 conn->c_unacked_packets--;
 313                         }
 314 
 315                         conn->c_xmit_rm = rm;
 316                 }
 317 
 318                 /*
 319                  * Try and send an rdma message.  Let's see if we can
 320                  * keep this simple and require that the transport either
 321                  * send the whole rdma or none of it.
 322                  */
 323                 if (rm->m_rdma_op && !conn->c_xmit_rdma_sent) {
 324                         ret = conn->c_trans->xmit_rdma(conn, rm->m_rdma_op);
 325                         if (ret)
 326                                 break;
 327                         conn->c_xmit_rdma_sent = 1;
 328                         /*
 329                          * The transport owns the mapped memory for now.
 330                          * You can't unmap it while it's on the send queue
 331                          */
 332                         set_bit(RDSV3_MSG_MAPPED, &rm->m_flags);
 333                 }
 334 
 335                 if (conn->c_xmit_hdr_off < sizeof (struct rdsv3_header) ||
 336                     conn->c_xmit_sg < rm->m_nents) {
 337                         ret = conn->c_trans->xmit(conn, rm,
 338                             conn->c_xmit_hdr_off,
 339                             conn->c_xmit_sg,
 340                             conn->c_xmit_data_off);
 341                         if (ret <= 0)
 342                                 break;
 343 
 344                         if (conn->c_xmit_hdr_off <
 345                             sizeof (struct rdsv3_header)) {
 346                                 tmp = min(ret,
 347                                     sizeof (struct rdsv3_header) -
 348                                     conn->c_xmit_hdr_off);
 349                                 conn->c_xmit_hdr_off += tmp;
 350                                 ret -= tmp;
 351                         }
 352 
 353                         sg = &rm->m_sg[conn->c_xmit_sg];
 354                         while (ret) {
 355                                 tmp = min(ret, rdsv3_sg_len(sg) -
 356                                     conn->c_xmit_data_off);
 357                                 conn->c_xmit_data_off += tmp;
 358                                 ret -= tmp;
 359                                 if (conn->c_xmit_data_off == rdsv3_sg_len(sg)) {
 360                                         conn->c_xmit_data_off = 0;
 361                                         sg++;
 362                                         conn->c_xmit_sg++;
 363                                         ASSERT(!(ret != 0 &&
 364                                             conn->c_xmit_sg == rm->m_nents));
 365                                 }
 366                         }
 367                 }
 368         }
 369 
 370         /* Nuke any messages we decided not to retransmit. */
 371         if (!list_is_empty(&to_be_dropped))
 372                 rdsv3_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
 373 
 374         if (conn->c_trans->xmit_complete)
 375                 conn->c_trans->xmit_complete(conn);
 376 
 377         /*
 378          * We might be racing with another sender who queued a message but
 379          * backed off on noticing that we held the c_send_lock.  If we check
 380          * for queued messages after dropping the sem then either we'll
 381          * see the queued message or the queuer will get the sem.  If we
 382          * notice the queued message then we trigger an immediate retry.
 383          *
 384          * We need to be careful only to do this when we stopped processing
 385          * the send queue because it was empty.  It's the only way we
 386          * stop processing the loop when the transport hasn't taken
 387          * responsibility for forward progress.
 388          */
 389         mutex_exit(&conn->c_send_lock);
 390 
 391         if (conn->c_map_bytes || (send_quota == 0 && !was_empty)) {
 392                 /*
 393                  * We exhausted the send quota, but there's work left to
 394                  * do. Return and (re-)schedule the send worker.
 395                  */
 396                 ret = -EAGAIN;
 397         }
 398 
 399         atomic_dec_32(&conn->c_senders);
 400 
 401         if (ret == 0 && was_empty) {
 402                 /*
 403                  * A simple bit test would be way faster than taking the
 404                  * spin lock
 405                  */
 406                 mutex_enter(&conn->c_lock);
 407                 if (!list_is_empty(&conn->c_send_queue)) {
 408                         rdsv3_stats_inc(s_send_sem_queue_raced);
 409                         ret = -EAGAIN;
 410                 }
 411                 mutex_exit(&conn->c_lock);
 412         }
 413 
 414 out:
 415         RDSV3_DPRINTF4("rdsv3_send_xmit", "Return(conn: %p, ret: %d)",
 416             conn, ret);
 417         return (ret);
 418 }
 419 
 420 static void
 421 rdsv3_send_sndbuf_remove(struct rdsv3_sock *rs, struct rdsv3_message *rm)
 422 {
 423         uint32_t len = ntohl(rm->m_inc.i_hdr.h_len);
 424 
 425         ASSERT(mutex_owned(&rs->rs_lock));
 426 
 427         ASSERT(rs->rs_snd_bytes >= len);
 428         rs->rs_snd_bytes -= len;
 429 
 430         if (rs->rs_snd_bytes == 0)
 431                 rdsv3_stats_inc(s_send_queue_empty);
 432 }
 433 
 434 static inline int
 435 rdsv3_send_is_acked(struct rdsv3_message *rm, uint64_t ack,
 436     is_acked_func is_acked)
 437 {
 438         if (is_acked)
 439                 return (is_acked(rm, ack));
 440         return (ntohll(rm->m_inc.i_hdr.h_sequence) <= ack);
 441 }
 442 
 443 /*
 444  * Returns true if there are no messages on the send and retransmit queues
 445  * which have a sequence number greater than or equal to the given sequence
 446  * number.
 447  */
 448 int
 449 rdsv3_send_acked_before(struct rdsv3_connection *conn, uint64_t seq)
 450 {
 451         struct rdsv3_message *rm;
 452         int ret = 1;
 453 
 454         RDSV3_DPRINTF4("rdsv3_send_acked_before", "Enter(conn: %p)", conn);
 455 
 456         mutex_enter(&conn->c_lock);
 457 
 458         /* XXX - original code spits out warning */
 459         rm = list_head(&conn->c_retrans);
 460         if (ntohll(rm->m_inc.i_hdr.h_sequence) < seq)
 461                 ret = 0;
 462 
 463         /* XXX - original code spits out warning */
 464         rm = list_head(&conn->c_send_queue);
 465         if (ntohll(rm->m_inc.i_hdr.h_sequence) < seq)
 466                 ret = 0;
 467 
 468         mutex_exit(&conn->c_lock);
 469 
 470         RDSV3_DPRINTF4("rdsv3_send_acked_before", "Return(conn: %p)", conn);
 471 
 472         return (ret);
 473 }
 474 
 475 /*
 476  * This is pretty similar to what happens below in the ACK
 477  * handling code - except that we call here as soon as we get
 478  * the IB send completion on the RDMA op and the accompanying
 479  * message.
 480  */
 481 void
 482 rdsv3_rdma_send_complete(struct rdsv3_message *rm, int status)
 483 {
 484         struct rdsv3_sock *rs = NULL;
 485         struct rdsv3_rdma_op *ro;
 486         struct rdsv3_notifier *notifier;
 487 
 488         RDSV3_DPRINTF4("rdsv3_rdma_send_complete", "Enter(rm: %p)", rm);
 489 
 490         mutex_enter(&rm->m_rs_lock);
 491 
 492         ro = rm->m_rdma_op;
 493         if (test_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags) &&
 494             ro && ro->r_notify && ro->r_notifier) {
 495                 notifier = ro->r_notifier;
 496                 rs = rm->m_rs;
 497                 rdsv3_sk_sock_hold(rdsv3_rs_to_sk(rs));
 498 
 499                 notifier->n_status = status;
 500                 mutex_enter(&rs->rs_lock);
 501                 list_insert_tail(&rs->rs_notify_queue, notifier);
 502                 mutex_exit(&rs->rs_lock);
 503                 ro->r_notifier = NULL;
 504         }
 505 
 506         mutex_exit(&rm->m_rs_lock);
 507 
 508         if (rs) {
 509                 struct rsock *sk = rdsv3_rs_to_sk(rs);
 510                 int error;
 511 
 512                 rdsv3_wake_sk_sleep(rs);
 513 
 514                 /* wake up anyone waiting in poll */
 515                 sk->sk_upcalls->su_recv(sk->sk_upper_handle, NULL,
 516                     0, 0, &error, NULL);
 517                 if (error != 0) {
 518                         RDSV3_DPRINTF2("rdsv3_recv_incoming",
 519                             "su_recv returned: %d", error);
 520                 }
 521 
 522                 rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs));
 523         }
 524 
 525         RDSV3_DPRINTF4("rdsv3_rdma_send_complete", "Return(rm: %p)", rm);
 526 }
 527 
 528 /*
 529  * This is the same as rdsv3_rdma_send_complete except we
 530  * don't do any locking - we have all the ingredients (message,
 531  * socket, socket lock) and can just move the notifier.
 532  */
 533 static inline void
 534 __rdsv3_rdma_send_complete(struct rdsv3_sock *rs, struct rdsv3_message *rm,
 535     int status)
 536 {
 537         struct rdsv3_rdma_op *ro;
 538         void *ic;
 539 
 540         RDSV3_DPRINTF4("__rdsv3_rdma_send_complete",
 541             "Enter(rs: %p, rm: %p)", rs, rm);
 542 
 543         ro = rm->m_rdma_op;
 544         if (ro && ro->r_notify && ro->r_notifier) {
 545                 ro->r_notifier->n_status = status;
 546                 list_insert_tail(&rs->rs_notify_queue, ro->r_notifier);
 547                 ro->r_notifier = NULL;
 548         }
 549 
 550         /* No need to wake the app - caller does this */
 551 }
 552 
 553 /*
 554  * This is called from the IB send completion when we detect
 555  * a RDMA operation that failed with remote access error.
 556  * So speed is not an issue here.
 557  */
 558 struct rdsv3_message *
 559 rdsv3_send_get_message(struct rdsv3_connection *conn,
 560     struct rdsv3_rdma_op *op)
 561 {
 562         struct rdsv3_message *rm, *tmp, *found = NULL;
 563 
 564         RDSV3_DPRINTF4("rdsv3_send_get_message", "Enter(conn: %p)", conn);
 565 
 566         mutex_enter(&conn->c_lock);
 567 
 568         RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) {
 569                 if (rm->m_rdma_op == op) {
 570                         atomic_add_32(&rm->m_refcount, 1);
 571                         found = rm;
 572                         goto out;
 573                 }
 574         }
 575 
 576         RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_send_queue,
 577             m_conn_item) {
 578                 if (rm->m_rdma_op == op) {
 579                         atomic_add_32(&rm->m_refcount, 1);
 580                         found = rm;
 581                         break;
 582                 }
 583         }
 584 
 585 out:
 586         mutex_exit(&conn->c_lock);
 587 
 588         return (found);
 589 }
 590 
 591 /*
 592  * This removes messages from the socket's list if they're on it.  The list
 593  * argument must be private to the caller, we must be able to modify it
 594  * without locks.  The messages must have a reference held for their
 595  * position on the list.  This function will drop that reference after
 596  * removing the messages from the 'messages' list regardless of if it found
 597  * the messages on the socket list or not.
 598  */
 599 void
 600 rdsv3_send_remove_from_sock(struct list *messages, int status)
 601 {
 602         struct rdsv3_sock *rs = NULL;
 603         struct rdsv3_message *rm;
 604 
 605         RDSV3_DPRINTF4("rdsv3_send_remove_from_sock", "Enter");
 606 
 607         while (!list_is_empty(messages)) {
 608                 int was_on_sock = 0;
 609                 rm = list_remove_head(messages);
 610 
 611                 /*
 612                  * If we see this flag cleared then we're *sure* that someone
 613                  * else beat us to removing it from the sock.  If we race
 614                  * with their flag update we'll get the lock and then really
 615                  * see that the flag has been cleared.
 616                  *
 617                  * The message spinlock makes sure nobody clears rm->m_rs
 618                  * while we're messing with it. It does not prevent the
 619                  * message from being removed from the socket, though.
 620                  */
 621                 mutex_enter(&rm->m_rs_lock);
 622                 if (!test_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags))
 623                         goto unlock_and_drop;
 624 
 625                 if (rs != rm->m_rs) {
 626                         if (rs) {
 627                                 rdsv3_wake_sk_sleep(rs);
 628                                 rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs));
 629                         }
 630                         rs = rm->m_rs;
 631                         rdsv3_sk_sock_hold(rdsv3_rs_to_sk(rs));
 632                 }
 633 
 634                 mutex_enter(&rs->rs_lock);
 635                 if (test_and_clear_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags)) {
 636                         struct rdsv3_rdma_op *ro = rm->m_rdma_op;
 637                         struct rdsv3_notifier *notifier;
 638 
 639                         list_remove_node(&rm->m_sock_item);
 640                         rdsv3_send_sndbuf_remove(rs, rm);
 641                         if (ro && ro->r_notifier &&
 642                             (status || ro->r_notify)) {
 643                                 notifier = ro->r_notifier;
 644                                 list_insert_tail(&rs->rs_notify_queue,
 645                                     notifier);
 646                                 if (!notifier->n_status)
 647                                         notifier->n_status = status;
 648                                 rm->m_rdma_op->r_notifier = NULL;
 649                         }
 650                         was_on_sock = 1;
 651                         rm->m_rs = NULL;
 652                 }
 653                 mutex_exit(&rs->rs_lock);
 654 
 655 unlock_and_drop:
 656                 mutex_exit(&rm->m_rs_lock);
 657                 rdsv3_message_put(rm);
 658                 if (was_on_sock)
 659                         rdsv3_message_put(rm);
 660         }
 661 
 662         if (rs) {
 663                 rdsv3_wake_sk_sleep(rs);
 664                 rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs));
 665         }
 666 
 667         RDSV3_DPRINTF4("rdsv3_send_remove_from_sock", "Return");
 668 }
 669 
 670 /*
 671  * Transports call here when they've determined that the receiver queued
 672  * messages up to, and including, the given sequence number.  Messages are
 673  * moved to the retrans queue when rdsv3_send_xmit picks them off the send
 674  * queue. This means that in the TCP case, the message may not have been
 675  * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked
 676  * checks the RDSV3_MSG_HAS_ACK_SEQ bit.
 677  *
 678  * XXX It's not clear to me how this is safely serialized with socket
 679  * destruction.  Maybe it should bail if it sees SOCK_DEAD.
 680  */
 681 void
 682 rdsv3_send_drop_acked(struct rdsv3_connection *conn, uint64_t ack,
 683     is_acked_func is_acked)
 684 {
 685         struct rdsv3_message *rm, *tmp;
 686         list_t list;
 687 
 688         RDSV3_DPRINTF4("rdsv3_send_drop_acked", "Enter(conn: %p)", conn);
 689 
 690         list_create(&list, sizeof (struct rdsv3_message),
 691             offsetof(struct rdsv3_message, m_conn_item));
 692 
 693         mutex_enter(&conn->c_lock);
 694 
 695         RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) {
 696                 if (!rdsv3_send_is_acked(rm, ack, is_acked))
 697                         break;
 698 
 699                 list_remove_node(&rm->m_conn_item);
 700                 list_insert_tail(&list, rm);
 701                 clear_bit(RDSV3_MSG_ON_CONN, &rm->m_flags);
 702         }
 703 
 704 #if 0
 705 XXX
 706         /* order flag updates with spin locks */
 707         if (!list_is_empty(&list))
 708                 smp_mb__after_clear_bit();
 709 #endif
 710 
 711         mutex_exit(&conn->c_lock);
 712 
 713         /* now remove the messages from the sock list as needed */
 714         rdsv3_send_remove_from_sock(&list, RDS_RDMA_SUCCESS);
 715 
 716         RDSV3_DPRINTF4("rdsv3_send_drop_acked", "Return(conn: %p)", conn);
 717 }
 718 
 719 void
 720 rdsv3_send_drop_to(struct rdsv3_sock *rs, struct sockaddr_in *dest)
 721 {
 722         struct rdsv3_message *rm, *tmp;
 723         struct rdsv3_connection *conn;
 724         list_t list;
 725         int wake = 0;
 726 
 727         RDSV3_DPRINTF4("rdsv3_send_drop_to", "Enter(rs: %p)", rs);
 728 
 729         list_create(&list, sizeof (struct rdsv3_message),
 730             offsetof(struct rdsv3_message, m_sock_item));
 731 
 732         /* get all the messages we're dropping under the rs lock */
 733         mutex_enter(&rs->rs_lock);
 734 
 735         RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &rs->rs_send_queue,
 736             m_sock_item) {
 737                 if (dest && (dest->sin_addr.s_addr != rm->m_daddr ||
 738                     dest->sin_port != rm->m_inc.i_hdr.h_dport))
 739                         continue;
 740                 wake = 1;
 741                 list_remove(&rs->rs_send_queue, rm);
 742                 list_insert_tail(&list, rm);
 743                 rdsv3_send_sndbuf_remove(rs, rm);
 744                 clear_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags);
 745         }
 746 
 747         mutex_exit(&rs->rs_lock);
 748 
 749         conn = NULL;
 750 
 751         /* now remove the messages from the conn list as needed */
 752         RDSV3_FOR_EACH_LIST_NODE(rm, &list, m_sock_item) {
 753                 /*
 754                  * We do this here rather than in the loop above, so that
 755                  * we don't have to nest m_rs_lock under rs->rs_lock
 756                  */
 757                 mutex_enter(&rm->m_rs_lock);
 758                 /* If this is a RDMA operation, notify the app. */
 759                 __rdsv3_rdma_send_complete(rs, rm, RDS_RDMA_CANCELED);
 760                 rm->m_rs = NULL;
 761                 mutex_exit(&rm->m_rs_lock);
 762 
 763                 /*
 764                  * If we see this flag cleared then we're *sure* that someone
 765                  * else beat us to removing it from the conn.  If we race
 766                  * with their flag update we'll get the lock and then really
 767                  * see that the flag has been cleared.
 768                  */
 769                 if (!test_bit(RDSV3_MSG_ON_CONN, &rm->m_flags))
 770                         continue;
 771 
 772                 if (conn != rm->m_inc.i_conn) {
 773                         if (conn)
 774                                 mutex_exit(&conn->c_lock);
 775                         conn = rm->m_inc.i_conn;
 776                         mutex_enter(&conn->c_lock);
 777                 }
 778 
 779                 if (test_and_clear_bit(RDSV3_MSG_ON_CONN, &rm->m_flags)) {
 780                         list_remove_node(&rm->m_conn_item);
 781                         rdsv3_message_put(rm);
 782                 }
 783         }
 784 
 785         if (conn)
 786                 mutex_exit(&conn->c_lock);
 787 
 788         if (wake)
 789                 rdsv3_wake_sk_sleep(rs);
 790 
 791         while (!list_is_empty(&list)) {
 792                 rm = list_remove_head(&list);
 793 
 794                 rdsv3_message_wait(rm);
 795                 rdsv3_message_put(rm);
 796         }
 797 
 798         RDSV3_DPRINTF4("rdsv3_send_drop_to", "Return(rs: %p)", rs);
 799 }
 800 
 801 /*
 802  * we only want this to fire once so we use the callers 'queued'.  It's
 803  * possible that another thread can race with us and remove the
 804  * message from the flow with RDSV3_CANCEL_SENT_TO.
 805  */
 806 static int
 807 rdsv3_send_queue_rm(struct rdsv3_sock *rs, struct rdsv3_connection *conn,
 808     struct rdsv3_message *rm, uint16_be_t sport,
 809     uint16_be_t dport, int *queued)
 810 {
 811         uint32_t len;
 812 
 813         RDSV3_DPRINTF4("rdsv3_send_queue_rm", "Enter(rs: %p, rm: %p)", rs, rm);
 814 
 815         if (*queued)
 816                 goto out;
 817 
 818         len = ntohl(rm->m_inc.i_hdr.h_len);
 819 
 820         /*
 821          * this is the only place which holds both the socket's rs_lock
 822          * and the connection's c_lock
 823          */
 824         mutex_enter(&rs->rs_lock);
 825 
 826         /*
 827          * If there is a little space in sndbuf, we don't queue anything,
 828          * and userspace gets -EAGAIN. But poll() indicates there's send
 829          * room. This can lead to bad behavior (spinning) if snd_bytes isn't
 830          * freed up by incoming acks. So we check the *old* value of
 831          * rs_snd_bytes here to allow the last msg to exceed the buffer,
 832          * and poll() now knows no more data can be sent.
 833          */
 834         if (rs->rs_snd_bytes < rdsv3_sk_sndbuf(rs)) {
 835                 rs->rs_snd_bytes += len;
 836 
 837                 /*
 838                  * let recv side know we are close to send space exhaustion.
 839                  * This is probably not the optimal way to do it, as this
 840                  * means we set the flag on *all* messages as soon as our
 841                  * throughput hits a certain threshold.
 842                  */
 843                 if (rs->rs_snd_bytes >= rdsv3_sk_sndbuf(rs) / 2)
 844                         set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags);
 845 
 846                 list_insert_tail(&rs->rs_send_queue, rm);
 847                 set_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags);
 848 
 849                 rdsv3_message_addref(rm);
 850                 rm->m_rs = rs;
 851 
 852                 /*
 853                  * The code ordering is a little weird, but we're
 854                  * trying to minimize the time we hold c_lock
 855                  */
 856                 rdsv3_message_populate_header(&rm->m_inc.i_hdr, sport,
 857                     dport, 0);
 858                 rm->m_inc.i_conn = conn;
 859                 rdsv3_message_addref(rm);       /* XXX - called twice */
 860 
 861                 mutex_enter(&conn->c_lock);
 862                 rm->m_inc.i_hdr.h_sequence = htonll(conn->c_next_tx_seq++);
 863                 list_insert_tail(&conn->c_send_queue, rm);
 864                 set_bit(RDSV3_MSG_ON_CONN, &rm->m_flags);
 865                 mutex_exit(&conn->c_lock);
 866 
 867                 RDSV3_DPRINTF5("rdsv3_send_queue_rm",
 868                     "queued msg %p len %d, rs %p bytes %d seq %llu",
 869                     rm, len, rs, rs->rs_snd_bytes,
 870                     (unsigned long long)ntohll(
 871                     rm->m_inc.i_hdr.h_sequence));
 872 
 873                 *queued = 1;
 874         }
 875 
 876         mutex_exit(&rs->rs_lock);
 877 
 878         RDSV3_DPRINTF4("rdsv3_send_queue_rm", "Return(rs: %p)", rs);
 879 out:
 880         return (*queued);
 881 }
 882 
 883 static int
 884 rdsv3_cmsg_send(struct rdsv3_sock *rs, struct rdsv3_message *rm,
 885     struct msghdr *msg, int *allocated_mr)
 886 {
 887         struct cmsghdr *cmsg;
 888         int ret = 0;
 889 
 890         RDSV3_DPRINTF4("rdsv3_cmsg_send", "Enter(rs: %p)", rs);
 891 
 892         for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
 893 
 894                 if (cmsg->cmsg_level != SOL_RDS)
 895                         continue;
 896 
 897                 RDSV3_DPRINTF4("rdsv3_cmsg_send", "cmsg(%p, %p) type %d",
 898                     cmsg, rm, cmsg->cmsg_type);
 899                 /*
 900                  * As a side effect, RDMA_DEST and RDMA_MAP will set
 901                  * rm->m_rdma_cookie and rm->m_rdma_mr.
 902                  */
 903                 switch (cmsg->cmsg_type) {
 904                 case RDS_CMSG_RDMA_ARGS:
 905                         ret = rdsv3_cmsg_rdma_args(rs, rm, cmsg);
 906                         break;
 907 
 908                 case RDS_CMSG_RDMA_DEST:
 909                         ret = rdsv3_cmsg_rdma_dest(rs, rm, cmsg);
 910                         break;
 911 
 912                 case RDS_CMSG_RDMA_MAP:
 913                         ret = rdsv3_cmsg_rdma_map(rs, rm, cmsg);
 914                         if (ret)
 915                                 *allocated_mr = 1;
 916                         break;
 917 
 918                 default:
 919                         return (-EINVAL);
 920                 }
 921 
 922                 if (ret)
 923                         break;
 924         }
 925 
 926         RDSV3_DPRINTF4("rdsv3_cmsg_send", "Return(rs: %p)", rs);
 927 
 928         return (ret);
 929 }
 930 
 931 extern unsigned long rdsv3_max_bcopy_size;
 932 
 933 int
 934 rdsv3_sendmsg(struct rdsv3_sock *rs, uio_t *uio, struct nmsghdr *msg,
 935     size_t payload_len)
 936 {
 937         struct rsock *sk = rdsv3_rs_to_sk(rs);
 938         struct sockaddr_in *usin = (struct sockaddr_in *)msg->msg_name;
 939         uint32_be_t daddr;
 940         uint16_be_t dport;
 941         struct rdsv3_message *rm = NULL;
 942         struct rdsv3_connection *conn;
 943         int ret = 0;
 944         int queued = 0, allocated_mr = 0;
 945         int nonblock = msg->msg_flags & MSG_DONTWAIT;
 946         long timeo = rdsv3_sndtimeo(sk, nonblock);
 947 
 948         RDSV3_DPRINTF4("rdsv3_sendmsg", "Enter(rs: %p)", rs);
 949 
 950         if (msg->msg_namelen) {
 951                 /* XXX fail non-unicast destination IPs? */
 952                 if (msg->msg_namelen < sizeof (*usin) ||
 953                     usin->sin_family != AF_INET_OFFLOAD) {
 954                         ret = -EINVAL;
 955                         RDSV3_DPRINTF2("rdsv3_sendmsg", "returning: %d", -ret);
 956                         goto out;
 957                 }
 958                 daddr = usin->sin_addr.s_addr;
 959                 dport = usin->sin_port;
 960         } else {
 961                 /* We only care about consistency with ->connect() */
 962                 mutex_enter(&sk->sk_lock);
 963                 daddr = rs->rs_conn_addr;
 964                 dport = rs->rs_conn_port;
 965                 mutex_exit(&sk->sk_lock);
 966         }
 967 
 968         /* racing with another thread binding seems ok here */
 969         if (daddr == 0 || rs->rs_bound_addr == 0) {
 970                 ret = -ENOTCONN; /* XXX not a great errno */
 971                 RDSV3_DPRINTF2("rdsv3_sendmsg", "returning: %d", -ret);
 972                 goto out;
 973         }
 974 
 975         if (payload_len > rdsv3_max_bcopy_size) {
 976                 RDSV3_DPRINTF2("rdsv3_sendmsg", "Message too large: %d",
 977                     payload_len);
 978                 ret = -EMSGSIZE;
 979                 goto out;
 980         }
 981 
 982         rm = rdsv3_message_copy_from_user(uio, payload_len);
 983         if (IS_ERR(rm)) {
 984                 ret = PTR_ERR(rm);
 985                 RDSV3_DPRINTF2("rdsv3_sendmsg",
 986                     "rdsv3_message_copy_from_user failed %d", -ret);
 987                 rm = NULL;
 988                 goto out;
 989         }
 990 
 991         rm->m_daddr = daddr;
 992 
 993         /* Parse any control messages the user may have included. */
 994         ret = rdsv3_cmsg_send(rs, rm, msg, &allocated_mr);
 995         if (ret) {
 996                 RDSV3_DPRINTF2("rdsv3_sendmsg",
 997                     "rdsv3_cmsg_send(rs: %p rm: %p msg: %p) returned: %d",
 998                     rs, rm, msg, ret);
 999                 goto out;
1000         }
1001 
1002         /*
1003          * rdsv3_conn_create has a spinlock that runs with IRQ off.
1004          * Caching the conn in the socket helps a lot.
1005          */
1006         mutex_enter(&rs->rs_conn_lock);
1007         if (rs->rs_conn && rs->rs_conn->c_faddr == daddr) {
1008                 conn = rs->rs_conn;
1009         } else {
1010                 conn = rdsv3_conn_create_outgoing(rs->rs_bound_addr,
1011                     daddr, rs->rs_transport, KM_NOSLEEP);
1012                 if (IS_ERR(conn)) {
1013                         mutex_exit(&rs->rs_conn_lock);
1014                         ret = PTR_ERR(conn);
1015                         RDSV3_DPRINTF2("rdsv3_sendmsg",
1016                             "rdsv3_conn_create_outgoing failed %d",
1017                             -ret);
1018                         goto out;
1019                 }
1020                 rs->rs_conn = conn;
1021         }
1022         mutex_exit(&rs->rs_conn_lock);
1023 
1024         if ((rm->m_rdma_cookie || rm->m_rdma_op) &&
1025             conn->c_trans->xmit_rdma == NULL) {
1026                 RDSV3_DPRINTF2("rdsv3_sendmsg", "rdma_op %p conn xmit_rdma %p",
1027                     rm->m_rdma_op, conn->c_trans->xmit_rdma);
1028                 ret = -EOPNOTSUPP;
1029                 goto out;
1030         }
1031 
1032         /*
1033          * If the connection is down, trigger a connect. We may
1034          * have scheduled a delayed reconnect however - in this case
1035          * we should not interfere.
1036          */
1037         if (rdsv3_conn_state(conn) == RDSV3_CONN_DOWN &&
1038             !test_and_set_bit(RDSV3_RECONNECT_PENDING, &conn->c_flags))
1039                 rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_conn_w, 0);
1040 
1041         ret = rdsv3_cong_wait(conn->c_fcong, dport, nonblock, rs);
1042         if (ret) {
1043                 mutex_enter(&rs->rs_congested_lock);
1044                 rs->rs_seen_congestion = 1;
1045                 cv_signal(&rs->rs_congested_cv);
1046                 mutex_exit(&rs->rs_congested_lock);
1047 
1048                 RDSV3_DPRINTF2("rdsv3_sendmsg",
1049                     "rdsv3_cong_wait (dport: %d) returned: %d", dport, ret);
1050                 goto out;
1051         }
1052 
1053         (void) rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port, dport,
1054             &queued);
1055         if (!queued) {
1056                 /* rdsv3_stats_inc(s_send_queue_full); */
1057                 /* XXX make sure this is reasonable */
1058                 if (payload_len > rdsv3_sk_sndbuf(rs)) {
1059                         ret = -EMSGSIZE;
1060                         RDSV3_DPRINTF2("rdsv3_sendmsg",
1061                             "msgsize(%d) too big, returning: %d",
1062                             payload_len, -ret);
1063                         goto out;
1064                 }
1065                 if (nonblock) {
1066                         ret = -EAGAIN;
1067                         RDSV3_DPRINTF3("rdsv3_sendmsg",
1068                             "send queue full (%d), returning: %d",
1069                             payload_len, -ret);
1070                         goto out;
1071                 }
1072 
1073 #if 0
1074                 ret = rdsv3_wait_sig(sk->sk_sleep,
1075                     (rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port,
1076                     dport, &queued)));
1077                 if (ret == 0) {
1078                         /* signal/timeout pending */
1079                         RDSV3_DPRINTF2("rdsv3_sendmsg",
1080                             "woke due to signal: %d", ret);
1081                         ret = -ERESTART;
1082                         goto out;
1083                 }
1084 #else
1085                 mutex_enter(&sk->sk_sleep->waitq_mutex);
1086                 sk->sk_sleep->waitq_waiters++;
1087                 while (!rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port,
1088                     dport, &queued)) {
1089                         ret = cv_wait_sig(&sk->sk_sleep->waitq_cv,
1090                             &sk->sk_sleep->waitq_mutex);
1091                         if (ret == 0) {
1092                                 /* signal/timeout pending */
1093                                 RDSV3_DPRINTF2("rdsv3_sendmsg",
1094                                     "woke due to signal: %d", ret);
1095                                 ret = -EINTR;
1096                                 sk->sk_sleep->waitq_waiters--;
1097                                 mutex_exit(&sk->sk_sleep->waitq_mutex);
1098                                 goto out;
1099                         }
1100                 }
1101                 sk->sk_sleep->waitq_waiters--;
1102                 mutex_exit(&sk->sk_sleep->waitq_mutex);
1103 #endif
1104 
1105                 RDSV3_DPRINTF5("rdsv3_sendmsg", "sendmsg woke queued %d",
1106                     queued);
1107 
1108                 ASSERT(queued);
1109                 ret = 0;
1110         }
1111 
1112         /*
1113          * By now we've committed to the send.  We reuse rdsv3_send_worker()
1114          * to retry sends in the rds thread if the transport asks us to.
1115          */
1116         rdsv3_stats_inc(s_send_queued);
1117 
1118         if (!test_bit(RDSV3_LL_SEND_FULL, &conn->c_flags))
1119                 (void) rdsv3_send_worker(&conn->c_send_w.work);
1120 
1121         rdsv3_message_put(rm);
1122         RDSV3_DPRINTF4("rdsv3_sendmsg", "Return(rs: %p, len: %d)",
1123             rs, payload_len);
1124         return (payload_len);
1125 
1126 out:
1127         /*
1128          * If the user included a RDMA_MAP cmsg, we allocated a MR on the fly.
1129          * If the sendmsg goes through, we keep the MR. If it fails with EAGAIN
1130          * or in any other way, we need to destroy the MR again
1131          */
1132         if (allocated_mr)
1133                 rdsv3_rdma_unuse(rs, rdsv3_rdma_cookie_key(rm->m_rdma_cookie),
1134                     1);
1135 
1136         if (rm)
1137                 rdsv3_message_put(rm);
1138         return (ret);
1139 }
1140 
1141 /*
1142  * Reply to a ping packet.
1143  */
1144 int
1145 rdsv3_send_pong(struct rdsv3_connection *conn, uint16_be_t dport)
1146 {
1147         struct rdsv3_message *rm;
1148         int ret = 0;
1149 
1150         RDSV3_DPRINTF4("rdsv3_send_pong", "Enter(conn: %p)", conn);
1151 
1152         rm = rdsv3_message_alloc(0, KM_NOSLEEP);
1153         if (!rm) {
1154                 ret = -ENOMEM;
1155                 goto out;
1156         }
1157 
1158         rm->m_daddr = conn->c_faddr;
1159 
1160         /*
1161          * If the connection is down, trigger a connect. We may
1162          * have scheduled a delayed reconnect however - in this case
1163          * we should not interfere.
1164          */
1165         if (rdsv3_conn_state(conn) == RDSV3_CONN_DOWN &&
1166             !test_and_set_bit(RDSV3_RECONNECT_PENDING, &conn->c_flags))
1167                 rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_conn_w, 0);
1168 
1169         ret = rdsv3_cong_wait(conn->c_fcong, dport, 1, NULL);
1170         if (ret)
1171                 goto out;
1172 
1173         mutex_enter(&conn->c_lock);
1174         list_insert_tail(&conn->c_send_queue, rm);
1175         set_bit(RDSV3_MSG_ON_CONN, &rm->m_flags);
1176         rdsv3_message_addref(rm);
1177         rm->m_inc.i_conn = conn;
1178 
1179         rdsv3_message_populate_header(&rm->m_inc.i_hdr, 0, dport,
1180             conn->c_next_tx_seq);
1181         conn->c_next_tx_seq++;
1182         mutex_exit(&conn->c_lock);
1183 
1184         rdsv3_stats_inc(s_send_queued);
1185         rdsv3_stats_inc(s_send_pong);
1186 
1187         if (!test_bit(RDSV3_LL_SEND_FULL, &conn->c_flags))
1188                 (void) rdsv3_send_xmit(conn);
1189 
1190         rdsv3_message_put(rm);
1191 
1192         RDSV3_DPRINTF4("rdsv3_send_pong", "Return(conn: %p)", conn);
1193         return (0);
1194 
1195 out:
1196         if (rm)
1197                 rdsv3_message_put(rm);
1198         return (ret);
1199 }