1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 1983, 2010, Oracle and/or its affiliates. All rights reserved.
  23  */
  24 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
  25 /* All Rights Reserved */
  26 /*
  27  * Portions of this source code were derived from Berkeley
  28  * 4.3 BSD under license from the Regents of the University of
  29  * California.
  30  */
  31 
  32 /*
  33  * Server side of RPC over RDMA in the kernel.
  34  */
  35 
  36 #include <sys/param.h>
  37 #include <sys/types.h>
  38 #include <sys/user.h>
  39 #include <sys/sysmacros.h>
  40 #include <sys/proc.h>
  41 #include <sys/file.h>
  42 #include <sys/errno.h>
  43 #include <sys/kmem.h>
  44 #include <sys/debug.h>
  45 #include <sys/systm.h>
  46 #include <sys/cmn_err.h>
  47 #include <sys/kstat.h>
  48 #include <sys/vtrace.h>
  49 #include <sys/debug.h>
  50 
  51 #include <rpc/types.h>
  52 #include <rpc/xdr.h>
  53 #include <rpc/auth.h>
  54 #include <rpc/clnt.h>
  55 #include <rpc/rpc_msg.h>
  56 #include <rpc/svc.h>
  57 #include <rpc/rpc_rdma.h>
  58 #include <sys/ddi.h>
  59 #include <sys/sunddi.h>
  60 
  61 #include <inet/common.h>
  62 #include <inet/ip.h>
  63 #include <inet/ip6.h>
  64 
  65 #include <nfs/nfs.h>
  66 #include <sys/sdt.h>
  67 
  68 #define SVC_RDMA_SUCCESS 0
  69 #define SVC_RDMA_FAIL -1
  70 
  71 #define SVC_CREDIT_FACTOR (0.5)
  72 
  73 #define MSG_IS_RPCSEC_GSS(msg)          \
  74         ((msg)->rm_reply.rp_acpt.ar_verf.oa_flavor == RPCSEC_GSS)
  75 
  76 
  77 uint32_t rdma_bufs_granted = RDMA_BUFS_GRANT;
  78 
  79 /*
  80  * RDMA transport specific data associated with SVCMASTERXPRT
  81  */
  82 struct rdma_data {
  83         SVCMASTERXPRT   *rd_xprt;       /* back ptr to SVCMASTERXPRT */
  84         struct rdma_svc_data rd_data;   /* rdma data */
  85         rdma_mod_t      *r_mod;         /* RDMA module containing ops ptr */
  86 };
  87 
  88 /*
  89  * Plugin connection specific data stashed away in clone SVCXPRT
  90  */
  91 struct clone_rdma_data {
  92         bool_t          cloned;         /* xprt cloned for thread processing */
  93         CONN            *conn;          /* RDMA connection */
  94         rdma_buf_t      rpcbuf;         /* RPC req/resp buffer */
  95         struct clist    *cl_reply;      /* reply chunk buffer info */
  96         struct clist    *cl_wlist;              /* write list clist */
  97 };
  98 
  99 
 100 #define MAXADDRLEN      128     /* max length for address mask */
 101 
 102 /*
 103  * Routines exported through ops vector.
 104  */
 105 static bool_t           svc_rdma_krecv(SVCXPRT *, mblk_t *, struct rpc_msg *);
 106 static bool_t           svc_rdma_ksend(SVCXPRT *, struct rpc_msg *);
 107 static bool_t           svc_rdma_kgetargs(SVCXPRT *, xdrproc_t, caddr_t);
 108 static bool_t           svc_rdma_kfreeargs(SVCXPRT *, xdrproc_t, caddr_t);
 109 void                    svc_rdma_kdestroy(SVCMASTERXPRT *);
 110 static int              svc_rdma_kdup(struct svc_req *, caddr_t, int,
 111                                 struct dupreq **, bool_t *);
 112 static void             svc_rdma_kdupdone(struct dupreq *, caddr_t,
 113                                 void (*)(), int, int);
 114 static int32_t          *svc_rdma_kgetres(SVCXPRT *, int);
 115 static void             svc_rdma_kfreeres(SVCXPRT *);
 116 static void             svc_rdma_kclone_destroy(SVCXPRT *);
 117 static void             svc_rdma_kstart(SVCMASTERXPRT *);
 118 void                    svc_rdma_kstop(SVCMASTERXPRT *);
 119 static void             svc_rdma_kclone_xprt(SVCXPRT *, SVCXPRT *);
 120 static void             svc_rdma_ktattrs(SVCXPRT *, int, void **);
 121 
 122 static int      svc_process_long_reply(SVCXPRT *, xdrproc_t,
 123                         caddr_t, struct rpc_msg *, bool_t, int *,
 124                         int *, int *, unsigned int *);
 125 
 126 static int      svc_compose_rpcmsg(SVCXPRT *, CONN *, xdrproc_t,
 127                         caddr_t, rdma_buf_t *, XDR **, struct rpc_msg *,
 128                         bool_t, uint_t *);
 129 static bool_t rpcmsg_length(xdrproc_t,
 130                 caddr_t,
 131                 struct rpc_msg *, bool_t, int);
 132 
 133 /*
 134  * Server transport operations vector.
 135  */
 136 struct svc_ops rdma_svc_ops = {
 137         svc_rdma_krecv,         /* Get requests */
 138         svc_rdma_kgetargs,      /* Deserialize arguments */
 139         svc_rdma_ksend,         /* Send reply */
 140         svc_rdma_kfreeargs,     /* Free argument data space */
 141         svc_rdma_kdestroy,      /* Destroy transport handle */
 142         svc_rdma_kdup,          /* Check entry in dup req cache */
 143         svc_rdma_kdupdone,      /* Mark entry in dup req cache as done */
 144         svc_rdma_kgetres,       /* Get pointer to response buffer */
 145         svc_rdma_kfreeres,      /* Destroy pre-serialized response header */
 146         svc_rdma_kclone_destroy,        /* Destroy a clone xprt */
 147         svc_rdma_kstart,        /* Tell `ready-to-receive' to rpcmod */
 148         svc_rdma_kclone_xprt,   /* Transport specific clone xprt */
 149         svc_rdma_ktattrs        /* Get Transport Attributes */
 150 };
 151 
 152 /*
 153  * Server statistics
 154  * NOTE: This structure type is duplicated in the NFS fast path.
 155  */
 156 struct {
 157         kstat_named_t   rscalls;
 158         kstat_named_t   rsbadcalls;
 159         kstat_named_t   rsnullrecv;
 160         kstat_named_t   rsbadlen;
 161         kstat_named_t   rsxdrcall;
 162         kstat_named_t   rsdupchecks;
 163         kstat_named_t   rsdupreqs;
 164         kstat_named_t   rslongrpcs;
 165         kstat_named_t   rstotalreplies;
 166         kstat_named_t   rstotallongreplies;
 167         kstat_named_t   rstotalinlinereplies;
 168 } rdmarsstat = {
 169         { "calls",      KSTAT_DATA_UINT64 },
 170         { "badcalls",   KSTAT_DATA_UINT64 },
 171         { "nullrecv",   KSTAT_DATA_UINT64 },
 172         { "badlen",     KSTAT_DATA_UINT64 },
 173         { "xdrcall",    KSTAT_DATA_UINT64 },
 174         { "dupchecks",  KSTAT_DATA_UINT64 },
 175         { "dupreqs",    KSTAT_DATA_UINT64 },
 176         { "longrpcs",   KSTAT_DATA_UINT64 },
 177         { "totalreplies",       KSTAT_DATA_UINT64 },
 178         { "totallongreplies",   KSTAT_DATA_UINT64 },
 179         { "totalinlinereplies", KSTAT_DATA_UINT64 },
 180 };
 181 
 182 kstat_named_t *rdmarsstat_ptr = (kstat_named_t *)&rdmarsstat;
 183 uint_t rdmarsstat_ndata = sizeof (rdmarsstat) / sizeof (kstat_named_t);
 184 
 185 #define RSSTAT_INCR(x)  atomic_inc_64(&rdmarsstat.x.value.ui64)
 186 /*
 187  * Create a transport record.
 188  * The transport record, output buffer, and private data structure
 189  * are allocated.  The output buffer is serialized into using xdrmem.
 190  * There is one transport record per user process which implements a
 191  * set of services.
 192  */
 193 /* ARGSUSED */
 194 int
 195 svc_rdma_kcreate(char *netid, SVC_CALLOUT_TABLE *sct, int id,
 196     rdma_xprt_group_t *started_xprts)
 197 {
 198         int error;
 199         SVCMASTERXPRT *xprt;
 200         struct rdma_data *rd;
 201         rdma_registry_t *rmod;
 202         rdma_xprt_record_t *xprt_rec;
 203         queue_t *q;
 204         /*
 205          * modload the RDMA plugins is not already done.
 206          */
 207         if (!rdma_modloaded) {
 208                 /*CONSTANTCONDITION*/
 209                 ASSERT(sizeof (struct clone_rdma_data) <= SVC_P2LEN);
 210 
 211                 mutex_enter(&rdma_modload_lock);
 212                 if (!rdma_modloaded) {
 213                         error = rdma_modload();
 214                 }
 215                 mutex_exit(&rdma_modload_lock);
 216 
 217                 if (error)
 218                         return (error);
 219         }
 220 
 221         /*
 222          * master_xprt_count is the count of master transport handles
 223          * that were successfully created and are ready to recieve for
 224          * RDMA based access.
 225          */
 226         error = 0;
 227         xprt_rec = NULL;
 228         rw_enter(&rdma_lock, RW_READER);
 229         if (rdma_mod_head == NULL) {
 230                 started_xprts->rtg_count = 0;
 231                 rw_exit(&rdma_lock);
 232                 if (rdma_dev_available)
 233                         return (EPROTONOSUPPORT);
 234                 else
 235                         return (ENODEV);
 236         }
 237 
 238         /*
 239          * If we have reached here, then atleast one RDMA plugin has loaded.
 240          * Create a master_xprt, make it start listenining on the device,
 241          * if an error is generated, record it, we might need to shut
 242          * the master_xprt.
 243          * SVC_START() calls svc_rdma_kstart which calls plugin binding
 244          * routines.
 245          */
 246         for (rmod = rdma_mod_head; rmod != NULL; rmod = rmod->r_next) {
 247 
 248                 /*
 249                  * One SVCMASTERXPRT per RDMA plugin.
 250                  */
 251                 xprt = kmem_zalloc(sizeof (*xprt), KM_SLEEP);
 252                 xprt->xp_ops = &rdma_svc_ops;
 253                 xprt->xp_sct = sct;
 254                 xprt->xp_type = T_RDMA;
 255                 mutex_init(&xprt->xp_req_lock, NULL, MUTEX_DEFAULT, NULL);
 256                 mutex_init(&xprt->xp_thread_lock, NULL, MUTEX_DEFAULT, NULL);
 257                 xprt->xp_req_head = (mblk_t *)0;
 258                 xprt->xp_req_tail = (mblk_t *)0;
 259                 xprt->xp_threads = 0;
 260                 xprt->xp_detached_threads = 0;
 261 
 262                 rd = kmem_zalloc(sizeof (*rd), KM_SLEEP);
 263                 xprt->xp_p2 = (caddr_t)rd;
 264                 rd->rd_xprt = xprt;
 265                 rd->r_mod = rmod->r_mod;
 266 
 267                 q = &rd->rd_data.q;
 268                 xprt->xp_wq = q;
 269                 q->q_ptr = &rd->rd_xprt;
 270                 xprt->xp_netid = NULL;
 271 
 272                 /*
 273                  * Each of the plugins will have their own Service ID
 274                  * to listener specific mapping, like port number for VI
 275                  * and service name for IB.
 276                  */
 277                 rd->rd_data.svcid = id;
 278                 error = svc_xprt_register(xprt, id);
 279                 if (error) {
 280                         DTRACE_PROBE(krpc__e__svcrdma__xprt__reg);
 281                         goto cleanup;
 282                 }
 283 
 284                 SVC_START(xprt);
 285                 if (!rd->rd_data.active) {
 286                         svc_xprt_unregister(xprt);
 287                         error = rd->rd_data.err_code;
 288                         goto cleanup;
 289                 }
 290 
 291                 /*
 292                  * This is set only when there is atleast one or more
 293                  * transports successfully created. We insert the pointer
 294                  * to the created RDMA master xprt into a separately maintained
 295                  * list. This way we can easily reference it later to cleanup,
 296                  * when NFS kRPC service pool is going away/unregistered.
 297                  */
 298                 started_xprts->rtg_count ++;
 299                 xprt_rec = kmem_alloc(sizeof (*xprt_rec), KM_SLEEP);
 300                 xprt_rec->rtr_xprt_ptr = xprt;
 301                 xprt_rec->rtr_next = started_xprts->rtg_listhead;
 302                 started_xprts->rtg_listhead = xprt_rec;
 303                 continue;
 304 cleanup:
 305                 SVC_DESTROY(xprt);
 306                 if (error == RDMA_FAILED)
 307                         error = EPROTONOSUPPORT;
 308         }
 309 
 310         rw_exit(&rdma_lock);
 311 
 312         /*
 313          * Don't return any error even if a single plugin was started
 314          * successfully.
 315          */
 316         if (started_xprts->rtg_count == 0)
 317                 return (error);
 318         return (0);
 319 }
 320 
 321 /*
 322  * Cleanup routine for freeing up memory allocated by
 323  * svc_rdma_kcreate()
 324  */
 325 void
 326 svc_rdma_kdestroy(SVCMASTERXPRT *xprt)
 327 {
 328         struct rdma_data *rd = (struct rdma_data *)xprt->xp_p2;
 329 
 330 
 331         mutex_destroy(&xprt->xp_req_lock);
 332         mutex_destroy(&xprt->xp_thread_lock);
 333         kmem_free(rd, sizeof (*rd));
 334         kmem_free(xprt, sizeof (*xprt));
 335 }
 336 
 337 
 338 static void
 339 svc_rdma_kstart(SVCMASTERXPRT *xprt)
 340 {
 341         struct rdma_svc_data *svcdata;
 342         rdma_mod_t *rmod;
 343 
 344         svcdata = &((struct rdma_data *)xprt->xp_p2)->rd_data;
 345         rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod;
 346 
 347         /*
 348          * Create a listener for  module at this port
 349          */
 350 
 351         if (rmod->rdma_count != 0)
 352                 (*rmod->rdma_ops->rdma_svc_listen)(svcdata);
 353         else
 354                 svcdata->err_code = RDMA_FAILED;
 355 }
 356 
 357 void
 358 svc_rdma_kstop(SVCMASTERXPRT *xprt)
 359 {
 360         struct rdma_svc_data *svcdata;
 361         rdma_mod_t *rmod;
 362 
 363         svcdata = &((struct rdma_data *)xprt->xp_p2)->rd_data;
 364         rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod;
 365 
 366         /*
 367          * Call the stop listener routine for each plugin. If rdma_count is
 368          * already zero set active to zero.
 369          */
 370         if (rmod->rdma_count != 0)
 371                 (*rmod->rdma_ops->rdma_svc_stop)(svcdata);
 372         else
 373                 svcdata->active = 0;
 374         if (svcdata->active)
 375                 DTRACE_PROBE(krpc__e__svcrdma__kstop);
 376 }
 377 
 378 /* ARGSUSED */
 379 static void
 380 svc_rdma_kclone_destroy(SVCXPRT *clone_xprt)
 381 {
 382 
 383         struct clone_rdma_data *cdrp;
 384         cdrp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
 385 
 386         /*
 387          * Only free buffers and release connection when cloned is set.
 388          */
 389         if (cdrp->cloned != TRUE)
 390                 return;
 391 
 392         rdma_buf_free(cdrp->conn, &cdrp->rpcbuf);
 393         if (cdrp->cl_reply) {
 394                 clist_free(cdrp->cl_reply);
 395                 cdrp->cl_reply = NULL;
 396         }
 397         RDMA_REL_CONN(cdrp->conn);
 398 
 399         cdrp->cloned = 0;
 400 }
 401 
 402 /*
 403  * Clone the xprt specific information.  It will be freed by
 404  * SVC_CLONE_DESTROY.
 405  */
 406 static void
 407 svc_rdma_kclone_xprt(SVCXPRT *src_xprt, SVCXPRT *dst_xprt)
 408 {
 409         struct clone_rdma_data *srcp2;
 410         struct clone_rdma_data *dstp2;
 411 
 412         srcp2 = (struct clone_rdma_data *)src_xprt->xp_p2buf;
 413         dstp2 = (struct clone_rdma_data *)dst_xprt->xp_p2buf;
 414 
 415         if (srcp2->conn != NULL) {
 416                 srcp2->cloned = TRUE;
 417                 *dstp2 = *srcp2;
 418         }
 419 }
 420 
 421 static void
 422 svc_rdma_ktattrs(SVCXPRT *clone_xprt, int attrflag, void **tattr)
 423 {
 424         CONN    *conn;
 425         *tattr = NULL;
 426 
 427         switch (attrflag) {
 428         case SVC_TATTR_ADDRMASK:
 429                 conn = ((struct clone_rdma_data *)clone_xprt->xp_p2buf)->conn;
 430                 ASSERT(conn != NULL);
 431                 if (conn)
 432                         *tattr = (void *)&conn->c_addrmask;
 433         }
 434 }
 435 
 436 static bool_t
 437 svc_rdma_krecv(SVCXPRT *clone_xprt, mblk_t *mp, struct rpc_msg *msg)
 438 {
 439         XDR     *xdrs;
 440         CONN    *conn;
 441         rdma_recv_data_t        *rdp = (rdma_recv_data_t *)mp->b_rptr;
 442         struct clone_rdma_data *crdp;
 443         struct clist    *cl = NULL;
 444         struct clist    *wcl = NULL;
 445         struct clist    *cllong = NULL;
 446 
 447         rdma_stat       status;
 448         uint32_t vers, op, pos, xid;
 449         uint32_t rdma_credit;
 450         uint32_t wcl_total_length = 0;
 451         bool_t  wwl = FALSE;
 452 
 453         crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
 454         RSSTAT_INCR(rscalls);
 455         conn = rdp->conn;
 456 
 457         status = rdma_svc_postrecv(conn);
 458         if (status != RDMA_SUCCESS) {
 459                 DTRACE_PROBE(krpc__e__svcrdma__krecv__postrecv);
 460                 goto badrpc_call;
 461         }
 462 
 463         xdrs = &clone_xprt->xp_xdrin;
 464         xdrmem_create(xdrs, rdp->rpcmsg.addr, rdp->rpcmsg.len, XDR_DECODE);
 465         xid = *(uint32_t *)rdp->rpcmsg.addr;
 466         XDR_SETPOS(xdrs, sizeof (uint32_t));
 467 
 468         if (! xdr_u_int(xdrs, &vers) ||
 469             ! xdr_u_int(xdrs, &rdma_credit) ||
 470             ! xdr_u_int(xdrs, &op)) {
 471                 DTRACE_PROBE(krpc__e__svcrdma__krecv__uint);
 472                 goto xdr_err;
 473         }
 474 
 475         /* Checking if the status of the recv operation was normal */
 476         if (rdp->status != 0) {
 477                 DTRACE_PROBE1(krpc__e__svcrdma__krecv__invalid__status,
 478                     int, rdp->status);
 479                 goto badrpc_call;
 480         }
 481 
 482         if (! xdr_do_clist(xdrs, &cl)) {
 483                 DTRACE_PROBE(krpc__e__svcrdma__krecv__do__clist);
 484                 goto xdr_err;
 485         }
 486 
 487         if (!xdr_decode_wlist_svc(xdrs, &wcl, &wwl, &wcl_total_length, conn)) {
 488                 DTRACE_PROBE(krpc__e__svcrdma__krecv__decode__wlist);
 489                 if (cl)
 490                         clist_free(cl);
 491                 goto xdr_err;
 492         }
 493         crdp->cl_wlist = wcl;
 494 
 495         crdp->cl_reply = NULL;
 496         (void) xdr_decode_reply_wchunk(xdrs, &crdp->cl_reply);
 497 
 498         /*
 499          * A chunk at 0 offset indicates that the RPC call message
 500          * is in a chunk. Get the RPC call message chunk.
 501          */
 502         if (cl != NULL && op == RDMA_NOMSG) {
 503 
 504                 /* Remove RPC call message chunk from chunklist */
 505                 cllong = cl;
 506                 cl = cl->c_next;
 507                 cllong->c_next = NULL;
 508 
 509 
 510                 /* Allocate and register memory for the RPC call msg chunk */
 511                 cllong->rb_longbuf.type = RDMA_LONG_BUFFER;
 512                 cllong->rb_longbuf.len = cllong->c_len > LONG_REPLY_LEN ?
 513                     cllong->c_len : LONG_REPLY_LEN;
 514 
 515                 if (rdma_buf_alloc(conn, &cllong->rb_longbuf)) {
 516                         clist_free(cllong);
 517                         goto cll_malloc_err;
 518                 }
 519 
 520                 cllong->u.c_daddr3 = cllong->rb_longbuf.addr;
 521 
 522                 if (cllong->u.c_daddr == NULL) {
 523                         DTRACE_PROBE(krpc__e__svcrdma__krecv__nomem);
 524                         rdma_buf_free(conn, &cllong->rb_longbuf);
 525                         clist_free(cllong);
 526                         goto cll_malloc_err;
 527                 }
 528 
 529                 status = clist_register(conn, cllong, CLIST_REG_DST);
 530                 if (status) {
 531                         DTRACE_PROBE(krpc__e__svcrdma__krecv__clist__reg);
 532                         rdma_buf_free(conn, &cllong->rb_longbuf);
 533                         clist_free(cllong);
 534                         goto cll_malloc_err;
 535                 }
 536 
 537                 /*
 538                  * Now read the RPC call message in
 539                  */
 540                 status = RDMA_READ(conn, cllong, WAIT);
 541                 if (status) {
 542                         DTRACE_PROBE(krpc__e__svcrdma__krecv__read);
 543                         (void) clist_deregister(conn, cllong);
 544                         rdma_buf_free(conn, &cllong->rb_longbuf);
 545                         clist_free(cllong);
 546                         goto cll_malloc_err;
 547                 }
 548 
 549                 status = clist_syncmem(conn, cllong, CLIST_REG_DST);
 550                 (void) clist_deregister(conn, cllong);
 551 
 552                 xdrrdma_create(xdrs, (caddr_t)(uintptr_t)cllong->u.c_daddr3,
 553                     cllong->c_len, 0, cl, XDR_DECODE, conn);
 554 
 555                 crdp->rpcbuf = cllong->rb_longbuf;
 556                 crdp->rpcbuf.len = cllong->c_len;
 557                 clist_free(cllong);
 558                 RDMA_BUF_FREE(conn, &rdp->rpcmsg);
 559         } else {
 560                 pos = XDR_GETPOS(xdrs);
 561                 xdrrdma_create(xdrs, rdp->rpcmsg.addr + pos,
 562                     rdp->rpcmsg.len - pos, 0, cl, XDR_DECODE, conn);
 563                 crdp->rpcbuf = rdp->rpcmsg;
 564 
 565                 /* Use xdrrdmablk_ops to indicate there is a read chunk list */
 566                 if (cl != NULL) {
 567                         int32_t flg = XDR_RDMA_RLIST_REG;
 568 
 569                         XDR_CONTROL(xdrs, XDR_RDMA_SET_FLAGS, &flg);
 570                         xdrs->x_ops = &xdrrdmablk_ops;
 571                 }
 572         }
 573 
 574         if (crdp->cl_wlist) {
 575                 int32_t flg = XDR_RDMA_WLIST_REG;
 576 
 577                 XDR_CONTROL(xdrs, XDR_RDMA_SET_WLIST, crdp->cl_wlist);
 578                 XDR_CONTROL(xdrs, XDR_RDMA_SET_FLAGS, &flg);
 579         }
 580 
 581         if (! xdr_callmsg(xdrs, msg)) {
 582                 DTRACE_PROBE(krpc__e__svcrdma__krecv__callmsg);
 583                 RSSTAT_INCR(rsxdrcall);
 584                 goto callmsg_err;
 585         }
 586 
 587         /*
 588          * Point the remote transport address in the service_transport
 589          * handle at the address in the request.
 590          */
 591         clone_xprt->xp_rtaddr.buf = conn->c_raddr.buf;
 592         clone_xprt->xp_rtaddr.len = conn->c_raddr.len;
 593         clone_xprt->xp_rtaddr.maxlen = conn->c_raddr.len;
 594 
 595         clone_xprt->xp_lcladdr.buf = conn->c_laddr.buf;
 596         clone_xprt->xp_lcladdr.len = conn->c_laddr.len;
 597         clone_xprt->xp_lcladdr.maxlen = conn->c_laddr.len;
 598 
 599         /*
 600          * In case of RDMA, connection management is
 601          * entirely done in rpcib module and netid in the
 602          * SVCMASTERXPRT is NULL. Initialize the clone netid
 603          * from the connection.
 604          */
 605 
 606         clone_xprt->xp_netid = conn->c_netid;
 607 
 608         clone_xprt->xp_xid = xid;
 609         crdp->conn = conn;
 610 
 611         freeb(mp);
 612 
 613         return (TRUE);
 614 
 615 callmsg_err:
 616         rdma_buf_free(conn, &crdp->rpcbuf);
 617 
 618 cll_malloc_err:
 619         if (cl)
 620                 clist_free(cl);
 621 xdr_err:
 622         XDR_DESTROY(xdrs);
 623 
 624 badrpc_call:
 625         RDMA_BUF_FREE(conn, &rdp->rpcmsg);
 626         RDMA_REL_CONN(conn);
 627         freeb(mp);
 628         RSSTAT_INCR(rsbadcalls);
 629         return (FALSE);
 630 }
 631 
 632 static int
 633 svc_process_long_reply(SVCXPRT * clone_xprt,
 634     xdrproc_t xdr_results, caddr_t xdr_location,
 635     struct rpc_msg *msg, bool_t has_args, int *msglen,
 636     int *freelen, int *numchunks, unsigned int *final_len)
 637 {
 638         int status;
 639         XDR xdrslong;
 640         struct clist *wcl = NULL;
 641         int count = 0;
 642         int alloc_len;
 643         char  *memp;
 644         rdma_buf_t long_rpc = {0};
 645         struct clone_rdma_data *crdp;
 646 
 647         crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
 648 
 649         bzero(&xdrslong, sizeof (xdrslong));
 650 
 651         /* Choose a size for the long rpc response */
 652         if (MSG_IS_RPCSEC_GSS(msg)) {
 653                 alloc_len = RNDUP(MAX_AUTH_BYTES + *msglen);
 654         } else {
 655                 alloc_len = RNDUP(*msglen);
 656         }
 657 
 658         if (alloc_len <= 64 * 1024) {
 659                 if (alloc_len > 32 * 1024) {
 660                         alloc_len = 64 * 1024;
 661                 } else {
 662                         if (alloc_len > 16 * 1024) {
 663                                 alloc_len = 32 * 1024;
 664                         } else {
 665                                 alloc_len = 16 * 1024;
 666                         }
 667                 }
 668         }
 669 
 670         long_rpc.type = RDMA_LONG_BUFFER;
 671         long_rpc.len = alloc_len;
 672         if (rdma_buf_alloc(crdp->conn, &long_rpc)) {
 673                 return (SVC_RDMA_FAIL);
 674         }
 675 
 676         memp = long_rpc.addr;
 677         xdrmem_create(&xdrslong, memp, alloc_len, XDR_ENCODE);
 678 
 679         msg->rm_xid = clone_xprt->xp_xid;
 680 
 681         if (!(xdr_replymsg(&xdrslong, msg) &&
 682             (!has_args || SVCAUTH_WRAP(&clone_xprt->xp_auth, &xdrslong,
 683             xdr_results, xdr_location)))) {
 684                 rdma_buf_free(crdp->conn, &long_rpc);
 685                 DTRACE_PROBE(krpc__e__svcrdma__longrep__authwrap);
 686                 return (SVC_RDMA_FAIL);
 687         }
 688 
 689         *final_len = XDR_GETPOS(&xdrslong);
 690 
 691         DTRACE_PROBE1(krpc__i__replylen, uint_t, *final_len);
 692         *numchunks = 0;
 693         *freelen = 0;
 694 
 695         wcl = crdp->cl_reply;
 696         wcl->rb_longbuf = long_rpc;
 697 
 698         count = *final_len;
 699         while ((wcl != NULL) && (count > 0)) {
 700 
 701                 if (wcl->c_dmemhandle.mrc_rmr == 0)
 702                         break;
 703 
 704                 DTRACE_PROBE2(krpc__i__write__chunks, uint32_t, count,
 705                     uint32_t, wcl->c_len);
 706 
 707                 if (wcl->c_len > count) {
 708                         wcl->c_len = count;
 709                 }
 710                 wcl->w.c_saddr3 = (caddr_t)memp;
 711 
 712                 count -= wcl->c_len;
 713                 *numchunks +=  1;
 714                 memp += wcl->c_len;
 715                 wcl = wcl->c_next;
 716         }
 717 
 718         /*
 719          * Make rest of the chunks 0-len
 720          */
 721         while (wcl != NULL) {
 722                 if (wcl->c_dmemhandle.mrc_rmr == 0)
 723                         break;
 724                 wcl->c_len = 0;
 725                 wcl = wcl->c_next;
 726         }
 727 
 728         wcl = crdp->cl_reply;
 729 
 730         /*
 731          * MUST fail if there are still more data
 732          */
 733         if (count > 0) {
 734                 rdma_buf_free(crdp->conn, &long_rpc);
 735                 DTRACE_PROBE(krpc__e__svcrdma__longrep__dlen__clist);
 736                 return (SVC_RDMA_FAIL);
 737         }
 738 
 739         if (clist_register(crdp->conn, wcl, CLIST_REG_SOURCE) != RDMA_SUCCESS) {
 740                 rdma_buf_free(crdp->conn, &long_rpc);
 741                 DTRACE_PROBE(krpc__e__svcrdma__longrep__clistreg);
 742                 return (SVC_RDMA_FAIL);
 743         }
 744 
 745         status = clist_syncmem(crdp->conn, wcl, CLIST_REG_SOURCE);
 746 
 747         if (status) {
 748                 (void) clist_deregister(crdp->conn, wcl);
 749                 rdma_buf_free(crdp->conn, &long_rpc);
 750                 DTRACE_PROBE(krpc__e__svcrdma__longrep__syncmem);
 751                 return (SVC_RDMA_FAIL);
 752         }
 753 
 754         status = RDMA_WRITE(crdp->conn, wcl, WAIT);
 755 
 756         (void) clist_deregister(crdp->conn, wcl);
 757         rdma_buf_free(crdp->conn, &wcl->rb_longbuf);
 758 
 759         if (status != RDMA_SUCCESS) {
 760                 DTRACE_PROBE(krpc__e__svcrdma__longrep__write);
 761                 return (SVC_RDMA_FAIL);
 762         }
 763 
 764         return (SVC_RDMA_SUCCESS);
 765 }
 766 
 767 
 768 static int
 769 svc_compose_rpcmsg(SVCXPRT * clone_xprt, CONN * conn, xdrproc_t xdr_results,
 770     caddr_t xdr_location, rdma_buf_t *rpcreply, XDR ** xdrs,
 771     struct rpc_msg *msg, bool_t has_args, uint_t *len)
 772 {
 773         /*
 774          * Get a pre-allocated buffer for rpc reply
 775          */
 776         rpcreply->type = SEND_BUFFER;
 777         if (rdma_buf_alloc(conn, rpcreply)) {
 778                 DTRACE_PROBE(krpc__e__svcrdma__rpcmsg__reply__nofreebufs);
 779                 return (SVC_RDMA_FAIL);
 780         }
 781 
 782         xdrrdma_create(*xdrs, rpcreply->addr, rpcreply->len,
 783             0, NULL, XDR_ENCODE, conn);
 784 
 785         msg->rm_xid = clone_xprt->xp_xid;
 786 
 787         if (has_args) {
 788                 if (!(xdr_replymsg(*xdrs, msg) &&
 789                     (!has_args ||
 790                     SVCAUTH_WRAP(&clone_xprt->xp_auth, *xdrs,
 791                     xdr_results, xdr_location)))) {
 792                         rdma_buf_free(conn, rpcreply);
 793                         DTRACE_PROBE(
 794                             krpc__e__svcrdma__rpcmsg__reply__authwrap1);
 795                         return (SVC_RDMA_FAIL);
 796                 }
 797         } else {
 798                 if (!xdr_replymsg(*xdrs, msg)) {
 799                         rdma_buf_free(conn, rpcreply);
 800                         DTRACE_PROBE(
 801                             krpc__e__svcrdma__rpcmsg__reply__authwrap2);
 802                         return (SVC_RDMA_FAIL);
 803                 }
 804         }
 805 
 806         *len = XDR_GETPOS(*xdrs);
 807 
 808         return (SVC_RDMA_SUCCESS);
 809 }
 810 
 811 /*
 812  * Send rpc reply.
 813  */
 814 static bool_t
 815 svc_rdma_ksend(SVCXPRT * clone_xprt, struct rpc_msg *msg)
 816 {
 817         XDR *xdrs_rpc = &(clone_xprt->xp_xdrout);
 818         XDR xdrs_rhdr;
 819         CONN *conn = NULL;
 820         rdma_buf_t rbuf_resp = {0}, rbuf_rpc_resp = {0};
 821 
 822         struct clone_rdma_data *crdp;
 823         struct clist *cl_read = NULL;
 824         struct clist *cl_send = NULL;
 825         struct clist *cl_write = NULL;
 826         xdrproc_t xdr_results;          /* results XDR encoding function */
 827         caddr_t xdr_location;           /* response results pointer */
 828 
 829         int retval = FALSE;
 830         int status, msglen, num_wreply_segments = 0;
 831         uint32_t rdma_credit = 0;
 832         int freelen = 0;
 833         bool_t has_args;
 834         uint_t  final_resp_len, rdma_response_op, vers;
 835 
 836         bzero(&xdrs_rhdr, sizeof (XDR));
 837         crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
 838         conn = crdp->conn;
 839 
 840         /*
 841          * If there is a result procedure specified in the reply message,
 842          * it will be processed in the xdr_replymsg and SVCAUTH_WRAP.
 843          * We need to make sure it won't be processed twice, so we null
 844          * it for xdr_replymsg here.
 845          */
 846         has_args = FALSE;
 847         if (msg->rm_reply.rp_stat == MSG_ACCEPTED &&
 848             msg->rm_reply.rp_acpt.ar_stat == SUCCESS) {
 849                 if ((xdr_results = msg->acpted_rply.ar_results.proc) != NULL) {
 850                         has_args = TRUE;
 851                         xdr_location = msg->acpted_rply.ar_results.where;
 852                         msg->acpted_rply.ar_results.proc = xdr_void;
 853                         msg->acpted_rply.ar_results.where = NULL;
 854                 }
 855         }
 856 
 857         /*
 858          * Given the limit on the inline response size (RPC_MSG_SZ),
 859          * there is a need to make a guess as to the overall size of
 860          * the response.  If the resultant size is beyond the inline
 861          * size, then the server needs to use the "reply chunk list"
 862          * provided by the client (if the client provided one).  An
 863          * example of this type of response would be a READDIR
 864          * response (e.g. a small directory read would fit in RPC_MSG_SZ
 865          * and that is the preference but it may not fit)
 866          *
 867          * Combine the encoded size and the size of the true results
 868          * and then make the decision about where to encode and send results.
 869          *
 870          * One important note, this calculation is ignoring the size
 871          * of the encoding of the authentication overhead.  The reason
 872          * for this is rooted in the complexities of access to the
 873          * encoded size of RPCSEC_GSS related authentiation,
 874          * integrity, and privacy.
 875          *
 876          * If it turns out that the encoded authentication bumps the
 877          * response over the RPC_MSG_SZ limit, then it may need to
 878          * attempt to encode for the reply chunk list.
 879          */
 880 
 881         /*
 882          * Calculating the "sizeof" the RPC response header and the
 883          * encoded results.
 884          */
 885         msglen = xdr_sizeof(xdr_replymsg, msg);
 886 
 887         if (msglen > 0) {
 888                 RSSTAT_INCR(rstotalreplies);
 889         }
 890         if (has_args)
 891                 msglen += xdrrdma_sizeof(xdr_results, xdr_location,
 892                     rdma_minchunk, NULL, NULL);
 893 
 894         DTRACE_PROBE1(krpc__i__svcrdma__ksend__msglen, int, msglen);
 895 
 896         status = SVC_RDMA_SUCCESS;
 897 
 898         if (msglen < RPC_MSG_SZ) {
 899                 /*
 900                  * Looks like the response will fit in the inline
 901                  * response; let's try
 902                  */
 903                 RSSTAT_INCR(rstotalinlinereplies);
 904 
 905                 rdma_response_op = RDMA_MSG;
 906 
 907                 status = svc_compose_rpcmsg(clone_xprt, conn, xdr_results,
 908                     xdr_location, &rbuf_rpc_resp, &xdrs_rpc, msg,
 909                     has_args, &final_resp_len);
 910 
 911                 DTRACE_PROBE1(krpc__i__srdma__ksend__compose_status,
 912                     int, status);
 913                 DTRACE_PROBE1(krpc__i__srdma__ksend__compose_len,
 914                     int, final_resp_len);
 915 
 916                 if (status == SVC_RDMA_SUCCESS && crdp->cl_reply) {
 917                         clist_free(crdp->cl_reply);
 918                         crdp->cl_reply = NULL;
 919                 }
 920         }
 921 
 922         /*
 923          * If the encode failed (size?) or the message really is
 924          * larger than what is allowed, try the response chunk list.
 925          */
 926         if (status != SVC_RDMA_SUCCESS || msglen >= RPC_MSG_SZ) {
 927                 /*
 928                  * attempting to use a reply chunk list when there
 929                  * isn't one won't get very far...
 930                  */
 931                 if (crdp->cl_reply == NULL) {
 932                         DTRACE_PROBE(krpc__e__svcrdma__ksend__noreplycl);
 933                         goto out;
 934                 }
 935 
 936                 RSSTAT_INCR(rstotallongreplies);
 937 
 938                 msglen = xdr_sizeof(xdr_replymsg, msg);
 939                 msglen += xdrrdma_sizeof(xdr_results, xdr_location, 0,
 940                     NULL, NULL);
 941 
 942                 status = svc_process_long_reply(clone_xprt, xdr_results,
 943                     xdr_location, msg, has_args, &msglen, &freelen,
 944                     &num_wreply_segments, &final_resp_len);
 945 
 946                 DTRACE_PROBE1(krpc__i__svcrdma__ksend__longreplen,
 947                     int, final_resp_len);
 948 
 949                 if (status != SVC_RDMA_SUCCESS) {
 950                         DTRACE_PROBE(krpc__e__svcrdma__ksend__compose__failed);
 951                         goto out;
 952                 }
 953 
 954                 rdma_response_op = RDMA_NOMSG;
 955         }
 956 
 957         DTRACE_PROBE1(krpc__i__svcrdma__ksend__rdmamsg__len,
 958             int, final_resp_len);
 959 
 960         rbuf_resp.type = SEND_BUFFER;
 961         if (rdma_buf_alloc(conn, &rbuf_resp)) {
 962                 rdma_buf_free(conn, &rbuf_rpc_resp);
 963                 DTRACE_PROBE(krpc__e__svcrdma__ksend__nofreebufs);
 964                 goto out;
 965         }
 966 
 967         rdma_credit = rdma_bufs_granted;
 968 
 969         vers = RPCRDMA_VERS;
 970         xdrmem_create(&xdrs_rhdr, rbuf_resp.addr, rbuf_resp.len, XDR_ENCODE);
 971         (*(uint32_t *)rbuf_resp.addr) = msg->rm_xid;
 972         /* Skip xid and set the xdr position accordingly. */
 973         XDR_SETPOS(&xdrs_rhdr, sizeof (uint32_t));
 974         if (!xdr_u_int(&xdrs_rhdr, &vers) ||
 975             !xdr_u_int(&xdrs_rhdr, &rdma_credit) ||
 976             !xdr_u_int(&xdrs_rhdr, &rdma_response_op)) {
 977                 rdma_buf_free(conn, &rbuf_rpc_resp);
 978                 rdma_buf_free(conn, &rbuf_resp);
 979                 DTRACE_PROBE(krpc__e__svcrdma__ksend__uint);
 980                 goto out;
 981         }
 982 
 983         /*
 984          * Now XDR the read chunk list, actually always NULL
 985          */
 986         (void) xdr_encode_rlist_svc(&xdrs_rhdr, cl_read);
 987 
 988         /*
 989          * encode write list -- we already drove RDMA_WRITEs
 990          */
 991         cl_write = crdp->cl_wlist;
 992         if (!xdr_encode_wlist(&xdrs_rhdr, cl_write)) {
 993                 DTRACE_PROBE(krpc__e__svcrdma__ksend__enc__wlist);
 994                 rdma_buf_free(conn, &rbuf_rpc_resp);
 995                 rdma_buf_free(conn, &rbuf_resp);
 996                 goto out;
 997         }
 998 
 999         /*
1000          * XDR encode the RDMA_REPLY write chunk
1001          */
1002         if (!xdr_encode_reply_wchunk(&xdrs_rhdr, crdp->cl_reply,
1003             num_wreply_segments)) {
1004                 rdma_buf_free(conn, &rbuf_rpc_resp);
1005                 rdma_buf_free(conn, &rbuf_resp);
1006                 goto out;
1007         }
1008 
1009         clist_add(&cl_send, 0, XDR_GETPOS(&xdrs_rhdr), &rbuf_resp.handle,
1010             rbuf_resp.addr, NULL, NULL);
1011 
1012         if (rdma_response_op == RDMA_MSG) {
1013                 clist_add(&cl_send, 0, final_resp_len, &rbuf_rpc_resp.handle,
1014                     rbuf_rpc_resp.addr, NULL, NULL);
1015         }
1016 
1017         status = RDMA_SEND(conn, cl_send, msg->rm_xid);
1018 
1019         if (status == RDMA_SUCCESS) {
1020                 retval = TRUE;
1021         }
1022 
1023 out:
1024         /*
1025          * Free up sendlist chunks
1026          */
1027         if (cl_send != NULL)
1028                 clist_free(cl_send);
1029 
1030         /*
1031          * Destroy private data for xdr rdma
1032          */
1033         if (clone_xprt->xp_xdrout.x_ops != NULL) {
1034                 XDR_DESTROY(&(clone_xprt->xp_xdrout));
1035         }
1036 
1037         if (crdp->cl_reply) {
1038                 clist_free(crdp->cl_reply);
1039                 crdp->cl_reply = NULL;
1040         }
1041 
1042         /*
1043          * This is completely disgusting.  If public is set it is
1044          * a pointer to a structure whose first field is the address
1045          * of the function to free that structure and any related
1046          * stuff.  (see rrokfree in nfs_xdr.c).
1047          */
1048         if (xdrs_rpc->x_public) {
1049                 /* LINTED pointer alignment */
1050                 (**((int (**)()) xdrs_rpc->x_public)) (xdrs_rpc->x_public);
1051         }
1052 
1053         if (xdrs_rhdr.x_ops != NULL) {
1054                 XDR_DESTROY(&xdrs_rhdr);
1055         }
1056 
1057         return (retval);
1058 }
1059 
1060 /*
1061  * Deserialize arguments.
1062  */
1063 static bool_t
1064 svc_rdma_kgetargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args, caddr_t args_ptr)
1065 {
1066         if ((SVCAUTH_UNWRAP(&clone_xprt->xp_auth, &clone_xprt->xp_xdrin,
1067             xdr_args, args_ptr)) != TRUE)
1068                 return (FALSE);
1069         return (TRUE);
1070 }
1071 
1072 static bool_t
1073 svc_rdma_kfreeargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args,
1074     caddr_t args_ptr)
1075 {
1076         struct clone_rdma_data *crdp;
1077         bool_t retval;
1078 
1079         /*
1080          * If the cloned bit is true, then this transport specific
1081          * rmda data has been duplicated into another cloned xprt. Do
1082          * not free, or release the connection, it is still in use.  The
1083          * buffers will be freed and the connection released later by
1084          * SVC_CLONE_DESTROY().
1085          */
1086         crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
1087         if (crdp->cloned == TRUE) {
1088                 crdp->cloned = 0;
1089                 return (TRUE);
1090         }
1091 
1092         /*
1093          * Free the args if needed then XDR_DESTROY
1094          */
1095         if (args_ptr) {
1096                 XDR     *xdrs = &clone_xprt->xp_xdrin;
1097 
1098                 xdrs->x_op = XDR_FREE;
1099                 retval = (*xdr_args)(xdrs, args_ptr);
1100         }
1101 
1102         XDR_DESTROY(&(clone_xprt->xp_xdrin));
1103         rdma_buf_free(crdp->conn, &crdp->rpcbuf);
1104         if (crdp->cl_reply) {
1105                 clist_free(crdp->cl_reply);
1106                 crdp->cl_reply = NULL;
1107         }
1108         RDMA_REL_CONN(crdp->conn);
1109 
1110         return (retval);
1111 }
1112 
1113 /* ARGSUSED */
1114 static int32_t *
1115 svc_rdma_kgetres(SVCXPRT *clone_xprt, int size)
1116 {
1117         return (NULL);
1118 }
1119 
1120 /* ARGSUSED */
1121 static void
1122 svc_rdma_kfreeres(SVCXPRT *clone_xprt)
1123 {
1124 }
1125 
1126 /*
1127  * the dup cacheing routines below provide a cache of non-failure
1128  * transaction id's.  rpc service routines can use this to detect
1129  * retransmissions and re-send a non-failure response.
1130  */
1131 
1132 /*
1133  * MAXDUPREQS is the number of cached items.  It should be adjusted
1134  * to the service load so that there is likely to be a response entry
1135  * when the first retransmission comes in.
1136  */
1137 #define MAXDUPREQS      1024
1138 
1139 /*
1140  * This should be appropriately scaled to MAXDUPREQS.
1141  */
1142 #define DRHASHSZ        257
1143 
1144 #if ((DRHASHSZ & (DRHASHSZ - 1)) == 0)
1145 #define XIDHASH(xid)    ((xid) & (DRHASHSZ - 1))
1146 #else
1147 #define XIDHASH(xid)    ((xid) % DRHASHSZ)
1148 #endif
1149 #define DRHASH(dr)      XIDHASH((dr)->dr_xid)
1150 #define REQTOXID(req)   ((req)->rq_xprt->xp_xid)
1151 
1152 static int      rdmandupreqs = 0;
1153 int     rdmamaxdupreqs = MAXDUPREQS;
1154 static kmutex_t rdmadupreq_lock;
1155 static struct dupreq *rdmadrhashtbl[DRHASHSZ];
1156 static int      rdmadrhashstat[DRHASHSZ];
1157 
1158 static void unhash(struct dupreq *);
1159 
1160 /*
1161  * rdmadrmru points to the head of a circular linked list in lru order.
1162  * rdmadrmru->dr_next == drlru
1163  */
1164 struct dupreq *rdmadrmru;
1165 
1166 /*
1167  * svc_rdma_kdup searches the request cache and returns 0 if the
1168  * request is not found in the cache.  If it is found, then it
1169  * returns the state of the request (in progress or done) and
1170  * the status or attributes that were part of the original reply.
1171  */
1172 static int
1173 svc_rdma_kdup(struct svc_req *req, caddr_t res, int size, struct dupreq **drpp,
1174         bool_t *dupcachedp)
1175 {
1176         struct dupreq *dr;
1177         uint32_t xid;
1178         uint32_t drhash;
1179         int status;
1180 
1181         xid = REQTOXID(req);
1182         mutex_enter(&rdmadupreq_lock);
1183         RSSTAT_INCR(rsdupchecks);
1184         /*
1185          * Check to see whether an entry already exists in the cache.
1186          */
1187         dr = rdmadrhashtbl[XIDHASH(xid)];
1188         while (dr != NULL) {
1189                 if (dr->dr_xid == xid &&
1190                     dr->dr_proc == req->rq_proc &&
1191                     dr->dr_prog == req->rq_prog &&
1192                     dr->dr_vers == req->rq_vers &&
1193                     dr->dr_addr.len == req->rq_xprt->xp_rtaddr.len &&
1194                     bcmp((caddr_t)dr->dr_addr.buf,
1195                     (caddr_t)req->rq_xprt->xp_rtaddr.buf,
1196                     dr->dr_addr.len) == 0) {
1197                         status = dr->dr_status;
1198                         if (status == DUP_DONE) {
1199                                 bcopy(dr->dr_resp.buf, res, size);
1200                                 if (dupcachedp != NULL)
1201                                         *dupcachedp = (dr->dr_resfree != NULL);
1202                         } else {
1203                                 dr->dr_status = DUP_INPROGRESS;
1204                                 *drpp = dr;
1205                         }
1206                         RSSTAT_INCR(rsdupreqs);
1207                         mutex_exit(&rdmadupreq_lock);
1208                         return (status);
1209                 }
1210                 dr = dr->dr_chain;
1211         }
1212 
1213         /*
1214          * There wasn't an entry, either allocate a new one or recycle
1215          * an old one.
1216          */
1217         if (rdmandupreqs < rdmamaxdupreqs) {
1218                 dr = kmem_alloc(sizeof (*dr), KM_NOSLEEP);
1219                 if (dr == NULL) {
1220                         mutex_exit(&rdmadupreq_lock);
1221                         return (DUP_ERROR);
1222                 }
1223                 dr->dr_resp.buf = NULL;
1224                 dr->dr_resp.maxlen = 0;
1225                 dr->dr_addr.buf = NULL;
1226                 dr->dr_addr.maxlen = 0;
1227                 if (rdmadrmru) {
1228                         dr->dr_next = rdmadrmru->dr_next;
1229                         rdmadrmru->dr_next = dr;
1230                 } else {
1231                         dr->dr_next = dr;
1232                 }
1233                 rdmandupreqs++;
1234         } else {
1235                 dr = rdmadrmru->dr_next;
1236                 while (dr->dr_status == DUP_INPROGRESS) {
1237                         dr = dr->dr_next;
1238                         if (dr == rdmadrmru->dr_next) {
1239                                 mutex_exit(&rdmadupreq_lock);
1240                                 return (DUP_ERROR);
1241                         }
1242                 }
1243                 unhash(dr);
1244                 if (dr->dr_resfree) {
1245                         (*dr->dr_resfree)(dr->dr_resp.buf);
1246                 }
1247         }
1248         dr->dr_resfree = NULL;
1249         rdmadrmru = dr;
1250 
1251         dr->dr_xid = REQTOXID(req);
1252         dr->dr_prog = req->rq_prog;
1253         dr->dr_vers = req->rq_vers;
1254         dr->dr_proc = req->rq_proc;
1255         if (dr->dr_addr.maxlen < req->rq_xprt->xp_rtaddr.len) {
1256                 if (dr->dr_addr.buf != NULL)
1257                         kmem_free(dr->dr_addr.buf, dr->dr_addr.maxlen);
1258                 dr->dr_addr.maxlen = req->rq_xprt->xp_rtaddr.len;
1259                 dr->dr_addr.buf = kmem_alloc(dr->dr_addr.maxlen, KM_NOSLEEP);
1260                 if (dr->dr_addr.buf == NULL) {
1261                         dr->dr_addr.maxlen = 0;
1262                         dr->dr_status = DUP_DROP;
1263                         mutex_exit(&rdmadupreq_lock);
1264                         return (DUP_ERROR);
1265                 }
1266         }
1267         dr->dr_addr.len = req->rq_xprt->xp_rtaddr.len;
1268         bcopy(req->rq_xprt->xp_rtaddr.buf, dr->dr_addr.buf, dr->dr_addr.len);
1269         if (dr->dr_resp.maxlen < size) {
1270                 if (dr->dr_resp.buf != NULL)
1271                         kmem_free(dr->dr_resp.buf, dr->dr_resp.maxlen);
1272                 dr->dr_resp.maxlen = (unsigned int)size;
1273                 dr->dr_resp.buf = kmem_alloc(size, KM_NOSLEEP);
1274                 if (dr->dr_resp.buf == NULL) {
1275                         dr->dr_resp.maxlen = 0;
1276                         dr->dr_status = DUP_DROP;
1277                         mutex_exit(&rdmadupreq_lock);
1278                         return (DUP_ERROR);
1279                 }
1280         }
1281         dr->dr_status = DUP_INPROGRESS;
1282 
1283         drhash = (uint32_t)DRHASH(dr);
1284         dr->dr_chain = rdmadrhashtbl[drhash];
1285         rdmadrhashtbl[drhash] = dr;
1286         rdmadrhashstat[drhash]++;
1287         mutex_exit(&rdmadupreq_lock);
1288         *drpp = dr;
1289         return (DUP_NEW);
1290 }
1291 
1292 /*
1293  * svc_rdma_kdupdone marks the request done (DUP_DONE or DUP_DROP)
1294  * and stores the response.
1295  */
1296 static void
1297 svc_rdma_kdupdone(struct dupreq *dr, caddr_t res, void (*dis_resfree)(),
1298         int size, int status)
1299 {
1300         ASSERT(dr->dr_resfree == NULL);
1301         if (status == DUP_DONE) {
1302                 bcopy(res, dr->dr_resp.buf, size);
1303                 dr->dr_resfree = dis_resfree;
1304         }
1305         dr->dr_status = status;
1306 }
1307 
1308 /*
1309  * This routine expects that the mutex, rdmadupreq_lock, is already held.
1310  */
1311 static void
1312 unhash(struct dupreq *dr)
1313 {
1314         struct dupreq *drt;
1315         struct dupreq *drtprev = NULL;
1316         uint32_t drhash;
1317 
1318         ASSERT(MUTEX_HELD(&rdmadupreq_lock));
1319 
1320         drhash = (uint32_t)DRHASH(dr);
1321         drt = rdmadrhashtbl[drhash];
1322         while (drt != NULL) {
1323                 if (drt == dr) {
1324                         rdmadrhashstat[drhash]--;
1325                         if (drtprev == NULL) {
1326                                 rdmadrhashtbl[drhash] = drt->dr_chain;
1327                         } else {
1328                                 drtprev->dr_chain = drt->dr_chain;
1329                         }
1330                         return;
1331                 }
1332                 drtprev = drt;
1333                 drt = drt->dr_chain;
1334         }
1335 }
1336 
1337 bool_t
1338 rdma_get_wchunk(struct svc_req *req, iovec_t *iov, struct clist *wlist)
1339 {
1340         struct clist    *clist;
1341         uint32_t        tlen;
1342 
1343         if (req->rq_xprt->xp_type != T_RDMA) {
1344                 return (FALSE);
1345         }
1346 
1347         tlen = 0;
1348         clist = wlist;
1349         while (clist) {
1350                 tlen += clist->c_len;
1351                 clist = clist->c_next;
1352         }
1353 
1354         /*
1355          * set iov to addr+len of first segment of first wchunk of
1356          * wlist sent by client.  krecv() already malloc'd a buffer
1357          * large enough, but registration is deferred until we write
1358          * the buffer back to (NFS) client using RDMA_WRITE.
1359          */
1360         iov->iov_base = (caddr_t)(uintptr_t)wlist->w.c_saddr;
1361         iov->iov_len = tlen;
1362 
1363         return (TRUE);
1364 }
1365 
1366 /*
1367  * routine to setup the read chunk lists
1368  */
1369 
1370 int
1371 rdma_setup_read_chunks(struct clist *wcl, uint32_t count, int *wcl_len)
1372 {
1373         int             data_len, avail_len;
1374         uint_t          round_len;
1375 
1376         data_len = avail_len = 0;
1377 
1378         while (wcl != NULL && count > 0) {
1379                 if (wcl->c_dmemhandle.mrc_rmr == 0)
1380                         break;
1381 
1382                 if (wcl->c_len < count) {
1383                         data_len += wcl->c_len;
1384                         avail_len = 0;
1385                 } else {
1386                         data_len += count;
1387                         avail_len = wcl->c_len - count;
1388                         wcl->c_len = count;
1389                 }
1390                 count -= wcl->c_len;
1391 
1392                 if (count == 0)
1393                         break;
1394 
1395                 wcl = wcl->c_next;
1396         }
1397 
1398         /*
1399          * MUST fail if there are still more data
1400          */
1401         if (count > 0) {
1402                 DTRACE_PROBE2(krpc__e__rdma_setup_read_chunks_clist_len,
1403                     int, data_len, int, count);
1404                 return (FALSE);
1405         }
1406 
1407         /*
1408          * Round up the last chunk to 4-byte boundary
1409          */
1410         *wcl_len = roundup(data_len, BYTES_PER_XDR_UNIT);
1411         round_len = *wcl_len - data_len;
1412 
1413         if (round_len) {
1414 
1415                 /*
1416                  * If there is space in the current chunk,
1417                  * add the roundup to the chunk.
1418                  */
1419                 if (avail_len >= round_len) {
1420                         wcl->c_len += round_len;
1421                 } else  {
1422                         /*
1423                          * try the next one.
1424                          */
1425                         wcl = wcl->c_next;
1426                         if ((wcl == NULL) || (wcl->c_len < round_len)) {
1427                                 DTRACE_PROBE1(
1428                                     krpc__e__rdma_setup_read_chunks_rndup,
1429                                     int, round_len);
1430                                 return (FALSE);
1431                         }
1432                         wcl->c_len = round_len;
1433                 }
1434         }
1435 
1436         wcl = wcl->c_next;
1437 
1438         /*
1439          * Make rest of the chunks 0-len
1440          */
1441 
1442         clist_zero_len(wcl);
1443 
1444         return (TRUE);
1445 }