Print this page
6141 use kmem_zalloc instead of kmem_alloc + bzero/memset
Split |
Close |
Expand all |
Collapse all |
--- old/usr/src/uts/common/io/pckt.c
+++ new/usr/src/uts/common/io/pckt.c
1 1 /*
2 2 * CDDL HEADER START
3 3 *
4 4 * The contents of this file are subject to the terms of the
5 5 * Common Development and Distribution License, Version 1.0 only
6 6 * (the "License"). You may not use this file except in compliance
7 7 * with the License.
8 8 *
9 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10 10 * or http://www.opensolaris.org/os/licensing.
11 11 * See the License for the specific language governing permissions
12 12 * and limitations under the License.
13 13 *
14 14 * When distributing Covered Code, include this CDDL HEADER in each
15 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16 16 * If applicable, add the following below this CDDL HEADER, with the
17 17 * fields enclosed by brackets "[]" replaced with your own identifying
18 18 * information: Portions Copyright [yyyy] [name of copyright owner]
19 19 *
20 20 * CDDL HEADER END
↓ open down ↓ |
20 lines elided |
↑ open up ↑ |
21 21 */
22 22 /*
23 23 * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
24 24 * Use is subject to license terms.
25 25 */
26 26
27 27 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */
28 28 /* All Rights Reserved */
29 29
30 30
31 -#pragma ident "%Z%%M% %I% %E% SMI" /* from S5R4 1.10 */
32 -
33 31 /*
34 32 * Description: The pckt module packetizes messages on
35 33 * its read queue by pre-fixing an M_PROTO
36 34 * message type to certain incoming messages.
37 35 */
38 36
39 37 #include <sys/types.h>
40 38 #include <sys/param.h>
41 39 #include <sys/stream.h>
42 40 #include <sys/stropts.h>
43 41 #include <sys/kmem.h>
44 42 #include <sys/errno.h>
45 43 #include <sys/ddi.h>
46 44 #include <sys/sunddi.h>
47 45 #include <sys/debug.h>
48 46
49 47 /*
50 48 * This is the loadable module wrapper.
51 49 */
52 50 #include <sys/conf.h>
53 51 #include <sys/modctl.h>
54 52
55 53 static struct streamtab pcktinfo;
56 54
57 55 /*
58 56 * Per queue instances are single-threaded since the q_ptr
59 57 * field of queues need to be shared among threads.
60 58 */
61 59 static struct fmodsw fsw = {
62 60 "pckt",
63 61 &pcktinfo,
64 62 D_NEW | D_MTPERQ | D_MP
65 63 };
66 64
67 65 /*
68 66 * Module linkage information for the kernel.
69 67 */
70 68
71 69 static struct modlstrmod modlstrmod = {
72 70 &mod_strmodops,
73 71 "pckt module",
74 72 &fsw
75 73 };
76 74
77 75 static struct modlinkage modlinkage = {
78 76 MODREV_1, &modlstrmod, NULL
79 77 };
80 78
81 79
82 80 int
83 81 _init(void)
84 82 {
85 83 return (mod_install(&modlinkage));
86 84 }
87 85
88 86 int
89 87 _fini(void)
90 88 {
91 89 return (mod_remove(&modlinkage));
92 90 }
93 91
94 92 int
95 93 _info(struct modinfo *modinfop)
96 94 {
97 95 return (mod_info(&modlinkage, modinfop));
98 96 }
99 97
100 98 static int pcktopen(queue_t *, dev_t *, int, int, cred_t *);
101 99 static int pcktclose(queue_t *, int, cred_t *);
102 100 static void pcktrput(queue_t *, mblk_t *);
103 101 static void pcktrsrv(queue_t *);
104 102 static void pcktwput(queue_t *, mblk_t *);
105 103 static mblk_t *add_ctl_info(queue_t *, mblk_t *);
106 104 static void add_ctl_wkup(void *);
107 105
108 106
109 107 /*
110 108 * Stream module data structure definitions.
111 109 * Sits over the ptm module generally.
112 110 *
113 111 * Read side flow control strategy: Since we may be putting messages on
114 112 * the read q due to allocb failures, these failures must get
115 113 * reflected fairly quickly to the module below us.
116 114 * No sense in piling on messages in times of memory shortage.
117 115 * Further, for the case of upper level flow control, there is no
118 116 * compelling reason to have more buffering in this module.
119 117 * Thus use a hi-water mark of one.
120 118 * This module imposes no max packet size, there is no inherent reason
121 119 * in the code to do so.
122 120 */
123 121 static struct module_info pcktiinfo = {
124 122 0x9898, /* module id number */
125 123 "pckt", /* module name */
126 124 0, /* minimum packet size */
127 125 INFPSZ, /* maximum packet size */
128 126 1, /* hi-water mark */
129 127 0 /* lo-water mark */
130 128 };
131 129
132 130 /*
133 131 * Write side flow control strategy: There is no write service procedure.
134 132 * The write put function is pass thru, thus there is no reason to have any
135 133 * limits on the maximum packet size.
136 134 */
137 135 static struct module_info pcktoinfo = {
138 136 0x9898, /* module id number */
139 137 "pckt", /* module name */
140 138 0, /* minimum packet size */
141 139 INFPSZ, /* maximum packet size */
142 140 0, /* hi-water mark */
143 141 0 /* lo-water mark */
144 142 };
145 143
146 144 static struct qinit pcktrinit = {
147 145 (int (*)())pcktrput,
148 146 (int (*)())pcktrsrv,
149 147 pcktopen,
150 148 pcktclose,
151 149 NULL,
152 150 &pcktiinfo,
153 151 NULL
154 152 };
155 153
156 154 static struct qinit pcktwinit = {
157 155 (int (*)())pcktwput,
158 156 NULL,
159 157 NULL,
160 158 NULL,
161 159 NULL,
162 160 &pcktoinfo,
163 161 NULL
164 162 };
165 163
166 164 static struct streamtab pcktinfo = {
167 165 &pcktrinit,
168 166 &pcktwinit,
169 167 NULL,
170 168 NULL
171 169 };
172 170
173 171
174 172 /*
175 173 * Per-instance state struct for the pckt module.
176 174 */
177 175 struct pckt_info {
178 176 queue_t *pi_qptr; /* back pointer to q */
179 177 bufcall_id_t pi_bufcall_id;
180 178 #ifdef _MULTI_DATAMODEL
181 179 model_t model;
182 180 #endif /* _MULTI_DATAMODEL */
183 181 };
184 182
185 183 /*
186 184 * Dummy qbufcall callback routine used by open and close.
187 185 * The framework will wake up qwait_sig when we return from
188 186 * this routine (as part of leaving the perimeters.)
189 187 * (The framework enters the perimeters before calling the qbufcall() callback
190 188 * and leaves the perimeters after the callback routine has executed. The
191 189 * framework performs an implicit wakeup of any thread in qwait/qwait_sig
192 190 * when it leaves the perimeter. See qwait(9E).)
193 191 */
194 192 /* ARGSUSED */
195 193 static void
196 194 dummy_callback(void *arg)
197 195 {}
198 196
199 197 /*
200 198 * pcktopen - open routine gets called when the
201 199 * module gets pushed onto the stream.
202 200 */
203 201 /*ARGSUSED*/
204 202 static int
205 203 pcktopen(
206 204 queue_t *q, /* pointer to the read side queue */
207 205 dev_t *devp, /* pointer to stream tail's dev */
208 206 int oflag, /* the user open(2) supplied flags */
209 207 int sflag, /* open state flag */
210 208 cred_t *credp) /* credentials */
211 209 {
212 210 struct pckt_info *pip;
213 211 mblk_t *mop; /* ptr to a setopts msg block */
214 212 struct stroptions *sop;
215 213
216 214 if (sflag != MODOPEN)
↓ open down ↓ |
174 lines elided |
↑ open up ↑ |
217 215 return (EINVAL);
218 216
219 217 if (q->q_ptr != NULL) {
220 218 /* It's already attached. */
221 219 return (0);
222 220 }
223 221
224 222 /*
225 223 * Allocate state structure.
226 224 */
227 - pip = kmem_alloc(sizeof (*pip), KM_SLEEP);
228 - bzero(pip, sizeof (*pip));
225 + pip = kmem_zalloc(sizeof (*pip), KM_SLEEP);
229 226
230 227 #ifdef _MULTI_DATAMODEL
231 228 pip->model = ddi_model_convert_from(get_udatamodel());
232 229 #endif /* _MULTI_DATAMODEL */
233 230
234 231 /*
235 232 * Cross-link.
236 233 */
237 234 pip->pi_qptr = q;
238 235 q->q_ptr = pip;
239 236 WR(q)->q_ptr = pip;
240 237
241 238 qprocson(q);
242 239
243 240 /*
244 241 * Initialize an M_SETOPTS message to set up hi/lo water marks on
245 242 * stream head read queue.
246 243 */
247 244
248 245 while ((mop = allocb(sizeof (struct stroptions), BPRI_MED)) == NULL) {
249 246 bufcall_id_t id = qbufcall(q, sizeof (struct stroptions),
250 247 BPRI_MED, dummy_callback, NULL);
251 248 if (!qwait_sig(q)) {
252 249 qunbufcall(q, id);
253 250 kmem_free(pip, sizeof (*pip));
254 251 qprocsoff(q);
255 252 return (EINTR);
256 253 }
257 254 qunbufcall(q, id);
258 255 }
259 256
260 257
261 258 /*
262 259 * XXX: Should this module really control the hi/low water marks?
263 260 * Is there any reason in this code to do so?
264 261 */
265 262 mop->b_datap->db_type = M_SETOPTS;
266 263 mop->b_wptr += sizeof (struct stroptions);
267 264 sop = (struct stroptions *)mop->b_rptr;
268 265 sop->so_flags = SO_HIWAT | SO_LOWAT;
269 266 sop->so_hiwat = 512;
270 267 sop->so_lowat = 256;
271 268
272 269 /*
273 270 * Commit to the open and send the M_SETOPTS off to the stream head.
274 271 */
275 272 putnext(q, mop);
276 273
277 274 return (0);
278 275 }
279 276
280 277
281 278 /*
282 279 * pcktclose - This routine gets called when the module
283 280 * gets popped off of the stream.
284 281 */
285 282
286 283 /*ARGSUSED*/
287 284 static int
288 285 pcktclose(
289 286 queue_t *q, /* Pointer to the read queue */
290 287 int flag,
291 288 cred_t *credp)
292 289 {
293 290 struct pckt_info *pip = (struct pckt_info *)q->q_ptr;
294 291
295 292 qprocsoff(q);
296 293 /*
297 294 * Cancel outstanding qbufcall
298 295 */
299 296 if (pip->pi_bufcall_id) {
300 297 qunbufcall(q, pip->pi_bufcall_id);
301 298 pip->pi_bufcall_id = 0;
302 299 }
303 300 /*
304 301 * Do not worry about msgs queued on the q, the framework
305 302 * will free them up.
306 303 */
307 304 kmem_free(q->q_ptr, sizeof (struct pckt_info));
308 305 q->q_ptr = WR(q)->q_ptr = NULL;
309 306 return (0);
310 307 }
311 308
312 309 /*
313 310 * pcktrput - Module read queue put procedure.
314 311 * This is called from the module or
315 312 * driver downstream.
316 313 */
317 314 static void
318 315 pcktrput(
319 316 queue_t *q, /* Pointer to the read queue */
320 317 mblk_t *mp) /* Pointer to the current message block */
321 318 {
322 319 mblk_t *pckt_msgp;
323 320
324 321
325 322 switch (mp->b_datap->db_type) {
326 323 case M_FLUSH:
327 324 /*
328 325 * The PTS driver swaps the FLUSHR and FLUSHW flags
329 326 * we need to swap them back to reflect the actual
330 327 * slave side FLUSH mode.
331 328 */
332 329 if ((*mp->b_rptr & FLUSHRW) != FLUSHRW)
333 330 if ((*mp->b_rptr & FLUSHRW) == FLUSHR)
334 331 *mp->b_rptr = FLUSHW;
335 332 else if ((*mp->b_rptr & FLUSHRW) == FLUSHW)
336 333 *mp->b_rptr = FLUSHR;
337 334
338 335 pckt_msgp = copymsg(mp);
339 336 if (*mp->b_rptr & FLUSHW) {
340 337 /*
341 338 * In the packet model we are not allowing
342 339 * flushes of the master's stream head read
343 340 * side queue. This is because all packet
344 341 * state information is stored there and
345 342 * a flush could destroy this data before
346 343 * it is read.
347 344 */
348 345 *mp->b_rptr = FLUSHW;
349 346 putnext(q, mp);
350 347 } else {
351 348 /*
352 349 * Free messages that only flush the
353 350 * master's read queue.
354 351 */
355 352 freemsg(mp);
356 353 }
357 354
358 355 if (pckt_msgp == NULL)
359 356 break;
360 357
361 358 mp = pckt_msgp;
362 359 /*
363 360 * Prefix M_PROTO and putnext.
364 361 */
365 362 goto prefix_head;
366 363
367 364 case M_DATA:
368 365 case M_IOCTL:
369 366 case M_PROTO:
370 367 /*
371 368 * For non-priority messages, follow flow-control rules.
372 369 * Also, if there are messages on the q already, keep
373 370 * queueing them since they need to be processed in order.
374 371 */
375 372 if (!canputnext(q) || (qsize(q) > 0)) {
376 373 (void) putq(q, mp);
377 374 break;
378 375 }
379 376 /* FALLTHROUGH */
380 377
381 378 /*
382 379 * For high priority messages, skip flow control checks.
383 380 */
384 381 case M_PCPROTO:
385 382 case M_READ:
386 383 case M_STOP:
387 384 case M_START:
388 385 case M_STARTI:
389 386 case M_STOPI:
390 387 prefix_head:
391 388 /*
392 389 * Prefix an M_PROTO header to message and pass upstream.
393 390 */
394 391 if ((mp = add_ctl_info(q, mp)) != NULL)
395 392 putnext(q, mp);
396 393 break;
397 394
398 395 default:
399 396 /*
400 397 * For data messages, queue them back on the queue if
401 398 * there are messages on the queue already. This is
402 399 * done to preserve the order of messages.
403 400 * For high priority messages or for no messages on the
404 401 * q, simply putnext() and pass it on.
405 402 */
406 403 if ((datamsg(mp->b_datap->db_type)) && (qsize(q) > 0))
407 404 (void) putq(q, mp);
408 405 else
409 406 putnext(q, mp);
410 407 break;
411 408 }
412 409 }
413 410
414 411 /*
415 412 * pcktrsrv - module read service procedure
416 413 * This function deals with messages left in the queue due to
417 414 * (a) not enough memory to allocate the header M_PROTO message
418 415 * (b) flow control reasons
419 416 * The function will attempt to get the messages off the queue and
420 417 * process them.
421 418 */
422 419 static void
423 420 pcktrsrv(queue_t *q)
424 421 {
425 422 mblk_t *mp;
426 423
427 424 while ((mp = getq(q)) != NULL) {
428 425 if (!canputnext(q)) {
429 426 /*
430 427 * For high priority messages, make sure there is no
431 428 * infinite loop. Disable the queue for this case.
432 429 * High priority messages get here only for buffer
433 430 * allocation failures. Thus the bufcall callout
434 431 * will reenable the q.
435 432 * XXX bug alert - nooenable will *not* prevent
436 433 * putbq of a hipri messages frm enabling the queue.
437 434 */
438 435 if (!datamsg(mp->b_datap->db_type))
439 436 noenable(q);
440 437 (void) putbq(q, mp);
441 438 return;
442 439 }
443 440
444 441 /*
445 442 * M_FLUSH msgs may also be here if there was a memory
446 443 * failure.
447 444 */
448 445 switch (mp->b_datap->db_type) {
449 446 case M_FLUSH:
450 447 case M_PROTO:
451 448 case M_PCPROTO:
452 449 case M_STOP:
453 450 case M_START:
454 451 case M_IOCTL:
455 452 case M_DATA:
456 453 case M_READ:
457 454 case M_STARTI:
458 455 case M_STOPI:
459 456 /*
460 457 * Prefix an M_PROTO header to msg and pass upstream.
461 458 */
462 459 if ((mp = add_ctl_info(q, mp)) == NULL) {
463 460 /*
464 461 * Running into memory or flow ctl problems.
465 462 */
466 463 return;
467 464 }
468 465 /* FALL THROUGH */
469 466
470 467 default:
471 468 putnext(q, mp);
472 469 break;
473 470 }
474 471 }
475 472 }
476 473
477 474 /*
478 475 * pcktwput - Module write queue put procedure.
479 476 * All messages are send downstream unchanged
480 477 */
481 478
482 479 static void
483 480 pcktwput(
484 481 queue_t *q, /* Pointer to the read queue */
485 482 mblk_t *mp) /* Pointer to current message block */
486 483 {
487 484 putnext(q, mp);
488 485 }
489 486
490 487 #ifdef _MULTI_DATAMODEL
491 488 /*
492 489 * reallocb - copy the data block from the given message block into a new block.
493 490 * This function is used in case data block had another message block
494 491 * pointing to it (and hence we just copy this one data block).
495 492 *
496 493 * Returns new message block if successful. On failure it returns NULL.
497 494 * It also tries to do a qbufcall and if that also fails,
498 495 * it frees the message block.
499 496 */
500 497 static mblk_t *
501 498 pckt_reallocb(
502 499 queue_t *q, /* Pointer to the read queue */
503 500 mblk_t *mp /* Pointer to the message block to be changed */
504 501 )
505 502 {
506 503 mblk_t *nmp;
507 504
508 505 ASSERT(mp->b_datap->db_ref >= 1);
509 506
510 507 /*
511 508 * No reallocation is needed if there is only one reference
512 509 * to this data block.
513 510 */
514 511 if (mp->b_datap->db_ref == 1)
515 512 return (mp);
516 513
517 514 if ((nmp = copyb(mp)) == NULL) {
518 515 struct pckt_info *pip = (struct pckt_info *)q->q_ptr;
519 516
520 517 noenable(q);
521 518 if (pip->pi_bufcall_id = qbufcall(q, mp->b_wptr - mp->b_rptr,
522 519 BPRI_MED, add_ctl_wkup, q)) {
523 520 /*
524 521 * Put the message back onto the q.
525 522 */
526 523 (void) putq(q, mp);
527 524 } else {
528 525 /*
529 526 * Things are pretty bad and serious if bufcall fails!
530 527 * Drop the message in this case.
531 528 */
532 529 freemsg(mp);
533 530 }
534 531 return ((mblk_t *)0);
535 532 }
536 533
537 534 nmp->b_cont = mp->b_cont;
538 535 freeb(mp);
539 536 return (nmp);
540 537 }
541 538 #endif /* _MULTI_DATAMODEL */
542 539
543 540 /*
544 541 * add_ctl_info: add message control information to in coming
545 542 * message.
546 543 */
547 544 static mblk_t *
548 545 add_ctl_info(
549 546 queue_t *q, /* pointer to the read queue */
550 547 mblk_t *mp) /* pointer to the raw data input message */
551 548 {
552 549 struct pckt_info *pip = (struct pckt_info *)q->q_ptr;
553 550 mblk_t *bp; /* pointer to the unmodified message block */
554 551
555 552 /*
556 553 * Waiting on space for previous message?
557 554 */
558 555 if (pip->pi_bufcall_id) {
559 556 /*
560 557 * Chain this message on to q for later processing.
561 558 */
562 559 (void) putq(q, mp);
563 560 return (NULL);
564 561 }
565 562
566 563 /*
567 564 * Need to add the message block header as
568 565 * an M_PROTO type message.
569 566 */
570 567 if ((bp = allocb(sizeof (char), BPRI_MED)) == (mblk_t *)NULL) {
571 568
572 569 /*
573 570 * There are two reasons to disable the q:
574 571 * (1) Flow control reasons should not wake up the q.
575 572 * (2) High priority messages will wakeup the q
576 573 * immediately. Disallow this.
577 574 */
578 575 noenable(q);
579 576 if (pip->pi_bufcall_id = qbufcall(q, sizeof (char), BPRI_MED,
580 577 add_ctl_wkup, q)) {
581 578 /*
582 579 * Add the message to the q.
583 580 */
584 581 (void) putq(q, mp);
585 582 } else {
586 583 /*
587 584 * Things are pretty bad and serious if bufcall fails!
588 585 * Drop the message in this case.
589 586 */
590 587 freemsg(mp);
591 588 }
592 589
593 590 return (NULL);
594 591 }
595 592
596 593 /*
597 594 * Copy the message type information to this message.
598 595 */
599 596 bp->b_datap->db_type = M_PROTO;
600 597 *(unsigned char *)bp->b_rptr = mp->b_datap->db_type;
601 598 bp->b_wptr++;
602 599
603 600 #ifdef _MULTI_DATAMODEL
604 601 /*
605 602 * Check the datamodel and if the calling program is
606 603 * an ILP32 application then we covert the M_IOCTLs and M_READs
607 604 * into the native ILP32 format before passing them upstream
608 605 * to user mode.
609 606 */
610 607 switch (pip->model) {
611 608 case DDI_MODEL_ILP32:
612 609 switch (mp->b_datap->db_type) {
613 610 /*
614 611 * This structure must have the same shape as
615 612 * the * ILP32 compilation of `struct iocblk'
616 613 * from <sys/stream.h>.
617 614 */
618 615 struct iocblk32 {
619 616 int32_t ioc_cmd;
620 617 caddr32_t ioc_cr;
621 618 uint32_t ioc_id;
622 619 int32_t ioc_count;
623 620 int32_t ioc_error;
624 621 int32_t ioc_rval;
625 622 int32_t ioc_fill1;
626 623 uint32_t ioc_flag;
627 624 int32_t ioc_filler[2];
628 625 } niocblk_32;
629 626 struct iocblk *iocblk_64;
630 627
631 628 case M_IOCTL:
632 629 if ((mp = pckt_reallocb(q, mp)) == (mblk_t *)0)
633 630 return ((mblk_t *)0);
634 631
635 632 bzero(&niocblk_32, sizeof (niocblk_32));
636 633 iocblk_64 = (struct iocblk *)mp->b_rptr;
637 634
638 635 /* Leave the pointer to cred_t structure as it is. */
639 636 niocblk_32.ioc_cmd = iocblk_64->ioc_cmd;
640 637 niocblk_32.ioc_cr = (caddr32_t)(uintptr_t)
641 638 iocblk_64->ioc_cr;
642 639 niocblk_32.ioc_id = iocblk_64->ioc_id;
643 640 niocblk_32.ioc_count = iocblk_64->ioc_count;
644 641 niocblk_32.ioc_error = iocblk_64->ioc_error;
645 642 niocblk_32.ioc_rval = iocblk_64->ioc_rval;
646 643 niocblk_32.ioc_flag = iocblk_64->ioc_flag;
647 644
648 645 /* Copy the iocblk structure for ILP32 back */
649 646 *(struct iocblk32 *)mp->b_rptr = niocblk_32;
650 647 mp->b_wptr = mp->b_rptr + sizeof (struct iocblk32);
651 648 break;
652 649
653 650 case M_READ:
654 651 if ((mp = pckt_reallocb(q, mp)) == (mblk_t *)0)
655 652 return ((mblk_t *)0);
656 653
657 654 /* change the size_t to size32_t for ILP32 */
658 655 *(size32_t *)mp->b_rptr = *(size_t *)mp->b_rptr;
659 656 mp->b_wptr = mp->b_rptr + sizeof (size32_t);
660 657 break;
661 658 }
662 659 break;
663 660
664 661 case DATAMODEL_NONE:
665 662 break;
666 663 }
667 664 #endif /* _MULTI_DATAMODEL */
668 665
669 666 /*
670 667 * Now change the orginal message type to M_DATA and tie them up.
671 668 */
672 669 mp->b_datap->db_type = M_DATA;
673 670 bp->b_cont = mp;
674 671
675 672 return (bp);
676 673 }
677 674
678 675 static void
679 676 add_ctl_wkup(void *arg)
680 677 {
681 678 queue_t *q = arg; /* ptr to the read queue */
682 679 struct pckt_info *pip = (struct pckt_info *)q->q_ptr;
683 680
684 681 pip->pi_bufcall_id = 0;
685 682 /*
686 683 * Allow enabling of the q to allow the service
687 684 * function to do its job.
688 685 *
689 686 * Also, qenable() to schedule the q immediately.
690 687 * This is to ensure timely processing of high priority
691 688 * messages if they are on the q.
692 689 */
693 690 enableok(q);
694 691 qenable(q);
695 692 }
↓ open down ↓ |
457 lines elided |
↑ open up ↑ |
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX