1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*      Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T     */
  22 /*        All Rights Reserved   */
  23 
  24 /*
  25  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
  26  * Use is subject to license terms.
  27  */
  28 
  29 #include <sys/types.h>
  30 #include <sys/param.h>
  31 #include <sys/thread.h>
  32 #include <sys/sysmacros.h>
  33 #include <sys/stropts.h>
  34 #include <sys/stream.h>
  35 #include <sys/strsubr.h>
  36 #include <sys/strsun.h>
  37 #include <sys/conf.h>
  38 #include <sys/debug.h>
  39 #include <sys/cmn_err.h>
  40 #include <sys/kmem.h>
  41 #include <sys/atomic.h>
  42 #include <sys/errno.h>
  43 #include <sys/vtrace.h>
  44 #include <sys/ftrace.h>
  45 #include <sys/ontrap.h>
  46 #include <sys/multidata.h>
  47 #include <sys/multidata_impl.h>
  48 #include <sys/sdt.h>
  49 #include <sys/strft.h>
  50 
  51 #ifdef DEBUG
  52 #include <sys/kmem_impl.h>
  53 #endif
  54 
  55 /*
  56  * This file contains all the STREAMS utility routines that may
  57  * be used by modules and drivers.
  58  */
  59 
  60 /*
  61  * STREAMS message allocator: principles of operation
  62  *
  63  * The streams message allocator consists of all the routines that
  64  * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
  65  * dupb(), freeb() and freemsg().  What follows is a high-level view
  66  * of how the allocator works.
  67  *
  68  * Every streams message consists of one or more mblks, a dblk, and data.
  69  * All mblks for all types of messages come from a common mblk_cache.
  70  * The dblk and data come in several flavors, depending on how the
  71  * message is allocated:
  72  *
  73  * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
  74  *     fixed-size dblk/data caches. For message sizes that are multiples of
  75  *     PAGESIZE, dblks are allocated separately from the buffer.
  76  *     The associated buffer is allocated by the constructor using kmem_alloc().
  77  *     For all other message sizes, dblk and its associated data is allocated
  78  *     as a single contiguous chunk of memory.
  79  *     Objects in these caches consist of a dblk plus its associated data.
  80  *     allocb() determines the nearest-size cache by table lookup:
  81  *     the dblk_cache[] array provides the mapping from size to dblk cache.
  82  *
  83  * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
  84  *     kmem_alloc()'ing a buffer for the data and supplying that
  85  *     buffer to gesballoc(), described below.
  86  *
  87  * (3) The four flavors of [d]esballoc[a] are all implemented by a
  88  *     common routine, gesballoc() ("generic esballoc").  gesballoc()
  89  *     allocates a dblk from the global dblk_esb_cache and sets db_base,
  90  *     db_lim and db_frtnp to describe the caller-supplied buffer.
  91  *
  92  * While there are several routines to allocate messages, there is only
  93  * one routine to free messages: freeb().  freeb() simply invokes the
  94  * dblk's free method, dbp->db_free(), which is set at allocation time.
  95  *
  96  * dupb() creates a new reference to a message by allocating a new mblk,
  97  * incrementing the dblk reference count and setting the dblk's free
  98  * method to dblk_decref().  The dblk's original free method is retained
  99  * in db_lastfree.  dblk_decref() decrements the reference count on each
 100  * freeb().  If this is not the last reference it just frees the mblk;
 101  * if this *is* the last reference, it restores db_free to db_lastfree,
 102  * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
 103  *
 104  * The implementation makes aggressive use of kmem object caching for
 105  * maximum performance.  This makes the code simple and compact, but
 106  * also a bit abstruse in some places.  The invariants that constitute a
 107  * message's constructed state, described below, are more subtle than usual.
 108  *
 109  * Every dblk has an "attached mblk" as part of its constructed state.
 110  * The mblk is allocated by the dblk's constructor and remains attached
 111  * until the message is either dup'ed or pulled up.  In the dupb() case
 112  * the mblk association doesn't matter until the last free, at which time
 113  * dblk_decref() attaches the last mblk to the dblk.  pullupmsg() affects
 114  * the mblk association because it swaps the leading mblks of two messages,
 115  * so it is responsible for swapping their db_mblk pointers accordingly.
 116  * From a constructed-state viewpoint it doesn't matter that a dblk's
 117  * attached mblk can change while the message is allocated; all that
 118  * matters is that the dblk has *some* attached mblk when it's freed.
 119  *
 120  * The sizes of the allocb() small-message caches are not magical.
 121  * They represent a good trade-off between internal and external
 122  * fragmentation for current workloads.  They should be reevaluated
 123  * periodically, especially if allocations larger than DBLK_MAX_CACHE
 124  * become common.  We use 64-byte alignment so that dblks don't
 125  * straddle cache lines unnecessarily.
 126  */
 127 #define DBLK_MAX_CACHE          73728
 128 #define DBLK_CACHE_ALIGN        64
 129 #define DBLK_MIN_SIZE           8
 130 #define DBLK_SIZE_SHIFT         3
 131 
 132 #ifdef _BIG_ENDIAN
 133 #define DBLK_RTFU_SHIFT(field)  \
 134         (8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
 135 #else
 136 #define DBLK_RTFU_SHIFT(field)  \
 137         (8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
 138 #endif
 139 
 140 #define DBLK_RTFU(ref, type, flags, uioflag)    \
 141         (((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
 142         ((type) << DBLK_RTFU_SHIFT(db_type)) | \
 143         (((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
 144         ((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
 145 #define DBLK_RTFU_REF_MASK      (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
 146 #define DBLK_RTFU_WORD(dbp)     (*((uint32_t *)&(dbp)->db_ref))
 147 #define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band))
 148 
 149 static size_t dblk_sizes[] = {
 150 #ifdef _LP64
 151         16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856,
 152         8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624,
 153         40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392,
 154 #else
 155         64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904,
 156         8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672,
 157         40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440,
 158 #endif
 159         DBLK_MAX_CACHE, 0
 160 };
 161 
 162 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
 163 static struct kmem_cache *mblk_cache;
 164 static struct kmem_cache *dblk_esb_cache;
 165 static struct kmem_cache *fthdr_cache;
 166 static struct kmem_cache *ftblk_cache;
 167 
 168 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
 169 static mblk_t *allocb_oversize(size_t size, int flags);
 170 static int allocb_tryhard_fails;
 171 static void frnop_func(void *arg);
 172 frtn_t frnop = { frnop_func };
 173 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
 174 
 175 static boolean_t rwnext_enter(queue_t *qp);
 176 static void rwnext_exit(queue_t *qp);
 177 
 178 /*
 179  * Patchable mblk/dblk kmem_cache flags.
 180  */
 181 int dblk_kmem_flags = 0;
 182 int mblk_kmem_flags = 0;
 183 
 184 static int
 185 dblk_constructor(void *buf, void *cdrarg, int kmflags)
 186 {
 187         dblk_t *dbp = buf;
 188         ssize_t msg_size = (ssize_t)cdrarg;
 189         size_t index;
 190 
 191         ASSERT(msg_size != 0);
 192 
 193         index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
 194 
 195         ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
 196 
 197         if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
 198                 return (-1);
 199         if ((msg_size & PAGEOFFSET) == 0) {
 200                 dbp->db_base = kmem_alloc(msg_size, kmflags);
 201                 if (dbp->db_base == NULL) {
 202                         kmem_cache_free(mblk_cache, dbp->db_mblk);
 203                         return (-1);
 204                 }
 205         } else {
 206                 dbp->db_base = (unsigned char *)&dbp[1];
 207         }
 208 
 209         dbp->db_mblk->b_datap = dbp;
 210         dbp->db_cache = dblk_cache[index];
 211         dbp->db_lim = dbp->db_base + msg_size;
 212         dbp->db_free = dbp->db_lastfree = dblk_lastfree;
 213         dbp->db_frtnp = NULL;
 214         dbp->db_fthdr = NULL;
 215         dbp->db_credp = NULL;
 216         dbp->db_cpid = -1;
 217         dbp->db_struioflag = 0;
 218         dbp->db_struioun.cksum.flags = 0;
 219         return (0);
 220 }
 221 
 222 /*ARGSUSED*/
 223 static int
 224 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
 225 {
 226         dblk_t *dbp = buf;
 227 
 228         if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
 229                 return (-1);
 230         dbp->db_mblk->b_datap = dbp;
 231         dbp->db_cache = dblk_esb_cache;
 232         dbp->db_fthdr = NULL;
 233         dbp->db_credp = NULL;
 234         dbp->db_cpid = -1;
 235         dbp->db_struioflag = 0;
 236         dbp->db_struioun.cksum.flags = 0;
 237         return (0);
 238 }
 239 
 240 static int
 241 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
 242 {
 243         dblk_t *dbp = buf;
 244         bcache_t *bcp = cdrarg;
 245 
 246         if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
 247                 return (-1);
 248 
 249         dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags);
 250         if (dbp->db_base == NULL) {
 251                 kmem_cache_free(mblk_cache, dbp->db_mblk);
 252                 return (-1);
 253         }
 254 
 255         dbp->db_mblk->b_datap = dbp;
 256         dbp->db_cache = (void *)bcp;
 257         dbp->db_lim = dbp->db_base + bcp->size;
 258         dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
 259         dbp->db_frtnp = NULL;
 260         dbp->db_fthdr = NULL;
 261         dbp->db_credp = NULL;
 262         dbp->db_cpid = -1;
 263         dbp->db_struioflag = 0;
 264         dbp->db_struioun.cksum.flags = 0;
 265         return (0);
 266 }
 267 
 268 /*ARGSUSED*/
 269 static void
 270 dblk_destructor(void *buf, void *cdrarg)
 271 {
 272         dblk_t *dbp = buf;
 273         ssize_t msg_size = (ssize_t)cdrarg;
 274 
 275         ASSERT(dbp->db_mblk->b_datap == dbp);
 276         ASSERT(msg_size != 0);
 277         ASSERT(dbp->db_struioflag == 0);
 278         ASSERT(dbp->db_struioun.cksum.flags == 0);
 279 
 280         if ((msg_size & PAGEOFFSET) == 0) {
 281                 kmem_free(dbp->db_base, msg_size);
 282         }
 283 
 284         kmem_cache_free(mblk_cache, dbp->db_mblk);
 285 }
 286 
 287 static void
 288 bcache_dblk_destructor(void *buf, void *cdrarg)
 289 {
 290         dblk_t *dbp = buf;
 291         bcache_t *bcp = cdrarg;
 292 
 293         kmem_cache_free(bcp->buffer_cache, dbp->db_base);
 294 
 295         ASSERT(dbp->db_mblk->b_datap == dbp);
 296         ASSERT(dbp->db_struioflag == 0);
 297         ASSERT(dbp->db_struioun.cksum.flags == 0);
 298 
 299         kmem_cache_free(mblk_cache, dbp->db_mblk);
 300 }
 301 
 302 /* ARGSUSED */
 303 static int
 304 ftblk_constructor(void *buf, void *cdrarg, int kmflags)
 305 {
 306         ftblk_t *fbp = buf;
 307         int i;
 308 
 309         bzero(fbp, sizeof (ftblk_t));
 310         if (str_ftstack != 0) {
 311                 for (i = 0; i < FTBLK_EVNTS; i++)
 312                         fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags);
 313         }
 314 
 315         return (0);
 316 }
 317 
 318 /* ARGSUSED */
 319 static void
 320 ftblk_destructor(void *buf, void *cdrarg)
 321 {
 322         ftblk_t *fbp = buf;
 323         int i;
 324 
 325         if (str_ftstack != 0) {
 326                 for (i = 0; i < FTBLK_EVNTS; i++) {
 327                         if (fbp->ev[i].stk != NULL) {
 328                                 kmem_free(fbp->ev[i].stk, sizeof (ftstk_t));
 329                                 fbp->ev[i].stk = NULL;
 330                         }
 331                 }
 332         }
 333 }
 334 
 335 static int
 336 fthdr_constructor(void *buf, void *cdrarg, int kmflags)
 337 {
 338         fthdr_t *fhp = buf;
 339 
 340         return (ftblk_constructor(&fhp->first, cdrarg, kmflags));
 341 }
 342 
 343 static void
 344 fthdr_destructor(void *buf, void *cdrarg)
 345 {
 346         fthdr_t *fhp = buf;
 347 
 348         ftblk_destructor(&fhp->first, cdrarg);
 349 }
 350 
 351 void
 352 streams_msg_init(void)
 353 {
 354         char name[40];
 355         size_t size;
 356         size_t lastsize = DBLK_MIN_SIZE;
 357         size_t *sizep;
 358         struct kmem_cache *cp;
 359         size_t tot_size;
 360         int offset;
 361 
 362         mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32,
 363             NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags);
 364 
 365         for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
 366 
 367                 if ((offset = (size & PAGEOFFSET)) != 0) {
 368                         /*
 369                          * We are in the middle of a page, dblk should
 370                          * be allocated on the same page
 371                          */
 372                         tot_size = size + sizeof (dblk_t);
 373                         ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
 374                             < PAGESIZE);
 375                         ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
 376 
 377                 } else {
 378 
 379                         /*
 380                          * buf size is multiple of page size, dblk and
 381                          * buffer are allocated separately.
 382                          */
 383 
 384                         ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
 385                         tot_size = sizeof (dblk_t);
 386                 }
 387 
 388                 (void) sprintf(name, "streams_dblk_%ld", size);
 389                 cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN,
 390                     dblk_constructor, dblk_destructor, NULL, (void *)(size),
 391                     NULL, dblk_kmem_flags);
 392 
 393                 while (lastsize <= size) {
 394                         dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
 395                         lastsize += DBLK_MIN_SIZE;
 396                 }
 397         }
 398 
 399         dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t),
 400             DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL,
 401             (void *)sizeof (dblk_t), NULL, dblk_kmem_flags);
 402         fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32,
 403             fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0);
 404         ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32,
 405             ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0);
 406 
 407         /* Initialize Multidata caches */
 408         mmd_init();
 409 
 410         /* initialize throttling queue for esballoc */
 411         esballoc_queue_init();
 412 }
 413 
 414 /*ARGSUSED*/
 415 mblk_t *
 416 allocb(size_t size, uint_t pri)
 417 {
 418         dblk_t *dbp;
 419         mblk_t *mp;
 420         size_t index;
 421 
 422         index =  (size - 1)  >> DBLK_SIZE_SHIFT;
 423 
 424         if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
 425                 if (size != 0) {
 426                         mp = allocb_oversize(size, KM_NOSLEEP);
 427                         goto out;
 428                 }
 429                 index = 0;
 430         }
 431 
 432         if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
 433                 mp = NULL;
 434                 goto out;
 435         }
 436 
 437         mp = dbp->db_mblk;
 438         DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
 439         mp->b_next = mp->b_prev = mp->b_cont = NULL;
 440         mp->b_rptr = mp->b_wptr = dbp->db_base;
 441         mp->b_queue = NULL;
 442         MBLK_BAND_FLAG_WORD(mp) = 0;
 443         STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
 444 out:
 445         FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
 446 
 447         return (mp);
 448 }
 449 
 450 /*
 451  * Allocate an mblk taking db_credp and db_cpid from the template.
 452  * Allow the cred to be NULL.
 453  */
 454 mblk_t *
 455 allocb_tmpl(size_t size, const mblk_t *tmpl)
 456 {
 457         mblk_t *mp = allocb(size, 0);
 458 
 459         if (mp != NULL) {
 460                 dblk_t *src = tmpl->b_datap;
 461                 dblk_t *dst = mp->b_datap;
 462                 cred_t *cr;
 463                 pid_t cpid;
 464 
 465                 cr = msg_getcred(tmpl, &cpid);
 466                 if (cr != NULL)
 467                         crhold(dst->db_credp = cr);
 468                 dst->db_cpid = cpid;
 469                 dst->db_type = src->db_type;
 470         }
 471         return (mp);
 472 }
 473 
 474 mblk_t *
 475 allocb_cred(size_t size, cred_t *cr, pid_t cpid)
 476 {
 477         mblk_t *mp = allocb(size, 0);
 478 
 479         ASSERT(cr != NULL);
 480         if (mp != NULL) {
 481                 dblk_t *dbp = mp->b_datap;
 482 
 483                 crhold(dbp->db_credp = cr);
 484                 dbp->db_cpid = cpid;
 485         }
 486         return (mp);
 487 }
 488 
 489 mblk_t *
 490 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid)
 491 {
 492         mblk_t *mp = allocb_wait(size, 0, flags, error);
 493 
 494         ASSERT(cr != NULL);
 495         if (mp != NULL) {
 496                 dblk_t *dbp = mp->b_datap;
 497 
 498                 crhold(dbp->db_credp = cr);
 499                 dbp->db_cpid = cpid;
 500         }
 501 
 502         return (mp);
 503 }
 504 
 505 /*
 506  * Extract the db_cred (and optionally db_cpid) from a message.
 507  * We find the first mblk which has a non-NULL db_cred and use that.
 508  * If none found we return NULL.
 509  * Does NOT get a hold on the cred.
 510  */
 511 cred_t *
 512 msg_getcred(const mblk_t *mp, pid_t *cpidp)
 513 {
 514         cred_t *cr = NULL;
 515         cred_t *cr2;
 516         mblk_t *mp2;
 517 
 518         while (mp != NULL) {
 519                 dblk_t *dbp = mp->b_datap;
 520 
 521                 cr = dbp->db_credp;
 522                 if (cr == NULL) {
 523                         mp = mp->b_cont;
 524                         continue;
 525                 }
 526                 if (cpidp != NULL)
 527                         *cpidp = dbp->db_cpid;
 528 
 529 #ifdef DEBUG
 530                 /*
 531                  * Normally there should at most one db_credp in a message.
 532                  * But if there are multiple (as in the case of some M_IOC*
 533                  * and some internal messages in TCP/IP bind logic) then
 534                  * they must be identical in the normal case.
 535                  * However, a socket can be shared between different uids
 536                  * in which case data queued in TCP would be from different
 537                  * creds. Thus we can only assert for the zoneid being the
 538                  * same. Due to Multi-level Level Ports for TX, some
 539                  * cred_t can have a NULL cr_zone, and we skip the comparison
 540                  * in that case.
 541                  */
 542                 mp2 = mp->b_cont;
 543                 while (mp2 != NULL) {
 544                         cr2 = DB_CRED(mp2);
 545                         if (cr2 != NULL) {
 546                                 DTRACE_PROBE2(msg__getcred,
 547                                     cred_t *, cr, cred_t *, cr2);
 548                                 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
 549                                     crgetzone(cr) == NULL ||
 550                                     crgetzone(cr2) == NULL);
 551                         }
 552                         mp2 = mp2->b_cont;
 553                 }
 554 #endif
 555                 return (cr);
 556         }
 557         if (cpidp != NULL)
 558                 *cpidp = NOPID;
 559         return (NULL);
 560 }
 561 
 562 /*
 563  * Variant of msg_getcred which, when a cred is found
 564  * 1. Returns with a hold on the cred
 565  * 2. Clears the first cred in the mblk.
 566  * This is more efficient to use than a msg_getcred() + crhold() when
 567  * the message is freed after the cred has been extracted.
 568  *
 569  * The caller is responsible for ensuring that there is no other reference
 570  * on the message since db_credp can not be cleared when there are other
 571  * references.
 572  */
 573 cred_t *
 574 msg_extractcred(mblk_t *mp, pid_t *cpidp)
 575 {
 576         cred_t *cr = NULL;
 577         cred_t *cr2;
 578         mblk_t *mp2;
 579 
 580         while (mp != NULL) {
 581                 dblk_t *dbp = mp->b_datap;
 582 
 583                 cr = dbp->db_credp;
 584                 if (cr == NULL) {
 585                         mp = mp->b_cont;
 586                         continue;
 587                 }
 588                 ASSERT(dbp->db_ref == 1);
 589                 dbp->db_credp = NULL;
 590                 if (cpidp != NULL)
 591                         *cpidp = dbp->db_cpid;
 592 #ifdef DEBUG
 593                 /*
 594                  * Normally there should at most one db_credp in a message.
 595                  * But if there are multiple (as in the case of some M_IOC*
 596                  * and some internal messages in TCP/IP bind logic) then
 597                  * they must be identical in the normal case.
 598                  * However, a socket can be shared between different uids
 599                  * in which case data queued in TCP would be from different
 600                  * creds. Thus we can only assert for the zoneid being the
 601                  * same. Due to Multi-level Level Ports for TX, some
 602                  * cred_t can have a NULL cr_zone, and we skip the comparison
 603                  * in that case.
 604                  */
 605                 mp2 = mp->b_cont;
 606                 while (mp2 != NULL) {
 607                         cr2 = DB_CRED(mp2);
 608                         if (cr2 != NULL) {
 609                                 DTRACE_PROBE2(msg__extractcred,
 610                                     cred_t *, cr, cred_t *, cr2);
 611                                 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
 612                                     crgetzone(cr) == NULL ||
 613                                     crgetzone(cr2) == NULL);
 614                         }
 615                         mp2 = mp2->b_cont;
 616                 }
 617 #endif
 618                 return (cr);
 619         }
 620         return (NULL);
 621 }
 622 /*
 623  * Get the label for a message. Uses the first mblk in the message
 624  * which has a non-NULL db_credp.
 625  * Returns NULL if there is no credp.
 626  */
 627 extern struct ts_label_s *
 628 msg_getlabel(const mblk_t *mp)
 629 {
 630         cred_t *cr = msg_getcred(mp, NULL);
 631 
 632         if (cr == NULL)
 633                 return (NULL);
 634 
 635         return (crgetlabel(cr));
 636 }
 637 
 638 void
 639 freeb(mblk_t *mp)
 640 {
 641         dblk_t *dbp = mp->b_datap;
 642 
 643         ASSERT(dbp->db_ref > 0);
 644         ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
 645         FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
 646 
 647         STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
 648 
 649         dbp->db_free(mp, dbp);
 650 }
 651 
 652 void
 653 freemsg(mblk_t *mp)
 654 {
 655         FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
 656         while (mp) {
 657                 dblk_t *dbp = mp->b_datap;
 658                 mblk_t *mp_cont = mp->b_cont;
 659 
 660                 ASSERT(dbp->db_ref > 0);
 661                 ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
 662 
 663                 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
 664 
 665                 dbp->db_free(mp, dbp);
 666                 mp = mp_cont;
 667         }
 668 }
 669 
 670 /*
 671  * Reallocate a block for another use.  Try hard to use the old block.
 672  * If the old data is wanted (copy), leave b_wptr at the end of the data,
 673  * otherwise return b_wptr = b_rptr.
 674  *
 675  * This routine is private and unstable.
 676  */
 677 mblk_t  *
 678 reallocb(mblk_t *mp, size_t size, uint_t copy)
 679 {
 680         mblk_t          *mp1;
 681         unsigned char   *old_rptr;
 682         ptrdiff_t       cur_size;
 683 
 684         if (mp == NULL)
 685                 return (allocb(size, BPRI_HI));
 686 
 687         cur_size = mp->b_wptr - mp->b_rptr;
 688         old_rptr = mp->b_rptr;
 689 
 690         ASSERT(mp->b_datap->db_ref != 0);
 691 
 692         if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
 693                 /*
 694                  * If the data is wanted and it will fit where it is, no
 695                  * work is required.
 696                  */
 697                 if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
 698                         return (mp);
 699 
 700                 mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
 701                 mp1 = mp;
 702         } else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
 703                 /* XXX other mp state could be copied too, db_flags ... ? */
 704                 mp1->b_cont = mp->b_cont;
 705         } else {
 706                 return (NULL);
 707         }
 708 
 709         if (copy) {
 710                 bcopy(old_rptr, mp1->b_rptr, cur_size);
 711                 mp1->b_wptr = mp1->b_rptr + cur_size;
 712         }
 713 
 714         if (mp != mp1)
 715                 freeb(mp);
 716 
 717         return (mp1);
 718 }
 719 
 720 static void
 721 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
 722 {
 723         ASSERT(dbp->db_mblk == mp);
 724         if (dbp->db_fthdr != NULL)
 725                 str_ftfree(dbp);
 726 
 727         /* set credp and projid to be 'unspecified' before returning to cache */
 728         if (dbp->db_credp != NULL) {
 729                 crfree(dbp->db_credp);
 730                 dbp->db_credp = NULL;
 731         }
 732         dbp->db_cpid = -1;
 733 
 734         /* Reset the struioflag and the checksum flag fields */
 735         dbp->db_struioflag = 0;
 736         dbp->db_struioun.cksum.flags = 0;
 737 
 738         /* and the COOKED and/or UIOA flag(s) */
 739         dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA);
 740 
 741         kmem_cache_free(dbp->db_cache, dbp);
 742 }
 743 
 744 static void
 745 dblk_decref(mblk_t *mp, dblk_t *dbp)
 746 {
 747         if (dbp->db_ref != 1) {
 748                 uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
 749                     -(1 << DBLK_RTFU_SHIFT(db_ref)));
 750                 /*
 751                  * atomic_add_32_nv() just decremented db_ref, so we no longer
 752                  * have a reference to the dblk, which means another thread
 753                  * could free it.  Therefore we cannot examine the dblk to
 754                  * determine whether ours was the last reference.  Instead,
 755                  * we extract the new and minimum reference counts from rtfu.
 756                  * Note that all we're really saying is "if (ref != refmin)".
 757                  */
 758                 if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
 759                     ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
 760                         kmem_cache_free(mblk_cache, mp);
 761                         return;
 762                 }
 763         }
 764         dbp->db_mblk = mp;
 765         dbp->db_free = dbp->db_lastfree;
 766         dbp->db_lastfree(mp, dbp);
 767 }
 768 
 769 mblk_t *
 770 dupb(mblk_t *mp)
 771 {
 772         dblk_t *dbp = mp->b_datap;
 773         mblk_t *new_mp;
 774         uint32_t oldrtfu, newrtfu;
 775 
 776         if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
 777                 goto out;
 778 
 779         new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
 780         new_mp->b_rptr = mp->b_rptr;
 781         new_mp->b_wptr = mp->b_wptr;
 782         new_mp->b_datap = dbp;
 783         new_mp->b_queue = NULL;
 784         MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
 785 
 786         STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
 787 
 788         dbp->db_free = dblk_decref;
 789         do {
 790                 ASSERT(dbp->db_ref > 0);
 791                 oldrtfu = DBLK_RTFU_WORD(dbp);
 792                 newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
 793                 /*
 794                  * If db_ref is maxed out we can't dup this message anymore.
 795                  */
 796                 if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
 797                         kmem_cache_free(mblk_cache, new_mp);
 798                         new_mp = NULL;
 799                         goto out;
 800                 }
 801         } while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu);
 802 
 803 out:
 804         FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
 805         return (new_mp);
 806 }
 807 
 808 static void
 809 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
 810 {
 811         frtn_t *frp = dbp->db_frtnp;
 812 
 813         ASSERT(dbp->db_mblk == mp);
 814         frp->free_func(frp->free_arg);
 815         if (dbp->db_fthdr != NULL)
 816                 str_ftfree(dbp);
 817 
 818         /* set credp and projid to be 'unspecified' before returning to cache */
 819         if (dbp->db_credp != NULL) {
 820                 crfree(dbp->db_credp);
 821                 dbp->db_credp = NULL;
 822         }
 823         dbp->db_cpid = -1;
 824         dbp->db_struioflag = 0;
 825         dbp->db_struioun.cksum.flags = 0;
 826 
 827         kmem_cache_free(dbp->db_cache, dbp);
 828 }
 829 
 830 /*ARGSUSED*/
 831 static void
 832 frnop_func(void *arg)
 833 {
 834 }
 835 
 836 /*
 837  * Generic esballoc used to implement the four flavors: [d]esballoc[a].
 838  */
 839 static mblk_t *
 840 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
 841         void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
 842 {
 843         dblk_t *dbp;
 844         mblk_t *mp;
 845 
 846         ASSERT(base != NULL && frp != NULL);
 847 
 848         if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
 849                 mp = NULL;
 850                 goto out;
 851         }
 852 
 853         mp = dbp->db_mblk;
 854         dbp->db_base = base;
 855         dbp->db_lim = base + size;
 856         dbp->db_free = dbp->db_lastfree = lastfree;
 857         dbp->db_frtnp = frp;
 858         DBLK_RTFU_WORD(dbp) = db_rtfu;
 859         mp->b_next = mp->b_prev = mp->b_cont = NULL;
 860         mp->b_rptr = mp->b_wptr = base;
 861         mp->b_queue = NULL;
 862         MBLK_BAND_FLAG_WORD(mp) = 0;
 863 
 864 out:
 865         FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
 866         return (mp);
 867 }
 868 
 869 /*ARGSUSED*/
 870 mblk_t *
 871 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
 872 {
 873         mblk_t *mp;
 874 
 875         /*
 876          * Note that this is structured to allow the common case (i.e.
 877          * STREAMS flowtracing disabled) to call gesballoc() with tail
 878          * call optimization.
 879          */
 880         if (!str_ftnever) {
 881                 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
 882                     frp, freebs_enqueue, KM_NOSLEEP);
 883 
 884                 if (mp != NULL)
 885                         STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
 886                 return (mp);
 887         }
 888 
 889         return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
 890             frp, freebs_enqueue, KM_NOSLEEP));
 891 }
 892 
 893 /*
 894  * Same as esballoc() but sleeps waiting for memory.
 895  */
 896 /*ARGSUSED*/
 897 mblk_t *
 898 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
 899 {
 900         mblk_t *mp;
 901 
 902         /*
 903          * Note that this is structured to allow the common case (i.e.
 904          * STREAMS flowtracing disabled) to call gesballoc() with tail
 905          * call optimization.
 906          */
 907         if (!str_ftnever) {
 908                 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
 909                     frp, freebs_enqueue, KM_SLEEP);
 910 
 911                 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
 912                 return (mp);
 913         }
 914 
 915         return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
 916             frp, freebs_enqueue, KM_SLEEP));
 917 }
 918 
 919 /*ARGSUSED*/
 920 mblk_t *
 921 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
 922 {
 923         mblk_t *mp;
 924 
 925         /*
 926          * Note that this is structured to allow the common case (i.e.
 927          * STREAMS flowtracing disabled) to call gesballoc() with tail
 928          * call optimization.
 929          */
 930         if (!str_ftnever) {
 931                 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
 932                     frp, dblk_lastfree_desb, KM_NOSLEEP);
 933 
 934                 if (mp != NULL)
 935                         STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
 936                 return (mp);
 937         }
 938 
 939         return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
 940             frp, dblk_lastfree_desb, KM_NOSLEEP));
 941 }
 942 
 943 /*ARGSUSED*/
 944 mblk_t *
 945 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
 946 {
 947         mblk_t *mp;
 948 
 949         /*
 950          * Note that this is structured to allow the common case (i.e.
 951          * STREAMS flowtracing disabled) to call gesballoc() with tail
 952          * call optimization.
 953          */
 954         if (!str_ftnever) {
 955                 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
 956                     frp, freebs_enqueue, KM_NOSLEEP);
 957 
 958                 if (mp != NULL)
 959                         STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
 960                 return (mp);
 961         }
 962 
 963         return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
 964             frp, freebs_enqueue, KM_NOSLEEP));
 965 }
 966 
 967 /*ARGSUSED*/
 968 mblk_t *
 969 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
 970 {
 971         mblk_t *mp;
 972 
 973         /*
 974          * Note that this is structured to allow the common case (i.e.
 975          * STREAMS flowtracing disabled) to call gesballoc() with tail
 976          * call optimization.
 977          */
 978         if (!str_ftnever) {
 979                 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
 980                     frp, dblk_lastfree_desb, KM_NOSLEEP);
 981 
 982                 if (mp != NULL)
 983                         STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
 984                 return (mp);
 985         }
 986 
 987         return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
 988             frp, dblk_lastfree_desb, KM_NOSLEEP));
 989 }
 990 
 991 static void
 992 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
 993 {
 994         bcache_t *bcp = dbp->db_cache;
 995 
 996         ASSERT(dbp->db_mblk == mp);
 997         if (dbp->db_fthdr != NULL)
 998                 str_ftfree(dbp);
 999 
1000         /* set credp and projid to be 'unspecified' before returning to cache */
1001         if (dbp->db_credp != NULL) {
1002                 crfree(dbp->db_credp);
1003                 dbp->db_credp = NULL;
1004         }
1005         dbp->db_cpid = -1;
1006         dbp->db_struioflag = 0;
1007         dbp->db_struioun.cksum.flags = 0;
1008 
1009         mutex_enter(&bcp->mutex);
1010         kmem_cache_free(bcp->dblk_cache, dbp);
1011         bcp->alloc--;
1012 
1013         if (bcp->alloc == 0 && bcp->destroy != 0) {
1014                 kmem_cache_destroy(bcp->dblk_cache);
1015                 kmem_cache_destroy(bcp->buffer_cache);
1016                 mutex_exit(&bcp->mutex);
1017                 mutex_destroy(&bcp->mutex);
1018                 kmem_free(bcp, sizeof (bcache_t));
1019         } else {
1020                 mutex_exit(&bcp->mutex);
1021         }
1022 }
1023 
1024 bcache_t *
1025 bcache_create(char *name, size_t size, uint_t align)
1026 {
1027         bcache_t *bcp;
1028         char buffer[255];
1029 
1030         ASSERT((align & (align - 1)) == 0);
1031 
1032         if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL)
1033                 return (NULL);
1034 
1035         bcp->size = size;
1036         bcp->align = align;
1037         bcp->alloc = 0;
1038         bcp->destroy = 0;
1039 
1040         mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
1041 
1042         (void) sprintf(buffer, "%s_buffer_cache", name);
1043         bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
1044             NULL, NULL, NULL, 0);
1045         (void) sprintf(buffer, "%s_dblk_cache", name);
1046         bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
1047             DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
1048             NULL, (void *)bcp, NULL, 0);
1049 
1050         return (bcp);
1051 }
1052 
1053 void
1054 bcache_destroy(bcache_t *bcp)
1055 {
1056         ASSERT(bcp != NULL);
1057 
1058         mutex_enter(&bcp->mutex);
1059         if (bcp->alloc == 0) {
1060                 kmem_cache_destroy(bcp->dblk_cache);
1061                 kmem_cache_destroy(bcp->buffer_cache);
1062                 mutex_exit(&bcp->mutex);
1063                 mutex_destroy(&bcp->mutex);
1064                 kmem_free(bcp, sizeof (bcache_t));
1065         } else {
1066                 bcp->destroy++;
1067                 mutex_exit(&bcp->mutex);
1068         }
1069 }
1070 
1071 /*ARGSUSED*/
1072 mblk_t *
1073 bcache_allocb(bcache_t *bcp, uint_t pri)
1074 {
1075         dblk_t *dbp;
1076         mblk_t *mp = NULL;
1077 
1078         ASSERT(bcp != NULL);
1079 
1080         mutex_enter(&bcp->mutex);
1081         if (bcp->destroy != 0) {
1082                 mutex_exit(&bcp->mutex);
1083                 goto out;
1084         }
1085 
1086         if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
1087                 mutex_exit(&bcp->mutex);
1088                 goto out;
1089         }
1090         bcp->alloc++;
1091         mutex_exit(&bcp->mutex);
1092 
1093         ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
1094 
1095         mp = dbp->db_mblk;
1096         DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1097         mp->b_next = mp->b_prev = mp->b_cont = NULL;
1098         mp->b_rptr = mp->b_wptr = dbp->db_base;
1099         mp->b_queue = NULL;
1100         MBLK_BAND_FLAG_WORD(mp) = 0;
1101         STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
1102 out:
1103         FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
1104 
1105         return (mp);
1106 }
1107 
1108 static void
1109 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
1110 {
1111         ASSERT(dbp->db_mblk == mp);
1112         if (dbp->db_fthdr != NULL)
1113                 str_ftfree(dbp);
1114 
1115         /* set credp and projid to be 'unspecified' before returning to cache */
1116         if (dbp->db_credp != NULL) {
1117                 crfree(dbp->db_credp);
1118                 dbp->db_credp = NULL;
1119         }
1120         dbp->db_cpid = -1;
1121         dbp->db_struioflag = 0;
1122         dbp->db_struioun.cksum.flags = 0;
1123 
1124         kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
1125         kmem_cache_free(dbp->db_cache, dbp);
1126 }
1127 
1128 static mblk_t *
1129 allocb_oversize(size_t size, int kmflags)
1130 {
1131         mblk_t *mp;
1132         void *buf;
1133 
1134         size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
1135         if ((buf = kmem_alloc(size, kmflags)) == NULL)
1136                 return (NULL);
1137         if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
1138             &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
1139                 kmem_free(buf, size);
1140 
1141         if (mp != NULL)
1142                 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
1143 
1144         return (mp);
1145 }
1146 
1147 mblk_t *
1148 allocb_tryhard(size_t target_size)
1149 {
1150         size_t size;
1151         mblk_t *bp;
1152 
1153         for (size = target_size; size < target_size + 512;
1154             size += DBLK_CACHE_ALIGN)
1155                 if ((bp = allocb(size, BPRI_HI)) != NULL)
1156                         return (bp);
1157         allocb_tryhard_fails++;
1158         return (NULL);
1159 }
1160 
1161 /*
1162  * This routine is consolidation private for STREAMS internal use
1163  * This routine may only be called from sync routines (i.e., not
1164  * from put or service procedures).  It is located here (rather
1165  * than strsubr.c) so that we don't have to expose all of the
1166  * allocb() implementation details in header files.
1167  */
1168 mblk_t *
1169 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
1170 {
1171         dblk_t *dbp;
1172         mblk_t *mp;
1173         size_t index;
1174 
1175         index = (size -1) >> DBLK_SIZE_SHIFT;
1176 
1177         if (flags & STR_NOSIG) {
1178                 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
1179                         if (size != 0) {
1180                                 mp = allocb_oversize(size, KM_SLEEP);
1181                                 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
1182                                     (uintptr_t)mp);
1183                                 return (mp);
1184                         }
1185                         index = 0;
1186                 }
1187 
1188                 dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1189                 mp = dbp->db_mblk;
1190                 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1191                 mp->b_next = mp->b_prev = mp->b_cont = NULL;
1192                 mp->b_rptr = mp->b_wptr = dbp->db_base;
1193                 mp->b_queue = NULL;
1194                 MBLK_BAND_FLAG_WORD(mp) = 0;
1195                 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1196 
1197                 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1198 
1199         } else {
1200                 while ((mp = allocb(size, pri)) == NULL) {
1201                         if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1202                                 return (NULL);
1203                 }
1204         }
1205 
1206         return (mp);
1207 }
1208 
1209 /*
1210  * Call function 'func' with 'arg' when a class zero block can
1211  * be allocated with priority 'pri'.
1212  */
1213 bufcall_id_t
1214 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1215 {
1216         return (bufcall(1, pri, func, arg));
1217 }
1218 
1219 /*
1220  * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1221  * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1222  * This provides consistency for all internal allocators of ioctl.
1223  */
1224 mblk_t *
1225 mkiocb(uint_t cmd)
1226 {
1227         struct iocblk   *ioc;
1228         mblk_t          *mp;
1229 
1230         /*
1231          * Allocate enough space for any of the ioctl related messages.
1232          */
1233         if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1234                 return (NULL);
1235 
1236         bzero(mp->b_rptr, sizeof (union ioctypes));
1237 
1238         /*
1239          * Set the mblk_t information and ptrs correctly.
1240          */
1241         mp->b_wptr += sizeof (struct iocblk);
1242         mp->b_datap->db_type = M_IOCTL;
1243 
1244         /*
1245          * Fill in the fields.
1246          */
1247         ioc             = (struct iocblk *)mp->b_rptr;
1248         ioc->ioc_cmd = cmd;
1249         ioc->ioc_cr  = kcred;
1250         ioc->ioc_id  = getiocseqno();
1251         ioc->ioc_flag        = IOC_NATIVE;
1252         return (mp);
1253 }
1254 
1255 /*
1256  * test if block of given size can be allocated with a request of
1257  * the given priority.
1258  * 'pri' is no longer used, but is retained for compatibility.
1259  */
1260 /* ARGSUSED */
1261 int
1262 testb(size_t size, uint_t pri)
1263 {
1264         return ((size + sizeof (dblk_t)) <= kmem_avail());
1265 }
1266 
1267 /*
1268  * Call function 'func' with argument 'arg' when there is a reasonably
1269  * good chance that a block of size 'size' can be allocated.
1270  * 'pri' is no longer used, but is retained for compatibility.
1271  */
1272 /* ARGSUSED */
1273 bufcall_id_t
1274 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1275 {
1276         static long bid = 1;    /* always odd to save checking for zero */
1277         bufcall_id_t bc_id;
1278         struct strbufcall *bcp;
1279 
1280         if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1281                 return (0);
1282 
1283         bcp->bc_func = func;
1284         bcp->bc_arg = arg;
1285         bcp->bc_size = size;
1286         bcp->bc_next = NULL;
1287         bcp->bc_executor = NULL;
1288 
1289         mutex_enter(&strbcall_lock);
1290         /*
1291          * After bcp is linked into strbcalls and strbcall_lock is dropped there
1292          * should be no references to bcp since it may be freed by
1293          * runbufcalls(). Since bcp_id field is returned, we save its value in
1294          * the local var.
1295          */
1296         bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2);       /* keep it odd */
1297 
1298         /*
1299          * add newly allocated stream event to existing
1300          * linked list of events.
1301          */
1302         if (strbcalls.bc_head == NULL) {
1303                 strbcalls.bc_head = strbcalls.bc_tail = bcp;
1304         } else {
1305                 strbcalls.bc_tail->bc_next = bcp;
1306                 strbcalls.bc_tail = bcp;
1307         }
1308 
1309         cv_signal(&strbcall_cv);
1310         mutex_exit(&strbcall_lock);
1311         return (bc_id);
1312 }
1313 
1314 /*
1315  * Cancel a bufcall request.
1316  */
1317 void
1318 unbufcall(bufcall_id_t id)
1319 {
1320         strbufcall_t *bcp, *pbcp;
1321 
1322         mutex_enter(&strbcall_lock);
1323 again:
1324         pbcp = NULL;
1325         for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1326                 if (id == bcp->bc_id)
1327                         break;
1328                 pbcp = bcp;
1329         }
1330         if (bcp) {
1331                 if (bcp->bc_executor != NULL) {
1332                         if (bcp->bc_executor != curthread) {
1333                                 cv_wait(&bcall_cv, &strbcall_lock);
1334                                 goto again;
1335                         }
1336                 } else {
1337                         if (pbcp)
1338                                 pbcp->bc_next = bcp->bc_next;
1339                         else
1340                                 strbcalls.bc_head = bcp->bc_next;
1341                         if (bcp == strbcalls.bc_tail)
1342                                 strbcalls.bc_tail = pbcp;
1343                         kmem_free(bcp, sizeof (strbufcall_t));
1344                 }
1345         }
1346         mutex_exit(&strbcall_lock);
1347 }
1348 
1349 /*
1350  * Duplicate a message block by block (uses dupb), returning
1351  * a pointer to the duplicate message.
1352  * Returns a non-NULL value only if the entire message
1353  * was dup'd.
1354  */
1355 mblk_t *
1356 dupmsg(mblk_t *bp)
1357 {
1358         mblk_t *head, *nbp;
1359 
1360         if (!bp || !(nbp = head = dupb(bp)))
1361                 return (NULL);
1362 
1363         while (bp->b_cont) {
1364                 if (!(nbp->b_cont = dupb(bp->b_cont))) {
1365                         freemsg(head);
1366                         return (NULL);
1367                 }
1368                 nbp = nbp->b_cont;
1369                 bp = bp->b_cont;
1370         }
1371         return (head);
1372 }
1373 
1374 #define DUPB_NOLOAN(bp) \
1375         ((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1376         copyb((bp)) : dupb((bp)))
1377 
1378 mblk_t *
1379 dupmsg_noloan(mblk_t *bp)
1380 {
1381         mblk_t *head, *nbp;
1382 
1383         if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1384             ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1385                 return (NULL);
1386 
1387         while (bp->b_cont) {
1388                 if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1389                         freemsg(head);
1390                         return (NULL);
1391                 }
1392                 nbp = nbp->b_cont;
1393                 bp = bp->b_cont;
1394         }
1395         return (head);
1396 }
1397 
1398 /*
1399  * Copy data from message and data block to newly allocated message and
1400  * data block. Returns new message block pointer, or NULL if error.
1401  * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1402  * as in the original even when db_base is not word aligned. (bug 1052877)
1403  */
1404 mblk_t *
1405 copyb(mblk_t *bp)
1406 {
1407         mblk_t  *nbp;
1408         dblk_t  *dp, *ndp;
1409         uchar_t *base;
1410         size_t  size;
1411         size_t  unaligned;
1412 
1413         ASSERT(bp->b_wptr >= bp->b_rptr);
1414 
1415         dp = bp->b_datap;
1416         if (dp->db_fthdr != NULL)
1417                 STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1418 
1419         /*
1420          * Special handling for Multidata message; this should be
1421          * removed once a copy-callback routine is made available.
1422          */
1423         if (dp->db_type == M_MULTIDATA) {
1424                 cred_t *cr;
1425 
1426                 if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL)
1427                         return (NULL);
1428 
1429                 nbp->b_flag = bp->b_flag;
1430                 nbp->b_band = bp->b_band;
1431                 ndp = nbp->b_datap;
1432 
1433                 /* See comments below on potential issues. */
1434                 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1435 
1436                 ASSERT(ndp->db_type == dp->db_type);
1437                 cr = dp->db_credp;
1438                 if (cr != NULL)
1439                         crhold(ndp->db_credp = cr);
1440                 ndp->db_cpid = dp->db_cpid;
1441                 return (nbp);
1442         }
1443 
1444         size = dp->db_lim - dp->db_base;
1445         unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1446         if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1447                 return (NULL);
1448         nbp->b_flag = bp->b_flag;
1449         nbp->b_band = bp->b_band;
1450         ndp = nbp->b_datap;
1451 
1452         /*
1453          * Well, here is a potential issue.  If we are trying to
1454          * trace a flow, and we copy the message, we might lose
1455          * information about where this message might have been.
1456          * So we should inherit the FT data.  On the other hand,
1457          * a user might be interested only in alloc to free data.
1458          * So I guess the real answer is to provide a tunable.
1459          */
1460         STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1461 
1462         base = ndp->db_base + unaligned;
1463         bcopy(dp->db_base, ndp->db_base + unaligned, size);
1464 
1465         nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1466         nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1467 
1468         return (nbp);
1469 }
1470 
1471 /*
1472  * Copy data from message to newly allocated message using new
1473  * data blocks.  Returns a pointer to the new message, or NULL if error.
1474  */
1475 mblk_t *
1476 copymsg(mblk_t *bp)
1477 {
1478         mblk_t *head, *nbp;
1479 
1480         if (!bp || !(nbp = head = copyb(bp)))
1481                 return (NULL);
1482 
1483         while (bp->b_cont) {
1484                 if (!(nbp->b_cont = copyb(bp->b_cont))) {
1485                         freemsg(head);
1486                         return (NULL);
1487                 }
1488                 nbp = nbp->b_cont;
1489                 bp = bp->b_cont;
1490         }
1491         return (head);
1492 }
1493 
1494 /*
1495  * link a message block to tail of message
1496  */
1497 void
1498 linkb(mblk_t *mp, mblk_t *bp)
1499 {
1500         ASSERT(mp && bp);
1501 
1502         for (; mp->b_cont; mp = mp->b_cont)
1503                 ;
1504         mp->b_cont = bp;
1505 }
1506 
1507 /*
1508  * unlink a message block from head of message
1509  * return pointer to new message.
1510  * NULL if message becomes empty.
1511  */
1512 mblk_t *
1513 unlinkb(mblk_t *bp)
1514 {
1515         mblk_t *bp1;
1516 
1517         bp1 = bp->b_cont;
1518         bp->b_cont = NULL;
1519         return (bp1);
1520 }
1521 
1522 /*
1523  * remove a message block "bp" from message "mp"
1524  *
1525  * Return pointer to new message or NULL if no message remains.
1526  * Return -1 if bp is not found in message.
1527  */
1528 mblk_t *
1529 rmvb(mblk_t *mp, mblk_t *bp)
1530 {
1531         mblk_t *tmp;
1532         mblk_t *lastp = NULL;
1533 
1534         ASSERT(mp && bp);
1535         for (tmp = mp; tmp; tmp = tmp->b_cont) {
1536                 if (tmp == bp) {
1537                         if (lastp)
1538                                 lastp->b_cont = tmp->b_cont;
1539                         else
1540                                 mp = tmp->b_cont;
1541                         tmp->b_cont = NULL;
1542                         return (mp);
1543                 }
1544                 lastp = tmp;
1545         }
1546         return ((mblk_t *)-1);
1547 }
1548 
1549 /*
1550  * Concatenate and align first len bytes of common
1551  * message type.  Len == -1, means concat everything.
1552  * Returns 1 on success, 0 on failure
1553  * After the pullup, mp points to the pulled up data.
1554  */
1555 int
1556 pullupmsg(mblk_t *mp, ssize_t len)
1557 {
1558         mblk_t *bp, *b_cont;
1559         dblk_t *dbp;
1560         ssize_t n;
1561 
1562         ASSERT(mp->b_datap->db_ref > 0);
1563         ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1564 
1565         /*
1566          * We won't handle Multidata message, since it contains
1567          * metadata which this function has no knowledge of; we
1568          * assert on DEBUG, and return failure otherwise.
1569          */
1570         ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1571         if (mp->b_datap->db_type == M_MULTIDATA)
1572                 return (0);
1573 
1574         if (len == -1) {
1575                 if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1576                         return (1);
1577                 len = xmsgsize(mp);
1578         } else {
1579                 ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1580                 ASSERT(first_mblk_len >= 0);
1581                 /*
1582                  * If the length is less than that of the first mblk,
1583                  * we want to pull up the message into an aligned mblk.
1584                  * Though not part of the spec, some callers assume it.
1585                  */
1586                 if (len <= first_mblk_len) {
1587                         if (str_aligned(mp->b_rptr))
1588                                 return (1);
1589                         len = first_mblk_len;
1590                 } else if (xmsgsize(mp) < len)
1591                         return (0);
1592         }
1593 
1594         if ((bp = allocb_tmpl(len, mp)) == NULL)
1595                 return (0);
1596 
1597         dbp = bp->b_datap;
1598         *bp = *mp;              /* swap mblks so bp heads the old msg... */
1599         mp->b_datap = dbp;   /* ... and mp heads the new message */
1600         mp->b_datap->db_mblk = mp;
1601         bp->b_datap->db_mblk = bp;
1602         mp->b_rptr = mp->b_wptr = dbp->db_base;
1603 
1604         do {
1605                 ASSERT(bp->b_datap->db_ref > 0);
1606                 ASSERT(bp->b_wptr >= bp->b_rptr);
1607                 n = MIN(bp->b_wptr - bp->b_rptr, len);
1608                 ASSERT(n >= 0);              /* allow zero-length mblk_t's */
1609                 if (n > 0)
1610                         bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1611                 mp->b_wptr += n;
1612                 bp->b_rptr += n;
1613                 len -= n;
1614                 if (bp->b_rptr != bp->b_wptr)
1615                         break;
1616                 b_cont = bp->b_cont;
1617                 freeb(bp);
1618                 bp = b_cont;
1619         } while (len && bp);
1620 
1621         mp->b_cont = bp;     /* tack on whatever wasn't pulled up */
1622 
1623         return (1);
1624 }
1625 
1626 /*
1627  * Concatenate and align at least the first len bytes of common message
1628  * type.  Len == -1 means concatenate everything.  The original message is
1629  * unaltered.  Returns a pointer to a new message on success, otherwise
1630  * returns NULL.
1631  */
1632 mblk_t *
1633 msgpullup(mblk_t *mp, ssize_t len)
1634 {
1635         mblk_t  *newmp;
1636         ssize_t totlen;
1637         ssize_t n;
1638 
1639         /*
1640          * We won't handle Multidata message, since it contains
1641          * metadata which this function has no knowledge of; we
1642          * assert on DEBUG, and return failure otherwise.
1643          */
1644         ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1645         if (mp->b_datap->db_type == M_MULTIDATA)
1646                 return (NULL);
1647 
1648         totlen = xmsgsize(mp);
1649 
1650         if ((len > 0) && (len > totlen))
1651                 return (NULL);
1652 
1653         /*
1654          * Copy all of the first msg type into one new mblk, then dupmsg
1655          * and link the rest onto this.
1656          */
1657 
1658         len = totlen;
1659 
1660         if ((newmp = allocb_tmpl(len, mp)) == NULL)
1661                 return (NULL);
1662 
1663         newmp->b_flag = mp->b_flag;
1664         newmp->b_band = mp->b_band;
1665 
1666         while (len > 0) {
1667                 n = mp->b_wptr - mp->b_rptr;
1668                 ASSERT(n >= 0);              /* allow zero-length mblk_t's */
1669                 if (n > 0)
1670                         bcopy(mp->b_rptr, newmp->b_wptr, n);
1671                 newmp->b_wptr += n;
1672                 len -= n;
1673                 mp = mp->b_cont;
1674         }
1675 
1676         if (mp != NULL) {
1677                 newmp->b_cont = dupmsg(mp);
1678                 if (newmp->b_cont == NULL) {
1679                         freemsg(newmp);
1680                         return (NULL);
1681                 }
1682         }
1683 
1684         return (newmp);
1685 }
1686 
1687 /*
1688  * Trim bytes from message
1689  *  len > 0, trim from head
1690  *  len < 0, trim from tail
1691  * Returns 1 on success, 0 on failure.
1692  */
1693 int
1694 adjmsg(mblk_t *mp, ssize_t len)
1695 {
1696         mblk_t *bp;
1697         mblk_t *save_bp = NULL;
1698         mblk_t *prev_bp;
1699         mblk_t *bcont;
1700         unsigned char type;
1701         ssize_t n;
1702         int fromhead;
1703         int first;
1704 
1705         ASSERT(mp != NULL);
1706         /*
1707          * We won't handle Multidata message, since it contains
1708          * metadata which this function has no knowledge of; we
1709          * assert on DEBUG, and return failure otherwise.
1710          */
1711         ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1712         if (mp->b_datap->db_type == M_MULTIDATA)
1713                 return (0);
1714 
1715         if (len < 0) {
1716                 fromhead = 0;
1717                 len = -len;
1718         } else {
1719                 fromhead = 1;
1720         }
1721 
1722         if (xmsgsize(mp) < len)
1723                 return (0);
1724 
1725         if (fromhead) {
1726                 first = 1;
1727                 while (len) {
1728                         ASSERT(mp->b_wptr >= mp->b_rptr);
1729                         n = MIN(mp->b_wptr - mp->b_rptr, len);
1730                         mp->b_rptr += n;
1731                         len -= n;
1732 
1733                         /*
1734                          * If this is not the first zero length
1735                          * message remove it
1736                          */
1737                         if (!first && (mp->b_wptr == mp->b_rptr)) {
1738                                 bcont = mp->b_cont;
1739                                 freeb(mp);
1740                                 mp = save_bp->b_cont = bcont;
1741                         } else {
1742                                 save_bp = mp;
1743                                 mp = mp->b_cont;
1744                         }
1745                         first = 0;
1746                 }
1747         } else {
1748                 type = mp->b_datap->db_type;
1749                 while (len) {
1750                         bp = mp;
1751                         save_bp = NULL;
1752 
1753                         /*
1754                          * Find the last message of same type
1755                          */
1756                         while (bp && bp->b_datap->db_type == type) {
1757                                 ASSERT(bp->b_wptr >= bp->b_rptr);
1758                                 prev_bp = save_bp;
1759                                 save_bp = bp;
1760                                 bp = bp->b_cont;
1761                         }
1762                         if (save_bp == NULL)
1763                                 break;
1764                         n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1765                         save_bp->b_wptr -= n;
1766                         len -= n;
1767 
1768                         /*
1769                          * If this is not the first message
1770                          * and we have taken away everything
1771                          * from this message, remove it
1772                          */
1773 
1774                         if ((save_bp != mp) &&
1775                             (save_bp->b_wptr == save_bp->b_rptr)) {
1776                                 bcont = save_bp->b_cont;
1777                                 freeb(save_bp);
1778                                 prev_bp->b_cont = bcont;
1779                         }
1780                 }
1781         }
1782         return (1);
1783 }
1784 
1785 /*
1786  * get number of data bytes in message
1787  */
1788 size_t
1789 msgdsize(mblk_t *bp)
1790 {
1791         size_t count = 0;
1792 
1793         for (; bp; bp = bp->b_cont)
1794                 if (bp->b_datap->db_type == M_DATA) {
1795                         ASSERT(bp->b_wptr >= bp->b_rptr);
1796                         count += bp->b_wptr - bp->b_rptr;
1797                 }
1798         return (count);
1799 }
1800 
1801 /*
1802  * Get a message off head of queue
1803  *
1804  * If queue has no buffers then mark queue
1805  * with QWANTR. (queue wants to be read by
1806  * someone when data becomes available)
1807  *
1808  * If there is something to take off then do so.
1809  * If queue falls below hi water mark turn off QFULL
1810  * flag.  Decrement weighted count of queue.
1811  * Also turn off QWANTR because queue is being read.
1812  *
1813  * The queue count is maintained on a per-band basis.
1814  * Priority band 0 (normal messages) uses q_count,
1815  * q_lowat, etc.  Non-zero priority bands use the
1816  * fields in their respective qband structures
1817  * (qb_count, qb_lowat, etc.)  All messages appear
1818  * on the same list, linked via their b_next pointers.
1819  * q_first is the head of the list.  q_count does
1820  * not reflect the size of all the messages on the
1821  * queue.  It only reflects those messages in the
1822  * normal band of flow.  The one exception to this
1823  * deals with high priority messages.  They are in
1824  * their own conceptual "band", but are accounted
1825  * against q_count.
1826  *
1827  * If queue count is below the lo water mark and QWANTW
1828  * is set, enable the closest backq which has a service
1829  * procedure and turn off the QWANTW flag.
1830  *
1831  * getq could be built on top of rmvq, but isn't because
1832  * of performance considerations.
1833  *
1834  * A note on the use of q_count and q_mblkcnt:
1835  *   q_count is the traditional byte count for messages that
1836  *   have been put on a queue.  Documentation tells us that
1837  *   we shouldn't rely on that count, but some drivers/modules
1838  *   do.  What was needed, however, is a mechanism to prevent
1839  *   runaway streams from consuming all of the resources,
1840  *   and particularly be able to flow control zero-length
1841  *   messages.  q_mblkcnt is used for this purpose.  It
1842  *   counts the number of mblk's that are being put on
1843  *   the queue.  The intention here, is that each mblk should
1844  *   contain one byte of data and, for the purpose of
1845  *   flow-control, logically does.  A queue will become
1846  *   full when EITHER of these values (q_count and q_mblkcnt)
1847  *   reach the highwater mark.  It will clear when BOTH
1848  *   of them drop below the highwater mark.  And it will
1849  *   backenable when BOTH of them drop below the lowwater
1850  *   mark.
1851  *   With this algorithm, a driver/module might be able
1852  *   to find a reasonably accurate q_count, and the
1853  *   framework can still try and limit resource usage.
1854  */
1855 mblk_t *
1856 getq(queue_t *q)
1857 {
1858         mblk_t *bp;
1859         uchar_t band = 0;
1860 
1861         bp = getq_noenab(q, 0);
1862         if (bp != NULL)
1863                 band = bp->b_band;
1864 
1865         /*
1866          * Inlined from qbackenable().
1867          * Quick check without holding the lock.
1868          */
1869         if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1870                 return (bp);
1871 
1872         qbackenable(q, band);
1873         return (bp);
1874 }
1875 
1876 /*
1877  * Calculate number of data bytes in a single data message block taking
1878  * multidata messages into account.
1879  */
1880 
1881 #define ADD_MBLK_SIZE(mp, size)                                         \
1882         if (DB_TYPE(mp) != M_MULTIDATA) {                               \
1883                 (size) += MBLKL(mp);                                    \
1884         } else {                                                        \
1885                 uint_t  pinuse;                                         \
1886                                                                         \
1887                 mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse);   \
1888                 (size) += pinuse;                                       \
1889         }
1890 
1891 /*
1892  * Returns the number of bytes in a message (a message is defined as a
1893  * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we
1894  * also return the number of distinct mblks in the message.
1895  */
1896 int
1897 mp_cont_len(mblk_t *bp, int *mblkcnt)
1898 {
1899         mblk_t  *mp;
1900         int     mblks = 0;
1901         int     bytes = 0;
1902 
1903         for (mp = bp; mp != NULL; mp = mp->b_cont) {
1904                 ADD_MBLK_SIZE(mp, bytes);
1905                 mblks++;
1906         }
1907 
1908         if (mblkcnt != NULL)
1909                 *mblkcnt = mblks;
1910 
1911         return (bytes);
1912 }
1913 
1914 /*
1915  * Like getq() but does not backenable.  This is used by the stream
1916  * head when a putback() is likely.  The caller must call qbackenable()
1917  * after it is done with accessing the queue.
1918  * The rbytes arguments to getq_noneab() allows callers to specify a
1919  * the maximum number of bytes to return. If the current amount on the
1920  * queue is less than this then the entire message will be returned.
1921  * A value of 0 returns the entire message and is equivalent to the old
1922  * default behaviour prior to the addition of the rbytes argument.
1923  */
1924 mblk_t *
1925 getq_noenab(queue_t *q, ssize_t rbytes)
1926 {
1927         mblk_t *bp, *mp1;
1928         mblk_t *mp2 = NULL;
1929         qband_t *qbp;
1930         kthread_id_t freezer;
1931         int     bytecnt = 0, mblkcnt = 0;
1932 
1933         /* freezestr should allow its caller to call getq/putq */
1934         freezer = STREAM(q)->sd_freezer;
1935         if (freezer == curthread) {
1936                 ASSERT(frozenstr(q));
1937                 ASSERT(MUTEX_HELD(QLOCK(q)));
1938         } else
1939                 mutex_enter(QLOCK(q));
1940 
1941         if ((bp = q->q_first) == 0) {
1942                 q->q_flag |= QWANTR;
1943         } else {
1944                 /*
1945                  * If the caller supplied a byte threshold and there is
1946                  * more than this amount on the queue then break up the
1947                  * the message appropriately.  We can only safely do
1948                  * this for M_DATA messages.
1949                  */
1950                 if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) &&
1951                     (q->q_count > rbytes)) {
1952                         /*
1953                          * Inline version of mp_cont_len() which terminates
1954                          * when we meet or exceed rbytes.
1955                          */
1956                         for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) {
1957                                 mblkcnt++;
1958                                 ADD_MBLK_SIZE(mp1, bytecnt);
1959                                 if (bytecnt  >= rbytes)
1960                                         break;
1961                         }
1962                         /*
1963                          * We need to account for the following scenarios:
1964                          *
1965                          * 1) Too much data in the first message:
1966                          *      mp1 will be the mblk which puts us over our
1967                          *      byte limit.
1968                          * 2) Not enough data in the first message:
1969                          *      mp1 will be NULL.
1970                          * 3) Exactly the right amount of data contained within
1971                          *    whole mblks:
1972                          *      mp1->b_cont will be where we break the message.
1973                          */
1974                         if (bytecnt > rbytes) {
1975                                 /*
1976                                  * Dup/copy mp1 and put what we don't need
1977                                  * back onto the queue. Adjust the read/write
1978                                  * and continuation pointers appropriately
1979                                  * and decrement the current mblk count to
1980                                  * reflect we are putting an mblk back onto
1981                                  * the queue.
1982                                  * When adjusting the message pointers, it's
1983                                  * OK to use the existing bytecnt and the
1984                                  * requested amount (rbytes) to calculate the
1985                                  * the new write offset (b_wptr) of what we
1986                                  * are taking. However, we  cannot use these
1987                                  * values when calculating the read offset of
1988                                  * the mblk we are putting back on the queue.
1989                                  * This is because the begining (b_rptr) of the
1990                                  * mblk represents some arbitrary point within
1991                                  * the message.
1992                                  * It's simplest to do this by advancing b_rptr
1993                                  * by the new length of mp1 as we don't have to
1994                                  * remember any intermediate state.
1995                                  */
1996                                 ASSERT(mp1 != NULL);
1997                                 mblkcnt--;
1998                                 if ((mp2 = dupb(mp1)) == NULL &&
1999                                     (mp2 = copyb(mp1)) == NULL) {
2000                                         bytecnt = mblkcnt = 0;
2001                                         goto dup_failed;
2002                                 }
2003                                 mp2->b_cont = mp1->b_cont;
2004                                 mp1->b_wptr -= bytecnt - rbytes;
2005                                 mp2->b_rptr += mp1->b_wptr - mp1->b_rptr;
2006                                 mp1->b_cont = NULL;
2007                                 bytecnt = rbytes;
2008                         } else {
2009                                 /*
2010                                  * Either there is not enough data in the first
2011                                  * message or there is no excess data to deal
2012                                  * with. If mp1 is NULL, we are taking the
2013                                  * whole message. No need to do anything.
2014                                  * Otherwise we assign mp1->b_cont to mp2 as
2015                                  * we will be putting this back onto the head of
2016                                  * the queue.
2017                                  */
2018                                 if (mp1 != NULL) {
2019                                         mp2 = mp1->b_cont;
2020                                         mp1->b_cont = NULL;
2021                                 }
2022                         }
2023                         /*
2024                          * If mp2 is not NULL then we have part of the message
2025                          * to put back onto the queue.
2026                          */
2027                         if (mp2 != NULL) {
2028                                 if ((mp2->b_next = bp->b_next) == NULL)
2029                                         q->q_last = mp2;
2030                                 else
2031                                         bp->b_next->b_prev = mp2;
2032                                 q->q_first = mp2;
2033                         } else {
2034                                 if ((q->q_first = bp->b_next) == NULL)
2035                                         q->q_last = NULL;
2036                                 else
2037                                         q->q_first->b_prev = NULL;
2038                         }
2039                 } else {
2040                         /*
2041                          * Either no byte threshold was supplied, there is
2042                          * not enough on the queue or we failed to
2043                          * duplicate/copy a data block. In these cases we
2044                          * just take the entire first message.
2045                          */
2046 dup_failed:
2047                         bytecnt = mp_cont_len(bp, &mblkcnt);
2048                         if ((q->q_first = bp->b_next) == NULL)
2049                                 q->q_last = NULL;
2050                         else
2051                                 q->q_first->b_prev = NULL;
2052                 }
2053                 if (bp->b_band == 0) {
2054                         q->q_count -= bytecnt;
2055                         q->q_mblkcnt -= mblkcnt;
2056                         if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2057                             (q->q_mblkcnt < q->q_hiwat))) {
2058                                 q->q_flag &= ~QFULL;
2059                         }
2060                 } else {
2061                         int i;
2062 
2063                         ASSERT(bp->b_band <= q->q_nband);
2064                         ASSERT(q->q_bandp != NULL);
2065                         ASSERT(MUTEX_HELD(QLOCK(q)));
2066                         qbp = q->q_bandp;
2067                         i = bp->b_band;
2068                         while (--i > 0)
2069                                 qbp = qbp->qb_next;
2070                         if (qbp->qb_first == qbp->qb_last) {
2071                                 qbp->qb_first = NULL;
2072                                 qbp->qb_last = NULL;
2073                         } else {
2074                                 qbp->qb_first = bp->b_next;
2075                         }
2076                         qbp->qb_count -= bytecnt;
2077                         qbp->qb_mblkcnt -= mblkcnt;
2078                         if (qbp->qb_mblkcnt == 0 ||
2079                             ((qbp->qb_count < qbp->qb_hiwat) &&
2080                             (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2081                                 qbp->qb_flag &= ~QB_FULL;
2082                         }
2083                 }
2084                 q->q_flag &= ~QWANTR;
2085                 bp->b_next = NULL;
2086                 bp->b_prev = NULL;
2087         }
2088         if (freezer != curthread)
2089                 mutex_exit(QLOCK(q));
2090 
2091         STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL);
2092 
2093         return (bp);
2094 }
2095 
2096 /*
2097  * Determine if a backenable is needed after removing a message in the
2098  * specified band.
2099  * NOTE: This routine assumes that something like getq_noenab() has been
2100  * already called.
2101  *
2102  * For the read side it is ok to hold sd_lock across calling this (and the
2103  * stream head often does).
2104  * But for the write side strwakeq might be invoked and it acquires sd_lock.
2105  */
2106 void
2107 qbackenable(queue_t *q, uchar_t band)
2108 {
2109         int backenab = 0;
2110         qband_t *qbp;
2111         kthread_id_t freezer;
2112 
2113         ASSERT(q);
2114         ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
2115 
2116         /*
2117          * Quick check without holding the lock.
2118          * OK since after getq() has lowered the q_count these flags
2119          * would not change unless either the qbackenable() is done by
2120          * another thread (which is ok) or the queue has gotten QFULL
2121          * in which case another backenable will take place when the queue
2122          * drops below q_lowat.
2123          */
2124         if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
2125                 return;
2126 
2127         /* freezestr should allow its caller to call getq/putq */
2128         freezer = STREAM(q)->sd_freezer;
2129         if (freezer == curthread) {
2130                 ASSERT(frozenstr(q));
2131                 ASSERT(MUTEX_HELD(QLOCK(q)));
2132         } else
2133                 mutex_enter(QLOCK(q));
2134 
2135         if (band == 0) {
2136                 if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
2137                     q->q_mblkcnt < q->q_lowat)) {
2138                         backenab = q->q_flag & (QWANTW|QWANTWSYNC);
2139                 }
2140         } else {
2141                 int i;
2142 
2143                 ASSERT((unsigned)band <= q->q_nband);
2144                 ASSERT(q->q_bandp != NULL);
2145 
2146                 qbp = q->q_bandp;
2147                 i = band;
2148                 while (--i > 0)
2149                         qbp = qbp->qb_next;
2150 
2151                 if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
2152                     qbp->qb_mblkcnt < qbp->qb_lowat)) {
2153                         backenab = qbp->qb_flag & QB_WANTW;
2154                 }
2155         }
2156 
2157         if (backenab == 0) {
2158                 if (freezer != curthread)
2159                         mutex_exit(QLOCK(q));
2160                 return;
2161         }
2162 
2163         /* Have to drop the lock across strwakeq and backenable */
2164         if (backenab & QWANTWSYNC)
2165                 q->q_flag &= ~QWANTWSYNC;
2166         if (backenab & (QWANTW|QB_WANTW)) {
2167                 if (band != 0)
2168                         qbp->qb_flag &= ~QB_WANTW;
2169                 else {
2170                         q->q_flag &= ~QWANTW;
2171                 }
2172         }
2173 
2174         if (freezer != curthread)
2175                 mutex_exit(QLOCK(q));
2176 
2177         if (backenab & QWANTWSYNC)
2178                 strwakeq(q, QWANTWSYNC);
2179         if (backenab & (QWANTW|QB_WANTW))
2180                 backenable(q, band);
2181 }
2182 
2183 /*
2184  * Remove a message from a queue.  The queue count and other
2185  * flow control parameters are adjusted and the back queue
2186  * enabled if necessary.
2187  *
2188  * rmvq can be called with the stream frozen, but other utility functions
2189  * holding QLOCK, and by streams modules without any locks/frozen.
2190  */
2191 void
2192 rmvq(queue_t *q, mblk_t *mp)
2193 {
2194         ASSERT(mp != NULL);
2195 
2196         rmvq_noenab(q, mp);
2197         if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
2198                 /*
2199                  * qbackenable can handle a frozen stream but not a "random"
2200                  * qlock being held. Drop lock across qbackenable.
2201                  */
2202                 mutex_exit(QLOCK(q));
2203                 qbackenable(q, mp->b_band);
2204                 mutex_enter(QLOCK(q));
2205         } else {
2206                 qbackenable(q, mp->b_band);
2207         }
2208 }
2209 
2210 /*
2211  * Like rmvq() but without any backenabling.
2212  * This exists to handle SR_CONSOL_DATA in strrput().
2213  */
2214 void
2215 rmvq_noenab(queue_t *q, mblk_t *mp)
2216 {
2217         int i;
2218         qband_t *qbp = NULL;
2219         kthread_id_t freezer;
2220         int     bytecnt = 0, mblkcnt = 0;
2221 
2222         freezer = STREAM(q)->sd_freezer;
2223         if (freezer == curthread) {
2224                 ASSERT(frozenstr(q));
2225                 ASSERT(MUTEX_HELD(QLOCK(q)));
2226         } else if (MUTEX_HELD(QLOCK(q))) {
2227                 /* Don't drop lock on exit */
2228                 freezer = curthread;
2229         } else
2230                 mutex_enter(QLOCK(q));
2231 
2232         ASSERT(mp->b_band <= q->q_nband);
2233         if (mp->b_band != 0) {               /* Adjust band pointers */
2234                 ASSERT(q->q_bandp != NULL);
2235                 qbp = q->q_bandp;
2236                 i = mp->b_band;
2237                 while (--i > 0)
2238                         qbp = qbp->qb_next;
2239                 if (mp == qbp->qb_first) {
2240                         if (mp->b_next && mp->b_band == mp->b_next->b_band)
2241                                 qbp->qb_first = mp->b_next;
2242                         else
2243                                 qbp->qb_first = NULL;
2244                 }
2245                 if (mp == qbp->qb_last) {
2246                         if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
2247                                 qbp->qb_last = mp->b_prev;
2248                         else
2249                                 qbp->qb_last = NULL;
2250                 }
2251         }
2252 
2253         /*
2254          * Remove the message from the list.
2255          */
2256         if (mp->b_prev)
2257                 mp->b_prev->b_next = mp->b_next;
2258         else
2259                 q->q_first = mp->b_next;
2260         if (mp->b_next)
2261                 mp->b_next->b_prev = mp->b_prev;
2262         else
2263                 q->q_last = mp->b_prev;
2264         mp->b_next = NULL;
2265         mp->b_prev = NULL;
2266 
2267         /* Get the size of the message for q_count accounting */
2268         bytecnt = mp_cont_len(mp, &mblkcnt);
2269 
2270         if (mp->b_band == 0) {               /* Perform q_count accounting */
2271                 q->q_count -= bytecnt;
2272                 q->q_mblkcnt -= mblkcnt;
2273                 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2274                     (q->q_mblkcnt < q->q_hiwat))) {
2275                         q->q_flag &= ~QFULL;
2276                 }
2277         } else {                        /* Perform qb_count accounting */
2278                 qbp->qb_count -= bytecnt;
2279                 qbp->qb_mblkcnt -= mblkcnt;
2280                 if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) &&
2281                     (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2282                         qbp->qb_flag &= ~QB_FULL;
2283                 }
2284         }
2285         if (freezer != curthread)
2286                 mutex_exit(QLOCK(q));
2287 
2288         STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL);
2289 }
2290 
2291 /*
2292  * Empty a queue.
2293  * If flag is set, remove all messages.  Otherwise, remove
2294  * only non-control messages.  If queue falls below its low
2295  * water mark, and QWANTW is set, enable the nearest upstream
2296  * service procedure.
2297  *
2298  * Historical note: when merging the M_FLUSH code in strrput with this
2299  * code one difference was discovered. flushq did not have a check
2300  * for q_lowat == 0 in the backenabling test.
2301  *
2302  * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
2303  * if one exists on the queue.
2304  */
2305 void
2306 flushq_common(queue_t *q, int flag, int pcproto_flag)
2307 {
2308         mblk_t *mp, *nmp;
2309         qband_t *qbp;
2310         int backenab = 0;
2311         unsigned char bpri;
2312         unsigned char   qbf[NBAND];     /* band flushing backenable flags */
2313 
2314         if (q->q_first == NULL)
2315                 return;
2316 
2317         mutex_enter(QLOCK(q));
2318         mp = q->q_first;
2319         q->q_first = NULL;
2320         q->q_last = NULL;
2321         q->q_count = 0;
2322         q->q_mblkcnt = 0;
2323         for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2324                 qbp->qb_first = NULL;
2325                 qbp->qb_last = NULL;
2326                 qbp->qb_count = 0;
2327                 qbp->qb_mblkcnt = 0;
2328                 qbp->qb_flag &= ~QB_FULL;
2329         }
2330         q->q_flag &= ~QFULL;
2331         mutex_exit(QLOCK(q));
2332         while (mp) {
2333                 nmp = mp->b_next;
2334                 mp->b_next = mp->b_prev = NULL;
2335 
2336                 STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL);
2337 
2338                 if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2339                         (void) putq(q, mp);
2340                 else if (flag || datamsg(mp->b_datap->db_type))
2341                         freemsg(mp);
2342                 else
2343                         (void) putq(q, mp);
2344                 mp = nmp;
2345         }
2346         bpri = 1;
2347         mutex_enter(QLOCK(q));
2348         for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2349                 if ((qbp->qb_flag & QB_WANTW) &&
2350                     (((qbp->qb_count < qbp->qb_lowat) &&
2351                     (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2352                     qbp->qb_lowat == 0)) {
2353                         qbp->qb_flag &= ~QB_WANTW;
2354                         backenab = 1;
2355                         qbf[bpri] = 1;
2356                 } else
2357                         qbf[bpri] = 0;
2358                 bpri++;
2359         }
2360         ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2361         if ((q->q_flag & QWANTW) &&
2362             (((q->q_count < q->q_lowat) &&
2363             (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2364                 q->q_flag &= ~QWANTW;
2365                 backenab = 1;
2366                 qbf[0] = 1;
2367         } else
2368                 qbf[0] = 0;
2369 
2370         /*
2371          * If any band can now be written to, and there is a writer
2372          * for that band, then backenable the closest service procedure.
2373          */
2374         if (backenab) {
2375                 mutex_exit(QLOCK(q));
2376                 for (bpri = q->q_nband; bpri != 0; bpri--)
2377                         if (qbf[bpri])
2378                                 backenable(q, bpri);
2379                 if (qbf[0])
2380                         backenable(q, 0);
2381         } else
2382                 mutex_exit(QLOCK(q));
2383 }
2384 
2385 /*
2386  * The real flushing takes place in flushq_common. This is done so that
2387  * a flag which specifies whether or not M_PCPROTO messages should be flushed
2388  * or not. Currently the only place that uses this flag is the stream head.
2389  */
2390 void
2391 flushq(queue_t *q, int flag)
2392 {
2393         flushq_common(q, flag, 0);
2394 }
2395 
2396 /*
2397  * Flush the queue of messages of the given priority band.
2398  * There is some duplication of code between flushq and flushband.
2399  * This is because we want to optimize the code as much as possible.
2400  * The assumption is that there will be more messages in the normal
2401  * (priority 0) band than in any other.
2402  *
2403  * Historical note: when merging the M_FLUSH code in strrput with this
2404  * code one difference was discovered. flushband had an extra check for
2405  * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2406  * case. That check does not match the man page for flushband and was not
2407  * in the strrput flush code hence it was removed.
2408  */
2409 void
2410 flushband(queue_t *q, unsigned char pri, int flag)
2411 {
2412         mblk_t *mp;
2413         mblk_t *nmp;
2414         mblk_t *last;
2415         qband_t *qbp;
2416         int band;
2417 
2418         ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2419         if (pri > q->q_nband) {
2420                 return;
2421         }
2422         mutex_enter(QLOCK(q));
2423         if (pri == 0) {
2424                 mp = q->q_first;
2425                 q->q_first = NULL;
2426                 q->q_last = NULL;
2427                 q->q_count = 0;
2428                 q->q_mblkcnt = 0;
2429                 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2430                         qbp->qb_first = NULL;
2431                         qbp->qb_last = NULL;
2432                         qbp->qb_count = 0;
2433                         qbp->qb_mblkcnt = 0;
2434                         qbp->qb_flag &= ~QB_FULL;
2435                 }
2436                 q->q_flag &= ~QFULL;
2437                 mutex_exit(QLOCK(q));
2438                 while (mp) {
2439                         nmp = mp->b_next;
2440                         mp->b_next = mp->b_prev = NULL;
2441                         if ((mp->b_band == 0) &&
2442                             ((flag == FLUSHALL) ||
2443                             datamsg(mp->b_datap->db_type)))
2444                                 freemsg(mp);
2445                         else
2446                                 (void) putq(q, mp);
2447                         mp = nmp;
2448                 }
2449                 mutex_enter(QLOCK(q));
2450                 if ((q->q_flag & QWANTW) &&
2451                     (((q->q_count < q->q_lowat) &&
2452                     (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2453                         q->q_flag &= ~QWANTW;
2454                         mutex_exit(QLOCK(q));
2455 
2456                         backenable(q, pri);
2457                 } else
2458                         mutex_exit(QLOCK(q));
2459         } else {        /* pri != 0 */
2460                 boolean_t flushed = B_FALSE;
2461                 band = pri;
2462 
2463                 ASSERT(MUTEX_HELD(QLOCK(q)));
2464                 qbp = q->q_bandp;
2465                 while (--band > 0)
2466                         qbp = qbp->qb_next;
2467                 mp = qbp->qb_first;
2468                 if (mp == NULL) {
2469                         mutex_exit(QLOCK(q));
2470                         return;
2471                 }
2472                 last = qbp->qb_last->b_next;
2473                 /*
2474                  * rmvq_noenab() and freemsg() are called for each mblk that
2475                  * meets the criteria.  The loop is executed until the last
2476                  * mblk has been processed.
2477                  */
2478                 while (mp != last) {
2479                         ASSERT(mp->b_band == pri);
2480                         nmp = mp->b_next;
2481                         if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2482                                 rmvq_noenab(q, mp);
2483                                 freemsg(mp);
2484                                 flushed = B_TRUE;
2485                         }
2486                         mp = nmp;
2487                 }
2488                 mutex_exit(QLOCK(q));
2489 
2490                 /*
2491                  * If any mblk(s) has been freed, we know that qbackenable()
2492                  * will need to be called.
2493                  */
2494                 if (flushed)
2495                         qbackenable(q, pri);
2496         }
2497 }
2498 
2499 /*
2500  * Return 1 if the queue is not full.  If the queue is full, return
2501  * 0 (may not put message) and set QWANTW flag (caller wants to write
2502  * to the queue).
2503  */
2504 int
2505 canput(queue_t *q)
2506 {
2507         TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2508 
2509         /* this is for loopback transports, they should not do a canput */
2510         ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2511 
2512         /* Find next forward module that has a service procedure */
2513         q = q->q_nfsrv;
2514 
2515         if (!(q->q_flag & QFULL)) {
2516                 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2517                 return (1);
2518         }
2519         mutex_enter(QLOCK(q));
2520         if (q->q_flag & QFULL) {
2521                 q->q_flag |= QWANTW;
2522                 mutex_exit(QLOCK(q));
2523                 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2524                 return (0);
2525         }
2526         mutex_exit(QLOCK(q));
2527         TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2528         return (1);
2529 }
2530 
2531 /*
2532  * This is the new canput for use with priority bands.  Return 1 if the
2533  * band is not full.  If the band is full, return 0 (may not put message)
2534  * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2535  * write to the queue).
2536  */
2537 int
2538 bcanput(queue_t *q, unsigned char pri)
2539 {
2540         qband_t *qbp;
2541 
2542         TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2543         if (!q)
2544                 return (0);
2545 
2546         /* Find next forward module that has a service procedure */
2547         q = q->q_nfsrv;
2548 
2549         mutex_enter(QLOCK(q));
2550         if (pri == 0) {
2551                 if (q->q_flag & QFULL) {
2552                         q->q_flag |= QWANTW;
2553                         mutex_exit(QLOCK(q));
2554                         TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2555                             "bcanput:%p %X %d", q, pri, 0);
2556                         return (0);
2557                 }
2558         } else {        /* pri != 0 */
2559                 if (pri > q->q_nband) {
2560                         /*
2561                          * No band exists yet, so return success.
2562                          */
2563                         mutex_exit(QLOCK(q));
2564                         TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2565                             "bcanput:%p %X %d", q, pri, 1);
2566                         return (1);
2567                 }
2568                 qbp = q->q_bandp;
2569                 while (--pri)
2570                         qbp = qbp->qb_next;
2571                 if (qbp->qb_flag & QB_FULL) {
2572                         qbp->qb_flag |= QB_WANTW;
2573                         mutex_exit(QLOCK(q));
2574                         TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2575                             "bcanput:%p %X %d", q, pri, 0);
2576                         return (0);
2577                 }
2578         }
2579         mutex_exit(QLOCK(q));
2580         TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2581             "bcanput:%p %X %d", q, pri, 1);
2582         return (1);
2583 }
2584 
2585 /*
2586  * Put a message on a queue.
2587  *
2588  * Messages are enqueued on a priority basis.  The priority classes
2589  * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2590  * and B_NORMAL (type < QPCTL && band == 0).
2591  *
2592  * Add appropriate weighted data block sizes to queue count.
2593  * If queue hits high water mark then set QFULL flag.
2594  *
2595  * If QNOENAB is not set (putq is allowed to enable the queue),
2596  * enable the queue only if the message is PRIORITY,
2597  * or the QWANTR flag is set (indicating that the service procedure
2598  * is ready to read the queue.  This implies that a service
2599  * procedure must NEVER put a high priority message back on its own
2600  * queue, as this would result in an infinite loop (!).
2601  */
2602 int
2603 putq(queue_t *q, mblk_t *bp)
2604 {
2605         mblk_t *tmp;
2606         qband_t *qbp = NULL;
2607         int mcls = (int)queclass(bp);
2608         kthread_id_t freezer;
2609         int     bytecnt = 0, mblkcnt = 0;
2610 
2611         freezer = STREAM(q)->sd_freezer;
2612         if (freezer == curthread) {
2613                 ASSERT(frozenstr(q));
2614                 ASSERT(MUTEX_HELD(QLOCK(q)));
2615         } else
2616                 mutex_enter(QLOCK(q));
2617 
2618         /*
2619          * Make sanity checks and if qband structure is not yet
2620          * allocated, do so.
2621          */
2622         if (mcls == QPCTL) {
2623                 if (bp->b_band != 0)
2624                         bp->b_band = 0;              /* force to be correct */
2625         } else if (bp->b_band != 0) {
2626                 int i;
2627                 qband_t **qbpp;
2628 
2629                 if (bp->b_band > q->q_nband) {
2630 
2631                         /*
2632                          * The qband structure for this priority band is
2633                          * not on the queue yet, so we have to allocate
2634                          * one on the fly.  It would be wasteful to
2635                          * associate the qband structures with every
2636                          * queue when the queues are allocated.  This is
2637                          * because most queues will only need the normal
2638                          * band of flow which can be described entirely
2639                          * by the queue itself.
2640                          */
2641                         qbpp = &q->q_bandp;
2642                         while (*qbpp)
2643                                 qbpp = &(*qbpp)->qb_next;
2644                         while (bp->b_band > q->q_nband) {
2645                                 if ((*qbpp = allocband()) == NULL) {
2646                                         if (freezer != curthread)
2647                                                 mutex_exit(QLOCK(q));
2648                                         return (0);
2649                                 }
2650                                 (*qbpp)->qb_hiwat = q->q_hiwat;
2651                                 (*qbpp)->qb_lowat = q->q_lowat;
2652                                 q->q_nband++;
2653                                 qbpp = &(*qbpp)->qb_next;
2654                         }
2655                 }
2656                 ASSERT(MUTEX_HELD(QLOCK(q)));
2657                 qbp = q->q_bandp;
2658                 i = bp->b_band;
2659                 while (--i)
2660                         qbp = qbp->qb_next;
2661         }
2662 
2663         /*
2664          * If queue is empty, add the message and initialize the pointers.
2665          * Otherwise, adjust message pointers and queue pointers based on
2666          * the type of the message and where it belongs on the queue.  Some
2667          * code is duplicated to minimize the number of conditionals and
2668          * hopefully minimize the amount of time this routine takes.
2669          */
2670         if (!q->q_first) {
2671                 bp->b_next = NULL;
2672                 bp->b_prev = NULL;
2673                 q->q_first = bp;
2674                 q->q_last = bp;
2675                 if (qbp) {
2676                         qbp->qb_first = bp;
2677                         qbp->qb_last = bp;
2678                 }
2679         } else if (!qbp) {      /* bp->b_band == 0 */
2680 
2681                 /*
2682                  * If queue class of message is less than or equal to
2683                  * that of the last one on the queue, tack on to the end.
2684                  */
2685                 tmp = q->q_last;
2686                 if (mcls <= (int)queclass(tmp)) {
2687                         bp->b_next = NULL;
2688                         bp->b_prev = tmp;
2689                         tmp->b_next = bp;
2690                         q->q_last = bp;
2691                 } else {
2692                         tmp = q->q_first;
2693                         while ((int)queclass(tmp) >= mcls)
2694                                 tmp = tmp->b_next;
2695 
2696                         /*
2697                          * Insert bp before tmp.
2698                          */
2699                         bp->b_next = tmp;
2700                         bp->b_prev = tmp->b_prev;
2701                         if (tmp->b_prev)
2702                                 tmp->b_prev->b_next = bp;
2703                         else
2704                                 q->q_first = bp;
2705                         tmp->b_prev = bp;
2706                 }
2707         } else {                /* bp->b_band != 0 */
2708                 if (qbp->qb_first) {
2709                         tmp = qbp->qb_last;
2710 
2711                         /*
2712                          * Insert bp after the last message in this band.
2713                          */
2714                         bp->b_next = tmp->b_next;
2715                         if (tmp->b_next)
2716                                 tmp->b_next->b_prev = bp;
2717                         else
2718                                 q->q_last = bp;
2719                         bp->b_prev = tmp;
2720                         tmp->b_next = bp;
2721                 } else {
2722                         tmp = q->q_last;
2723                         if ((mcls < (int)queclass(tmp)) ||
2724                             (bp->b_band <= tmp->b_band)) {
2725 
2726                                 /*
2727                                  * Tack bp on end of queue.
2728                                  */
2729                                 bp->b_next = NULL;
2730                                 bp->b_prev = tmp;
2731                                 tmp->b_next = bp;
2732                                 q->q_last = bp;
2733                         } else {
2734                                 tmp = q->q_first;
2735                                 while (tmp->b_datap->db_type >= QPCTL)
2736                                         tmp = tmp->b_next;
2737                                 while (tmp->b_band >= bp->b_band)
2738                                         tmp = tmp->b_next;
2739 
2740                                 /*
2741                                  * Insert bp before tmp.
2742                                  */
2743                                 bp->b_next = tmp;
2744                                 bp->b_prev = tmp->b_prev;
2745                                 if (tmp->b_prev)
2746                                         tmp->b_prev->b_next = bp;
2747                                 else
2748                                         q->q_first = bp;
2749                                 tmp->b_prev = bp;
2750                         }
2751                         qbp->qb_first = bp;
2752                 }
2753                 qbp->qb_last = bp;
2754         }
2755 
2756         /* Get message byte count for q_count accounting */
2757         bytecnt = mp_cont_len(bp, &mblkcnt);
2758 
2759         if (qbp) {
2760                 qbp->qb_count += bytecnt;
2761                 qbp->qb_mblkcnt += mblkcnt;
2762                 if ((qbp->qb_count >= qbp->qb_hiwat) ||
2763                     (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2764                         qbp->qb_flag |= QB_FULL;
2765                 }
2766         } else {
2767                 q->q_count += bytecnt;
2768                 q->q_mblkcnt += mblkcnt;
2769                 if ((q->q_count >= q->q_hiwat) ||
2770                     (q->q_mblkcnt >= q->q_hiwat)) {
2771                         q->q_flag |= QFULL;
2772                 }
2773         }
2774 
2775         STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL);
2776 
2777         if ((mcls > QNORM) ||
2778             (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2779                 qenable_locked(q);
2780         ASSERT(MUTEX_HELD(QLOCK(q)));
2781         if (freezer != curthread)
2782                 mutex_exit(QLOCK(q));
2783 
2784         return (1);
2785 }
2786 
2787 /*
2788  * Put stuff back at beginning of Q according to priority order.
2789  * See comment on putq above for details.
2790  */
2791 int
2792 putbq(queue_t *q, mblk_t *bp)
2793 {
2794         mblk_t *tmp;
2795         qband_t *qbp = NULL;
2796         int mcls = (int)queclass(bp);
2797         kthread_id_t freezer;
2798         int     bytecnt = 0, mblkcnt = 0;
2799 
2800         ASSERT(q && bp);
2801         ASSERT(bp->b_next == NULL);
2802         freezer = STREAM(q)->sd_freezer;
2803         if (freezer == curthread) {
2804                 ASSERT(frozenstr(q));
2805                 ASSERT(MUTEX_HELD(QLOCK(q)));
2806         } else
2807                 mutex_enter(QLOCK(q));
2808 
2809         /*
2810          * Make sanity checks and if qband structure is not yet
2811          * allocated, do so.
2812          */
2813         if (mcls == QPCTL) {
2814                 if (bp->b_band != 0)
2815                         bp->b_band = 0;              /* force to be correct */
2816         } else if (bp->b_band != 0) {
2817                 int i;
2818                 qband_t **qbpp;
2819 
2820                 if (bp->b_band > q->q_nband) {
2821                         qbpp = &q->q_bandp;
2822                         while (*qbpp)
2823                                 qbpp = &(*qbpp)->qb_next;
2824                         while (bp->b_band > q->q_nband) {
2825                                 if ((*qbpp = allocband()) == NULL) {
2826                                         if (freezer != curthread)
2827                                                 mutex_exit(QLOCK(q));
2828                                         return (0);
2829                                 }
2830                                 (*qbpp)->qb_hiwat = q->q_hiwat;
2831                                 (*qbpp)->qb_lowat = q->q_lowat;
2832                                 q->q_nband++;
2833                                 qbpp = &(*qbpp)->qb_next;
2834                         }
2835                 }
2836                 qbp = q->q_bandp;
2837                 i = bp->b_band;
2838                 while (--i)
2839                         qbp = qbp->qb_next;
2840         }
2841 
2842         /*
2843          * If queue is empty or if message is high priority,
2844          * place on the front of the queue.
2845          */
2846         tmp = q->q_first;
2847         if ((!tmp) || (mcls == QPCTL)) {
2848                 bp->b_next = tmp;
2849                 if (tmp)
2850                         tmp->b_prev = bp;
2851                 else
2852                         q->q_last = bp;
2853                 q->q_first = bp;
2854                 bp->b_prev = NULL;
2855                 if (qbp) {
2856                         qbp->qb_first = bp;
2857                         qbp->qb_last = bp;
2858                 }
2859         } else if (qbp) {       /* bp->b_band != 0 */
2860                 tmp = qbp->qb_first;
2861                 if (tmp) {
2862 
2863                         /*
2864                          * Insert bp before the first message in this band.
2865                          */
2866                         bp->b_next = tmp;
2867                         bp->b_prev = tmp->b_prev;
2868                         if (tmp->b_prev)
2869                                 tmp->b_prev->b_next = bp;
2870                         else
2871                                 q->q_first = bp;
2872                         tmp->b_prev = bp;
2873                 } else {
2874                         tmp = q->q_last;
2875                         if ((mcls < (int)queclass(tmp)) ||
2876                             (bp->b_band < tmp->b_band)) {
2877 
2878                                 /*
2879                                  * Tack bp on end of queue.
2880                                  */
2881                                 bp->b_next = NULL;
2882                                 bp->b_prev = tmp;
2883                                 tmp->b_next = bp;
2884                                 q->q_last = bp;
2885                         } else {
2886                                 tmp = q->q_first;
2887                                 while (tmp->b_datap->db_type >= QPCTL)
2888                                         tmp = tmp->b_next;
2889                                 while (tmp->b_band > bp->b_band)
2890                                         tmp = tmp->b_next;
2891 
2892                                 /*
2893                                  * Insert bp before tmp.
2894                                  */
2895                                 bp->b_next = tmp;
2896                                 bp->b_prev = tmp->b_prev;
2897                                 if (tmp->b_prev)
2898                                         tmp->b_prev->b_next = bp;
2899                                 else
2900                                         q->q_first = bp;
2901                                 tmp->b_prev = bp;
2902                         }
2903                         qbp->qb_last = bp;
2904                 }
2905                 qbp->qb_first = bp;
2906         } else {                /* bp->b_band == 0 && !QPCTL */
2907 
2908                 /*
2909                  * If the queue class or band is less than that of the last
2910                  * message on the queue, tack bp on the end of the queue.
2911                  */
2912                 tmp = q->q_last;
2913                 if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2914                         bp->b_next = NULL;
2915                         bp->b_prev = tmp;
2916                         tmp->b_next = bp;
2917                         q->q_last = bp;
2918                 } else {
2919                         tmp = q->q_first;
2920                         while (tmp->b_datap->db_type >= QPCTL)
2921                                 tmp = tmp->b_next;
2922                         while (tmp->b_band > bp->b_band)
2923                                 tmp = tmp->b_next;
2924 
2925                         /*
2926                          * Insert bp before tmp.
2927                          */
2928                         bp->b_next = tmp;
2929                         bp->b_prev = tmp->b_prev;
2930                         if (tmp->b_prev)
2931                                 tmp->b_prev->b_next = bp;
2932                         else
2933                                 q->q_first = bp;
2934                         tmp->b_prev = bp;
2935                 }
2936         }
2937 
2938         /* Get message byte count for q_count accounting */
2939         bytecnt = mp_cont_len(bp, &mblkcnt);
2940 
2941         if (qbp) {
2942                 qbp->qb_count += bytecnt;
2943                 qbp->qb_mblkcnt += mblkcnt;
2944                 if ((qbp->qb_count >= qbp->qb_hiwat) ||
2945                     (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2946                         qbp->qb_flag |= QB_FULL;
2947                 }
2948         } else {
2949                 q->q_count += bytecnt;
2950                 q->q_mblkcnt += mblkcnt;
2951                 if ((q->q_count >= q->q_hiwat) ||
2952                     (q->q_mblkcnt >= q->q_hiwat)) {
2953                         q->q_flag |= QFULL;
2954                 }
2955         }
2956 
2957         STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL);
2958 
2959         if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2960                 qenable_locked(q);
2961         ASSERT(MUTEX_HELD(QLOCK(q)));
2962         if (freezer != curthread)
2963                 mutex_exit(QLOCK(q));
2964 
2965         return (1);
2966 }
2967 
2968 /*
2969  * Insert a message before an existing message on the queue.  If the
2970  * existing message is NULL, the new messages is placed on the end of
2971  * the queue.  The queue class of the new message is ignored.  However,
2972  * the priority band of the new message must adhere to the following
2973  * ordering:
2974  *
2975  *      emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2976  *
2977  * All flow control parameters are updated.
2978  *
2979  * insq can be called with the stream frozen, but other utility functions
2980  * holding QLOCK, and by streams modules without any locks/frozen.
2981  */
2982 int
2983 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2984 {
2985         mblk_t *tmp;
2986         qband_t *qbp = NULL;
2987         int mcls = (int)queclass(mp);
2988         kthread_id_t freezer;
2989         int     bytecnt = 0, mblkcnt = 0;
2990 
2991         freezer = STREAM(q)->sd_freezer;
2992         if (freezer == curthread) {
2993                 ASSERT(frozenstr(q));
2994                 ASSERT(MUTEX_HELD(QLOCK(q)));
2995         } else if (MUTEX_HELD(QLOCK(q))) {
2996                 /* Don't drop lock on exit */
2997                 freezer = curthread;
2998         } else
2999                 mutex_enter(QLOCK(q));
3000 
3001         if (mcls == QPCTL) {
3002                 if (mp->b_band != 0)
3003                         mp->b_band = 0;              /* force to be correct */
3004                 if (emp && emp->b_prev &&
3005                     (emp->b_prev->b_datap->db_type < QPCTL))
3006                         goto badord;
3007         }
3008         if (emp) {
3009                 if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
3010                     (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
3011                     (emp->b_prev->b_band < mp->b_band))) {
3012                         goto badord;
3013                 }
3014         } else {
3015                 tmp = q->q_last;
3016                 if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
3017 badord:
3018                         cmn_err(CE_WARN,
3019                             "insq: attempt to insert message out of order "
3020                             "on q %p", (void *)q);
3021                         if (freezer != curthread)
3022                                 mutex_exit(QLOCK(q));
3023                         return (0);
3024                 }
3025         }
3026 
3027         if (mp->b_band != 0) {
3028                 int i;
3029                 qband_t **qbpp;
3030 
3031                 if (mp->b_band > q->q_nband) {
3032                         qbpp = &q->q_bandp;
3033                         while (*qbpp)
3034                                 qbpp = &(*qbpp)->qb_next;
3035                         while (mp->b_band > q->q_nband) {
3036                                 if ((*qbpp = allocband()) == NULL) {
3037                                         if (freezer != curthread)
3038                                                 mutex_exit(QLOCK(q));
3039                                         return (0);
3040                                 }
3041                                 (*qbpp)->qb_hiwat = q->q_hiwat;
3042                                 (*qbpp)->qb_lowat = q->q_lowat;
3043                                 q->q_nband++;
3044                                 qbpp = &(*qbpp)->qb_next;
3045                         }
3046                 }
3047                 qbp = q->q_bandp;
3048                 i = mp->b_band;
3049                 while (--i)
3050                         qbp = qbp->qb_next;
3051         }
3052 
3053         if ((mp->b_next = emp) != NULL) {
3054                 if ((mp->b_prev = emp->b_prev) != NULL)
3055                         emp->b_prev->b_next = mp;
3056                 else
3057                         q->q_first = mp;
3058                 emp->b_prev = mp;
3059         } else {
3060                 if ((mp->b_prev = q->q_last) != NULL)
3061                         q->q_last->b_next = mp;
3062                 else
3063                         q->q_first = mp;
3064                 q->q_last = mp;
3065         }
3066 
3067         /* Get mblk and byte count for q_count accounting */
3068         bytecnt = mp_cont_len(mp, &mblkcnt);
3069 
3070         if (qbp) {      /* adjust qband pointers and count */
3071                 if (!qbp->qb_first) {
3072                         qbp->qb_first = mp;
3073                         qbp->qb_last = mp;
3074                 } else {
3075                         if (mp->b_prev == NULL || (mp->b_prev != NULL &&
3076                             (mp->b_prev->b_band != mp->b_band)))
3077                                 qbp->qb_first = mp;
3078                         else if (mp->b_next == NULL || (mp->b_next != NULL &&
3079                             (mp->b_next->b_band != mp->b_band)))
3080                                 qbp->qb_last = mp;
3081                 }
3082                 qbp->qb_count += bytecnt;
3083                 qbp->qb_mblkcnt += mblkcnt;
3084                 if ((qbp->qb_count >= qbp->qb_hiwat) ||
3085                     (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
3086                         qbp->qb_flag |= QB_FULL;
3087                 }
3088         } else {
3089                 q->q_count += bytecnt;
3090                 q->q_mblkcnt += mblkcnt;
3091                 if ((q->q_count >= q->q_hiwat) ||
3092                     (q->q_mblkcnt >= q->q_hiwat)) {
3093                         q->q_flag |= QFULL;
3094                 }
3095         }
3096 
3097         STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL);
3098 
3099         if (canenable(q) && (q->q_flag & QWANTR))
3100                 qenable_locked(q);
3101 
3102         ASSERT(MUTEX_HELD(QLOCK(q)));
3103         if (freezer != curthread)
3104                 mutex_exit(QLOCK(q));
3105 
3106         return (1);
3107 }
3108 
3109 /*
3110  * Create and put a control message on queue.
3111  */
3112 int
3113 putctl(queue_t *q, int type)
3114 {
3115         mblk_t *bp;
3116 
3117         if ((datamsg(type) && (type != M_DELAY)) ||
3118             (bp = allocb_tryhard(0)) == NULL)
3119                 return (0);
3120         bp->b_datap->db_type = (unsigned char) type;
3121 
3122         put(q, bp);
3123 
3124         return (1);
3125 }
3126 
3127 /*
3128  * Control message with a single-byte parameter
3129  */
3130 int
3131 putctl1(queue_t *q, int type, int param)
3132 {
3133         mblk_t *bp;
3134 
3135         if ((datamsg(type) && (type != M_DELAY)) ||
3136             (bp = allocb_tryhard(1)) == NULL)
3137                 return (0);
3138         bp->b_datap->db_type = (unsigned char)type;
3139         *bp->b_wptr++ = (unsigned char)param;
3140 
3141         put(q, bp);
3142 
3143         return (1);
3144 }
3145 
3146 int
3147 putnextctl1(queue_t *q, int type, int param)
3148 {
3149         mblk_t *bp;
3150 
3151         if ((datamsg(type) && (type != M_DELAY)) ||
3152             ((bp = allocb_tryhard(1)) == NULL))
3153                 return (0);
3154 
3155         bp->b_datap->db_type = (unsigned char)type;
3156         *bp->b_wptr++ = (unsigned char)param;
3157 
3158         putnext(q, bp);
3159 
3160         return (1);
3161 }
3162 
3163 int
3164 putnextctl(queue_t *q, int type)
3165 {
3166         mblk_t *bp;
3167 
3168         if ((datamsg(type) && (type != M_DELAY)) ||
3169             ((bp = allocb_tryhard(0)) == NULL))
3170                 return (0);
3171         bp->b_datap->db_type = (unsigned char)type;
3172 
3173         putnext(q, bp);
3174 
3175         return (1);
3176 }
3177 
3178 /*
3179  * Return the queue upstream from this one
3180  */
3181 queue_t *
3182 backq(queue_t *q)
3183 {
3184         q = _OTHERQ(q);
3185         if (q->q_next) {
3186                 q = q->q_next;
3187                 return (_OTHERQ(q));
3188         }
3189         return (NULL);
3190 }
3191 
3192 /*
3193  * Send a block back up the queue in reverse from this
3194  * one (e.g. to respond to ioctls)
3195  */
3196 void
3197 qreply(queue_t *q, mblk_t *bp)
3198 {
3199         ASSERT(q && bp);
3200 
3201         putnext(_OTHERQ(q), bp);
3202 }
3203 
3204 /*
3205  * Streams Queue Scheduling
3206  *
3207  * Queues are enabled through qenable() when they have messages to
3208  * process.  They are serviced by queuerun(), which runs each enabled
3209  * queue's service procedure.  The call to queuerun() is processor
3210  * dependent - the general principle is that it be run whenever a queue
3211  * is enabled but before returning to user level.  For system calls,
3212  * the function runqueues() is called if their action causes a queue
3213  * to be enabled.  For device interrupts, queuerun() should be
3214  * called before returning from the last level of interrupt.  Beyond
3215  * this, no timing assumptions should be made about queue scheduling.
3216  */
3217 
3218 /*
3219  * Enable a queue: put it on list of those whose service procedures are
3220  * ready to run and set up the scheduling mechanism.
3221  * The broadcast is done outside the mutex -> to avoid the woken thread
3222  * from contending with the mutex. This is OK 'cos the queue has been
3223  * enqueued on the runlist and flagged safely at this point.
3224  */
3225 void
3226 qenable(queue_t *q)
3227 {
3228         mutex_enter(QLOCK(q));
3229         qenable_locked(q);
3230         mutex_exit(QLOCK(q));
3231 }
3232 /*
3233  * Return number of messages on queue
3234  */
3235 int
3236 qsize(queue_t *qp)
3237 {
3238         int count = 0;
3239         mblk_t *mp;
3240 
3241         mutex_enter(QLOCK(qp));
3242         for (mp = qp->q_first; mp; mp = mp->b_next)
3243                 count++;
3244         mutex_exit(QLOCK(qp));
3245         return (count);
3246 }
3247 
3248 /*
3249  * noenable - set queue so that putq() will not enable it.
3250  * enableok - set queue so that putq() can enable it.
3251  */
3252 void
3253 noenable(queue_t *q)
3254 {
3255         mutex_enter(QLOCK(q));
3256         q->q_flag |= QNOENB;
3257         mutex_exit(QLOCK(q));
3258 }
3259 
3260 void
3261 enableok(queue_t *q)
3262 {
3263         mutex_enter(QLOCK(q));
3264         q->q_flag &= ~QNOENB;
3265         mutex_exit(QLOCK(q));
3266 }
3267 
3268 /*
3269  * Set queue fields.
3270  */
3271 int
3272 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
3273 {
3274         qband_t *qbp = NULL;
3275         queue_t *wrq;
3276         int error = 0;
3277         kthread_id_t freezer;
3278 
3279         freezer = STREAM(q)->sd_freezer;
3280         if (freezer == curthread) {
3281                 ASSERT(frozenstr(q));
3282                 ASSERT(MUTEX_HELD(QLOCK(q)));
3283         } else
3284                 mutex_enter(QLOCK(q));
3285 
3286         if (what >= QBAD) {
3287                 error = EINVAL;
3288                 goto done;
3289         }
3290         if (pri != 0) {
3291                 int i;
3292                 qband_t **qbpp;
3293 
3294                 if (pri > q->q_nband) {
3295                         qbpp = &q->q_bandp;
3296                         while (*qbpp)
3297                                 qbpp = &(*qbpp)->qb_next;
3298                         while (pri > q->q_nband) {
3299                                 if ((*qbpp = allocband()) == NULL) {
3300                                         error = EAGAIN;
3301                                         goto done;
3302                                 }
3303                                 (*qbpp)->qb_hiwat = q->q_hiwat;
3304                                 (*qbpp)->qb_lowat = q->q_lowat;
3305                                 q->q_nband++;
3306                                 qbpp = &(*qbpp)->qb_next;
3307                         }
3308                 }
3309                 qbp = q->q_bandp;
3310                 i = pri;
3311                 while (--i)
3312                         qbp = qbp->qb_next;
3313         }
3314         switch (what) {
3315 
3316         case QHIWAT:
3317                 if (qbp)
3318                         qbp->qb_hiwat = (size_t)val;
3319                 else
3320                         q->q_hiwat = (size_t)val;
3321                 break;
3322 
3323         case QLOWAT:
3324                 if (qbp)
3325                         qbp->qb_lowat = (size_t)val;
3326                 else
3327                         q->q_lowat = (size_t)val;
3328                 break;
3329 
3330         case QMAXPSZ:
3331                 if (qbp)
3332                         error = EINVAL;
3333                 else
3334                         q->q_maxpsz = (ssize_t)val;
3335 
3336                 /*
3337                  * Performance concern, strwrite looks at the module below
3338                  * the stream head for the maxpsz each time it does a write
3339                  * we now cache it at the stream head.  Check to see if this
3340                  * queue is sitting directly below the stream head.
3341                  */
3342                 wrq = STREAM(q)->sd_wrq;
3343                 if (q != wrq->q_next)
3344                         break;
3345 
3346                 /*
3347                  * If the stream is not frozen drop the current QLOCK and
3348                  * acquire the sd_wrq QLOCK which protects sd_qn_*
3349                  */
3350                 if (freezer != curthread) {
3351                         mutex_exit(QLOCK(q));
3352                         mutex_enter(QLOCK(wrq));
3353                 }
3354                 ASSERT(MUTEX_HELD(QLOCK(wrq)));
3355 
3356                 if (strmsgsz != 0) {
3357                         if (val == INFPSZ)
3358                                 val = strmsgsz;
3359                         else  {
3360                                 if (STREAM(q)->sd_vnode->v_type == VFIFO)
3361                                         val = MIN(PIPE_BUF, val);
3362                                 else
3363                                         val = MIN(strmsgsz, val);
3364                         }
3365                 }
3366                 STREAM(q)->sd_qn_maxpsz = val;
3367                 if (freezer != curthread) {
3368                         mutex_exit(QLOCK(wrq));
3369                         mutex_enter(QLOCK(q));
3370                 }
3371                 break;
3372 
3373         case QMINPSZ:
3374                 if (qbp)
3375                         error = EINVAL;
3376                 else
3377                         q->q_minpsz = (ssize_t)val;
3378 
3379                 /*
3380                  * Performance concern, strwrite looks at the module below
3381                  * the stream head for the maxpsz each time it does a write
3382                  * we now cache it at the stream head.  Check to see if this
3383                  * queue is sitting directly below the stream head.
3384                  */
3385                 wrq = STREAM(q)->sd_wrq;
3386                 if (q != wrq->q_next)
3387                         break;
3388 
3389                 /*
3390                  * If the stream is not frozen drop the current QLOCK and
3391                  * acquire the sd_wrq QLOCK which protects sd_qn_*
3392                  */
3393                 if (freezer != curthread) {
3394                         mutex_exit(QLOCK(q));
3395                         mutex_enter(QLOCK(wrq));
3396                 }
3397                 STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3398 
3399                 if (freezer != curthread) {
3400                         mutex_exit(QLOCK(wrq));
3401                         mutex_enter(QLOCK(q));
3402                 }
3403                 break;
3404 
3405         case QSTRUIOT:
3406                 if (qbp)
3407                         error = EINVAL;
3408                 else
3409                         q->q_struiot = (ushort_t)val;
3410                 break;
3411 
3412         case QCOUNT:
3413         case QFIRST:
3414         case QLAST:
3415         case QFLAG:
3416                 error = EPERM;
3417                 break;
3418 
3419         default:
3420                 error = EINVAL;
3421                 break;
3422         }
3423 done:
3424         if (freezer != curthread)
3425                 mutex_exit(QLOCK(q));
3426         return (error);
3427 }
3428 
3429 /*
3430  * Get queue fields.
3431  */
3432 int
3433 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3434 {
3435         qband_t         *qbp = NULL;
3436         int             error = 0;
3437         kthread_id_t    freezer;
3438 
3439         freezer = STREAM(q)->sd_freezer;
3440         if (freezer == curthread) {
3441                 ASSERT(frozenstr(q));
3442                 ASSERT(MUTEX_HELD(QLOCK(q)));
3443         } else
3444                 mutex_enter(QLOCK(q));
3445         if (what >= QBAD) {
3446                 error = EINVAL;
3447                 goto done;
3448         }
3449         if (pri != 0) {
3450                 int i;
3451                 qband_t **qbpp;
3452 
3453                 if (pri > q->q_nband) {
3454                         qbpp = &q->q_bandp;
3455                         while (*qbpp)
3456                                 qbpp = &(*qbpp)->qb_next;
3457                         while (pri > q->q_nband) {
3458                                 if ((*qbpp = allocband()) == NULL) {
3459                                         error = EAGAIN;
3460                                         goto done;
3461                                 }
3462                                 (*qbpp)->qb_hiwat = q->q_hiwat;
3463                                 (*qbpp)->qb_lowat = q->q_lowat;
3464                                 q->q_nband++;
3465                                 qbpp = &(*qbpp)->qb_next;
3466                         }
3467                 }
3468                 qbp = q->q_bandp;
3469                 i = pri;
3470                 while (--i)
3471                         qbp = qbp->qb_next;
3472         }
3473         switch (what) {
3474         case QHIWAT:
3475                 if (qbp)
3476                         *(size_t *)valp = qbp->qb_hiwat;
3477                 else
3478                         *(size_t *)valp = q->q_hiwat;
3479                 break;
3480 
3481         case QLOWAT:
3482                 if (qbp)
3483                         *(size_t *)valp = qbp->qb_lowat;
3484                 else
3485                         *(size_t *)valp = q->q_lowat;
3486                 break;
3487 
3488         case QMAXPSZ:
3489                 if (qbp)
3490                         error = EINVAL;
3491                 else
3492                         *(ssize_t *)valp = q->q_maxpsz;
3493                 break;
3494 
3495         case QMINPSZ:
3496                 if (qbp)
3497                         error = EINVAL;
3498                 else
3499                         *(ssize_t *)valp = q->q_minpsz;
3500                 break;
3501 
3502         case QCOUNT:
3503                 if (qbp)
3504                         *(size_t *)valp = qbp->qb_count;
3505                 else
3506                         *(size_t *)valp = q->q_count;
3507                 break;
3508 
3509         case QFIRST:
3510                 if (qbp)
3511                         *(mblk_t **)valp = qbp->qb_first;
3512                 else
3513                         *(mblk_t **)valp = q->q_first;
3514                 break;
3515 
3516         case QLAST:
3517                 if (qbp)
3518                         *(mblk_t **)valp = qbp->qb_last;
3519                 else
3520                         *(mblk_t **)valp = q->q_last;
3521                 break;
3522 
3523         case QFLAG:
3524                 if (qbp)
3525                         *(uint_t *)valp = qbp->qb_flag;
3526                 else
3527                         *(uint_t *)valp = q->q_flag;
3528                 break;
3529 
3530         case QSTRUIOT:
3531                 if (qbp)
3532                         error = EINVAL;
3533                 else
3534                         *(short *)valp = q->q_struiot;
3535                 break;
3536 
3537         default:
3538                 error = EINVAL;
3539                 break;
3540         }
3541 done:
3542         if (freezer != curthread)
3543                 mutex_exit(QLOCK(q));
3544         return (error);
3545 }
3546 
3547 /*
3548  * Function awakes all in cvwait/sigwait/pollwait, on one of:
3549  *      QWANTWSYNC or QWANTR or QWANTW,
3550  *
3551  * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3552  *       deferred wakeup will be done. Also if strpoll() in progress then a
3553  *       deferred pollwakeup will be done.
3554  */
3555 void
3556 strwakeq(queue_t *q, int flag)
3557 {
3558         stdata_t        *stp = STREAM(q);
3559         pollhead_t      *pl;
3560 
3561         mutex_enter(&stp->sd_lock);
3562         pl = &stp->sd_pollist;
3563         if (flag & QWANTWSYNC) {
3564                 ASSERT(!(q->q_flag & QREADR));
3565                 if (stp->sd_flag & WSLEEP) {
3566                         stp->sd_flag &= ~WSLEEP;
3567                         cv_broadcast(&stp->sd_wrq->q_wait);
3568                 } else {
3569                         stp->sd_wakeq |= WSLEEP;
3570                 }
3571 
3572                 mutex_exit(&stp->sd_lock);
3573                 pollwakeup(pl, POLLWRNORM);
3574                 mutex_enter(&stp->sd_lock);
3575 
3576                 if (stp->sd_sigflags & S_WRNORM)
3577                         strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3578         } else if (flag & QWANTR) {
3579                 if (stp->sd_flag & RSLEEP) {
3580                         stp->sd_flag &= ~RSLEEP;
3581                         cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3582                 } else {
3583                         stp->sd_wakeq |= RSLEEP;
3584                 }
3585 
3586                 mutex_exit(&stp->sd_lock);
3587                 pollwakeup(pl, POLLIN | POLLRDNORM);
3588                 mutex_enter(&stp->sd_lock);
3589 
3590                 {
3591                         int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3592 
3593                         if (events)
3594                                 strsendsig(stp->sd_siglist, events, 0, 0);
3595                 }
3596         } else {
3597                 if (stp->sd_flag & WSLEEP) {
3598                         stp->sd_flag &= ~WSLEEP;
3599                         cv_broadcast(&stp->sd_wrq->q_wait);
3600                 }
3601 
3602                 mutex_exit(&stp->sd_lock);
3603                 pollwakeup(pl, POLLWRNORM);
3604                 mutex_enter(&stp->sd_lock);
3605 
3606                 if (stp->sd_sigflags & S_WRNORM)
3607                         strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3608         }
3609         mutex_exit(&stp->sd_lock);
3610 }
3611 
3612 int
3613 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3614 {
3615         stdata_t *stp = STREAM(q);
3616         int typ  = STRUIOT_STANDARD;
3617         uio_t    *uiop = &dp->d_uio;
3618         dblk_t   *dbp;
3619         ssize_t  uiocnt;
3620         ssize_t  cnt;
3621         unsigned char *ptr;
3622         ssize_t  resid;
3623         int      error = 0;
3624         on_trap_data_t otd;
3625         queue_t *stwrq;
3626 
3627         /*
3628          * Plumbing may change while taking the type so store the
3629          * queue in a temporary variable. It doesn't matter even
3630          * if the we take the type from the previous plumbing,
3631          * that's because if the plumbing has changed when we were
3632          * holding the queue in a temporary variable, we can continue
3633          * processing the message the way it would have been processed
3634          * in the old plumbing, without any side effects but a bit
3635          * extra processing for partial ip header checksum.
3636          *
3637          * This has been done to avoid holding the sd_lock which is
3638          * very hot.
3639          */
3640 
3641         stwrq = stp->sd_struiowrq;
3642         if (stwrq)
3643                 typ = stwrq->q_struiot;
3644 
3645         for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3646                 dbp = mp->b_datap;
3647                 ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3648                 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3649                 cnt = MIN(uiocnt, uiop->uio_resid);
3650                 if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3651                     (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3652                         /*
3653                          * Either this mblk has already been processed
3654                          * or there is no more room in this mblk (?).
3655                          */
3656                         continue;
3657                 }
3658                 switch (typ) {
3659                 case STRUIOT_STANDARD:
3660                         if (noblock) {
3661                                 if (on_trap(&otd, OT_DATA_ACCESS)) {
3662                                         no_trap();
3663                                         error = EWOULDBLOCK;
3664                                         goto out;
3665                                 }
3666                         }
3667                         if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3668                                 if (noblock)
3669                                         no_trap();
3670                                 goto out;
3671                         }
3672                         if (noblock)
3673                                 no_trap();
3674                         break;
3675 
3676                 default:
3677                         error = EIO;
3678                         goto out;
3679                 }
3680                 dbp->db_struioflag |= STRUIO_DONE;
3681                 dbp->db_cksumstuff += cnt;
3682         }
3683 out:
3684         if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3685                 /*
3686                  * A fault has occured and some bytes were moved to the
3687                  * current mblk, the uio_t has already been updated by
3688                  * the appropriate uio routine, so also update the mblk
3689                  * to reflect this in case this same mblk chain is used
3690                  * again (after the fault has been handled).
3691                  */
3692                 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3693                 if (uiocnt >= resid)
3694                         dbp->db_cksumstuff += resid;
3695         }
3696         return (error);
3697 }
3698 
3699 /*
3700  * Try to enter queue synchronously. Any attempt to enter a closing queue will
3701  * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3702  * that removeq() will not try to close the queue while a thread is inside the
3703  * queue.
3704  */
3705 static boolean_t
3706 rwnext_enter(queue_t *qp)
3707 {
3708         mutex_enter(QLOCK(qp));
3709         if (qp->q_flag & QWCLOSE) {
3710                 mutex_exit(QLOCK(qp));
3711                 return (B_FALSE);
3712         }
3713         qp->q_rwcnt++;
3714         ASSERT(qp->q_rwcnt != 0);
3715         mutex_exit(QLOCK(qp));
3716         return (B_TRUE);
3717 }
3718 
3719 /*
3720  * Decrease the count of threads running in sync stream queue and wake up any
3721  * threads blocked in removeq().
3722  */
3723 static void
3724 rwnext_exit(queue_t *qp)
3725 {
3726         mutex_enter(QLOCK(qp));
3727         qp->q_rwcnt--;
3728         if (qp->q_flag & QWANTRMQSYNC) {
3729                 qp->q_flag &= ~QWANTRMQSYNC;
3730                 cv_broadcast(&qp->q_wait);
3731         }
3732         mutex_exit(QLOCK(qp));
3733 }
3734 
3735 /*
3736  * The purpose of rwnext() is to call the rw procedure of the next
3737  * (downstream) modules queue.
3738  *
3739  * treated as put entrypoint for perimeter syncronization.
3740  *
3741  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3742  * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3743  * not matter if any regular put entrypoints have been already entered. We
3744  * can't increment one of the sq_putcounts (instead of sq_count) because
3745  * qwait_rw won't know which counter to decrement.
3746  *
3747  * It would be reasonable to add the lockless FASTPUT logic.
3748  */
3749 int
3750 rwnext(queue_t *qp, struiod_t *dp)
3751 {
3752         queue_t         *nqp;
3753         syncq_t         *sq;
3754         uint16_t        count;
3755         uint16_t        flags;
3756         struct qinit    *qi;
3757         int             (*proc)();
3758         struct stdata   *stp;
3759         int             isread;
3760         int             rval;
3761 
3762         stp = STREAM(qp);
3763         /*
3764          * Prevent q_next from changing by holding sd_lock until acquiring
3765          * SQLOCK. Note that a read-side rwnext from the streamhead will
3766          * already have sd_lock acquired. In either case sd_lock is always
3767          * released after acquiring SQLOCK.
3768          *
3769          * The streamhead read-side holding sd_lock when calling rwnext is
3770          * required to prevent a race condition were M_DATA mblks flowing
3771          * up the read-side of the stream could be bypassed by a rwnext()
3772          * down-call. In this case sd_lock acts as the streamhead perimeter.
3773          */
3774         if ((nqp = _WR(qp)) == qp) {
3775                 isread = 0;
3776                 mutex_enter(&stp->sd_lock);
3777                 qp = nqp->q_next;
3778         } else {
3779                 isread = 1;
3780                 if (nqp != stp->sd_wrq)
3781                         /* Not streamhead */
3782                         mutex_enter(&stp->sd_lock);
3783                 qp = _RD(nqp->q_next);
3784         }
3785         qi = qp->q_qinfo;
3786         if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3787                 /*
3788                  * Not a synchronous module or no r/w procedure for this
3789                  * queue, so just return EINVAL and let the caller handle it.
3790                  */
3791                 mutex_exit(&stp->sd_lock);
3792                 return (EINVAL);
3793         }
3794 
3795         if (rwnext_enter(qp) == B_FALSE) {
3796                 mutex_exit(&stp->sd_lock);
3797                 return (EINVAL);
3798         }
3799 
3800         sq = qp->q_syncq;
3801         mutex_enter(SQLOCK(sq));
3802         mutex_exit(&stp->sd_lock);
3803         count = sq->sq_count;
3804         flags = sq->sq_flags;
3805         ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3806 
3807         while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3808                 /*
3809                  * if this queue is being closed, return.
3810                  */
3811                 if (qp->q_flag & QWCLOSE) {
3812                         mutex_exit(SQLOCK(sq));
3813                         rwnext_exit(qp);
3814                         return (EINVAL);
3815                 }
3816 
3817                 /*
3818                  * Wait until we can enter the inner perimeter.
3819                  */
3820                 sq->sq_flags = flags | SQ_WANTWAKEUP;
3821                 cv_wait(&sq->sq_wait, SQLOCK(sq));
3822                 count = sq->sq_count;
3823                 flags = sq->sq_flags;
3824         }
3825 
3826         if (isread == 0 && stp->sd_struiowrq == NULL ||
3827             isread == 1 && stp->sd_struiordq == NULL) {
3828                 /*
3829                  * Stream plumbing changed while waiting for inner perimeter
3830                  * so just return EINVAL and let the caller handle it.
3831                  */
3832                 mutex_exit(SQLOCK(sq));
3833                 rwnext_exit(qp);
3834                 return (EINVAL);
3835         }
3836         if (!(flags & SQ_CIPUT))
3837                 sq->sq_flags = flags | SQ_EXCL;
3838         sq->sq_count = count + 1;
3839         ASSERT(sq->sq_count != 0);           /* Wraparound */
3840         /*
3841          * Note: The only message ordering guarantee that rwnext() makes is
3842          *       for the write queue flow-control case. All others (r/w queue
3843          *       with q_count > 0 (or q_first != 0)) are the resposibilty of
3844          *       the queue's rw procedure. This could be genralized here buy
3845          *       running the queue's service procedure, but that wouldn't be
3846          *       the most efficent for all cases.
3847          */
3848         mutex_exit(SQLOCK(sq));
3849         if (! isread && (qp->q_flag & QFULL)) {
3850                 /*
3851                  * Write queue may be flow controlled. If so,
3852                  * mark the queue for wakeup when it's not.
3853                  */
3854                 mutex_enter(QLOCK(qp));
3855                 if (qp->q_flag & QFULL) {
3856                         qp->q_flag |= QWANTWSYNC;
3857                         mutex_exit(QLOCK(qp));
3858                         rval = EWOULDBLOCK;
3859                         goto out;
3860                 }
3861                 mutex_exit(QLOCK(qp));
3862         }
3863 
3864         if (! isread && dp->d_mp)
3865                 STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3866                     dp->d_mp->b_datap->db_base);
3867 
3868         rval = (*proc)(qp, dp);
3869 
3870         if (isread && dp->d_mp)
3871                 STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3872                     dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3873 out:
3874         /*
3875          * The queue is protected from being freed by sq_count, so it is
3876          * safe to call rwnext_exit and reacquire SQLOCK(sq).
3877          */
3878         rwnext_exit(qp);
3879 
3880         mutex_enter(SQLOCK(sq));
3881         flags = sq->sq_flags;
3882         ASSERT(sq->sq_count != 0);
3883         sq->sq_count--;
3884         if (flags & SQ_TAIL) {
3885                 putnext_tail(sq, qp, flags);
3886                 /*
3887                  * The only purpose of this ASSERT is to preserve calling stack
3888                  * in DEBUG kernel.
3889                  */
3890                 ASSERT(flags & SQ_TAIL);
3891                 return (rval);
3892         }
3893         ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3894         /*
3895          * Safe to always drop SQ_EXCL:
3896          *      Not SQ_CIPUT means we set SQ_EXCL above
3897          *      For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3898          *      did a qwriter(INNER) in which case nobody else
3899          *      is in the inner perimeter and we are exiting.
3900          *
3901          * I would like to make the following assertion:
3902          *
3903          * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3904          *      sq->sq_count == 0);
3905          *
3906          * which indicates that if we are both putshared and exclusive,
3907          * we became exclusive while executing the putproc, and the only
3908          * claim on the syncq was the one we dropped a few lines above.
3909          * But other threads that enter putnext while the syncq is exclusive
3910          * need to make a claim as they may need to drop SQLOCK in the
3911          * has_writers case to avoid deadlocks.  If these threads are
3912          * delayed or preempted, it is possible that the writer thread can
3913          * find out that there are other claims making the (sq_count == 0)
3914          * test invalid.
3915          */
3916 
3917         sq->sq_flags = flags & ~SQ_EXCL;
3918         if (sq->sq_flags & SQ_WANTWAKEUP) {
3919                 sq->sq_flags &= ~SQ_WANTWAKEUP;
3920                 cv_broadcast(&sq->sq_wait);
3921         }
3922         mutex_exit(SQLOCK(sq));
3923         return (rval);
3924 }
3925 
3926 /*
3927  * The purpose of infonext() is to call the info procedure of the next
3928  * (downstream) modules queue.
3929  *
3930  * treated as put entrypoint for perimeter syncronization.
3931  *
3932  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3933  * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3934  * it does not matter if any regular put entrypoints have been already
3935  * entered.
3936  */
3937 int
3938 infonext(queue_t *qp, infod_t *idp)
3939 {
3940         queue_t         *nqp;
3941         syncq_t         *sq;
3942         uint16_t        count;
3943         uint16_t        flags;
3944         struct qinit    *qi;
3945         int             (*proc)();
3946         struct stdata   *stp;
3947         int             rval;
3948 
3949         stp = STREAM(qp);
3950         /*
3951          * Prevent q_next from changing by holding sd_lock until
3952          * acquiring SQLOCK.
3953          */
3954         mutex_enter(&stp->sd_lock);
3955         if ((nqp = _WR(qp)) == qp) {
3956                 qp = nqp->q_next;
3957         } else {
3958                 qp = _RD(nqp->q_next);
3959         }
3960         qi = qp->q_qinfo;
3961         if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3962                 mutex_exit(&stp->sd_lock);
3963                 return (EINVAL);
3964         }
3965         sq = qp->q_syncq;
3966         mutex_enter(SQLOCK(sq));
3967         mutex_exit(&stp->sd_lock);
3968         count = sq->sq_count;
3969         flags = sq->sq_flags;
3970         ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3971 
3972         while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3973                 /*
3974                  * Wait until we can enter the inner perimeter.
3975                  */
3976                 sq->sq_flags = flags | SQ_WANTWAKEUP;
3977                 cv_wait(&sq->sq_wait, SQLOCK(sq));
3978                 count = sq->sq_count;
3979                 flags = sq->sq_flags;
3980         }
3981 
3982         if (! (flags & SQ_CIPUT))
3983                 sq->sq_flags = flags | SQ_EXCL;
3984         sq->sq_count = count + 1;
3985         ASSERT(sq->sq_count != 0);           /* Wraparound */
3986         mutex_exit(SQLOCK(sq));
3987 
3988         rval = (*proc)(qp, idp);
3989 
3990         mutex_enter(SQLOCK(sq));
3991         flags = sq->sq_flags;
3992         ASSERT(sq->sq_count != 0);
3993         sq->sq_count--;
3994         if (flags & SQ_TAIL) {
3995                 putnext_tail(sq, qp, flags);
3996                 /*
3997                  * The only purpose of this ASSERT is to preserve calling stack
3998                  * in DEBUG kernel.
3999                  */
4000                 ASSERT(flags & SQ_TAIL);
4001                 return (rval);
4002         }
4003         ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
4004 /*
4005  * XXXX
4006  * I am not certain the next comment is correct here.  I need to consider
4007  * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
4008  * might cause other problems.  It just might be safer to drop it if
4009  * !SQ_CIPUT because that is when we set it.
4010  */
4011         /*
4012          * Safe to always drop SQ_EXCL:
4013          *      Not SQ_CIPUT means we set SQ_EXCL above
4014          *      For SQ_CIPUT SQ_EXCL will only be set if the put procedure
4015          *      did a qwriter(INNER) in which case nobody else
4016          *      is in the inner perimeter and we are exiting.
4017          *
4018          * I would like to make the following assertion:
4019          *
4020          * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
4021          *      sq->sq_count == 0);
4022          *
4023          * which indicates that if we are both putshared and exclusive,
4024          * we became exclusive while executing the putproc, and the only
4025          * claim on the syncq was the one we dropped a few lines above.
4026          * But other threads that enter putnext while the syncq is exclusive
4027          * need to make a claim as they may need to drop SQLOCK in the
4028          * has_writers case to avoid deadlocks.  If these threads are
4029          * delayed or preempted, it is possible that the writer thread can
4030          * find out that there are other claims making the (sq_count == 0)
4031          * test invalid.
4032          */
4033 
4034         sq->sq_flags = flags & ~SQ_EXCL;
4035         mutex_exit(SQLOCK(sq));
4036         return (rval);
4037 }
4038 
4039 /*
4040  * Return nonzero if the queue is responsible for struio(), else return 0.
4041  */
4042 int
4043 isuioq(queue_t *q)
4044 {
4045         if (q->q_flag & QREADR)
4046                 return (STREAM(q)->sd_struiordq == q);
4047         else
4048                 return (STREAM(q)->sd_struiowrq == q);
4049 }
4050 
4051 #if defined(__sparc)
4052 int disable_putlocks = 0;
4053 #else
4054 int disable_putlocks = 1;
4055 #endif
4056 
4057 /*
4058  * called by create_putlock.
4059  */
4060 static void
4061 create_syncq_putlocks(queue_t *q)
4062 {
4063         syncq_t *sq = q->q_syncq;
4064         ciputctrl_t *cip;
4065         int i;
4066 
4067         ASSERT(sq != NULL);
4068 
4069         ASSERT(disable_putlocks == 0);
4070         ASSERT(n_ciputctrl >= min_n_ciputctrl);
4071         ASSERT(ciputctrl_cache != NULL);
4072 
4073         if (!(sq->sq_type & SQ_CIPUT))
4074                 return;
4075 
4076         for (i = 0; i <= 1; i++) {
4077                 if (sq->sq_ciputctrl == NULL) {
4078                         cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4079                         SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4080                         mutex_enter(SQLOCK(sq));
4081                         if (sq->sq_ciputctrl != NULL) {
4082                                 mutex_exit(SQLOCK(sq));
4083                                 kmem_cache_free(ciputctrl_cache, cip);
4084                         } else {
4085                                 ASSERT(sq->sq_nciputctrl == 0);
4086                                 sq->sq_nciputctrl = n_ciputctrl - 1;
4087                                 /*
4088                                  * putnext checks sq_ciputctrl without holding
4089                                  * SQLOCK. if it is not NULL putnext assumes
4090                                  * sq_nciputctrl is initialized. membar below
4091                                  * insures that.
4092                                  */
4093                                 membar_producer();
4094                                 sq->sq_ciputctrl = cip;
4095                                 mutex_exit(SQLOCK(sq));
4096                         }
4097                 }
4098                 ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
4099                 if (i == 1)
4100                         break;
4101                 q = _OTHERQ(q);
4102                 if (!(q->q_flag & QPERQ)) {
4103                         ASSERT(sq == q->q_syncq);
4104                         break;
4105                 }
4106                 ASSERT(q->q_syncq != NULL);
4107                 ASSERT(sq != q->q_syncq);
4108                 sq = q->q_syncq;
4109                 ASSERT(sq->sq_type & SQ_CIPUT);
4110         }
4111 }
4112 
4113 /*
4114  * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
4115  * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
4116  * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
4117  * starting from q and down to the driver.
4118  *
4119  * This should be called after the affected queues are part of stream
4120  * geometry. It should be called from driver/module open routine after
4121  * qprocson() call. It is also called from nfs syscall where it is known that
4122  * stream is configured and won't change its geometry during create_putlock
4123  * call.
4124  *
4125  * caller normally uses 0 value for the stream argument to speed up MT putnext
4126  * into the perimeter of q for example because its perimeter is per module
4127  * (e.g. IP).
4128  *
4129  * caller normally uses non 0 value for the stream argument to hint the system
4130  * that the stream of q is a very contended global system stream
4131  * (e.g. NFS/UDP) and the part of the stream from q to the driver is
4132  * particularly MT hot.
4133  *
4134  * Caller insures stream plumbing won't happen while we are here and therefore
4135  * q_next can be safely used.
4136  */
4137 
4138 void
4139 create_putlocks(queue_t *q, int stream)
4140 {
4141         ciputctrl_t     *cip;
4142         struct stdata   *stp = STREAM(q);
4143 
4144         q = _WR(q);
4145         ASSERT(stp != NULL);
4146 
4147         if (disable_putlocks != 0)
4148                 return;
4149 
4150         if (n_ciputctrl < min_n_ciputctrl)
4151                 return;
4152 
4153         ASSERT(ciputctrl_cache != NULL);
4154 
4155         if (stream != 0 && stp->sd_ciputctrl == NULL) {
4156                 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4157                 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4158                 mutex_enter(&stp->sd_lock);
4159                 if (stp->sd_ciputctrl != NULL) {
4160                         mutex_exit(&stp->sd_lock);
4161                         kmem_cache_free(ciputctrl_cache, cip);
4162                 } else {
4163                         ASSERT(stp->sd_nciputctrl == 0);
4164                         stp->sd_nciputctrl = n_ciputctrl - 1;
4165                         /*
4166                          * putnext checks sd_ciputctrl without holding
4167                          * sd_lock. if it is not NULL putnext assumes
4168                          * sd_nciputctrl is initialized. membar below
4169                          * insures that.
4170                          */
4171                         membar_producer();
4172                         stp->sd_ciputctrl = cip;
4173                         mutex_exit(&stp->sd_lock);
4174                 }
4175         }
4176 
4177         ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
4178 
4179         while (_SAMESTR(q)) {
4180                 create_syncq_putlocks(q);
4181                 if (stream == 0)
4182                         return;
4183                 q = q->q_next;
4184         }
4185         ASSERT(q != NULL);
4186         create_syncq_putlocks(q);
4187 }
4188 
4189 /*
4190  * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
4191  * through a stream.
4192  *
4193  * Data currently record per-event is a timestamp, module/driver name,
4194  * downstream module/driver name, optional callstack, event type and a per
4195  * type datum.  Much of the STREAMS framework is instrumented for automatic
4196  * flow tracing (when enabled).  Events can be defined and used by STREAMS
4197  * modules and drivers.
4198  *
4199  * Global objects:
4200  *
4201  *      str_ftevent() - Add a flow-trace event to a dblk.
4202  *      str_ftfree() - Free flow-trace data
4203  *
4204  * Local objects:
4205  *
4206  *      fthdr_cache - pointer to the kmem cache for trace header.
4207  *      ftblk_cache - pointer to the kmem cache for trace data blocks.
4208  */
4209 
4210 int str_ftnever = 1;    /* Don't do STREAMS flow tracing */
4211 int str_ftstack = 0;    /* Don't record event call stacks */
4212 
4213 void
4214 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
4215 {
4216         ftblk_t *bp = hp->tail;
4217         ftblk_t *nbp;
4218         ftevnt_t *ep;
4219         int ix, nix;
4220 
4221         ASSERT(hp != NULL);
4222 
4223         for (;;) {
4224                 if ((ix = bp->ix) == FTBLK_EVNTS) {
4225                         /*
4226                          * Tail doesn't have room, so need a new tail.
4227                          *
4228                          * To make this MT safe, first, allocate a new
4229                          * ftblk, and initialize it.  To make life a
4230                          * little easier, reserve the first slot (mostly
4231                          * by making ix = 1).  When we are finished with
4232                          * the initialization, CAS this pointer to the
4233                          * tail.  If this succeeds, this is the new
4234                          * "next" block.  Otherwise, another thread
4235                          * got here first, so free the block and start
4236                          * again.
4237                          */
4238                         nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP);
4239                         if (nbp == NULL) {
4240                                 /* no mem, so punt */
4241                                 str_ftnever++;
4242                                 /* free up all flow data? */
4243                                 return;
4244                         }
4245                         nbp->nxt = NULL;
4246                         nbp->ix = 1;
4247                         /*
4248                          * Just in case there is another thread about
4249                          * to get the next index, we need to make sure
4250                          * the value is there for it.
4251                          */
4252                         membar_producer();
4253                         if (casptr(&hp->tail, bp, nbp) == bp) {
4254                                 /* CAS was successful */
4255                                 bp->nxt = nbp;
4256                                 membar_producer();
4257                                 bp = nbp;
4258                                 ix = 0;
4259                                 goto cas_good;
4260                         } else {
4261                                 kmem_cache_free(ftblk_cache, nbp);
4262                                 bp = hp->tail;
4263                                 continue;
4264                         }
4265                 }
4266                 nix = ix + 1;
4267                 if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) {
4268                 cas_good:
4269                         if (curthread != hp->thread) {
4270                                 hp->thread = curthread;
4271                                 evnt |= FTEV_CS;
4272                         }
4273                         if (CPU->cpu_seqid != hp->cpu_seqid) {
4274                                 hp->cpu_seqid = CPU->cpu_seqid;
4275                                 evnt |= FTEV_PS;
4276                         }
4277                         ep = &bp->ev[ix];
4278                         break;
4279                 }
4280         }
4281 
4282         if (evnt & FTEV_QMASK) {
4283                 queue_t *qp = p;
4284 
4285                 if (!(qp->q_flag & QREADR))
4286                         evnt |= FTEV_ISWR;
4287 
4288                 ep->mid = Q2NAME(qp);
4289 
4290                 /*
4291                  * We only record the next queue name for FTEV_PUTNEXT since
4292                  * that's the only time we *really* need it, and the putnext()
4293                  * code ensures that qp->q_next won't vanish.  (We could use
4294                  * claimstr()/releasestr() but at a performance cost.)
4295                  */
4296                 if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL)
4297                         ep->midnext = Q2NAME(qp->q_next);
4298                 else
4299                         ep->midnext = NULL;
4300         } else {
4301                 ep->mid = p;
4302                 ep->midnext = NULL;
4303         }
4304 
4305         if (ep->stk != NULL)
4306                 ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH);
4307 
4308         ep->ts = gethrtime();
4309         ep->evnt = evnt;
4310         ep->data = data;
4311         hp->hash = (hp->hash << 9) + hp->hash;
4312         hp->hash += (evnt << 16) | data;
4313         hp->hash += (uintptr_t)ep->mid;
4314 }
4315 
4316 /*
4317  * Free flow-trace data.
4318  */
4319 void
4320 str_ftfree(dblk_t *dbp)
4321 {
4322         fthdr_t *hp = dbp->db_fthdr;
4323         ftblk_t *bp = &hp->first;
4324         ftblk_t *nbp;
4325 
4326         if (bp != hp->tail || bp->ix != 0) {
4327                 /*
4328                  * Clear out the hash, have the tail point to itself, and free
4329                  * any continuation blocks.
4330                  */
4331                 bp = hp->first.nxt;
4332                 hp->tail = &hp->first;
4333                 hp->hash = 0;
4334                 hp->first.nxt = NULL;
4335                 hp->first.ix = 0;
4336                 while (bp != NULL) {
4337                         nbp = bp->nxt;
4338                         kmem_cache_free(ftblk_cache, bp);
4339                         bp = nbp;
4340                 }
4341         }
4342         kmem_cache_free(fthdr_cache, hp);
4343         dbp->db_fthdr = NULL;
4344 }