1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
  23  * Use is subject to license terms.
  24  *
  25  * Portions Copyright 2008 Denis Cheng
  26  */
  27 
  28 #include "config.h"
  29 #include <pthread.h>
  30 #ifdef HAVE_LWPS
  31 #include <sys/lwp.h>
  32 #endif
  33 #include <signal.h>
  34 
  35 #include "filebench.h"
  36 #include "threadflow.h"
  37 #include "flowop.h"
  38 #include "ipc.h"
  39 
  40 static threadflow_t *threadflow_define_common(procflow_t *procflow,
  41     char *name, threadflow_t *inherit, int instance);
  42 
  43 /*
  44  * Threadflows are filebench entities which manage operating system
  45  * threads. Each worker threadflow spawns a separate filebench thread,
  46  * with attributes inherited from a FLOW_MASTER threadflow created during
  47  * f model language parsing. This section contains routines to define,
  48  * create, control, and delete threadflows.
  49  *
  50  * Each thread defined in the f model creates a FLOW_MASTER
  51  * threadflow which encapsulates the defined attributes and flowops of
  52  * the f language thread, including the number of instances to create.
  53  * At runtime, a worker threadflow instance with an associated filebench
  54  * thread is created, which runs until told to quit or is specifically
  55  * deleted.
  56  */
  57 
  58 
  59 /*
  60  * Prints information about threadflow syntax.
  61  */
  62 void
  63 threadflow_usage(void)
  64 {
  65         (void) fprintf(stderr, "  thread  name=<name>[,instances=<count>]\n");
  66         (void) fprintf(stderr, "\n");
  67         (void) fprintf(stderr, "  {\n");
  68         (void) fprintf(stderr, "    flowop ...\n");
  69         (void) fprintf(stderr, "    flowop ...\n");
  70         (void) fprintf(stderr, "    flowop ...\n");
  71         (void) fprintf(stderr, "  }\n");
  72         (void) fprintf(stderr, "\n");
  73 }
  74 
  75 /*
  76  * Creates a thread for the supplied threadflow. If interprocess
  77  * shared memory is desired, then increments the amount of shared
  78  * memory needed by the amount specified in the threadflow's
  79  * tf_memsize parameter. The thread starts in routine
  80  * flowop_start() with a poineter to the threadflow supplied
  81  * as the argument.
  82  */
  83 static int
  84 threadflow_createthread(threadflow_t *threadflow)
  85 {
  86         fbint_t memsize;
  87         memsize = avd_get_int(threadflow->tf_memsize);
  88         threadflow->tf_constmemsize = memsize;
  89 
  90         filebench_log(LOG_DEBUG_SCRIPT, "Creating thread %s, memory = %ld",
  91             threadflow->tf_name, memsize);
  92 
  93         if (threadflow->tf_attrs & THREADFLOW_USEISM)
  94                 filebench_shm->shm_required += memsize;
  95 
  96         if (pthread_create(&threadflow->tf_tid, NULL,
  97             (void *(*)(void*))flowop_start, threadflow) != 0) {
  98                 filebench_log(LOG_ERROR, "thread create failed");
  99                 filebench_shutdown(1);
 100                 return (FILEBENCH_ERROR);
 101         }
 102 
 103         return (FILEBENCH_OK);
 104 }
 105 
 106 /*
 107  * Creates threads for the threadflows associated with a procflow.
 108  * The routine iterates through the list of threadflows in the
 109  * supplied procflow's pf_threads list. For each threadflow on
 110  * the list, it defines tf_instances number of cloned
 111  * threadflows, and then calls threadflow_createthread() for
 112  * each to create and start the actual operating system thread.
 113  * Note that each of the newly defined threadflows will be linked
 114  * into the procflows threadflow list, but at the head of the
 115  * list, so they will not become part of the supplied set. After
 116  * all the threads have been created, threadflow_init enters
 117  * a join loop for all the threads in the newly defined
 118  * threadflows. Once all the created threads have exited,
 119  * threadflow_init will return 0. If errors are encountered, it
 120  * will return a non zero value.
 121  */
 122 int
 123 threadflow_init(procflow_t *procflow)
 124 {
 125         threadflow_t *threadflow = procflow->pf_threads;
 126         int ret = 0;
 127 
 128         (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock);
 129 
 130         while (threadflow) {
 131                 threadflow_t *newthread;
 132                 int instances;
 133                 int i;
 134 
 135                 instances = avd_get_int(threadflow->tf_instances);
 136                 filebench_log(LOG_VERBOSE,
 137                     "Starting %d %s threads",
 138                     instances, threadflow->tf_name);
 139 
 140                 for (i = 1; i < instances; i++) {
 141                         /* Create threads */
 142                         newthread =
 143                             threadflow_define_common(procflow,
 144                             threadflow->tf_name, threadflow, i + 1);
 145                         if (newthread == NULL)
 146                                 return (-1);
 147                         ret |= threadflow_createthread(newthread);
 148                 }
 149 
 150                 newthread = threadflow_define_common(procflow,
 151                     threadflow->tf_name,
 152                     threadflow, 1);
 153 
 154                 if (newthread == NULL)
 155                         return (-1);
 156 
 157                 /* Create each thread */
 158                 ret |= threadflow_createthread(newthread);
 159 
 160                 threadflow = threadflow->tf_next;
 161         }
 162 
 163         threadflow = procflow->pf_threads;
 164 
 165         (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock);
 166 
 167         while (threadflow) {
 168                 /* wait for all threads to finish */
 169                 if (threadflow->tf_tid) {
 170                         void *status;
 171 
 172                         if (pthread_join(threadflow->tf_tid, &status) == 0)
 173                                 ret += *(int *)status;
 174                 }
 175                 threadflow = threadflow->tf_next;
 176         }
 177 
 178         procflow->pf_running = 0;
 179 
 180         return (ret);
 181 }
 182 
 183 /*
 184  * Tells the threadflow's thread to stop and optionally signals
 185  * its associated process to end the thread.
 186  */
 187 static void
 188 threadflow_kill(threadflow_t *threadflow)
 189 {
 190         int wait_cnt = 2;
 191 
 192         /* Tell thread to finish */
 193         threadflow->tf_abort = 1;
 194 
 195         /* wait a bit for threadflow to stop */
 196         while (wait_cnt && threadflow->tf_running) {
 197                 (void) sleep(1);
 198                 wait_cnt--;
 199         }
 200 
 201         if (threadflow->tf_running) {
 202                 threadflow->tf_running = FALSE;
 203                 (void) pthread_kill(threadflow->tf_tid, SIGKILL);
 204         }
 205 }
 206 
 207 /*
 208  * Deletes the specified threadflow from the specified threadflow
 209  * list after first terminating the threadflow's thread, deleting
 210  * the threadflow's flowops, and finally freeing the threadflow
 211  * entity. It also subtracts the threadflow's shared memory
 212  * requirements from the total amount required, shm_required. If
 213  * the specified threadflow is found, returns 0, otherwise
 214  * returns -1.
 215  */
 216 static int
 217 threadflow_delete(threadflow_t **threadlist, threadflow_t *threadflow)
 218 {
 219         threadflow_t *entry = *threadlist;
 220 
 221         filebench_log(LOG_DEBUG_IMPL, "Deleting thread: (%s-%d)",
 222             threadflow->tf_name,
 223             threadflow->tf_instance);
 224 
 225         if (threadflow->tf_attrs & THREADFLOW_USEISM)
 226                 filebench_shm->shm_required -= threadflow->tf_constmemsize;
 227 
 228         if (threadflow == *threadlist) {
 229                 /* First on list */
 230                 filebench_log(LOG_DEBUG_IMPL, "Deleted thread: (%s-%d)",
 231                     threadflow->tf_name,
 232                     threadflow->tf_instance);
 233 
 234                 threadflow_kill(threadflow);
 235                 flowop_delete_all(&threadflow->tf_thrd_fops);
 236                 *threadlist = threadflow->tf_next;
 237                 (void) pthread_mutex_destroy(&threadflow->tf_lock);
 238                 ipc_free(FILEBENCH_THREADFLOW, (char *)threadflow);
 239                 return (0);
 240         }
 241 
 242         while (entry->tf_next) {
 243                 filebench_log(LOG_DEBUG_IMPL,
 244                     "Delete thread: (%s-%d) == (%s-%d)",
 245                     entry->tf_next->tf_name,
 246                     entry->tf_next->tf_instance,
 247                     threadflow->tf_name,
 248                     threadflow->tf_instance);
 249 
 250                 if (threadflow == entry->tf_next) {
 251                         /* Delete */
 252                         filebench_log(LOG_DEBUG_IMPL,
 253                             "Deleted thread: (%s-%d)",
 254                             entry->tf_next->tf_name,
 255                             entry->tf_next->tf_instance);
 256                         threadflow_kill(entry->tf_next);
 257                         flowop_delete_all(&entry->tf_next->tf_thrd_fops);
 258                         (void) pthread_mutex_destroy(&threadflow->tf_lock);
 259                         ipc_free(FILEBENCH_THREADFLOW, (char *)threadflow);
 260                         entry->tf_next = entry->tf_next->tf_next;
 261                         return (0);
 262                 }
 263                 entry = entry->tf_next;
 264         }
 265 
 266         return (-1);
 267 }
 268 
 269 /*
 270  * Given a pointer to the thread list of a procflow, cycles
 271  * through all the threadflows on the list, deleting each one
 272  * except the FLOW_MASTER.
 273  */
 274 void
 275 threadflow_delete_all(threadflow_t **threadlist)
 276 {
 277         threadflow_t *threadflow;
 278 
 279         (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock);
 280 
 281         threadflow = *threadlist;
 282         filebench_log(LOG_DEBUG_IMPL, "Deleting all threads");
 283 
 284         while (threadflow) {
 285                 if (threadflow->tf_instance &&
 286                     (threadflow->tf_instance == FLOW_MASTER)) {
 287                         threadflow = threadflow->tf_next;
 288                         continue;
 289                 }
 290                 (void) threadflow_delete(threadlist, threadflow);
 291                 threadflow = threadflow->tf_next;
 292         }
 293 
 294         (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock);
 295 }
 296 
 297 /*
 298  * Waits till all threadflows are started, or a timeout occurs.
 299  * Checks through the list of threadflows, waiting up to 10
 300  * seconds for each one to set its tf_running flag to 1. If not
 301  * set after 10 seconds, continues on to the next threadflow
 302  * anyway.
 303  */
 304 void
 305 threadflow_allstarted(pid_t pid, threadflow_t *threadflow)
 306 {
 307         (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock);
 308 
 309         while (threadflow) {
 310                 int waits;
 311 
 312                 if ((threadflow->tf_instance == 0) ||
 313                     (threadflow->tf_instance == FLOW_MASTER)) {
 314                         threadflow = threadflow->tf_next;
 315                         continue;
 316                 }
 317 
 318                 filebench_log(LOG_DEBUG_IMPL, "Checking pid %d thread %s-%d",
 319                     pid,
 320                     threadflow->tf_name,
 321                     threadflow->tf_instance);
 322 
 323                 waits = 10;
 324                 while (waits && (threadflow->tf_running == 0) &&
 325                     (filebench_shm->shm_f_abort == 0)) {
 326                         (void) ipc_mutex_unlock(
 327                             &filebench_shm->shm_threadflow_lock);
 328                         if (waits < 3)
 329                                 filebench_log(LOG_INFO,
 330                                     "Waiting for pid %d thread %s-%d",
 331                                     pid,
 332                                     threadflow->tf_name,
 333                                     threadflow->tf_instance);
 334 
 335                         (void) sleep(1);
 336                         (void) ipc_mutex_lock(
 337                             &filebench_shm->shm_threadflow_lock);
 338                         waits--;
 339                 }
 340 
 341                 threadflow = threadflow->tf_next;
 342         }
 343 
 344         (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock);
 345 }
 346 
 347 /*
 348  * Create an in-memory thread object linked to a parent procflow.
 349  * A threadflow entity is allocated from shared memory and
 350  * initialized from the "inherit" threadflow if supplied,
 351  * otherwise to zeros. The threadflow is assigned a unique
 352  * thread id, the supplied instance number, the supplied name
 353  * and added to the procflow's pf_thread list. If no name is
 354  * supplied or the threadflow can't be allocated, NULL is
 355  * returned Otherwise a pointer to the newly allocated threadflow
 356  * is returned.
 357  *
 358  * The filebench_shm->shm_threadflow_lock must be held by the caller.
 359  */
 360 static threadflow_t *
 361 threadflow_define_common(procflow_t *procflow, char *name,
 362     threadflow_t *inherit, int instance)
 363 {
 364         threadflow_t *threadflow;
 365         threadflow_t **threadlistp = &procflow->pf_threads;
 366 
 367         if (name == NULL)
 368                 return (NULL);
 369 
 370         threadflow = (threadflow_t *)ipc_malloc(FILEBENCH_THREADFLOW);
 371 
 372         if (threadflow == NULL)
 373                 return (NULL);
 374 
 375         if (inherit)
 376                 (void) memcpy(threadflow, inherit, sizeof (threadflow_t));
 377         else
 378                 (void) memset(threadflow, 0, sizeof (threadflow_t));
 379 
 380         threadflow->tf_utid = ++filebench_shm->shm_utid;
 381 
 382         threadflow->tf_instance = instance;
 383         (void) strcpy(threadflow->tf_name, name);
 384         threadflow->tf_process = procflow;
 385         (void) pthread_mutex_init(&threadflow->tf_lock,
 386             ipc_mutexattr(IPC_MUTEX_NORMAL));
 387 
 388         filebench_log(LOG_DEBUG_IMPL, "Defining thread %s-%d",
 389             name, instance);
 390 
 391         /* Add threadflow to list */
 392         if (*threadlistp == NULL) {
 393                 *threadlistp = threadflow;
 394                 threadflow->tf_next = NULL;
 395         } else {
 396                 threadflow->tf_next = *threadlistp;
 397                 *threadlistp = threadflow;
 398         }
 399 
 400         return (threadflow);
 401 }
 402 
 403 /*
 404  * Create an in memory FLOW_MASTER thread object as described
 405  * by the syntax. Acquire the  filebench_shm->shm_threadflow_lock and
 406  * call threadflow_define_common() to create a threadflow entity.
 407  * Set the number of instances to create at runtime,
 408  * tf_instances, to "instances". Return the threadflow pointer
 409  * returned by the threadflow_define_common call.
 410  */
 411 threadflow_t *
 412 threadflow_define(procflow_t *procflow, char *name,
 413     threadflow_t *inherit, avd_t instances)
 414 {
 415         threadflow_t *threadflow;
 416 
 417         (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock);
 418 
 419         if ((threadflow = threadflow_define_common(procflow, name,
 420             inherit, FLOW_MASTER)) == NULL)
 421                 return (NULL);
 422 
 423         threadflow->tf_instances = instances;
 424 
 425         (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock);
 426 
 427         return (threadflow);
 428 }
 429 
 430 
 431 /*
 432  * Searches the provided threadflow list for the named threadflow.
 433  * A pointer to the threadflow is returned, or NULL if threadflow
 434  * is not found.
 435  */
 436 threadflow_t *
 437 threadflow_find(threadflow_t *threadlist, char *name)
 438 {
 439         threadflow_t *threadflow = threadlist;
 440 
 441         (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock);
 442 
 443         while (threadflow) {
 444                 if (strcmp(name, threadflow->tf_name) == 0) {
 445 
 446                         (void) ipc_mutex_unlock(
 447                             &filebench_shm->shm_threadflow_lock);
 448 
 449                         return (threadflow);
 450                 }
 451                 threadflow = threadflow->tf_next;
 452         }
 453 
 454         (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock);
 455 
 456 
 457         return (NULL);
 458 }