9proc.c (14629B)
1 #include "stdinc.h" 2 3 #include "9.h" 4 #include "dat.h" 5 #include "fns.h" 6 7 enum { 8 NConInit = 128, 9 NMsgInit = 384, 10 NMsgProcInit = 64, 11 NMsizeInit = 8192+IOHDRSZ, 12 }; 13 14 static struct { 15 QLock alock; /* alloc */ 16 Msg* ahead; 17 Rendez arendez; 18 19 int maxmsg; 20 int nmsg; 21 int nmsgstarve; 22 23 QLock rlock; /* read */ 24 Msg* rhead; 25 Msg* rtail; 26 Rendez rrendez; 27 28 int maxproc; 29 int nproc; 30 int nprocstarve; 31 32 u32int msize; /* immutable */ 33 } mbox; 34 35 static struct { 36 QLock alock; /* alloc */ 37 Con* ahead; 38 Rendez arendez; 39 40 RWLock clock; 41 Con* chead; 42 Con* ctail; 43 44 int maxcon; 45 int ncon; 46 int nconstarve; 47 48 u32int msize; 49 } cbox; 50 51 static void 52 conFree(Con* con) 53 { 54 assert(con->version == nil); 55 assert(con->mhead == nil); 56 assert(con->whead == nil); 57 assert(con->nfid == 0); 58 assert(con->state == ConMoribund); 59 60 if(con->fd >= 0){ 61 close(con->fd); 62 con->fd = -1; 63 } 64 con->state = ConDead; 65 con->aok = 0; 66 con->flags = 0; 67 con->isconsole = 0; 68 69 qlock(&cbox.alock); 70 if(con->cprev != nil) 71 con->cprev->cnext = con->cnext; 72 else 73 cbox.chead = con->cnext; 74 if(con->cnext != nil) 75 con->cnext->cprev = con->cprev; 76 else 77 cbox.ctail = con->cprev; 78 con->cprev = con->cnext = nil; 79 80 if(cbox.ncon > cbox.maxcon){ 81 if(con->name != nil) 82 vtfree(con->name); 83 vtfree(con->data); 84 vtfree(con); 85 cbox.ncon--; 86 qunlock(&cbox.alock); 87 return; 88 } 89 con->anext = cbox.ahead; 90 cbox.ahead = con; 91 if(con->anext == nil) 92 rwakeup(&cbox.arendez); 93 qunlock(&cbox.alock); 94 } 95 96 static void 97 msgFree(Msg* m) 98 { 99 assert(m->rwnext == nil); 100 assert(m->flush == nil); 101 102 qlock(&mbox.alock); 103 if(mbox.nmsg > mbox.maxmsg){ 104 vtfree(m->data); 105 vtfree(m); 106 mbox.nmsg--; 107 qunlock(&mbox.alock); 108 return; 109 } 110 m->anext = mbox.ahead; 111 mbox.ahead = m; 112 if(m->anext == nil) 113 rwakeup(&mbox.arendez); 114 qunlock(&mbox.alock); 115 } 116 117 static Msg* 118 msgAlloc(Con* con) 119 { 120 Msg *m; 121 122 qlock(&mbox.alock); 123 while(mbox.ahead == nil){ 124 if(mbox.nmsg >= mbox.maxmsg){ 125 mbox.nmsgstarve++; 126 rsleep(&mbox.arendez); 127 continue; 128 } 129 m = vtmallocz(sizeof(Msg)); 130 m->data = vtmalloc(mbox.msize); 131 m->msize = mbox.msize; 132 mbox.nmsg++; 133 mbox.ahead = m; 134 break; 135 } 136 m = mbox.ahead; 137 mbox.ahead = m->anext; 138 m->anext = nil; 139 qunlock(&mbox.alock); 140 141 m->con = con; 142 m->state = MsgR; 143 m->nowq = 0; 144 145 return m; 146 } 147 148 static void 149 msgMunlink(Msg* m) 150 { 151 Con *con; 152 153 con = m->con; 154 155 if(m->mprev != nil) 156 m->mprev->mnext = m->mnext; 157 else 158 con->mhead = m->mnext; 159 if(m->mnext != nil) 160 m->mnext->mprev = m->mprev; 161 else 162 con->mtail = m->mprev; 163 m->mprev = m->mnext = nil; 164 } 165 166 void 167 msgFlush(Msg* m) 168 { 169 Con *con; 170 Msg *flush, *old; 171 172 con = m->con; 173 174 if(Dflag) 175 fprint(2, "msgFlush %F\n", &m->t); 176 177 /* 178 * If this Tflush has been flushed, nothing to do. 179 * Look for the message to be flushed in the 180 * queue of all messages still on this connection. 181 * If it's not found must assume Elvis has already 182 * left the building and reply normally. 183 */ 184 qlock(&con->mlock); 185 if(m->state == MsgF){ 186 qunlock(&con->mlock); 187 return; 188 } 189 for(old = con->mhead; old != nil; old = old->mnext) 190 if(old->t.tag == m->t.oldtag) 191 break; 192 if(old == nil){ 193 if(Dflag) 194 fprint(2, "msgFlush: cannot find %d\n", m->t.oldtag); 195 qunlock(&con->mlock); 196 return; 197 } 198 199 if(Dflag) 200 fprint(2, "\tmsgFlush found %F\n", &old->t); 201 202 /* 203 * Found it. 204 * There are two cases where the old message can be 205 * truly flushed and no reply to the original message given. 206 * The first is when the old message is in MsgR state; no 207 * processing has been done yet and it is still on the read 208 * queue. The second is if old is a Tflush, which doesn't 209 * affect the server state. In both cases, put the old 210 * message into MsgF state and let MsgWrite toss it after 211 * pulling it off the queue. 212 */ 213 if(old->state == MsgR || old->t.type == Tflush){ 214 old->state = MsgF; 215 if(Dflag) 216 fprint(2, "msgFlush: change %d from MsgR to MsgF\n", 217 m->t.oldtag); 218 } 219 220 /* 221 * Link this flush message and the old message 222 * so multiple flushes can be coalesced (if there are 223 * multiple Tflush messages for a particular pending 224 * request, it is only necessary to respond to the last 225 * one, so any previous can be removed) and to be 226 * sure flushes wait for their corresponding old 227 * message to go out first. 228 * Waiting flush messages do not go on the write queue, 229 * they are processed after the old message is dealt 230 * with. There's no real need to protect the setting of 231 * Msg.nowq, the only code to check it runs in this 232 * process after this routine returns. 233 */ 234 if((flush = old->flush) != nil){ 235 if(Dflag) 236 fprint(2, "msgFlush: remove %d from %d list\n", 237 old->flush->t.tag, old->t.tag); 238 m->flush = flush->flush; 239 flush->flush = nil; 240 msgMunlink(flush); 241 msgFree(flush); 242 } 243 old->flush = m; 244 m->nowq = 1; 245 246 if(Dflag) 247 fprint(2, "msgFlush: add %d to %d queue\n", 248 m->t.tag, old->t.tag); 249 qunlock(&con->mlock); 250 } 251 252 static void 253 msgProc(void* v) 254 { 255 Msg *m; 256 char e[ERRMAX]; 257 Con *con; 258 259 USED(v); 260 threadsetname("msgProc"); 261 262 for(;;){ 263 /* 264 * If surplus to requirements, exit. 265 * If not, wait for and pull a message off 266 * the read queue. 267 */ 268 qlock(&mbox.rlock); 269 if(mbox.nproc > mbox.maxproc){ 270 mbox.nproc--; 271 qunlock(&mbox.rlock); 272 break; 273 } 274 while(mbox.rhead == nil) 275 rsleep(&mbox.rrendez); 276 m = mbox.rhead; 277 mbox.rhead = m->rwnext; 278 m->rwnext = nil; 279 qunlock(&mbox.rlock); 280 281 con = m->con; 282 *e = 0; 283 284 /* 285 * If the message has been flushed before 286 * any 9P processing has started, mark it so 287 * none will be attempted. 288 */ 289 qlock(&con->mlock); 290 if(m->state == MsgF) 291 strcpy(e, "flushed"); 292 else 293 m->state = Msg9; 294 qunlock(&con->mlock); 295 296 if(*e == 0){ 297 /* 298 * explain this 299 */ 300 qlock(&con->lock); 301 if(m->t.type == Tversion){ 302 con->version = m; 303 con->state = ConDown; 304 while(con->mhead != m) 305 rsleep(&con->rendez); 306 assert(con->state == ConDown); 307 if(con->version == m){ 308 con->version = nil; 309 con->state = ConInit; 310 } 311 else 312 strcpy(e, "Tversion aborted"); 313 } 314 else if(con->state != ConUp) 315 strcpy(e, "connection not ready"); 316 qunlock(&con->lock); 317 } 318 319 /* 320 * Dispatch if not error already. 321 */ 322 m->r.tag = m->t.tag; 323 if(*e == 0 && !(*rFcall[m->t.type])(m)) 324 rerrstr(e, sizeof e); 325 if(*e != 0){ 326 m->r.type = Rerror; 327 m->r.ename = e; 328 } 329 else 330 m->r.type = m->t.type+1; 331 332 /* 333 * Put the message (with reply) on the 334 * write queue and wakeup the write process. 335 */ 336 if(!m->nowq){ 337 qlock(&con->wlock); 338 if(con->whead == nil) 339 con->whead = m; 340 else 341 con->wtail->rwnext = m; 342 con->wtail = m; 343 rwakeup(&con->wrendez); 344 qunlock(&con->wlock); 345 } 346 } 347 } 348 349 static void 350 msgRead(void* v) 351 { 352 Msg *m; 353 Con *con; 354 int eof, fd, n; 355 356 threadsetname("msgRead"); 357 358 con = v; 359 fd = con->fd; 360 eof = 0; 361 362 while(!eof){ 363 m = msgAlloc(con); 364 365 while((n = read9pmsg(fd, m->data, con->msize)) == 0) 366 ; 367 if(n < 0){ 368 m->t.type = Tversion; 369 m->t.fid = NOFID; 370 m->t.tag = NOTAG; 371 m->t.msize = con->msize; 372 m->t.version = "9PEoF"; 373 eof = 1; 374 } 375 else if(convM2S(m->data, n, &m->t) != n){ 376 if(Dflag) 377 fprint(2, "msgRead: convM2S error: %s\n", 378 con->name); 379 msgFree(m); 380 continue; 381 } 382 if(Dflag) 383 fprint(2, "msgRead %p: t %F\n", con, &m->t); 384 385 qlock(&con->mlock); 386 if(con->mtail != nil){ 387 m->mprev = con->mtail; 388 con->mtail->mnext = m; 389 } 390 else{ 391 con->mhead = m; 392 m->mprev = nil; 393 } 394 con->mtail = m; 395 qunlock(&con->mlock); 396 397 qlock(&mbox.rlock); 398 if(mbox.rhead == nil){ 399 mbox.rhead = m; 400 if(!rwakeup(&mbox.rrendez)){ 401 if(mbox.nproc < mbox.maxproc){ 402 if(proccreate(msgProc, nil, STACK) > 0) 403 mbox.nproc++; 404 } 405 else 406 mbox.nprocstarve++; 407 } 408 /* 409 * don't need this surely? 410 rwakeup(&mbox.rrendez); 411 */ 412 } 413 else 414 mbox.rtail->rwnext = m; 415 mbox.rtail = m; 416 qunlock(&mbox.rlock); 417 } 418 } 419 420 static void 421 msgWrite(void* v) 422 { 423 Con *con; 424 int eof, n; 425 Msg *flush, *m; 426 427 threadsetname("msgWrite"); 428 429 con = v; 430 if(proccreate(msgRead, con, STACK) < 0){ 431 conFree(con); 432 return; 433 } 434 435 for(;;){ 436 /* 437 * Wait for and pull a message off the write queue. 438 */ 439 qlock(&con->wlock); 440 while(con->whead == nil) 441 rsleep(&con->wrendez); 442 m = con->whead; 443 con->whead = m->rwnext; 444 m->rwnext = nil; 445 assert(!m->nowq); 446 qunlock(&con->wlock); 447 448 eof = 0; 449 450 /* 451 * Write each message (if it hasn't been flushed) 452 * followed by any messages waiting for it to complete. 453 */ 454 qlock(&con->mlock); 455 while(m != nil){ 456 msgMunlink(m); 457 458 if(Dflag) 459 fprint(2, "msgWrite %d: r %F\n", 460 m->state, &m->r); 461 462 if(m->state != MsgF){ 463 m->state = MsgW; 464 qunlock(&con->mlock); 465 466 n = convS2M(&m->r, con->data, con->msize); 467 if(write(con->fd, con->data, n) != n) 468 eof = 1; 469 470 qlock(&con->mlock); 471 } 472 473 if((flush = m->flush) != nil){ 474 assert(flush->nowq); 475 m->flush = nil; 476 } 477 msgFree(m); 478 m = flush; 479 } 480 qunlock(&con->mlock); 481 482 qlock(&con->lock); 483 if(eof && con->fd >= 0){ 484 close(con->fd); 485 con->fd = -1; 486 } 487 if(con->state == ConDown) 488 rwakeup(&con->rendez); 489 if(con->state == ConMoribund && con->mhead == nil){ 490 qunlock(&con->lock); 491 conFree(con); 492 break; 493 } 494 qunlock(&con->lock); 495 } 496 } 497 498 Con* 499 conAlloc(int fd, char* name, int flags) 500 { 501 Con *con; 502 char buf[128], *p; 503 int rfd, n; 504 505 qlock(&cbox.alock); 506 while(cbox.ahead == nil){ 507 if(cbox.ncon >= cbox.maxcon){ 508 cbox.nconstarve++; 509 rsleep(&cbox.arendez); 510 continue; 511 } 512 con = vtmallocz(sizeof(Con)); 513 con->rendez.l = &con->lock; 514 con->data = vtmalloc(cbox.msize); 515 con->msize = cbox.msize; 516 con->mrendez.l = &con->mlock; 517 con->wrendez.l = &con->wlock; 518 519 cbox.ncon++; 520 cbox.ahead = con; 521 break; 522 } 523 con = cbox.ahead; 524 cbox.ahead = con->anext; 525 con->anext = nil; 526 527 if(cbox.ctail != nil){ 528 con->cprev = cbox.ctail; 529 cbox.ctail->cnext = con; 530 } 531 else{ 532 cbox.chead = con; 533 con->cprev = nil; 534 } 535 cbox.ctail = con; 536 537 assert(con->mhead == nil); 538 assert(con->whead == nil); 539 assert(con->fhead == nil); 540 assert(con->nfid == 0); 541 542 con->state = ConNew; 543 con->fd = fd; 544 if(con->name != nil){ 545 vtfree(con->name); 546 con->name = nil; 547 } 548 if(name != nil) 549 con->name = vtstrdup(name); 550 else 551 con->name = vtstrdup("unknown"); 552 con->remote[0] = 0; 553 snprint(buf, sizeof buf, "%s/remote", con->name); 554 if((rfd = open(buf, OREAD)) >= 0){ 555 n = read(rfd, buf, sizeof buf-1); 556 close(rfd); 557 if(n > 0){ 558 buf[n] = 0; 559 if((p = strchr(buf, '\n')) != nil) 560 *p = 0; 561 strecpy(con->remote, con->remote+sizeof con->remote, buf); 562 } 563 } 564 con->flags = flags; 565 con->isconsole = 0; 566 qunlock(&cbox.alock); 567 568 if(proccreate(msgWrite, con, STACK) < 0){ 569 conFree(con); 570 return nil; 571 } 572 573 return con; 574 } 575 576 static int 577 cmdMsg(int argc, char* argv[]) 578 { 579 char *p; 580 char *usage = "usage: msg [-m nmsg] [-p nproc]"; 581 int maxmsg, nmsg, nmsgstarve, maxproc, nproc, nprocstarve; 582 583 maxmsg = maxproc = 0; 584 585 ARGBEGIN{ 586 default: 587 return cliError(usage); 588 case 'm': 589 p = ARGF(); 590 if(p == nil) 591 return cliError(usage); 592 maxmsg = strtol(argv[0], &p, 0); 593 if(maxmsg <= 0 || p == argv[0] || *p != '\0') 594 return cliError(usage); 595 break; 596 case 'p': 597 p = ARGF(); 598 if(p == nil) 599 return cliError(usage); 600 maxproc = strtol(argv[0], &p, 0); 601 if(maxproc <= 0 || p == argv[0] || *p != '\0') 602 return cliError(usage); 603 break; 604 }ARGEND 605 if(argc) 606 return cliError(usage); 607 608 qlock(&mbox.alock); 609 if(maxmsg) 610 mbox.maxmsg = maxmsg; 611 maxmsg = mbox.maxmsg; 612 nmsg = mbox.nmsg; 613 nmsgstarve = mbox.nmsgstarve; 614 qunlock(&mbox.alock); 615 616 qlock(&mbox.rlock); 617 if(maxproc) 618 mbox.maxproc = maxproc; 619 maxproc = mbox.maxproc; 620 nproc = mbox.nproc; 621 nprocstarve = mbox.nprocstarve; 622 qunlock(&mbox.rlock); 623 624 consPrint("\tmsg -m %d -p %d\n", maxmsg, maxproc); 625 consPrint("\tnmsg %d nmsgstarve %d nproc %d nprocstarve %d\n", 626 nmsg, nmsgstarve, nproc, nprocstarve); 627 628 return 1; 629 } 630 631 static int 632 scmp(Fid *a, Fid *b) 633 { 634 if(a == 0) 635 return 1; 636 if(b == 0) 637 return -1; 638 return strcmp(a->uname, b->uname); 639 } 640 641 static Fid* 642 fidMerge(Fid *a, Fid *b) 643 { 644 Fid *s, **l; 645 646 l = &s; 647 while(a || b){ 648 if(scmp(a, b) < 0){ 649 *l = a; 650 l = &a->sort; 651 a = a->sort; 652 }else{ 653 *l = b; 654 l = &b->sort; 655 b = b->sort; 656 } 657 } 658 *l = 0; 659 return s; 660 } 661 662 static Fid* 663 fidMergeSort(Fid *f) 664 { 665 int delay; 666 Fid *a, *b; 667 668 if(f == nil) 669 return nil; 670 if(f->sort == nil) 671 return f; 672 673 a = b = f; 674 delay = 1; 675 while(a && b){ 676 if(delay) /* easy way to handle 2-element list */ 677 delay = 0; 678 else 679 a = a->sort; 680 if(b = b->sort) 681 b = b->sort; 682 } 683 684 b = a->sort; 685 a->sort = nil; 686 687 a = fidMergeSort(f); 688 b = fidMergeSort(b); 689 690 return fidMerge(a, b); 691 } 692 693 static int 694 cmdWho(int argc, char* argv[]) 695 { 696 char *usage = "usage: who"; 697 int i, l1, l2, l; 698 Con *con; 699 Fid *fid, *last; 700 701 ARGBEGIN{ 702 default: 703 return cliError(usage); 704 }ARGEND 705 706 if(argc > 0) 707 return cliError(usage); 708 709 rlock(&cbox.clock); 710 l1 = 0; 711 l2 = 0; 712 for(con=cbox.chead; con; con=con->cnext){ 713 if((l = strlen(con->name)) > l1) 714 l1 = l; 715 if((l = strlen(con->remote)) > l2) 716 l2 = l; 717 } 718 for(con=cbox.chead; con; con=con->cnext){ 719 consPrint("\t%-*s %-*s", l1, con->name, l2, con->remote); 720 qlock(&con->fidlock); 721 last = nil; 722 for(i=0; i<NFidHash; i++) 723 for(fid=con->fidhash[i]; fid; fid=fid->hash) 724 if(fid->fidno != NOFID && fid->uname){ 725 fid->sort = last; 726 last = fid; 727 } 728 fid = fidMergeSort(last); 729 last = nil; 730 for(; fid; last=fid, fid=fid->sort) 731 if(last==nil || strcmp(fid->uname, last->uname) != 0) 732 consPrint(" %q", fid->uname); 733 qunlock(&con->fidlock); 734 consPrint("\n"); 735 } 736 runlock(&cbox.clock); 737 return 1; 738 } 739 740 void 741 msgInit(void) 742 { 743 mbox.arendez.l = &mbox.alock; 744 745 mbox.rrendez.l = &mbox.rlock; 746 747 mbox.maxmsg = NMsgInit; 748 mbox.maxproc = NMsgProcInit; 749 mbox.msize = NMsizeInit; 750 751 cliAddCmd("msg", cmdMsg); 752 } 753 754 static int 755 cmdCon(int argc, char* argv[]) 756 { 757 char *p; 758 Con *con; 759 char *usage = "usage: con [-m ncon]"; 760 int maxcon, ncon, nconstarve; 761 762 maxcon = 0; 763 764 ARGBEGIN{ 765 default: 766 return cliError(usage); 767 case 'm': 768 p = ARGF(); 769 if(p == nil) 770 return cliError(usage); 771 maxcon = strtol(argv[0], &p, 0); 772 if(maxcon <= 0 || p == argv[0] || *p != '\0') 773 return cliError(usage); 774 break; 775 }ARGEND 776 if(argc) 777 return cliError(usage); 778 779 wlock(&cbox.clock); 780 if(maxcon) 781 cbox.maxcon = maxcon; 782 maxcon = cbox.maxcon; 783 ncon = cbox.ncon; 784 nconstarve = cbox.nconstarve; 785 wunlock(&cbox.clock); 786 787 consPrint("\tcon -m %d\n", maxcon); 788 consPrint("\tncon %d nconstarve %d\n", ncon, nconstarve); 789 790 rlock(&cbox.clock); 791 for(con = cbox.chead; con != nil; con = con->cnext){ 792 consPrint("\t%s\n", con->name); 793 } 794 runlock(&cbox.clock); 795 796 return 1; 797 } 798 799 void 800 conInit(void) 801 { 802 cbox.arendez.l = &cbox.alock; 803 804 cbox.maxcon = NConInit; 805 cbox.msize = NMsizeInit; 806 807 cliAddCmd("con", cmdCon); 808 cliAddCmd("who", cmdWho); 809 }