1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 
  22 /*
  23  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  24  */
  25 
  26 /*
  27  * FMD Transport Subsystem
  28  *
  29  * A transport module uses some underlying mechanism to transport events.
  30  * This mechanism may use any underlying link-layer protocol and may support
  31  * additional link-layer packets unrelated to FMA.  Some appropriate link-
  32  * layer mechanism to create the underlying connection is expected to be
  33  * called prior to calling fmd_xprt_open() itself.  Alternatively, a transport
  34  * may be created in the suspended state by specifying the FMD_XPRT_SUSPENDED
  35  * flag as part of the call to fmd_xprt_open(), and then may be resumed later.
  36  * The underlying transport mechanism is *required* to provide ordering: that
  37  * is, the sequences of bytes written across the transport must be read by
  38  * the remote peer in the order that they are written, even across separate
  39  * calls to fmdo_send().  As an example, the Internet TCP protocol would be
  40  * a valid transport as it guarantees ordering, whereas the Internet UDP
  41  * protocol would not because UDP datagrams may be delivered in any order
  42  * as a result of delays introduced when datagrams pass through routers.
  43  *
  44  * Similar to sending events, a transport module receives events that are from
  45  * its peer remote endpoint using some transport-specific mechanism that is
  46  * unknown to FMD.  As each event is received, the transport module is
  47  * responsible for constructing a valid nvlist_t object from the data and then
  48  * calling fmd_xprt_post() to post the event to the containing FMD's dispatch
  49  * queue, making it available to all local modules that are not transport
  50  * modules that have subscribed to the event.
  51  *
  52  * The following state machine is used for each transport.  The initial state
  53  * is either SYN, ACK, or RUN, depending on the flags specified to xprt_create.
  54  *
  55  *       FMD_XPRT_ACCEPT   !FMD_XPRT_ACCEPT
  56  *             |                 |
  57  * waiting  +--v--+           +--v--+  waiting
  58  * for syn  | SYN |--+     --+| ACK |  for ack
  59  * event    +-----+   \   /   +-----+  event
  60  *             |       \ /       |
  61  * drop all +--v--+     X     +--v--+  send subscriptions,
  62  * events   | ERR |<---+ +--->| SUB |  recv subscriptions,
  63  *          +-----+           +-----+  wait for run event
  64  *             ^                 |
  65  *             |     +-----+     |
  66  *             +-----| RUN |<----+
  67  *                   +--^--+
  68  *                      |
  69  *               FMD_XPRT_RDONLY
  70  *
  71  * When fmd_xprt_open() is called without FMD_XPRT_ACCEPT, the Common Transport
  72  * Layer enqueues a "syn" event for the module in its event queue and sets the
  73  * state to ACK.  In state ACK, we are waiting for the transport to get an
  74  * "ack" event and call fmd_xprt_post() on this event.  Other events will be
  75  * discarded.  If an "ack" is received, we transition to state SUB.  If a
  76  * configurable timeout occurs or if the "ack" is invalid (e.g. invalid version
  77  * exchange), we transition to state ERR.  Once in state ERR, no further
  78  * operations are valid except fmd_xprt_close() and fmd_xprt_error() will
  79  * return a non-zero value to the caller indicating the transport has failed.
  80  *
  81  * When fmd_xprt_open() is called with FMD_XPRT_ACCEPT, the Common Transport
  82  * Layer assumes this transport is being used to accept a virtual connection
  83  * from a remote peer that is sending a "syn", and sets the initial state to
  84  * SYN.  In this state, the transport waits for a "syn" event, validates it,
  85  * and then transitions to state SUB if it is valid or state ERR if it is not.
  86  *
  87  * Once in state SUB, the transport module is expected to receive a sequence of
  88  * zero or more "subscribe" events from the remote peer, followed by a "run"
  89  * event.  Once in state RUN, the transport is active and any events can be
  90  * sent or received.  The transport module is free to call fmd_xprt_close()
  91  * from any state.  The fmd_xprt_error() function will return zero if the
  92  * transport is not in the ERR state, or non-zero if it is in the ERR state.
  93  *
  94  * Once the state machine reaches RUN, other FMA protocol events can be sent
  95  * and received across the transport in addition to the various control events.
  96  *
  97  * Table of Common Transport Layer Control Events
  98  * ==============================================
  99  *
 100  * FMA Class                     Payload
 101  * ---------                     -------
 102  * resource.fm.xprt.uuclose      string (uuid of case)
 103  * resource.fm.xprt.uuresolved   string (uuid of case)
 104  * resource.fm.xprt.updated      string (uuid of case)
 105  * resource.fm.xprt.subscribe    string (class pattern)
 106  * resource.fm.xprt.unsubscribe  string (class pattern)
 107  * resource.fm.xprt.unsuback     string (class pattern)
 108  * resource.fm.xprt.syn          version information
 109  * resource.fm.xprt.ack          version information
 110  * resource.fm.xprt.run          version information
 111  *
 112  * Control events are used to add and delete proxy subscriptions on the remote
 113  * transport peer module, and to set up connections.  When a "syn" event is
 114  * sent, FMD will include in the payload the highest version of the FMA event
 115  * protocol that is supported by the sender.  When a "syn" event is received,
 116  * the receiving FMD will use the minimum of this version and its version of
 117  * the protocol, and reply with this new minimum version in the "ack" event.
 118  * The receiver will then use this new minimum for subsequent event semantics.
 119  */
 120 
 121 #include <sys/fm/protocol.h>
 122 #include <strings.h>
 123 #include <limits.h>
 124 
 125 #include <fmd_alloc.h>
 126 #include <fmd_error.h>
 127 #include <fmd_conf.h>
 128 #include <fmd_subr.h>
 129 #include <fmd_string.h>
 130 #include <fmd_protocol.h>
 131 #include <fmd_thread.h>
 132 #include <fmd_eventq.h>
 133 #include <fmd_dispq.h>
 134 #include <fmd_ctl.h>
 135 #include <fmd_log.h>
 136 #include <fmd_ustat.h>
 137 #include <fmd_case.h>
 138 #include <fmd_api.h>
 139 #include <fmd_fmri.h>
 140 #include <fmd_asru.h>
 141 #include <fmd_xprt.h>
 142 
 143 #include <fmd.h>
 144 
 145 /*
 146  * The states shown above in the transport state machine diagram are encoded
 147  * using arrays of class patterns and a corresponding action function.  These
 148  * arrays are then passed to fmd_xprt_transition() to change transport states.
 149  */
 150 
 151 const fmd_xprt_rule_t _fmd_xprt_state_syn[] = {
 152 { "resource.fm.xprt.syn", fmd_xprt_event_syn },
 153 { "*", fmd_xprt_event_error },
 154 { NULL, NULL }
 155 };
 156 
 157 const fmd_xprt_rule_t _fmd_xprt_state_ack[] = {
 158 { "resource.fm.xprt.ack", fmd_xprt_event_ack },
 159 { "*", fmd_xprt_event_error },
 160 };
 161 
 162 const fmd_xprt_rule_t _fmd_xprt_state_err[] = {
 163 { "*", fmd_xprt_event_drop },
 164 { NULL, NULL }
 165 };
 166 
 167 const fmd_xprt_rule_t _fmd_xprt_state_sub[] = {
 168 { "resource.fm.xprt.subscribe", fmd_xprt_event_sub },
 169 { "resource.fm.xprt.run", fmd_xprt_event_run },
 170 { "resource.fm.xprt.*", fmd_xprt_event_error },
 171 { "*", fmd_xprt_event_drop },
 172 { NULL, NULL }
 173 };
 174 
 175 const fmd_xprt_rule_t _fmd_xprt_state_run[] = {
 176 { "resource.fm.xprt.subscribe", fmd_xprt_event_sub },
 177 { "resource.fm.xprt.unsubscribe", fmd_xprt_event_unsub },
 178 { "resource.fm.xprt.unsuback", fmd_xprt_event_unsuback },
 179 { "resource.fm.xprt.uuclose", fmd_xprt_event_uuclose },
 180 { "resource.fm.xprt.uuresolved", fmd_xprt_event_uuresolved },
 181 { "resource.fm.xprt.updated", fmd_xprt_event_updated },
 182 { "resource.fm.xprt.*", fmd_xprt_event_error },
 183 { NULL, NULL }
 184 };
 185 
 186 /*
 187  * Template for per-transport statistics installed by fmd on behalf of each
 188  * transport.  These are used to initialize the per-transport xi_stats.  For
 189  * each statistic, the name is prepended with "fmd.xprt.%u", where %u is the
 190  * transport ID (xi_id) and then are inserted into the per-module stats hash.
 191  * The values in this array must match fmd_xprt_stat_t from <fmd_xprt.h>.
 192  */
 193 static const fmd_xprt_stat_t _fmd_xprt_stat_tmpl = {
 194 {
 195 { "dispatched", FMD_TYPE_UINT64, "total events dispatched to transport" },
 196 { "dequeued", FMD_TYPE_UINT64, "total events dequeued by transport" },
 197 { "prdequeued", FMD_TYPE_UINT64, "protocol events dequeued by transport" },
 198 { "dropped", FMD_TYPE_UINT64, "total events dropped on queue overflow" },
 199 { "wcnt", FMD_TYPE_UINT32, "count of events waiting on queue" },
 200 { "wtime", FMD_TYPE_TIME, "total wait time on queue" },
 201 { "wlentime", FMD_TYPE_TIME, "total wait length * time product" },
 202 { "wlastupdate", FMD_TYPE_TIME, "hrtime of last wait queue update" },
 203 { "dtime", FMD_TYPE_TIME, "total processing time after dequeue" },
 204 { "dlastupdate", FMD_TYPE_TIME, "hrtime of last event dequeue completion" },
 205 },
 206 { "module", FMD_TYPE_STRING, "module that owns this transport" },
 207 { "authority", FMD_TYPE_STRING, "authority associated with this transport" },
 208 { "state", FMD_TYPE_STRING, "current transport state" },
 209 { "received", FMD_TYPE_UINT64, "events received by transport" },
 210 { "discarded", FMD_TYPE_UINT64, "bad events discarded by transport" },
 211 { "retried", FMD_TYPE_UINT64, "retries requested of transport" },
 212 { "replayed", FMD_TYPE_UINT64, "events replayed by transport" },
 213 { "lost", FMD_TYPE_UINT64, "events lost by transport" },
 214 { "timeouts", FMD_TYPE_UINT64, "events received by transport with ttl=0" },
 215 { "subscriptions", FMD_TYPE_UINT64, "subscriptions registered to transport" },
 216 };
 217 
 218 static void
 219 fmd_xprt_class_hash_create(fmd_xprt_class_hash_t *xch, fmd_eventq_t *eq)
 220 {
 221         uint_t hashlen = fmd.d_str_buckets;
 222 
 223         xch->xch_queue = eq;
 224         xch->xch_hashlen = hashlen;
 225         xch->xch_hash = fmd_zalloc(sizeof (void *) * hashlen, FMD_SLEEP);
 226 }
 227 
 228 static void
 229 fmd_xprt_class_hash_destroy(fmd_xprt_class_hash_t *xch)
 230 {
 231         fmd_eventq_t *eq = xch->xch_queue;
 232         fmd_xprt_class_t *xcp, *ncp;
 233         uint_t i;
 234 
 235         for (i = 0; i < xch->xch_hashlen; i++) {
 236                 for (xcp = xch->xch_hash[i]; xcp != NULL; xcp = ncp) {
 237                         ncp = xcp->xc_next;
 238 
 239                         if (eq != NULL)
 240                                 fmd_dispq_delete(fmd.d_disp, eq, xcp->xc_class);
 241 
 242                         fmd_strfree(xcp->xc_class);
 243                         fmd_free(xcp, sizeof (fmd_xprt_class_t));
 244                 }
 245         }
 246 
 247         fmd_free(xch->xch_hash, sizeof (void *) * xch->xch_hashlen);
 248 }
 249 
 250 /*
 251  * Insert the specified class into the specified class hash, and return the
 252  * reference count.  A return value of one indicates this is the first insert.
 253  * If an eventq is associated with the hash, insert a dispq subscription for it.
 254  */
 255 static uint_t
 256 fmd_xprt_class_hash_insert(fmd_xprt_impl_t *xip,
 257     fmd_xprt_class_hash_t *xch, const char *class)
 258 {
 259         uint_t h = fmd_strhash(class) % xch->xch_hashlen;
 260         fmd_xprt_class_t *xcp;
 261 
 262         ASSERT(MUTEX_HELD(&xip->xi_lock));
 263 
 264         for (xcp = xch->xch_hash[h]; xcp != NULL; xcp = xcp->xc_next) {
 265                 if (strcmp(class, xcp->xc_class) == 0)
 266                         return (++xcp->xc_refs);
 267         }
 268 
 269         xcp = fmd_alloc(sizeof (fmd_xprt_class_t), FMD_SLEEP);
 270         xcp->xc_class = fmd_strdup(class, FMD_SLEEP);
 271         xcp->xc_next = xch->xch_hash[h];
 272         xcp->xc_refs = 1;
 273         xch->xch_hash[h] = xcp;
 274 
 275         if (xch->xch_queue != NULL)
 276                 fmd_dispq_insert(fmd.d_disp, xch->xch_queue, class);
 277 
 278         return (xcp->xc_refs);
 279 }
 280 
 281 /*
 282  * Delete the specified class from the specified class hash, and return the
 283  * reference count.  A return value of zero indicates the class was deleted.
 284  * If an eventq is associated with the hash, delete the dispq subscription.
 285  */
 286 static uint_t
 287 fmd_xprt_class_hash_delete(fmd_xprt_impl_t *xip,
 288     fmd_xprt_class_hash_t *xch, const char *class)
 289 {
 290         uint_t h = fmd_strhash(class) % xch->xch_hashlen;
 291         fmd_xprt_class_t *xcp, **pp;
 292 
 293         ASSERT(MUTEX_HELD(&xip->xi_lock));
 294         pp = &xch->xch_hash[h];
 295 
 296         for (xcp = *pp; xcp != NULL; xcp = xcp->xc_next) {
 297                 if (strcmp(class, xcp->xc_class) == 0)
 298                         break;
 299                 else
 300                         pp = &xcp->xc_next;
 301         }
 302 
 303         if (xcp == NULL)
 304                 return (-1U); /* explicitly permit an invalid delete */
 305 
 306         if (--xcp->xc_refs != 0)
 307                 return (xcp->xc_refs);
 308 
 309         ASSERT(xcp->xc_refs == 0);
 310         *pp = xcp->xc_next;
 311 
 312         fmd_strfree(xcp->xc_class);
 313         fmd_free(xcp, sizeof (fmd_xprt_class_t));
 314 
 315         if (xch->xch_queue != NULL)
 316                 fmd_dispq_delete(fmd.d_disp, xch->xch_queue, class);
 317 
 318         return (0);
 319 }
 320 
 321 /*
 322  * Queue subscribe events for the specified transport corresponding to all of
 323  * the active module subscriptions.  This is an extremely heavyweight operation
 324  * that we expect to take place rarely (i.e. when loading a transport module
 325  * or when it establishes a connection).  We lock all of the known modules to
 326  * prevent them from adding or deleting subscriptions, then snapshot their
 327  * subscriptions, and then unlock all of the modules.  We hold the modhash
 328  * lock for the duration of this operation to prevent new modules from loading.
 329  */
 330 static void
 331 fmd_xprt_subscribe_modhash(fmd_xprt_impl_t *xip, fmd_modhash_t *mhp)
 332 {
 333         fmd_xprt_t *xp = (fmd_xprt_t *)xip;
 334         const fmd_conf_path_t *pap;
 335         fmd_module_t *mp;
 336         uint_t i, j;
 337 
 338         (void) pthread_rwlock_rdlock(&mhp->mh_lock);
 339 
 340         for (i = 0; i < mhp->mh_hashlen; i++) {
 341                 for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next)
 342                         fmd_module_lock(mp);
 343         }
 344 
 345         (void) pthread_mutex_lock(&xip->xi_lock);
 346         ASSERT(!(xip->xi_flags & FMD_XPRT_SUBSCRIBER));
 347         xip->xi_flags |= FMD_XPRT_SUBSCRIBER;
 348         (void) pthread_mutex_unlock(&xip->xi_lock);
 349 
 350         for (i = 0; i < mhp->mh_hashlen; i++) {
 351                 for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next) {
 352                         (void) fmd_conf_getprop(mp->mod_conf,
 353                             FMD_PROP_SUBSCRIPTIONS, &pap);
 354                         for (j = 0; j < pap->cpa_argc; j++)
 355                                 fmd_xprt_subscribe(xp, pap->cpa_argv[j]);
 356                 }
 357         }
 358 
 359         for (i = 0; i < mhp->mh_hashlen; i++) {
 360                 for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next)
 361                         fmd_module_unlock(mp);
 362         }
 363 
 364         (void) pthread_rwlock_unlock(&mhp->mh_lock);
 365 }
 366 
 367 static void
 368 fmd_xprt_transition(fmd_xprt_impl_t *xip,
 369     const fmd_xprt_rule_t *state, const char *tag)
 370 {
 371         fmd_xprt_t *xp = (fmd_xprt_t *)xip;
 372         fmd_event_t *e;
 373         nvlist_t *nvl;
 374         char *s;
 375 
 376         TRACE((FMD_DBG_XPRT, "xprt %u -> %s\n", xip->xi_id, tag));
 377 
 378         xip->xi_state = state;
 379         s = fmd_strdup(tag, FMD_SLEEP);
 380 
 381         (void) pthread_mutex_lock(&xip->xi_stats_lock);
 382         fmd_strfree(xip->xi_stats->xs_state.fmds_value.str);
 383         xip->xi_stats->xs_state.fmds_value.str = s;
 384         (void) pthread_mutex_unlock(&xip->xi_stats_lock);
 385 
 386         /*
 387          * If we've reached the SUB state, take out the big hammer and snapshot
 388          * all of the subscriptions of all of the loaded modules.  Then queue a
 389          * run event for our remote peer indicating that it can enter RUN.
 390          */
 391         if (state == _fmd_xprt_state_sub) {
 392                 fmd_xprt_subscribe_modhash(xip, fmd.d_mod_hash);
 393 
 394                 /*
 395                  * For read-write transports, we always want to set up remote
 396                  * subscriptions to the bultin list.* events, regardless of
 397                  * whether any agents have subscribed to them.
 398                  */
 399                 if (xip->xi_flags & FMD_XPRT_RDWR) {
 400                         fmd_xprt_subscribe(xp, FM_LIST_SUSPECT_CLASS);
 401                         fmd_xprt_subscribe(xp, FM_LIST_ISOLATED_CLASS);
 402                         fmd_xprt_subscribe(xp, FM_LIST_UPDATED_CLASS);
 403                         fmd_xprt_subscribe(xp, FM_LIST_RESOLVED_CLASS);
 404                         fmd_xprt_subscribe(xp, FM_LIST_REPAIRED_CLASS);
 405                 }
 406 
 407                 nvl = fmd_protocol_xprt_ctl(xip->xi_queue->eq_mod,
 408                     "resource.fm.xprt.run", xip->xi_version);
 409 
 410                 (void) nvlist_lookup_string(nvl, FM_CLASS, &s);
 411                 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
 412                 fmd_eventq_insert_at_time(xip->xi_queue, e);
 413         }
 414 }
 415 
 416 static void
 417 fmd_xprt_authupdate(fmd_xprt_impl_t *xip)
 418 {
 419         char *s = fmd_fmri_auth2str(xip->xi_auth);
 420 
 421         (void) pthread_mutex_lock(&xip->xi_stats_lock);
 422         fmd_strfree(xip->xi_stats->xs_authority.fmds_value.str);
 423         xip->xi_stats->xs_authority.fmds_value.str = s;
 424         (void) pthread_mutex_unlock(&xip->xi_stats_lock);
 425 }
 426 
 427 static int
 428 fmd_xprt_vmismatch(fmd_xprt_impl_t *xip, nvlist_t *nvl, uint_t *rversionp)
 429 {
 430         uint8_t rversion;
 431 
 432         if (nvlist_lookup_uint8(nvl, FM_VERSION, &rversion) != 0) {
 433                 (void) pthread_mutex_lock(&xip->xi_stats_lock);
 434                 xip->xi_stats->xs_discarded.fmds_value.ui64++;
 435                 (void) pthread_mutex_unlock(&xip->xi_stats_lock);
 436 
 437                 fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR");
 438                 return (1);
 439         }
 440 
 441         if (rversion > xip->xi_version) {
 442                 fmd_dprintf(FMD_DBG_XPRT, "xprt %u protocol mismatch: %u>%u\n",
 443                     xip->xi_id, rversion, xip->xi_version);
 444 
 445                 (void) pthread_mutex_lock(&xip->xi_stats_lock);
 446                 xip->xi_stats->xs_discarded.fmds_value.ui64++;
 447                 (void) pthread_mutex_unlock(&xip->xi_stats_lock);
 448 
 449                 fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR");
 450                 return (1);
 451         }
 452 
 453         if (rversionp != NULL)
 454                 *rversionp = rversion;
 455 
 456         return (0);
 457 }
 458 
 459 void
 460 fmd_xprt_event_syn(fmd_xprt_impl_t *xip, nvlist_t *nvl)
 461 {
 462         fmd_event_t *e;
 463         uint_t vers;
 464         char *class;
 465 
 466         if (fmd_xprt_vmismatch(xip, nvl, &vers))
 467                 return; /* transitioned to error state */
 468 
 469         /*
 470          * If the transport module didn't specify an authority, extract the
 471          * one that is passed along with the xprt.syn event and use that.
 472          */
 473         if (xip->xi_auth == NULL &&
 474             nvlist_lookup_nvlist(nvl, FM_RSRC_RESOURCE, &nvl) == 0 &&
 475             nvlist_lookup_nvlist(nvl, FM_FMRI_AUTHORITY, &nvl) == 0) {
 476                 (void) nvlist_xdup(nvl, &xip->xi_auth, &fmd.d_nva);
 477                 fmd_xprt_authupdate(xip);
 478         }
 479 
 480         nvl = fmd_protocol_xprt_ctl(xip->xi_queue->eq_mod,
 481             "resource.fm.xprt.ack", xip->xi_version);
 482 
 483         (void) nvlist_lookup_string(nvl, FM_CLASS, &class);
 484         e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class);
 485         fmd_eventq_insert_at_time(xip->xi_queue, e);
 486 
 487         xip->xi_version = MIN(FM_RSRC_XPRT_VERSION, vers);
 488         fmd_xprt_transition(xip, _fmd_xprt_state_sub, "SUB");
 489 }
 490 
 491 void
 492 fmd_xprt_event_ack(fmd_xprt_impl_t *xip, nvlist_t *nvl)
 493 {
 494         uint_t vers;
 495 
 496         if (fmd_xprt_vmismatch(xip, nvl, &vers))
 497                 return; /* transitioned to error state */
 498 
 499         /*
 500          * If the transport module didn't specify an authority, extract the
 501          * one that is passed along with the xprt.syn event and use that.
 502          */
 503         if (xip->xi_auth == NULL &&
 504             nvlist_lookup_nvlist(nvl, FM_RSRC_RESOURCE, &nvl) == 0 &&
 505             nvlist_lookup_nvlist(nvl, FM_FMRI_AUTHORITY, &nvl) == 0) {
 506                 (void) nvlist_xdup(nvl, &xip->xi_auth, &fmd.d_nva);
 507                 fmd_xprt_authupdate(xip);
 508         }
 509 
 510         xip->xi_version = MIN(FM_RSRC_XPRT_VERSION, vers);
 511         fmd_xprt_transition(xip, _fmd_xprt_state_sub, "SUB");
 512 }
 513 
 514 /*
 515  * Upon transition to RUN, we take every solved case and resend a list.suspect
 516  * event for it to our remote peer.  If a case transitions from solved to a
 517  * future state (CLOSE_WAIT, CLOSED, or REPAIRED) while we are iterating over
 518  * the case hash, we will get it as part of examining the resource cache, next.
 519  */
 520 static void
 521 fmd_xprt_send_case(fmd_case_t *cp, void *arg)
 522 {
 523         fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
 524         fmd_xprt_impl_t *xip = arg;
 525 
 526         fmd_event_t *e;
 527         nvlist_t *nvl;
 528         char *class;
 529 
 530         if (cip->ci_state != FMD_CASE_SOLVED)
 531                 return;
 532 
 533         nvl = fmd_case_mkevent(cp, FM_LIST_SUSPECT_CLASS);
 534         (void) nvlist_lookup_string(nvl, FM_CLASS, &class);
 535         e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class);
 536 
 537         fmd_dprintf(FMD_DBG_XPRT, "re-send %s for %s to transport %u\n",
 538             FM_LIST_SUSPECT_CLASS, cip->ci_uuid, xip->xi_id);
 539 
 540         fmd_dispq_dispatch_gid(fmd.d_disp, e, class, xip->xi_queue->eq_sgid);
 541 }
 542 
 543 /*
 544  * Similar to the above function, but for use with readonly transport. Puts
 545  * the event on the module's queue such that it's fmdo_recv function can pick
 546  * it up and send it if appropriate.
 547  */
 548 static void
 549 fmd_xprt_send_case_ro(fmd_case_t *cp, void *arg)
 550 {
 551         fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
 552         fmd_module_t *mp = arg;
 553 
 554         fmd_event_t *e;
 555         nvlist_t *nvl;
 556         char *class;
 557 
 558         if (cip->ci_state != FMD_CASE_SOLVED)
 559                 return;
 560 
 561         nvl = fmd_case_mkevent(cp, FM_LIST_SUSPECT_CLASS);
 562         (void) nvlist_lookup_string(nvl, FM_CLASS, &class);
 563         e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class);
 564 
 565         fmd_dprintf(FMD_DBG_XPRT, "re-send %s for %s to rdonly transport %s\n",
 566             FM_LIST_SUSPECT_CLASS, cip->ci_uuid, mp->mod_name);
 567 
 568         fmd_dispq_dispatch_gid(fmd.d_disp, e, class, mp->mod_queue->eq_sgid);
 569 }
 570 
 571 void
 572 fmd_xprt_event_run(fmd_xprt_impl_t *xip, nvlist_t *nvl)
 573 {
 574         if (!fmd_xprt_vmismatch(xip, nvl, NULL)) {
 575                 fmd_xprt_transition(xip, _fmd_xprt_state_run, "RUN");
 576                 fmd_case_hash_apply(fmd.d_cases, fmd_xprt_send_case, xip);
 577         }
 578 }
 579 
 580 void
 581 fmd_xprt_event_sub(fmd_xprt_impl_t *xip, nvlist_t *nvl)
 582 {
 583         char *class;
 584 
 585         if (fmd_xprt_vmismatch(xip, nvl, NULL))
 586                 return; /* transitioned to error state */
 587 
 588         if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0)
 589                 return; /* malformed protocol event */
 590 
 591         (void) pthread_mutex_lock(&xip->xi_lock);
 592         (void) fmd_xprt_class_hash_insert(xip, &xip->xi_lsub, class);
 593         (void) pthread_mutex_unlock(&xip->xi_lock);
 594 
 595         (void) pthread_mutex_lock(&xip->xi_stats_lock);
 596         xip->xi_stats->xs_subscriptions.fmds_value.ui64++;
 597         (void) pthread_mutex_unlock(&xip->xi_stats_lock);
 598 }
 599 
 600 void
 601 fmd_xprt_event_unsub(fmd_xprt_impl_t *xip, nvlist_t *nvl)
 602 {
 603         fmd_event_t *e;
 604         char *class;
 605 
 606         if (fmd_xprt_vmismatch(xip, nvl, NULL))
 607                 return; /* transitioned to error state */
 608 
 609         if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0)
 610                 return; /* malformed protocol event */
 611 
 612         (void) pthread_mutex_lock(&xip->xi_lock);
 613         (void) fmd_xprt_class_hash_delete(xip, &xip->xi_lsub, class);
 614         (void) pthread_mutex_unlock(&xip->xi_lock);
 615 
 616         (void) pthread_mutex_lock(&xip->xi_stats_lock);
 617         xip->xi_stats->xs_subscriptions.fmds_value.ui64--;
 618         (void) pthread_mutex_unlock(&xip->xi_stats_lock);
 619 
 620         nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod,
 621             "resource.fm.xprt.unsuback", xip->xi_version, class);
 622 
 623         (void) nvlist_lookup_string(nvl, FM_CLASS, &class);
 624         e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class);
 625         fmd_eventq_insert_at_time(xip->xi_queue, e);
 626 }
 627 
 628 void
 629 fmd_xprt_event_unsuback(fmd_xprt_impl_t *xip, nvlist_t *nvl)
 630 {
 631         char *class;
 632 
 633         if (fmd_xprt_vmismatch(xip, nvl, NULL))
 634                 return; /* transitioned to error state */
 635 
 636         if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0)
 637                 return; /* malformed protocol event */
 638 
 639         (void) pthread_mutex_lock(&xip->xi_lock);
 640         (void) fmd_xprt_class_hash_delete(xip, &xip->xi_usub, class);
 641         (void) pthread_mutex_unlock(&xip->xi_lock);
 642 }
 643 
 644 /*
 645  * on diagnosing side, receive a uuclose from the proxy.
 646  */
 647 void
 648 fmd_xprt_event_uuclose(fmd_xprt_impl_t *xip, nvlist_t *nvl)
 649 {
 650         fmd_case_t *cp;
 651         char *uuid;
 652 
 653         if (fmd_xprt_vmismatch(xip, nvl, NULL))
 654                 return; /* transitioned to error state */
 655 
 656         if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_UUID, &uuid) == 0 &&
 657             (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
 658                 /*
 659                  * update resource cache status and transition case
 660                  */
 661                 fmd_case_close_status(cp);
 662                 fmd_case_transition(cp, FMD_CASE_CLOSE_WAIT, FMD_CF_ISOLATED);
 663                 fmd_case_rele(cp);
 664         }
 665 }
 666 
 667 /*
 668  * on diagnosing side, receive a uuresolved from the proxy.
 669  */
 670 void
 671 fmd_xprt_event_uuresolved(fmd_xprt_impl_t *xip, nvlist_t *nvl)
 672 {
 673         fmd_case_t *cp;
 674         char *uuid;
 675 
 676         if (fmd_xprt_vmismatch(xip, nvl, NULL))
 677                 return; /* transitioned to error state */
 678 
 679         if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_UUID, &uuid) == 0 &&
 680             (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
 681                 fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
 682 
 683                 fmd_case_transition(cp, (cip->ci_state == FMD_CASE_REPAIRED) ?
 684                     FMD_CASE_RESOLVED : (cip->ci_state == FMD_CASE_CLOSED) ?
 685                     FMD_CASE_REPAIRED : FMD_CASE_CLOSE_WAIT, FMD_CF_RESOLVED);
 686                 fmd_case_rele(cp);
 687         }
 688 }
 689 
 690 /*
 691  * on diagnosing side, receive a repair/acquit from the proxy.
 692  */
 693 void
 694 fmd_xprt_event_updated(fmd_xprt_impl_t *xip, nvlist_t *nvl)
 695 {
 696         fmd_case_t *cp;
 697         char *uuid;
 698 
 699         if (fmd_xprt_vmismatch(xip, nvl, NULL))
 700                 return; /* transitioned to error state */
 701 
 702         if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_UUID, &uuid) == 0 &&
 703             (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
 704                 uint8_t *statusp, *proxy_asrup = NULL;
 705                 uint_t nelem = 0;
 706 
 707                 /*
 708                  * Only update status with new repairs if "no remote repair"
 709                  * is not set. Do the case_update anyway though (as this will
 710                  * refresh the status on the proxy side).
 711                  */
 712                 if (!(xip->xi_flags & FMD_XPRT_NO_REMOTE_REPAIR)) {
 713                         if (nvlist_lookup_uint8_array(nvl,
 714                             FM_RSRC_XPRT_FAULT_STATUS, &statusp, &nelem) == 0 &&
 715                             nelem != 0) {
 716                                 (void) nvlist_lookup_uint8_array(nvl,
 717                                     FM_RSRC_XPRT_FAULT_HAS_ASRU, &proxy_asrup,
 718                                     &nelem);
 719                                 fmd_case_update_status(cp, statusp,
 720                                     proxy_asrup, NULL);
 721                         }
 722                         fmd_case_update_containees(cp);
 723                 }
 724                 fmd_case_update(cp);
 725                 fmd_case_rele(cp);
 726         }
 727 }
 728 
 729 void
 730 fmd_xprt_event_error(fmd_xprt_impl_t *xip, nvlist_t *nvl)
 731 {
 732         char *class = "<unknown>";
 733 
 734         (void) pthread_mutex_lock(&xip->xi_stats_lock);
 735         xip->xi_stats->xs_discarded.fmds_value.ui64++;
 736         (void) pthread_mutex_unlock(&xip->xi_stats_lock);
 737 
 738         (void) nvlist_lookup_string(nvl, FM_CLASS, &class);
 739         TRACE((FMD_DBG_XPRT, "xprt %u bad event %s\n", xip->xi_id, class));
 740 
 741         fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR");
 742 }
 743 
 744 void
 745 fmd_xprt_event_drop(fmd_xprt_impl_t *xip, nvlist_t *nvl)
 746 {
 747         char *class = "<unknown>";
 748 
 749         (void) pthread_mutex_lock(&xip->xi_stats_lock);
 750         xip->xi_stats->xs_discarded.fmds_value.ui64++;
 751         (void) pthread_mutex_unlock(&xip->xi_stats_lock);
 752 
 753         (void) nvlist_lookup_string(nvl, FM_CLASS, &class);
 754         TRACE((FMD_DBG_XPRT, "xprt %u drop event %s\n", xip->xi_id, class));
 755 
 756 }
 757 
 758 fmd_xprt_t *
 759 fmd_xprt_create(fmd_module_t *mp, uint_t flags, nvlist_t *auth, void *data)
 760 {
 761         fmd_xprt_impl_t *xip = fmd_zalloc(sizeof (fmd_xprt_impl_t), FMD_SLEEP);
 762         fmd_stat_t *statv;
 763         uint_t i, statc;
 764 
 765         char buf[PATH_MAX];
 766         fmd_event_t *e;
 767         nvlist_t *nvl;
 768         char *s;
 769 
 770         (void) pthread_mutex_init(&xip->xi_lock, NULL);
 771         (void) pthread_cond_init(&xip->xi_cv, NULL);
 772         (void) pthread_mutex_init(&xip->xi_stats_lock, NULL);
 773 
 774         xip->xi_auth = auth;
 775         xip->xi_data = data;
 776         xip->xi_version = FM_RSRC_XPRT_VERSION;
 777         xip->xi_flags = flags;
 778 
 779         /*
 780          * Grab fmd.d_xprt_lock to block fmd_xprt_suspend_all() and then create
 781          * a transport ID and make it visible in fmd.d_xprt_ids.  If transports
 782          * were previously suspended, set the FMD_XPRT_DSUSPENDED flag on us to
 783          * ensure that this transport will not run until fmd_xprt_resume_all().
 784          */
 785         (void) pthread_mutex_lock(&fmd.d_xprt_lock);
 786         xip->xi_id = fmd_idspace_alloc(fmd.d_xprt_ids, xip);
 787 
 788         if (fmd.d_xprt_suspend != 0)
 789                 xip->xi_flags |= FMD_XPRT_DSUSPENDED;
 790 
 791         (void) pthread_mutex_unlock(&fmd.d_xprt_lock);
 792 
 793         /*
 794          * If the module has not yet finished _fmd_init(), set the ISUSPENDED
 795          * bit so that fmdo_send() is not called until _fmd_init() completes.
 796          */
 797         if (!(mp->mod_flags & FMD_MOD_INIT))
 798                 xip->xi_flags |= FMD_XPRT_ISUSPENDED;
 799 
 800         /*
 801          * Initialize the transport statistics that we keep on behalf of fmd.
 802          * These are set up using a template defined at the top of this file.
 803          * We rename each statistic with a prefix ensuring its uniqueness.
 804          */
 805         statc = sizeof (_fmd_xprt_stat_tmpl) / sizeof (fmd_stat_t);
 806         statv = fmd_alloc(sizeof (_fmd_xprt_stat_tmpl), FMD_SLEEP);
 807         bcopy(&_fmd_xprt_stat_tmpl, statv, sizeof (_fmd_xprt_stat_tmpl));
 808 
 809         for (i = 0; i < statc; i++) {
 810                 (void) snprintf(statv[i].fmds_name,
 811                     sizeof (statv[i].fmds_name), "fmd.xprt.%u.%s", xip->xi_id,
 812                     ((fmd_stat_t *)&_fmd_xprt_stat_tmpl + i)->fmds_name);
 813         }
 814 
 815         xip->xi_stats = (fmd_xprt_stat_t *)fmd_ustat_insert(
 816             mp->mod_ustat, FMD_USTAT_NOALLOC, statc, statv, NULL);
 817 
 818         if (xip->xi_stats == NULL)
 819                 fmd_panic("failed to create xi_stats (%p)\n", (void *)statv);
 820 
 821         xip->xi_stats->xs_module.fmds_value.str =
 822             fmd_strdup(mp->mod_name, FMD_SLEEP);
 823 
 824         if (xip->xi_auth != NULL)
 825                 fmd_xprt_authupdate(xip);
 826 
 827         /*
 828          * Create the outbound eventq for this transport and link to its stats.
 829          * If any suspend bits were set above, suspend the eventq immediately.
 830          */
 831         xip->xi_queue = fmd_eventq_create(mp, &xip->xi_stats->xs_evqstat,
 832             &xip->xi_stats_lock, mp->mod_stats->ms_xprtqlimit.fmds_value.ui32);
 833 
 834         if (xip->xi_flags & FMD_XPRT_SMASK)
 835                 fmd_eventq_suspend(xip->xi_queue);
 836 
 837         /*
 838          * Create our subscription hashes: local subscriptions go to xi_queue,
 839          * remote subscriptions are tracked only for protocol requests, and
 840          * pending unsubscriptions are associated with the /dev/null eventq.
 841          */
 842         fmd_xprt_class_hash_create(&xip->xi_lsub, xip->xi_queue);
 843         fmd_xprt_class_hash_create(&xip->xi_rsub, NULL);
 844         fmd_xprt_class_hash_create(&xip->xi_usub, fmd.d_rmod->mod_queue);
 845 
 846         /*
 847          * Determine our initial state based upon the creation flags.  If we're
 848          * read-only, go directly to RUN.  If we're accepting a new connection,
 849          * wait for a SYN.  Otherwise send a SYN and wait for an ACK.
 850          */
 851         if ((flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY) {
 852                 /*
 853                  * Send the list.suspects across here for readonly transports.
 854                  * For read-write transport they will be sent on transition to
 855                  * RUN state in fmd_xprt_event_run().
 856                  */
 857                 fmd_case_hash_apply(fmd.d_cases, fmd_xprt_send_case_ro, mp);
 858                 fmd_xprt_transition(xip, _fmd_xprt_state_run, "RUN");
 859         } else if (flags & FMD_XPRT_ACCEPT)
 860                 fmd_xprt_transition(xip, _fmd_xprt_state_syn, "SYN");
 861         else
 862                 fmd_xprt_transition(xip, _fmd_xprt_state_ack, "ACK");
 863 
 864         /*
 865          * If client.xprtlog is set to TRUE, create a debugging log for the
 866          * events received by the transport in var/fm/fmd/xprt/.
 867          */
 868         (void) fmd_conf_getprop(fmd.d_conf, "client.xprtlog", &i);
 869         (void) fmd_conf_getprop(fmd.d_conf, "log.xprt", &s);
 870 
 871         if (i) {
 872                 (void) snprintf(buf, sizeof (buf), "%s/%u.log", s, xip->xi_id);
 873                 xip->xi_log = fmd_log_open(fmd.d_rootdir, buf, FMD_LOG_XPRT);
 874         }
 875 
 876         ASSERT(fmd_module_locked(mp));
 877         fmd_list_append(&mp->mod_transports, xip);
 878 
 879         (void) pthread_mutex_lock(&mp->mod_stats_lock);
 880         mp->mod_stats->ms_xprtopen.fmds_value.ui32++;
 881         (void) pthread_mutex_unlock(&mp->mod_stats_lock);
 882 
 883         /*
 884          * If this is a read-only transport, return without creating a send
 885          * queue thread and setting up any connection events in our queue.
 886          */
 887         if ((flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
 888                 goto out;
 889 
 890         /*
 891          * Once the transport is fully initialized, create a send queue thread
 892          * and start any connect events flowing to complete our initialization.
 893          */
 894         if ((xip->xi_thread = fmd_thread_create(mp,
 895             (fmd_thread_f *)fmd_xprt_send, xip)) == NULL) {
 896 
 897                 fmd_error(EFMD_XPRT_THR,
 898                     "failed to create thread for transport %u", xip->xi_id);
 899 
 900                 fmd_xprt_destroy((fmd_xprt_t *)xip);
 901                 (void) fmd_set_errno(EFMD_XPRT_THR);
 902                 return (NULL);
 903         }
 904 
 905         /*
 906          * If the transport is not being opened to accept an inbound connect,
 907          * start an outbound connection by enqueuing a SYN event for our peer.
 908          */
 909         if (!(flags & FMD_XPRT_ACCEPT)) {
 910                 nvl = fmd_protocol_xprt_ctl(mp,
 911                     "resource.fm.xprt.syn", FM_RSRC_XPRT_VERSION);
 912 
 913                 (void) nvlist_lookup_string(nvl, FM_CLASS, &s);
 914                 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
 915                 fmd_eventq_insert_at_time(xip->xi_queue, e);
 916         }
 917 out:
 918         fmd_dprintf(FMD_DBG_XPRT, "opened transport %u\n", xip->xi_id);
 919         return ((fmd_xprt_t *)xip);
 920 }
 921 
 922 void
 923 fmd_xprt_destroy(fmd_xprt_t *xp)
 924 {
 925         fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
 926         fmd_module_t *mp = xip->xi_queue->eq_mod;
 927         uint_t id = xip->xi_id;
 928 
 929         fmd_case_impl_t *cip, *nip;
 930         fmd_stat_t *sp;
 931         uint_t i, n;
 932 
 933         ASSERT(fmd_module_locked(mp));
 934         fmd_list_delete(&mp->mod_transports, xip);
 935 
 936         (void) pthread_mutex_lock(&mp->mod_stats_lock);
 937         mp->mod_stats->ms_xprtopen.fmds_value.ui32--;
 938         (void) pthread_mutex_unlock(&mp->mod_stats_lock);
 939 
 940         (void) pthread_mutex_lock(&xip->xi_lock);
 941 
 942         while (xip->xi_busy != 0)
 943                 (void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock);
 944 
 945         /*
 946          * Remove the transport from global visibility, cancel its send-side
 947          * thread, join with it, and then remove the transport from module
 948          * visibility.  Once all this is done, destroy and free the transport.
 949          */
 950         (void) fmd_idspace_free(fmd.d_xprt_ids, xip->xi_id);
 951 
 952         if (xip->xi_thread != NULL) {
 953                 fmd_eventq_abort(xip->xi_queue);
 954                 fmd_module_unlock(mp);
 955                 fmd_thread_destroy(xip->xi_thread, FMD_THREAD_JOIN);
 956                 fmd_module_lock(mp);
 957         }
 958 
 959         if (xip->xi_log != NULL)
 960                 fmd_log_rele(xip->xi_log);
 961 
 962         /*
 963          * Release every case handle in the module that was cached by this
 964          * transport.  This will result in these cases disappearing from the
 965          * local case hash so that fmd_case_uuclose() and fmd_case_repaired()
 966          * etc can no longer be used.
 967          */
 968         for (cip = fmd_list_next(&mp->mod_cases); cip != NULL; cip = nip) {
 969                 nip = fmd_list_next(cip);
 970                 if (cip->ci_xprt == xp)
 971                         fmd_case_discard((fmd_case_t *)cip, B_TRUE);
 972         }
 973 
 974         /*
 975          * Destroy every class in the various subscription hashes and remove
 976          * any corresponding subscriptions from the event dispatch queue.
 977          */
 978         fmd_xprt_class_hash_destroy(&xip->xi_lsub);
 979         fmd_xprt_class_hash_destroy(&xip->xi_rsub);
 980         fmd_xprt_class_hash_destroy(&xip->xi_usub);
 981 
 982         /*
 983          * Uniquify the stat names exactly as was done in fmd_xprt_create()
 984          * before calling fmd_ustat_insert(), otherwise fmd_ustat_delete()
 985          * won't find the entries in the hash table.
 986          */
 987         n = sizeof (_fmd_xprt_stat_tmpl) / sizeof (fmd_stat_t);
 988         sp = fmd_alloc(sizeof (_fmd_xprt_stat_tmpl), FMD_SLEEP);
 989         bcopy(&_fmd_xprt_stat_tmpl, sp, sizeof (_fmd_xprt_stat_tmpl));
 990         for (i = 0; i < n; i++) {
 991                 (void) snprintf(sp[i].fmds_name,
 992                     sizeof (sp[i].fmds_name), "fmd.xprt.%u.%s", xip->xi_id,
 993                     ((fmd_stat_t *)&_fmd_xprt_stat_tmpl + i)->fmds_name);
 994         }
 995         fmd_ustat_delete(mp->mod_ustat, n, sp);
 996         fmd_free(sp, sizeof (_fmd_xprt_stat_tmpl));
 997 
 998         fmd_free(xip->xi_stats, sizeof (fmd_xprt_stat_t));
 999         fmd_eventq_destroy(xip->xi_queue);
1000         nvlist_free(xip->xi_auth);
1001         fmd_free(xip, sizeof (fmd_xprt_impl_t));
1002 
1003         fmd_dprintf(FMD_DBG_XPRT, "closed transport %u\n", id);
1004 }
1005 
1006 void
1007 fmd_xprt_xsuspend(fmd_xprt_t *xp, uint_t flags)
1008 {
1009         fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1010         uint_t oflags;
1011 
1012         ASSERT((flags & ~FMD_XPRT_SMASK) == 0);
1013         (void) pthread_mutex_lock(&xip->xi_lock);
1014 
1015         oflags = xip->xi_flags;
1016         xip->xi_flags |= flags;
1017 
1018         if (!(oflags & FMD_XPRT_SMASK) && (xip->xi_flags & FMD_XPRT_SMASK) != 0)
1019                 fmd_eventq_suspend(xip->xi_queue);
1020 
1021         (void) pthread_cond_broadcast(&xip->xi_cv);
1022 
1023         while (xip->xi_busy != 0)
1024                 (void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock);
1025 
1026         (void) pthread_mutex_unlock(&xip->xi_lock);
1027 }
1028 
1029 void
1030 fmd_xprt_xresume(fmd_xprt_t *xp, uint_t flags)
1031 {
1032         fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1033         uint_t oflags;
1034 
1035         ASSERT((flags & ~FMD_XPRT_SMASK) == 0);
1036         (void) pthread_mutex_lock(&xip->xi_lock);
1037 
1038         oflags = xip->xi_flags;
1039         xip->xi_flags &= ~flags;
1040 
1041         if ((oflags & FMD_XPRT_SMASK) != 0 && !(xip->xi_flags & FMD_XPRT_SMASK))
1042                 fmd_eventq_resume(xip->xi_queue);
1043 
1044         (void) pthread_cond_broadcast(&xip->xi_cv);
1045         (void) pthread_mutex_unlock(&xip->xi_lock);
1046 }
1047 
1048 void
1049 fmd_xprt_send(fmd_xprt_t *xp)
1050 {
1051         fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1052         fmd_module_t *mp = xip->xi_queue->eq_mod;
1053         fmd_event_t *ep;
1054         int err;
1055 
1056         while ((ep = fmd_eventq_delete(xip->xi_queue)) != NULL) {
1057                 if (FMD_EVENT_TTL(ep) == 0) {
1058                         fmd_event_rele(ep);
1059                         continue;
1060                 }
1061 
1062                 fmd_dprintf(FMD_DBG_XPRT, "xprt %u sending %s\n",
1063                     xip->xi_id, (char *)FMD_EVENT_DATA(ep));
1064 
1065                 err = mp->mod_ops->mop_transport(mp, xp, ep);
1066                 fmd_eventq_done(xip->xi_queue);
1067 
1068                 if (err == FMD_SEND_RETRY) {
1069                         fmd_eventq_insert_at_time(xip->xi_queue, ep);
1070                         (void) pthread_mutex_lock(&xip->xi_stats_lock);
1071                         xip->xi_stats->xs_retried.fmds_value.ui64++;
1072                         (void) pthread_mutex_unlock(&xip->xi_stats_lock);
1073                 }
1074 
1075                 if (err != FMD_SEND_SUCCESS && err != FMD_SEND_RETRY) {
1076                         (void) pthread_mutex_lock(&xip->xi_stats_lock);
1077                         xip->xi_stats->xs_lost.fmds_value.ui64++;
1078                         (void) pthread_mutex_unlock(&xip->xi_stats_lock);
1079                 }
1080 
1081                 fmd_event_rele(ep);
1082         }
1083 }
1084 
1085 /*
1086  * This function creates a local suspect list. This is used when a suspect list
1087  * is created directly by an external source like fminject.
1088  */
1089 static void
1090 fmd_xprt_list_suspect_local(fmd_xprt_t *xp, nvlist_t *nvl)
1091 {
1092         nvlist_t **nvlp;
1093         nvlist_t *de_fmri, *de_fmri_dup = NULL;
1094         int64_t *diag_time;
1095         char *code = NULL;
1096         fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1097         fmd_case_t *cp;
1098         uint_t nelem = 0, nelem2 = 0, i;
1099         boolean_t injected;
1100 
1101         fmd_module_lock(xip->xi_queue->eq_mod);
1102         cp = fmd_case_create(xip->xi_queue->eq_mod, NULL, NULL);
1103         if (cp == NULL) {
1104                 fmd_module_unlock(xip->xi_queue->eq_mod);
1105                 return;
1106         }
1107 
1108         /*
1109          * copy diag_code if present
1110          */
1111         (void) nvlist_lookup_string(nvl, FM_SUSPECT_DIAG_CODE, &code);
1112         if (code != NULL) {
1113                 fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
1114 
1115                 cip->ci_precanned = 1;
1116                 fmd_case_setcode(cp, code);
1117         }
1118 
1119         /*
1120          * copy suspects
1121          */
1122         (void) nvlist_lookup_nvlist_array(nvl, FM_SUSPECT_FAULT_LIST, &nvlp,
1123             &nelem);
1124         for (i = 0; i < nelem; i++) {
1125                 nvlist_t *flt_copy, *asru = NULL, *fru = NULL, *rsrc = NULL;
1126                 topo_hdl_t *thp;
1127                 char *loc = NULL;
1128                 int err;
1129 
1130                 thp = fmd_fmri_topo_hold(TOPO_VERSION);
1131                 (void) nvlist_xdup(nvlp[i], &flt_copy, &fmd.d_nva);
1132                 (void) nvlist_lookup_nvlist(nvlp[i], FM_FAULT_RESOURCE, &rsrc);
1133 
1134                 /*
1135                  * If no fru specified, get it from topo
1136                  */
1137                 if (nvlist_lookup_nvlist(nvlp[i], FM_FAULT_FRU, &fru) != 0 &&
1138                     rsrc && topo_fmri_fru(thp, rsrc, &fru, &err) == 0)
1139                         (void) nvlist_add_nvlist(flt_copy, FM_FAULT_FRU, fru);
1140                 /*
1141                  * If no asru specified, get it from topo
1142                  */
1143                 if (nvlist_lookup_nvlist(nvlp[i], FM_FAULT_ASRU, &asru) != 0 &&
1144                     rsrc && topo_fmri_asru(thp, rsrc, &asru, &err) == 0)
1145                         (void) nvlist_add_nvlist(flt_copy, FM_FAULT_ASRU, asru);
1146                 /*
1147                  * If no location specified, get it from topo
1148                  */
1149                 if (nvlist_lookup_string(nvlp[i], FM_FAULT_LOCATION,
1150                     &loc) != 0) {
1151                         if (fru && topo_fmri_label(thp, fru, &loc, &err) == 0)
1152                                 (void) nvlist_add_string(flt_copy,
1153                                     FM_FAULT_LOCATION, loc);
1154                         else if (rsrc && topo_fmri_label(thp, rsrc, &loc,
1155                             &err) == 0)
1156                                 (void) nvlist_add_string(flt_copy,
1157                                     FM_FAULT_LOCATION, loc);
1158                         if (loc)
1159                                 topo_hdl_strfree(thp, loc);
1160                 }
1161                 if (fru)
1162                         nvlist_free(fru);
1163                 if (asru)
1164                         nvlist_free(asru);
1165                 if (rsrc)
1166                         nvlist_free(rsrc);
1167                 fmd_fmri_topo_rele(thp);
1168                 fmd_case_insert_suspect(cp, flt_copy);
1169         }
1170 
1171         /*
1172          * copy diag_time if present
1173          */
1174         if (nvlist_lookup_int64_array(nvl, FM_SUSPECT_DIAG_TIME, &diag_time,
1175             &nelem2) == 0 && nelem2 >= 2)
1176                 fmd_case_settime(cp, diag_time[0], diag_time[1]);
1177 
1178         /*
1179          * copy DE fmri if present
1180          */
1181         if (nvlist_lookup_nvlist(nvl, FM_SUSPECT_DE, &de_fmri) == 0) {
1182                 (void) nvlist_xdup(de_fmri, &de_fmri_dup, &fmd.d_nva);
1183                 fmd_case_set_de_fmri(cp, de_fmri_dup);
1184         }
1185 
1186         /*
1187          * copy injected if present
1188          */
1189         if (nvlist_lookup_boolean_value(nvl, FM_SUSPECT_INJECTED,
1190             &injected) == 0 && injected)
1191                 fmd_case_set_injected(cp);
1192 
1193         fmd_case_transition(cp, FMD_CASE_SOLVED, FMD_CF_SOLVED);
1194         fmd_module_unlock(xip->xi_queue->eq_mod);
1195 }
1196 
1197 /*
1198  * This function is called to create a proxy case on receipt of a list.suspect
1199  * from the diagnosing side of the transport.
1200  */
1201 static void
1202 fmd_xprt_list_suspect(fmd_xprt_t *xp, nvlist_t *nvl)
1203 {
1204         fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1205         nvlist_t **nvlp;
1206         uint_t nelem = 0, nelem2 = 0, i;
1207         int64_t *diag_time;
1208         topo_hdl_t *thp;
1209         char *class;
1210         nvlist_t *rsrc, *asru, *de_fmri, *de_fmri_dup = NULL;
1211         nvlist_t *flt_copy;
1212         int err;
1213         nvlist_t **asrua;
1214         uint8_t *proxy_asru = NULL;
1215         int got_proxy_asru = 0;
1216         int got_hc_rsrc = 0;
1217         int got_hc_asru = 0;
1218         int got_present_rsrc = 0;
1219         uint8_t *diag_asru = NULL;
1220         char *scheme;
1221         uint8_t *statusp;
1222         char *uuid, *code;
1223         fmd_case_t *cp;
1224         fmd_case_impl_t *cip;
1225         int need_update = 0;
1226         boolean_t injected;
1227 
1228         if (nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) != 0)
1229                 return;
1230         if (nvlist_lookup_string(nvl, FM_SUSPECT_DIAG_CODE, &code) != 0)
1231                 return;
1232         (void) nvlist_lookup_nvlist_array(nvl, FM_SUSPECT_FAULT_LIST, &nvlp,
1233             &nelem);
1234 
1235         /*
1236          * In order to implement FMD_XPRT_HCONLY and FMD_XPRT_HC_PRESENT_ONLY
1237          * etc we first scan the suspects to see if
1238          * - there was an asru in the received fault
1239          * - there was an hc-scheme resource in the received fault
1240          * - any hc-scheme resource in the received fault is present in the
1241          *   local topology
1242          * - any hc-scheme resource in the received fault has an asru in the
1243          *   local topology
1244          */
1245         if (nelem > 0) {
1246                 asrua = fmd_zalloc(sizeof (nvlist_t *) * nelem, FMD_SLEEP);
1247                 proxy_asru = fmd_zalloc(sizeof (uint8_t) * nelem, FMD_SLEEP);
1248                 diag_asru = fmd_zalloc(sizeof (uint8_t) * nelem, FMD_SLEEP);
1249                 thp = fmd_fmri_topo_hold(TOPO_VERSION);
1250                 for (i = 0; i < nelem; i++) {
1251                         if (nvlist_lookup_nvlist(nvlp[i], FM_FAULT_ASRU,
1252                             &asru) == 0 && asru != NULL)
1253                                 diag_asru[i] = 1;
1254                         if (nvlist_lookup_string(nvlp[i], FM_CLASS,
1255                             &class) != 0 || strncmp(class, "fault", 5) != 0)
1256                                 continue;
1257                         /*
1258                          * If there is an hc-scheme asru, use that to find the
1259                          * real asru. Otherwise if there is an hc-scheme
1260                          * resource, work out the old asru from that.
1261                          * This order is to allow a two stage evaluation
1262                          * of the asru where a fault in the diagnosing side
1263                          * is in a component not visible to the proxy side,
1264                          * but prevents a component that is visible from
1265                          * working. So the diagnosing side sets the asru to
1266                          * the latter component (in hc-scheme as the diagnosing
1267                          * side doesn't know about the proxy side's virtual
1268                          * schemes), and then the proxy side can convert that
1269                          * to a suitable virtual scheme asru.
1270                          */
1271                         if (nvlist_lookup_nvlist(nvlp[i], FM_FAULT_ASRU,
1272                             &asru) == 0 && asru != NULL &&
1273                             nvlist_lookup_string(asru, FM_FMRI_SCHEME,
1274                             &scheme) == 0 &&
1275                             strcmp(scheme, FM_FMRI_SCHEME_HC) == 0) {
1276                                 got_hc_asru = 1;
1277                                 if (xip->xi_flags & FMD_XPRT_EXTERNAL)
1278                                         continue;
1279                                 if (topo_fmri_present(thp, asru, &err) != 0)
1280                                         got_present_rsrc = 1;
1281                                 if (topo_fmri_asru(thp, asru, &asrua[i],
1282                                     &err) == 0) {
1283                                         proxy_asru[i] =
1284                                             FMD_PROXY_ASRU_FROM_ASRU;
1285                                         got_proxy_asru = 1;
1286                                 }
1287                         } else if (nvlist_lookup_nvlist(nvlp[i],
1288                             FM_FAULT_RESOURCE, &rsrc) == 0 && rsrc != NULL &&
1289                             nvlist_lookup_string(rsrc, FM_FMRI_SCHEME,
1290                             &scheme) == 0 &&
1291                             strcmp(scheme, FM_FMRI_SCHEME_HC) == 0) {
1292                                 got_hc_rsrc = 1;
1293                                 if (xip->xi_flags & FMD_XPRT_EXTERNAL)
1294                                         continue;
1295                                 if (topo_fmri_present(thp, rsrc, &err) != 0)
1296                                         got_present_rsrc = 1;
1297                                 if (topo_fmri_asru(thp, rsrc, &asrua[i],
1298                                     &err) == 0) {
1299                                         proxy_asru[i] =
1300                                             FMD_PROXY_ASRU_FROM_RSRC;
1301                                         got_proxy_asru = 1;
1302                                 }
1303                         }
1304                 }
1305                 fmd_fmri_topo_rele(thp);
1306         }
1307 
1308         /*
1309          * If we're set up only to report hc-scheme faults, and
1310          * there aren't any, then just drop the event.
1311          */
1312         if (got_hc_rsrc == 0 && got_hc_asru == 0 &&
1313             (xip->xi_flags & FMD_XPRT_HCONLY)) {
1314                 if (nelem > 0) {
1315                         fmd_free(proxy_asru, sizeof (uint8_t) * nelem);
1316                         fmd_free(diag_asru, sizeof (uint8_t) * nelem);
1317                         fmd_free(asrua, sizeof (nvlist_t *) * nelem);
1318                 }
1319                 return;
1320         }
1321 
1322         /*
1323          * If we're set up only to report locally present hc-scheme
1324          * faults, and there aren't any, then just drop the event.
1325          */
1326         if (got_present_rsrc == 0 &&
1327             (xip->xi_flags & FMD_XPRT_HC_PRESENT_ONLY)) {
1328                 if (nelem > 0) {
1329                         for (i = 0; i < nelem; i++)
1330                                 if (asrua[i])
1331                                         nvlist_free(asrua[i]);
1332                         fmd_free(proxy_asru, sizeof (uint8_t) * nelem);
1333                         fmd_free(diag_asru, sizeof (uint8_t) * nelem);
1334                         fmd_free(asrua, sizeof (nvlist_t *) * nelem);
1335                 }
1336                 return;
1337         }
1338 
1339         /*
1340          * If fmd_case_recreate() returns NULL, UUID is already known.
1341          */
1342         fmd_module_lock(xip->xi_queue->eq_mod);
1343         if ((cp = fmd_case_recreate(xip->xi_queue->eq_mod, xp,
1344             FMD_CASE_UNSOLVED, uuid, code)) == NULL) {
1345                 if (nelem > 0) {
1346                         for (i = 0; i < nelem; i++)
1347                                 if (asrua[i])
1348                                         nvlist_free(asrua[i]);
1349                         fmd_free(proxy_asru, sizeof (uint8_t) * nelem);
1350                         fmd_free(diag_asru, sizeof (uint8_t) * nelem);
1351                         fmd_free(asrua, sizeof (nvlist_t *) * nelem);
1352                 }
1353                 fmd_module_unlock(xip->xi_queue->eq_mod);
1354                 return;
1355         }
1356 
1357         cip = (fmd_case_impl_t *)cp;
1358         cip->ci_diag_asru = diag_asru;
1359         cip->ci_proxy_asru = proxy_asru;
1360         for (i = 0; i < nelem; i++) {
1361                 (void) nvlist_xdup(nvlp[i], &flt_copy, &fmd.d_nva);
1362                 if (proxy_asru[i] != FMD_PROXY_ASRU_NOT_NEEDED) {
1363                         /*
1364                          * Copy suspects, but remove/replace asru first. Also if
1365                          * the original asru was hc-scheme use that as resource.
1366                          */
1367                         if (proxy_asru[i] == FMD_PROXY_ASRU_FROM_ASRU) {
1368                                 (void) nvlist_remove(flt_copy,
1369                                     FM_FAULT_RESOURCE, DATA_TYPE_NVLIST);
1370                                 (void) nvlist_lookup_nvlist(flt_copy,
1371                                     FM_FAULT_ASRU, &asru);
1372                                 (void) nvlist_add_nvlist(flt_copy,
1373                                     FM_FAULT_RESOURCE, asru);
1374                         }
1375                         (void) nvlist_remove(flt_copy, FM_FAULT_ASRU,
1376                             DATA_TYPE_NVLIST);
1377                         (void) nvlist_add_nvlist(flt_copy, FM_FAULT_ASRU,
1378                             asrua[i]);
1379                         nvlist_free(asrua[i]);
1380                 } else if (got_hc_asru == 0 &&
1381                     nvlist_lookup_nvlist(flt_copy, FM_FAULT_ASRU,
1382                     &asru) == 0 && asru != NULL) {
1383                         /*
1384                          * If we have an asru from diag side, but it's not
1385                          * in hc scheme, then we can't be sure what it
1386                          * represents, so mark as no retire.
1387                          */
1388                         (void) nvlist_add_boolean_value(flt_copy,
1389                             FM_SUSPECT_RETIRE, B_FALSE);
1390                 }
1391                 fmd_case_insert_suspect(cp, flt_copy);
1392         }
1393         /*
1394          * copy diag_time
1395          */
1396         if (nvlist_lookup_int64_array(nvl, FM_SUSPECT_DIAG_TIME, &diag_time,
1397             &nelem2) == 0 && nelem2 >= 2)
1398                 fmd_case_settime(cp, diag_time[0], diag_time[1]);
1399         /*
1400          * copy DE fmri
1401          */
1402         if (nvlist_lookup_nvlist(nvl, FM_SUSPECT_DE, &de_fmri) == 0) {
1403                 (void) nvlist_xdup(de_fmri, &de_fmri_dup, &fmd.d_nva);
1404                 fmd_case_set_de_fmri(cp, de_fmri_dup);
1405         }
1406 
1407         /*
1408          * copy injected if present
1409          */
1410         if (nvlist_lookup_boolean_value(nvl, FM_SUSPECT_INJECTED,
1411             &injected) == 0 && injected)
1412                 fmd_case_set_injected(cp);
1413 
1414         /*
1415          * Transition to solved. This will log the suspect list and create
1416          * the resource cache entries.
1417          */
1418         fmd_case_transition(cp, FMD_CASE_SOLVED, FMD_CF_SOLVED);
1419 
1420         /*
1421          * Update status if it is not simply "all faulty" (can happen if
1422          * list.suspects are being re-sent when the transport has reconnected).
1423          */
1424         (void) nvlist_lookup_uint8_array(nvl, FM_SUSPECT_FAULT_STATUS, &statusp,
1425             &nelem);
1426         for (i = 0; i < nelem; i++) {
1427                 if ((statusp[i] & (FM_SUSPECT_FAULTY | FM_SUSPECT_UNUSABLE |
1428                     FM_SUSPECT_NOT_PRESENT | FM_SUSPECT_DEGRADED)) !=
1429                     FM_SUSPECT_FAULTY)
1430                         need_update = 1;
1431         }
1432         if (need_update) {
1433                 fmd_case_update_status(cp, statusp, cip->ci_proxy_asru,
1434                     cip->ci_diag_asru);
1435                 fmd_case_update_containees(cp);
1436                 fmd_case_update(cp);
1437         }
1438 
1439         /*
1440          * if asru on proxy side, send an update back to the diagnosing side to
1441          * update UNUSABLE/DEGRADED.
1442          */
1443         if (got_proxy_asru)
1444                 fmd_case_xprt_updated(cp);
1445 
1446         if (nelem > 0)
1447                 fmd_free(asrua, sizeof (nvlist_t *) * nelem);
1448         fmd_module_unlock(xip->xi_queue->eq_mod);
1449 }
1450 
1451 void
1452 fmd_xprt_recv(fmd_xprt_t *xp, nvlist_t *nvl, hrtime_t hrt, boolean_t logonly)
1453 {
1454         fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1455         const fmd_xprt_rule_t *xrp;
1456         fmd_t *dp = &fmd;
1457 
1458         fmd_event_t *e;
1459         char *class, *uuid;
1460         boolean_t isproto, isereport, isireport, ishvireport, issysevent;
1461 
1462         uint64_t *tod;
1463         uint8_t ttl;
1464         uint_t n;
1465         fmd_case_t *cp;
1466 
1467         /*
1468          * Grab the transport lock and set the busy flag to indicate we are
1469          * busy receiving an event.  If [DI]SUSPEND is pending, wait until fmd
1470          * resumes the transport before continuing on with the receive.
1471          */
1472         (void) pthread_mutex_lock(&xip->xi_lock);
1473 
1474         while (xip->xi_flags & (FMD_XPRT_DSUSPENDED | FMD_XPRT_ISUSPENDED)) {
1475 
1476                 if (fmd.d_signal != 0) {
1477                         (void) pthread_mutex_unlock(&xip->xi_lock);
1478                         return; /* fmd_destroy() is in progress */
1479                 }
1480 
1481                 (void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock);
1482         }
1483 
1484         xip->xi_busy++;
1485         ASSERT(xip->xi_busy != 0);
1486 
1487         (void) pthread_mutex_unlock(&xip->xi_lock);
1488 
1489         (void) pthread_mutex_lock(&xip->xi_stats_lock);
1490         xip->xi_stats->xs_received.fmds_value.ui64++;
1491         (void) pthread_mutex_unlock(&xip->xi_stats_lock);
1492 
1493         if (nvlist_lookup_string(nvl, FM_CLASS, &class) != 0) {
1494                 fmd_error(EFMD_XPRT_PAYLOAD, "discarding nvlist %p: missing "
1495                     "required \"%s\" payload element", (void *)nvl, FM_CLASS);
1496 
1497                 (void) pthread_mutex_lock(&xip->xi_stats_lock);
1498                 xip->xi_stats->xs_discarded.fmds_value.ui64++;
1499                 (void) pthread_mutex_unlock(&xip->xi_stats_lock);
1500 
1501                 nvlist_free(nvl);
1502                 goto done;
1503         }
1504 
1505         fmd_dprintf(FMD_DBG_XPRT, "xprt %u %s %s\n", xip->xi_id,
1506             ((logonly == FMD_B_TRUE) ? "logging" : "posting"), class);
1507 
1508         isereport = (strncmp(class, FM_EREPORT_CLASS ".",
1509             sizeof (FM_EREPORT_CLASS)) == 0) ? FMD_B_TRUE : FMD_B_FALSE;
1510 
1511         isireport = (strncmp(class, FM_IREPORT_CLASS ".",
1512             sizeof (FM_IREPORT_CLASS)) == 0) ?  FMD_B_TRUE : FMD_B_FALSE;
1513 
1514         issysevent = (strncmp(class, SYSEVENT_RSRC_CLASS,
1515             sizeof (SYSEVENT_RSRC_CLASS) - 1)) == 0 ? FMD_B_TRUE : FMD_B_FALSE;
1516 
1517         if (isireport) {
1518                 char *pri;
1519 
1520                 if (nvlist_lookup_string(nvl, FM_IREPORT_PRIORITY, &pri) == 0 &&
1521                     strncmp(pri, "high", 5) == 0) {
1522                         ishvireport = 1;
1523                 } else {
1524                         ishvireport = 0;
1525                 }
1526         }
1527 
1528         /*
1529          * The logonly flag should only be set for ereports.
1530          */
1531         if (logonly == FMD_B_TRUE && isereport == FMD_B_FALSE) {
1532                 fmd_error(EFMD_XPRT_INVAL, "discarding nvlist %p: "
1533                     "logonly flag is not valid for class %s",
1534                     (void *)nvl, class);
1535 
1536                 (void) pthread_mutex_lock(&xip->xi_stats_lock);
1537                 xip->xi_stats->xs_discarded.fmds_value.ui64++;
1538                 (void) pthread_mutex_unlock(&xip->xi_stats_lock);
1539 
1540                 nvlist_free(nvl);
1541                 goto done;
1542         }
1543 
1544         /*
1545          * If a time-to-live value is present in the event and is zero, drop
1546          * the event and bump xs_timeouts.  Otherwise decrement the TTL value.
1547          */
1548         if (nvlist_lookup_uint8(nvl, FMD_EVN_TTL, &ttl) == 0) {
1549                 if (ttl == 0) {
1550                         fmd_dprintf(FMD_DBG_XPRT, "xprt %u nvlist %p (%s) "
1551                             "timeout: event received with ttl=0\n",
1552                             xip->xi_id, (void *)nvl, class);
1553 
1554                         (void) pthread_mutex_lock(&xip->xi_stats_lock);
1555                         xip->xi_stats->xs_timeouts.fmds_value.ui64++;
1556                         (void) pthread_mutex_unlock(&xip->xi_stats_lock);
1557 
1558                         nvlist_free(nvl);
1559                         goto done;
1560                 }
1561                 (void) nvlist_remove(nvl, FMD_EVN_TTL, DATA_TYPE_UINT8);
1562                 (void) nvlist_add_uint8(nvl, FMD_EVN_TTL, ttl - 1);
1563         }
1564 
1565         /*
1566          * If we are using the native system clock, the underlying transport
1567          * code can provide a tighter event time bound by telling us when the
1568          * event was enqueued.  If we're using simulated clocks, this time
1569          * has no meaning to us, so just reset the value to use HRT_NOW.
1570          */
1571         if (dp->d_clockops != &fmd_timeops_native)
1572                 hrt = FMD_HRT_NOW;
1573 
1574         /*
1575          * If an event's class is in the FMD_CTL_CLASS family, then create a
1576          * control event.  If a FMD_EVN_TOD member is found, create a protocol
1577          * event using this time.  Otherwise create a protocol event using hrt.
1578          */
1579         isproto = (strncmp(class, FMD_CTL_CLASS, FMD_CTL_CLASS_LEN) == 0) ?
1580             FMD_B_FALSE : FMD_B_TRUE;
1581         if (isproto == FMD_B_FALSE)
1582                 e = fmd_event_create(FMD_EVT_CTL, hrt, nvl, fmd_ctl_init(nvl));
1583         else if (nvlist_lookup_uint64_array(nvl, FMD_EVN_TOD, &tod, &n) != 0)
1584                 e = fmd_event_create(FMD_EVT_PROTOCOL, hrt, nvl, class);
1585         else {
1586                 e = fmd_event_recreate(FMD_EVT_PROTOCOL,
1587                     NULL, nvl, class, NULL, 0, 0);
1588         }
1589 
1590         /*
1591          * If the debug log is enabled, create a temporary event, log it to the
1592          * debug log, and then reset the underlying state of the event.
1593          */
1594         if (xip->xi_log != NULL) {
1595                 fmd_event_impl_t *ep = (fmd_event_impl_t *)e;
1596 
1597                 fmd_log_append(xip->xi_log, e, NULL);
1598 
1599                 ep->ev_flags |= FMD_EVF_VOLATILE;
1600                 ep->ev_off = 0;
1601                 ep->ev_len = 0;
1602 
1603                 if (ep->ev_log != NULL) {
1604                         fmd_log_rele(ep->ev_log);
1605                         ep->ev_log = NULL;
1606                 }
1607         }
1608 
1609         /*
1610          * Iterate over the rules for the current state trying to match the
1611          * event class to one of our special rules.  If a rule is matched, the
1612          * event is consumed and not dispatched to other modules.  If the rule
1613          * set ends without matching an event, we fall through to dispatching.
1614          */
1615         for (xrp = xip->xi_state; xrp->xr_class != NULL; xrp++) {
1616                 if (fmd_event_match(e, FMD_EVT_PROTOCOL, xrp->xr_class)) {
1617                         fmd_event_hold(e);
1618                         xrp->xr_func(xip, nvl);
1619                         fmd_event_rele(e);
1620                         goto done;
1621                 }
1622         }
1623 
1624         /*
1625          * Record ereports and ireports in the log.  This code will
1626          * be replaced later with a per-transport intent log instead.
1627          */
1628         if (isereport == FMD_B_TRUE || isireport == FMD_B_TRUE ||
1629             issysevent == B_TRUE) {
1630                 pthread_rwlock_t *lockp;
1631                 fmd_log_t *lp;
1632 
1633                 if (isereport == FMD_B_TRUE) {
1634                         lp = fmd.d_errlog;
1635                         lockp = &fmd.d_log_lock;
1636                 } else {
1637                         if (ishvireport || issysevent) {
1638                                 lp = fmd.d_hvilog;
1639                                 lockp = &fmd.d_hvilog_lock;
1640                         } else {
1641                                 lp = fmd.d_ilog;
1642                                 lockp = &fmd.d_ilog_lock;
1643                         }
1644                 }
1645 
1646                 (void) pthread_rwlock_rdlock(lockp);
1647                 fmd_log_append(lp, e, NULL);
1648                 (void) pthread_rwlock_unlock(lockp);
1649         }
1650 
1651         /*
1652          * If a list.suspect event is received, create a case for the specified
1653          * UUID in the case hash, with the transport module as its owner.
1654          */
1655         if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_SUSPECT_CLASS)) {
1656                 if (xip->xi_flags & FMD_XPRT_CACHE_AS_LOCAL)
1657                         fmd_xprt_list_suspect_local(xp, nvl);
1658                 else
1659                         fmd_xprt_list_suspect(xp, nvl);
1660                 fmd_event_hold(e);
1661                 fmd_event_rele(e);
1662                 goto done;
1663         }
1664 
1665         /*
1666          * If a list.updated or list.repaired event is received, update the
1667          * resource cache status and the local case.
1668          */
1669         if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_REPAIRED_CLASS) ||
1670             fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_UPDATED_CLASS)) {
1671                 uint8_t *statusp;
1672                 uint_t nelem = 0;
1673 
1674                 (void) nvlist_lookup_uint8_array(nvl, FM_SUSPECT_FAULT_STATUS,
1675                     &statusp, &nelem);
1676                 fmd_module_lock(xip->xi_queue->eq_mod);
1677                 if (nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) == 0 &&
1678                     (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
1679                         fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
1680                         if (cip->ci_xprt != NULL) {
1681                                 fmd_case_update_status(cp, statusp,
1682                                     cip->ci_proxy_asru, cip->ci_diag_asru);
1683                                 fmd_case_update_containees(cp);
1684                                 fmd_case_update(cp);
1685                         }
1686                         fmd_case_rele(cp);
1687                 }
1688                 fmd_module_unlock(xip->xi_queue->eq_mod);
1689                 fmd_event_hold(e);
1690                 fmd_event_rele(e);
1691                 goto done;
1692         }
1693 
1694         /*
1695          * If a list.isolated event is received, update resource cache status
1696          */
1697         if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_ISOLATED_CLASS)) {
1698                 uint8_t *statusp;
1699                 uint_t nelem = 0;
1700 
1701                 (void) nvlist_lookup_uint8_array(nvl, FM_SUSPECT_FAULT_STATUS,
1702                     &statusp, &nelem);
1703                 fmd_module_lock(xip->xi_queue->eq_mod);
1704                 if (nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) == 0 &&
1705                     (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
1706                         fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
1707                         if (cip->ci_xprt != NULL)
1708                                 fmd_case_update_status(cp, statusp,
1709                                     cip->ci_proxy_asru, cip->ci_diag_asru);
1710                         fmd_case_rele(cp);
1711                 }
1712                 fmd_module_unlock(xip->xi_queue->eq_mod);
1713                 fmd_event_hold(e);
1714                 fmd_event_rele(e);
1715                 goto done;
1716         }
1717 
1718         /*
1719          * If a list.resolved event is received, resolve the local case.
1720          */
1721         if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_RESOLVED_CLASS)) {
1722                 fmd_module_lock(xip->xi_queue->eq_mod);
1723                 if (nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) == 0 &&
1724                     (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
1725                         fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
1726                         if (cip->ci_xprt != NULL)
1727                                 fmd_case_transition(cp, (cip->ci_state ==
1728                                     FMD_CASE_REPAIRED) ? FMD_CASE_RESOLVED :
1729                                     (cip->ci_state == FMD_CASE_CLOSED) ?
1730                                     FMD_CASE_REPAIRED : FMD_CASE_CLOSE_WAIT,
1731                                     FMD_CF_RESOLVED);
1732                         fmd_case_rele(cp);
1733                 }
1734                 fmd_module_unlock(xip->xi_queue->eq_mod);
1735                 fmd_event_hold(e);
1736                 fmd_event_rele(e);
1737                 goto done;
1738         }
1739 
1740         if (logonly == FMD_B_TRUE || (xip->xi_flags & FMD_XPRT_EXTERNAL)) {
1741                 /*
1742                  * Don't proxy ereports on an EXTERNAL transport - we won't
1743                  * know how to diagnose them with the wrong topology. Note
1744                  * that here (and above) we have to hold/release the event in
1745                  * order for it to be freed.
1746                  */
1747                 fmd_event_hold(e);
1748                 fmd_event_rele(e);
1749         } else if (isproto == FMD_B_TRUE)
1750                 fmd_dispq_dispatch(dp->d_disp, e, class);
1751         else
1752                 fmd_modhash_dispatch(dp->d_mod_hash, e);
1753 done:
1754         (void) pthread_mutex_lock(&xip->xi_lock);
1755 
1756         ASSERT(xip->xi_busy != 0);
1757         xip->xi_busy--;
1758 
1759         (void) pthread_cond_broadcast(&xip->xi_cv);
1760         (void) pthread_mutex_unlock(&xip->xi_lock);
1761 }
1762 
1763 void
1764 fmd_xprt_uuclose(fmd_xprt_t *xp, const char *uuid)
1765 {
1766         fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1767 
1768         fmd_event_t *e;
1769         nvlist_t *nvl;
1770         char *s;
1771 
1772         if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1773                 return; /* read-only transports do not proxy uuclose */
1774 
1775         TRACE((FMD_DBG_XPRT, "xprt %u closing case %s\n", xip->xi_id, uuid));
1776 
1777         nvl = fmd_protocol_xprt_uuclose(xip->xi_queue->eq_mod,
1778             "resource.fm.xprt.uuclose", xip->xi_version, uuid);
1779 
1780         (void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1781         e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1782         fmd_eventq_insert_at_time(xip->xi_queue, e);
1783 }
1784 
1785 /*
1786  * On proxy side, send back uuresolved request to diagnosing side
1787  */
1788 void
1789 fmd_xprt_uuresolved(fmd_xprt_t *xp, const char *uuid)
1790 {
1791         fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1792 
1793         fmd_event_t *e;
1794         nvlist_t *nvl;
1795         char *s;
1796 
1797         if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1798                 return; /* read-only transports do not proxy uuresolved */
1799 
1800         TRACE((FMD_DBG_XPRT, "xprt %u resolving case %s\n", xip->xi_id, uuid));
1801 
1802         nvl = fmd_protocol_xprt_uuresolved(xip->xi_queue->eq_mod,
1803             "resource.fm.xprt.uuresolved", xip->xi_version, uuid);
1804 
1805         (void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1806         e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1807         fmd_eventq_insert_at_time(xip->xi_queue, e);
1808 }
1809 
1810 /*
1811  * On proxy side, send back repair/acquit/etc request to diagnosing side
1812  */
1813 void
1814 fmd_xprt_updated(fmd_xprt_t *xp, const char *uuid, uint8_t *statusp,
1815         uint8_t *has_asrup, uint_t nelem)
1816 {
1817         fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1818 
1819         fmd_event_t *e;
1820         nvlist_t *nvl;
1821         char *s;
1822 
1823         if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1824                 return; /* read-only transports do not support remote repairs */
1825 
1826         TRACE((FMD_DBG_XPRT, "xprt %u updating case %s\n", xip->xi_id, uuid));
1827 
1828         nvl = fmd_protocol_xprt_updated(xip->xi_queue->eq_mod,
1829             "resource.fm.xprt.updated", xip->xi_version, uuid, statusp,
1830             has_asrup, nelem);
1831 
1832         (void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1833         e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1834         fmd_eventq_insert_at_time(xip->xi_queue, e);
1835 }
1836 
1837 /*
1838  * Insert the specified class into our remote subscription hash.  If the class
1839  * is already present, bump the reference count; otherwise add it to the hash
1840  * and then enqueue an event for our remote peer to proxy our subscription.
1841  */
1842 void
1843 fmd_xprt_subscribe(fmd_xprt_t *xp, const char *class)
1844 {
1845         fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1846 
1847         uint_t refs;
1848         nvlist_t *nvl;
1849         fmd_event_t *e;
1850         char *s;
1851 
1852         if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1853                 return; /* read-only transports do not proxy subscriptions */
1854 
1855         if (!(xip->xi_flags & FMD_XPRT_SUBSCRIBER))
1856                 return; /* transport is not yet an active subscriber */
1857 
1858         (void) pthread_mutex_lock(&xip->xi_lock);
1859         refs = fmd_xprt_class_hash_insert(xip, &xip->xi_rsub, class);
1860         (void) pthread_mutex_unlock(&xip->xi_lock);
1861 
1862         if (refs > 1)
1863                 return; /* we've already asked our peer for this subscription */
1864 
1865         fmd_dprintf(FMD_DBG_XPRT,
1866             "xprt %u subscribing to %s\n", xip->xi_id, class);
1867 
1868         nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod,
1869             "resource.fm.xprt.subscribe", xip->xi_version, class);
1870 
1871         (void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1872         e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1873         fmd_eventq_insert_at_time(xip->xi_queue, e);
1874 }
1875 
1876 /*
1877  * Delete the specified class from the remote subscription hash.  If the
1878  * reference count drops to zero, ask our remote peer to unsubscribe by proxy.
1879  */
1880 void
1881 fmd_xprt_unsubscribe(fmd_xprt_t *xp, const char *class)
1882 {
1883         fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1884 
1885         uint_t refs;
1886         nvlist_t *nvl;
1887         fmd_event_t *e;
1888         char *s;
1889 
1890         if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1891                 return; /* read-only transports do not proxy subscriptions */
1892 
1893         if (!(xip->xi_flags & FMD_XPRT_SUBSCRIBER))
1894                 return; /* transport is not yet an active subscriber */
1895 
1896         /*
1897          * If the subscription reference count drops to zero in xi_rsub, insert
1898          * an entry into the xi_usub hash indicating we await an unsuback event.
1899          */
1900         (void) pthread_mutex_lock(&xip->xi_lock);
1901 
1902         if ((refs = fmd_xprt_class_hash_delete(xip, &xip->xi_rsub, class)) == 0)
1903                 (void) fmd_xprt_class_hash_insert(xip, &xip->xi_usub, class);
1904 
1905         (void) pthread_mutex_unlock(&xip->xi_lock);
1906 
1907         if (refs != 0)
1908                 return; /* other subscriptions for this class still active */
1909 
1910         fmd_dprintf(FMD_DBG_XPRT,
1911             "xprt %u unsubscribing from %s\n", xip->xi_id, class);
1912 
1913         nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod,
1914             "resource.fm.xprt.unsubscribe", xip->xi_version, class);
1915 
1916         (void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1917         e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1918         fmd_eventq_insert_at_time(xip->xi_queue, e);
1919 }
1920 
1921 static void
1922 fmd_xprt_subscribe_xid(fmd_idspace_t *ids, id_t id, void *class)
1923 {
1924         fmd_xprt_t *xp;
1925 
1926         if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1927                 fmd_xprt_subscribe(xp, class);
1928                 fmd_idspace_rele(ids, id);
1929         }
1930 }
1931 
1932 void
1933 fmd_xprt_subscribe_all(const char *class)
1934 {
1935         fmd_idspace_t *ids = fmd.d_xprt_ids;
1936 
1937         if (ids->ids_count != 0)
1938                 fmd_idspace_apply(ids, fmd_xprt_subscribe_xid, (void *)class);
1939 }
1940 
1941 static void
1942 fmd_xprt_unsubscribe_xid(fmd_idspace_t *ids, id_t id, void *class)
1943 {
1944         fmd_xprt_t *xp;
1945 
1946         if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1947                 fmd_xprt_unsubscribe(xp, class);
1948                 fmd_idspace_rele(ids, id);
1949         }
1950 }
1951 
1952 void
1953 fmd_xprt_unsubscribe_all(const char *class)
1954 {
1955         fmd_idspace_t *ids = fmd.d_xprt_ids;
1956 
1957         if (ids->ids_count != 0)
1958                 fmd_idspace_apply(ids, fmd_xprt_unsubscribe_xid, (void *)class);
1959 }
1960 
1961 /*ARGSUSED*/
1962 static void
1963 fmd_xprt_suspend_xid(fmd_idspace_t *ids, id_t id, void *arg)
1964 {
1965         fmd_xprt_t *xp;
1966 
1967         if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1968                 fmd_xprt_xsuspend(xp, FMD_XPRT_DSUSPENDED);
1969                 fmd_idspace_rele(ids, id);
1970         }
1971 }
1972 
1973 void
1974 fmd_xprt_suspend_all(void)
1975 {
1976         fmd_idspace_t *ids = fmd.d_xprt_ids;
1977 
1978         (void) pthread_mutex_lock(&fmd.d_xprt_lock);
1979 
1980         if (fmd.d_xprt_suspend++ != 0) {
1981                 (void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1982                 return; /* already suspended */
1983         }
1984 
1985         if (ids->ids_count != 0)
1986                 fmd_idspace_apply(ids, fmd_xprt_suspend_xid, NULL);
1987 
1988         (void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1989 }
1990 
1991 /*ARGSUSED*/
1992 static void
1993 fmd_xprt_resume_xid(fmd_idspace_t *ids, id_t id, void *arg)
1994 {
1995         fmd_xprt_t *xp;
1996 
1997         if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1998                 fmd_xprt_xresume(xp, FMD_XPRT_DSUSPENDED);
1999                 fmd_idspace_rele(ids, id);
2000         }
2001 }
2002 
2003 void
2004 fmd_xprt_resume_all(void)
2005 {
2006         fmd_idspace_t *ids = fmd.d_xprt_ids;
2007 
2008         (void) pthread_mutex_lock(&fmd.d_xprt_lock);
2009 
2010         if (fmd.d_xprt_suspend == 0)
2011                 fmd_panic("fmd_xprt_suspend/resume_all mismatch\n");
2012 
2013         if (--fmd.d_xprt_suspend != 0) {
2014                 (void) pthread_mutex_unlock(&fmd.d_xprt_lock);
2015                 return; /* not ready to be resumed */
2016         }
2017 
2018         if (ids->ids_count != 0)
2019                 fmd_idspace_apply(ids, fmd_xprt_resume_xid, NULL);
2020 
2021         (void) pthread_mutex_unlock(&fmd.d_xprt_lock);
2022 }