9pserve.c (25691B)
1 #include <u.h> 2 #include <libc.h> 3 #include <fcall.h> 4 #include <thread.h> 5 #include <errno.h> 6 7 #define err err9pserve /* Darwin x86 */ 8 9 enum 10 { 11 STACK = 32768, 12 NHASH = 31, 13 MAXMSG = 64, /* per connection */ 14 MAXMSGSIZE = 4*1024*1024 15 }; 16 17 typedef struct Hash Hash; 18 typedef struct Fid Fid; 19 typedef struct Msg Msg; 20 typedef struct Conn Conn; 21 typedef struct Queue Queue; 22 23 struct Hash 24 { 25 Hash *next; 26 uint n; 27 void *v; 28 }; 29 30 struct Fid 31 { 32 int fid; 33 int ref; 34 int cfid; 35 int openfd; 36 int offset; 37 int coffset; 38 int isdir; 39 Fid *next; 40 }; 41 42 struct Msg 43 { 44 Conn *c; 45 int internal; 46 int sync; 47 int ref; 48 int ctag; 49 int tag; 50 int isopenfd; 51 Fcall tx; 52 Fcall rx; 53 Fid *fid; 54 Fid *newfid; 55 Fid *afid; 56 Msg *oldm; 57 Msg *next; 58 uchar *tpkt; 59 uchar *rpkt; 60 }; 61 62 struct Conn 63 { 64 int fd; 65 int fdmode; 66 Fid *fdfid; 67 int nmsg; 68 int nfid; 69 Channel *inc; 70 Channel *internal; 71 int inputstalled; 72 char dir[40]; 73 Hash *tag[NHASH]; 74 Hash *fid[NHASH]; 75 Queue *outq; 76 Queue *inq; 77 Channel *outqdead; 78 }; 79 80 char *xaname; 81 char *addr; 82 int afd; 83 char adir[40]; 84 int isunix; 85 Queue *outq; 86 Queue *inq; 87 int verbose = 0; 88 int logging = 0; 89 int msize = 8192; 90 u32int xafid = NOFID; 91 int attached; 92 int versioned; 93 int noauth; 94 95 void *gethash(Hash**, uint); 96 int puthash(Hash**, uint, void*); 97 int delhash(Hash**, uint, void*); 98 Msg *mread9p(Ioproc*, int); 99 int mwrite9p(Ioproc*, int, uchar*); 100 uchar *read9ppkt(Ioproc*, int); 101 int write9ppkt(int, uchar*); 102 Msg *msgnew(int); 103 void msgput(Msg*); 104 void msgclear(Msg*); 105 Msg *msgget(int); 106 void msgincref(Msg*); 107 Fid *fidnew(int); 108 void fidput(Fid*); 109 void *emalloc(int); 110 void *erealloc(void*, int); 111 Queue *qalloc(void); 112 int sendq(Queue*, void*); 113 void *recvq(Queue*); 114 void connthread(void*); 115 void connoutthread(void*); 116 void listenthread(void*); 117 void outputthread(void*); 118 void inputthread(void*); 119 void rewritehdr(Fcall*, uchar*); 120 void repack(Fcall*, uchar**); 121 int tlisten(char*, char*); 122 int taccept(int, char*); 123 int iolisten(Ioproc*, char*, char*); 124 int ioaccept(Ioproc*, int, char*); 125 int iorecvfd(Ioproc*, int); 126 int iosendfd(Ioproc*, int, int); 127 void mainproc(void*); 128 int ignorepipe(void*, char*); 129 int timefmt(Fmt*); 130 void dorootstat(void); 131 132 void 133 usage(void) 134 { 135 fprint(2, "usage: 9pserve [-lnv] [-A aname afid] [-c addr] [-M msize] address\n"); 136 fprint(2, "\treads/writes 9P messages on stdin/stdout\n"); 137 threadexitsall("usage"); 138 } 139 140 int 141 threadmaybackground(void) 142 { 143 return 1; 144 } 145 146 uchar vbuf[128]; 147 extern int _threaddebuglevel; 148 void 149 threadmain(int argc, char **argv) 150 { 151 char *file, *x, *addr; 152 int fd; 153 154 rfork(RFNOTEG); 155 x = getenv("verbose9pserve"); 156 if(x){ 157 verbose = atoi(x); 158 fprint(2, "verbose9pserve %s => %d\n", x, verbose); 159 } 160 ARGBEGIN{ 161 default: 162 usage(); 163 case 'A': 164 attached = 1; 165 xaname = EARGF(usage()); 166 xafid = atoi(EARGF(usage())); 167 break; 168 case 'M': 169 versioned = 1; 170 msize = atoi(EARGF(usage())); 171 break; 172 case 'c': 173 addr = netmkaddr(EARGF(usage()), "net", "9fs"); 174 if((fd = dial(addr, nil, nil, nil)) < 0) 175 sysfatal("dial %s: %r", addr); 176 dup(fd, 0); 177 dup(fd, 1); 178 if(fd > 1) 179 close(fd); 180 break; 181 case 'n': 182 noauth = 1; 183 break; 184 case 'v': 185 verbose++; 186 break; 187 case 'u': 188 isunix++; 189 break; 190 case 'l': 191 logging++; 192 break; 193 }ARGEND 194 195 if(attached && !versioned){ 196 fprint(2, "-A must be used with -M\n"); 197 usage(); 198 } 199 200 if(argc != 1) 201 usage(); 202 addr = argv[0]; 203 204 fmtinstall('T', timefmt); 205 206 if((afd = announce(addr, adir)) < 0) 207 sysfatal("announce %s: %r", addr); 208 if(logging){ 209 if(strncmp(addr, "unix!", 5) == 0) 210 addr += 5; 211 file = smprint("%s.log", addr); 212 if(file == nil) 213 sysfatal("smprint log: %r"); 214 if((fd = create(file, OWRITE, 0666)) < 0) 215 sysfatal("create %s: %r", file); 216 dup(fd, 2); 217 if(fd > 2) 218 close(fd); 219 } 220 if(verbose) fprint(2, "%T 9pserve running\n"); 221 proccreate(mainproc, nil, STACK); 222 } 223 224 void 225 mainproc(void *v) 226 { 227 int n, nn; 228 Fcall f; 229 USED(v); 230 231 atnotify(ignorepipe, 1); 232 fmtinstall('D', dirfmt); 233 fmtinstall('M', dirmodefmt); 234 fmtinstall('F', fcallfmt); 235 fmtinstall('H', encodefmt); 236 237 outq = qalloc(); 238 inq = qalloc(); 239 240 if(!versioned){ 241 f.type = Tversion; 242 f.version = "9P2000"; 243 f.msize = msize; 244 f.tag = NOTAG; 245 n = convS2M(&f, vbuf, sizeof vbuf); 246 if(n <= BIT16SZ) 247 sysfatal("convS2M conversion error"); 248 if(verbose > 1) fprint(2, "%T * <- %F\n", &f); 249 nn = write(1, vbuf, n); 250 if(n != nn) 251 sysfatal("error writing Tversion: %r\n"); 252 n = read9pmsg(0, vbuf, sizeof vbuf); 253 if(n < 0) 254 sysfatal("read9pmsg failure"); 255 if(convM2S(vbuf, n, &f) != n) 256 sysfatal("convM2S failure"); 257 if(f.msize < msize) 258 msize = f.msize; 259 if(verbose > 1) fprint(2, "%T * -> %F\n", &f); 260 } 261 262 threadcreate(inputthread, nil, STACK); 263 threadcreate(outputthread, nil, STACK); 264 265 /* if(rootfid) */ 266 /* dorootstat(); */ 267 268 threadcreate(listenthread, nil, STACK); 269 threadexits(0); 270 } 271 272 int 273 ignorepipe(void *v, char *s) 274 { 275 USED(v); 276 if(strcmp(s, "sys: write on closed pipe") == 0) 277 return 1; 278 if(strcmp(s, "sys: tstp") == 0) 279 return 1; 280 if(strcmp(s, "sys: window size change") == 0) 281 return 1; 282 fprint(2, "9pserve %s: %T note: %s\n", addr, s); 283 return 0; 284 } 285 286 void 287 listenthread(void *arg) 288 { 289 Conn *c; 290 Ioproc *io; 291 292 io = ioproc(); 293 USED(arg); 294 threadsetname("listen %s", adir); 295 for(;;){ 296 c = emalloc(sizeof(Conn)); 297 c->fd = iolisten(io, adir, c->dir); 298 if(c->fd < 0){ 299 if(verbose) fprint(2, "%T listen: %r\n"); 300 close(afd); 301 free(c); 302 return; 303 } 304 c->inc = chancreate(sizeof(void*), 0); 305 c->internal = chancreate(sizeof(void*), 0); 306 c->inq = qalloc(); 307 c->outq = qalloc(); 308 c->outqdead = chancreate(sizeof(void*), 0); 309 if(verbose) fprint(2, "%T incoming call on %s\n", c->dir); 310 threadcreate(connthread, c, STACK); 311 } 312 } 313 314 void 315 send9pmsg(Msg *m) 316 { 317 int n, nn; 318 319 n = sizeS2M(&m->rx); 320 m->rpkt = emalloc(n); 321 nn = convS2M(&m->rx, m->rpkt, n); 322 if(nn <= BIT16SZ) 323 sysfatal("convS2M conversion error"); 324 if(nn != n) 325 sysfatal("sizeS2M and convS2M disagree"); 326 sendq(m->c->outq, m); 327 } 328 329 void 330 sendomsg(Msg *m) 331 { 332 int n, nn; 333 334 n = sizeS2M(&m->tx); 335 m->tpkt = emalloc(n); 336 nn = convS2M(&m->tx, m->tpkt, n); 337 if(nn <= BIT16SZ) 338 sysfatal("convS2M conversion error"); 339 if(nn != n) 340 sysfatal("sizeS2M and convS2M disagree"); 341 sendq(outq, m); 342 } 343 344 void 345 err(Msg *m, char *ename) 346 { 347 m->rx.type = Rerror; 348 m->rx.ename = ename; 349 m->rx.tag = m->tx.tag; 350 send9pmsg(m); 351 } 352 353 char* 354 estrdup(char *s) 355 { 356 char *t; 357 358 t = emalloc(strlen(s)+1); 359 strcpy(t, s); 360 return t; 361 } 362 363 void 364 connthread(void *arg) 365 { 366 int i, fd; 367 Conn *c; 368 Hash *h, *hnext; 369 Msg *m, *om, *mm, sync; 370 Fid *f; 371 Ioproc *io; 372 373 c = arg; 374 threadsetname("conn %s", c->dir); 375 io = ioproc(); 376 fd = ioaccept(io, c->fd, c->dir); 377 if(fd < 0){ 378 if(verbose) fprint(2, "%T accept %s: %r\n", c->dir); 379 goto out; 380 } 381 close(c->fd); 382 c->fd = fd; 383 threadcreate(connoutthread, c, STACK); 384 while((m = mread9p(io, c->fd)) != nil){ 385 if(verbose > 1) fprint(2, "%T fd#%d -> %F\n", c->fd, &m->tx); 386 m->c = c; 387 m->ctag = m->tx.tag; 388 c->nmsg++; 389 if(verbose > 1) fprint(2, "%T fd#%d: new msg %p\n", c->fd, m); 390 if(puthash(c->tag, m->tx.tag, m) < 0){ 391 err(m, "duplicate tag"); 392 continue; 393 } 394 msgincref(m); 395 switch(m->tx.type){ 396 case Tversion: 397 m->rx.tag = m->tx.tag; 398 m->rx.msize = m->tx.msize; 399 if(m->rx.msize > msize) 400 m->rx.msize = msize; 401 m->rx.version = "9P2000"; 402 m->rx.type = Rversion; 403 send9pmsg(m); 404 continue; 405 case Tflush: 406 if((m->oldm = gethash(c->tag, m->tx.oldtag)) == nil){ 407 m->rx.tag = m->tx.tag; 408 m->rx.type = Rflush; 409 send9pmsg(m); 410 continue; 411 } 412 msgincref(m->oldm); 413 break; 414 case Tattach: 415 m->afid = nil; 416 if(m->tx.afid != NOFID 417 && (m->afid = gethash(c->fid, m->tx.afid)) == nil){ 418 err(m, "unknown fid"); 419 continue; 420 } 421 if(m->afid) 422 m->afid->ref++; 423 m->fid = fidnew(m->tx.fid); 424 if(puthash(c->fid, m->tx.fid, m->fid) < 0){ 425 err(m, "duplicate fid"); 426 continue; 427 } 428 m->fid->ref++; 429 if(attached && m->afid==nil){ 430 if(m->tx.aname[0] && strcmp(xaname, m->tx.aname) != 0){ 431 err(m, "invalid attach name"); 432 continue; 433 } 434 m->tx.afid = xafid; 435 m->tx.aname = xaname; 436 m->tx.uname = getuser(); /* what srv.c used */ 437 repack(&m->tx, &m->tpkt); 438 } 439 break; 440 case Twalk: 441 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){ 442 err(m, "unknown fid"); 443 continue; 444 } 445 m->fid->ref++; 446 if(m->tx.newfid == m->tx.fid){ 447 m->fid->ref++; 448 m->newfid = m->fid; 449 }else{ 450 m->newfid = fidnew(m->tx.newfid); 451 if(puthash(c->fid, m->tx.newfid, m->newfid) < 0){ 452 err(m, "duplicate fid"); 453 continue; 454 } 455 m->newfid->ref++; 456 } 457 break; 458 case Tauth: 459 if(attached){ 460 err(m, "authentication not required"); 461 continue; 462 } 463 if(noauth){ 464 err(m, "authentication rejected"); 465 continue; 466 } 467 m->afid = fidnew(m->tx.afid); 468 if(puthash(c->fid, m->tx.afid, m->afid) < 0){ 469 err(m, "duplicate fid"); 470 continue; 471 } 472 m->afid->ref++; 473 break; 474 case Tcreate: 475 if(m->tx.perm&(DMSYMLINK|DMDEVICE|DMNAMEDPIPE|DMSOCKET)){ 476 err(m, "unsupported file type"); 477 continue; 478 } 479 goto caseTopen; 480 case Topenfd: 481 if(m->tx.mode&~(OTRUNC|3)){ 482 err(m, "bad openfd mode"); 483 continue; 484 } 485 m->isopenfd = 1; 486 m->tx.type = Topen; 487 m->tpkt[4] = Topen; 488 /* fall through */ 489 caseTopen: 490 case Topen: 491 case Tclunk: 492 case Tread: 493 case Twrite: 494 case Tremove: 495 case Tstat: 496 case Twstat: 497 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){ 498 err(m, "unknown fid"); 499 continue; 500 } 501 m->fid->ref++; 502 break; 503 } 504 505 /* have everything - translate and send */ 506 m->c = c; 507 m->ctag = m->tx.tag; 508 m->tx.tag = m->tag; 509 if(m->fid) 510 m->tx.fid = m->fid->fid; 511 if(m->newfid) 512 m->tx.newfid = m->newfid->fid; 513 if(m->afid) 514 m->tx.afid = m->afid->fid; 515 if(m->oldm) 516 m->tx.oldtag = m->oldm->tag; 517 /* reference passes to outq */ 518 sendq(outq, m); 519 while(c->nmsg >= MAXMSG){ 520 c->inputstalled = 1; 521 recvp(c->inc); 522 } 523 } 524 525 if(verbose) fprint(2, "%T fd#%d eof; flushing conn\n", c->fd); 526 527 /* flush all outstanding messages */ 528 for(i=0; i<NHASH; i++){ 529 while((h = c->tag[i]) != nil){ 530 om = h->v; 531 msgincref(om); /* for us */ 532 m = msgnew(0); 533 m->internal = 1; 534 m->c = c; 535 c->nmsg++; 536 m->tx.type = Tflush; 537 m->tx.tag = m->tag; 538 m->tx.oldtag = om->tag; 539 m->oldm = om; 540 msgincref(om); 541 msgincref(m); /* for outq */ 542 sendomsg(m); 543 mm = recvp(c->internal); 544 assert(mm == m); 545 msgput(m); /* got from recvp */ 546 msgput(m); /* got from msgnew */ 547 if(delhash(c->tag, om->ctag, om) == 0) 548 msgput(om); /* got from hash table */ 549 msgput(om); /* got from msgincref */ 550 } 551 } 552 553 /* 554 * outputthread has written all its messages 555 * to the remote connection (because we've gotten all the replies!), 556 * but it might not have gotten a chance to msgput 557 * the very last one. sync up to make sure. 558 */ 559 memset(&sync, 0, sizeof sync); 560 sync.sync = 1; 561 sync.c = c; 562 sendq(outq, &sync); 563 recvp(c->outqdead); 564 565 /* everything is quiet; can close the local output queue. */ 566 sendq(c->outq, nil); 567 recvp(c->outqdead); 568 569 /* should be no messages left anywhere. */ 570 assert(c->nmsg == 0); 571 572 /* clunk all outstanding fids */ 573 for(i=0; i<NHASH; i++){ 574 for(h=c->fid[i]; h; h=hnext){ 575 f = h->v; 576 m = msgnew(0); 577 m->internal = 1; 578 m->c = c; 579 c->nmsg++; 580 m->tx.type = Tclunk; 581 m->tx.tag = m->tag; 582 m->tx.fid = f->fid; 583 m->fid = f; 584 f->ref++; 585 msgincref(m); 586 sendomsg(m); 587 mm = recvp(c->internal); 588 assert(mm == m); 589 msgclear(m); 590 msgput(m); /* got from recvp */ 591 msgput(m); /* got from msgnew */ 592 fidput(f); /* got from hash table */ 593 hnext = h->next; 594 free(h); 595 } 596 } 597 598 out: 599 closeioproc(io); 600 assert(c->nmsg == 0); 601 assert(c->nfid == 0); 602 close(c->fd); 603 chanfree(c->internal); 604 c->internal = 0; 605 chanfree(c->inc); 606 c->inc = 0; 607 free(c->inq); 608 c->inq = 0; 609 free(c); 610 } 611 612 static void 613 openfdthread(void *v) 614 { 615 Conn *c; 616 Fid *fid; 617 Msg *m; 618 int n; 619 vlong tot; 620 Ioproc *io; 621 char buf[1024]; 622 623 c = v; 624 fid = c->fdfid; 625 io = ioproc(); 626 threadsetname("openfd %s", c->fdfid); 627 tot = 0; 628 m = nil; 629 if(c->fdmode == OREAD){ 630 for(;;){ 631 if(verbose) fprint(2, "%T tread..."); 632 m = msgnew(0); 633 m->internal = 1; 634 m->c = c; 635 m->tx.type = Tread; 636 m->tx.count = msize - IOHDRSZ; 637 m->tx.fid = fid->fid; 638 m->tx.tag = m->tag; 639 m->tx.offset = tot; 640 m->fid = fid; 641 fid->ref++; 642 msgincref(m); 643 sendomsg(m); 644 recvp(c->internal); 645 if(m->rx.type == Rerror){ 646 /* fprint(2, "%T read error: %s\n", m->rx.ename); */ 647 break; 648 } 649 if(m->rx.count == 0) 650 break; 651 tot += m->rx.count; 652 if(iowrite(io, c->fd, m->rx.data, m->rx.count) != m->rx.count){ 653 /* fprint(2, "%T pipe write error: %r\n"); */ 654 break; 655 } 656 msgput(m); 657 msgput(m); 658 m = nil; 659 } 660 }else{ 661 for(;;){ 662 if(verbose) fprint(2, "%T twrite..."); 663 n = sizeof buf; 664 if(n > msize) 665 n = msize; 666 if((n=ioread(io, c->fd, buf, n)) <= 0){ 667 if(n < 0) 668 fprint(2, "%T pipe read error: %r\n"); 669 break; 670 } 671 m = msgnew(0); 672 m->internal = 1; 673 m->c = c; 674 m->tx.type = Twrite; 675 m->tx.fid = fid->fid; 676 m->tx.data = buf; 677 m->tx.count = n; 678 m->tx.tag = m->tag; 679 m->tx.offset = tot; 680 m->fid = fid; 681 fid->ref++; 682 msgincref(m); 683 sendomsg(m); 684 recvp(c->internal); 685 if(m->rx.type == Rerror){ 686 /* fprint(2, "%T write error: %s\n", m->rx.ename); */ 687 } 688 tot += n; 689 msgput(m); 690 msgput(m); 691 m = nil; 692 } 693 } 694 if(verbose) fprint(2, "%T eof on %d fid %d\n", c->fd, fid->fid); 695 close(c->fd); 696 closeioproc(io); 697 if(m){ 698 msgput(m); 699 msgput(m); 700 } 701 if(verbose) fprint(2, "%T eof on %d fid %d ref %d\n", c->fd, fid->fid, fid->ref); 702 if(--fid->openfd == 0){ 703 m = msgnew(0); 704 m->internal = 1; 705 m->c = c; 706 m->tx.type = Tclunk; 707 m->tx.tag = m->tag; 708 m->tx.fid = fid->fid; 709 m->fid = fid; 710 fid->ref++; 711 msgincref(m); 712 sendomsg(m); 713 recvp(c->internal); 714 msgput(m); 715 msgput(m); 716 } 717 fidput(fid); 718 c->fdfid = nil; 719 chanfree(c->internal); 720 c->internal = 0; 721 free(c); 722 } 723 724 int 725 xopenfd(Msg *m) 726 { 727 char errs[ERRMAX]; 728 int n, p[2]; 729 Conn *nc; 730 731 if(pipe(p) < 0){ 732 rerrstr(errs, sizeof errs); 733 err(m, errs); 734 /* XXX return here? */ 735 } 736 if(verbose) fprint(2, "%T xopen pipe %d %d...", p[0], p[1]); 737 738 /* now we're committed. */ 739 740 /* a new connection for this fid */ 741 nc = emalloc(sizeof(Conn)); 742 nc->internal = chancreate(sizeof(void*), 0); 743 744 /* a ref for us */ 745 nc->fdfid = m->fid; 746 m->fid->ref++; 747 nc->fdfid->openfd++; 748 nc->fdmode = m->tx.mode; 749 nc->fd = p[0]; 750 751 /* a thread to tend the pipe */ 752 threadcreate(openfdthread, nc, STACK); 753 754 /* if mode is ORDWR, that openfdthread will write; start a reader */ 755 if((m->tx.mode&3) == ORDWR){ 756 nc = emalloc(sizeof(Conn)); 757 nc->internal = chancreate(sizeof(void*), 0); 758 nc->fdfid = m->fid; 759 m->fid->ref++; 760 nc->fdfid->openfd++; 761 nc->fdmode = OREAD; 762 nc->fd = dup(p[0], -1); 763 threadcreate(openfdthread, nc, STACK); 764 } 765 766 /* steal fid from other connection */ 767 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0) 768 fidput(m->fid); 769 770 /* rewrite as Ropenfd */ 771 m->rx.type = Ropenfd; 772 n = GBIT32(m->rpkt); 773 m->rpkt = erealloc(m->rpkt, n+4); 774 PBIT32(m->rpkt+n, p[1]); 775 n += 4; 776 PBIT32(m->rpkt, n); 777 m->rpkt[4] = Ropenfd; 778 m->rx.unixfd = p[1]; 779 return 0; 780 } 781 782 void 783 connoutthread(void *arg) 784 { 785 int err; 786 Conn *c; 787 Msg *m, *om; 788 Ioproc *io; 789 790 c = arg; 791 io = ioproc(); 792 threadsetname("connout %s", c->dir); 793 while((m = recvq(c->outq)) != nil){ 794 err = m->tx.type+1 != m->rx.type; 795 if(!err && m->isopenfd) 796 if(xopenfd(m) < 0) 797 continue; 798 switch(m->tx.type){ 799 case Tflush: 800 om = m->oldm; 801 if(om) 802 if(delhash(om->c->tag, om->ctag, om) == 0) 803 msgput(om); 804 break; 805 case Tclunk: 806 case Tremove: 807 if(m->fid) 808 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0) 809 fidput(m->fid); 810 break; 811 case Tauth: 812 if(err && m->afid){ 813 if(verbose) fprint(2, "%T auth error\n"); 814 if(delhash(m->c->fid, m->afid->cfid, m->afid) == 0) 815 fidput(m->afid); 816 } 817 break; 818 case Tattach: 819 if(err && m->fid) 820 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0) 821 fidput(m->fid); 822 break; 823 case Twalk: 824 if(err || m->rx.nwqid < m->tx.nwname) 825 if(m->tx.fid != m->tx.newfid && m->newfid) 826 if(delhash(m->c->fid, m->newfid->cfid, m->newfid) == 0) 827 fidput(m->newfid); 828 break; 829 case Tread: 830 break; 831 case Tstat: 832 break; 833 case Topen: 834 case Tcreate: 835 m->fid->isdir = (m->rx.qid.type & QTDIR); 836 break; 837 } 838 if(delhash(m->c->tag, m->ctag, m) == 0) 839 msgput(m); 840 if(verbose > 1) fprint(2, "%T fd#%d <- %F\n", c->fd, &m->rx); 841 rewritehdr(&m->rx, m->rpkt); 842 if(mwrite9p(io, c->fd, m->rpkt) < 0) 843 if(verbose) fprint(2, "%T write error: %r\n"); 844 msgput(m); 845 if(c->inputstalled && c->nmsg < MAXMSG) 846 nbsendp(c->inc, 0); 847 } 848 closeioproc(io); 849 free(c->outq); 850 c->outq = nil; 851 sendp(c->outqdead, nil); 852 } 853 854 void 855 outputthread(void *arg) 856 { 857 Msg *m; 858 Ioproc *io; 859 860 USED(arg); 861 io = ioproc(); 862 threadsetname("output"); 863 while((m = recvq(outq)) != nil){ 864 if(m->sync){ 865 sendp(m->c->outqdead, nil); 866 continue; 867 } 868 if(verbose > 1) fprint(2, "%T * <- %F\n", &m->tx); 869 rewritehdr(&m->tx, m->tpkt); 870 if(mwrite9p(io, 1, m->tpkt) < 0) 871 sysfatal("output error: %r"); 872 msgput(m); 873 } 874 closeioproc(io); 875 fprint(2, "%T output eof\n"); 876 threadexitsall(0); 877 } 878 879 void 880 inputthread(void *arg) 881 { 882 uchar *pkt; 883 int n, nn, tag; 884 Msg *m; 885 Ioproc *io; 886 887 threadsetname("input"); 888 if(verbose) fprint(2, "%T input thread\n"); 889 io = ioproc(); 890 USED(arg); 891 while((pkt = read9ppkt(io, 0)) != nil){ 892 n = GBIT32(pkt); 893 if(n < 7){ 894 fprint(2, "%T short 9P packet from server\n"); 895 free(pkt); 896 continue; 897 } 898 if(verbose > 2) fprint(2, "%T read %.*H\n", n, pkt); 899 tag = GBIT16(pkt+5); 900 if((m = msgget(tag)) == nil){ 901 fprint(2, "%T unexpected 9P response tag %d\n", tag); 902 free(pkt); 903 continue; 904 } 905 if((nn = convM2S(pkt, n, &m->rx)) != n){ 906 fprint(2, "%T bad packet - convM2S %d but %d\n", nn, n); 907 free(pkt); 908 msgput(m); 909 continue; 910 } 911 if(verbose > 1) fprint(2, "%T * -> %F%s\n", &m->rx, 912 m->internal ? " (internal)" : ""); 913 m->rpkt = pkt; 914 m->rx.tag = m->ctag; 915 if(m->internal) 916 sendp(m->c->internal, m); 917 else if(m->c->outq) 918 sendq(m->c->outq, m); 919 else 920 msgput(m); 921 } 922 closeioproc(io); 923 /*fprint(2, "%T input eof\n"); */ 924 threadexitsall(0); 925 } 926 927 void* 928 gethash(Hash **ht, uint n) 929 { 930 Hash *h; 931 932 for(h=ht[n%NHASH]; h; h=h->next) 933 if(h->n == n) 934 return h->v; 935 return nil; 936 } 937 938 int 939 delhash(Hash **ht, uint n, void *v) 940 { 941 Hash *h, **l; 942 943 for(l=&ht[n%NHASH]; h=*l; l=&h->next) 944 if(h->n == n){ 945 if(h->v != v){ 946 if(verbose) fprint(2, "%T delhash %d got %p want %p\n", n, h->v, v); 947 return -1; 948 } 949 *l = h->next; 950 free(h); 951 return 0; 952 } 953 return -1; 954 } 955 956 int 957 puthash(Hash **ht, uint n, void *v) 958 { 959 Hash *h; 960 961 if(gethash(ht, n)) 962 return -1; 963 h = emalloc(sizeof(Hash)); 964 h->next = ht[n%NHASH]; 965 h->n = n; 966 h->v = v; 967 ht[n%NHASH] = h; 968 return 0; 969 } 970 971 Fid **fidtab; 972 int nfidtab; 973 Fid *freefid; 974 975 Fid* 976 fidnew(int cfid) 977 { 978 Fid *f; 979 980 if(freefid == nil){ 981 fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0])); 982 if(nfidtab == xafid){ 983 fidtab[nfidtab++] = nil; 984 fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0])); 985 } 986 fidtab[nfidtab] = emalloc(sizeof(Fid)); 987 freefid = fidtab[nfidtab]; 988 freefid->fid = nfidtab++; 989 } 990 f = freefid; 991 freefid = f->next; 992 f->cfid = cfid; 993 f->ref = 1; 994 f->offset = 0; 995 f->coffset = 0; 996 f->isdir = -1; 997 return f; 998 } 999 1000 void 1001 fidput(Fid *f) 1002 { 1003 if(f == nil) 1004 return; 1005 assert(f->ref > 0); 1006 if(--f->ref > 0) 1007 return; 1008 f->next = freefid; 1009 f->cfid = -1; 1010 freefid = f; 1011 } 1012 1013 Msg **msgtab; 1014 int nmsgtab; 1015 int nmsg; 1016 Msg *freemsg; 1017 1018 void 1019 msgincref(Msg *m) 1020 { 1021 if(verbose > 1) fprint(2, "%T msgincref @0x%lux %p tag %d/%d ref %d=>%d\n", 1022 getcallerpc(&m), m, m->tag, m->ctag, m->ref, m->ref+1); 1023 m->ref++; 1024 } 1025 1026 Msg* 1027 msgnew(int x) 1028 { 1029 Msg *m; 1030 1031 if(freemsg == nil){ 1032 msgtab = erealloc(msgtab, (nmsgtab+1)*sizeof(msgtab[0])); 1033 msgtab[nmsgtab] = emalloc(sizeof(Msg)); 1034 freemsg = msgtab[nmsgtab]; 1035 freemsg->tag = nmsgtab++; 1036 } 1037 m = freemsg; 1038 freemsg = m->next; 1039 m->ref = 1; 1040 if(verbose > 1) fprint(2, "%T msgnew @0x%lux %p tag %d ref %d\n", 1041 getcallerpc(&x), m, m->tag, m->ref); 1042 nmsg++; 1043 return m; 1044 } 1045 1046 /* 1047 * Clear data associated with connections, so that 1048 * if all msgs have been msgcleared, the connection 1049 * can be freed. Note that this does *not* free the tpkt 1050 * and rpkt; they are freed in msgput with the msg itself. 1051 * The io write thread might still be holding a ref to msg 1052 * even once the connection has finished with it. 1053 */ 1054 void 1055 msgclear(Msg *m) 1056 { 1057 if(m->c){ 1058 m->c->nmsg--; 1059 m->c = nil; 1060 } 1061 if(m->oldm){ 1062 msgput(m->oldm); 1063 m->oldm = nil; 1064 } 1065 if(m->fid){ 1066 fidput(m->fid); 1067 m->fid = nil; 1068 } 1069 if(m->afid){ 1070 fidput(m->afid); 1071 m->afid = nil; 1072 } 1073 if(m->newfid){ 1074 fidput(m->newfid); 1075 m->newfid = nil; 1076 } 1077 if(m->rx.type == Ropenfd && m->rx.unixfd >= 0){ 1078 close(m->rx.unixfd); 1079 m->rx.unixfd = -1; 1080 } 1081 } 1082 1083 void 1084 msgput(Msg *m) 1085 { 1086 if(m == nil) 1087 return; 1088 1089 if(verbose > 1) fprint(2, "%T msgput 0x%lux %p tag %d/%d ref %d\n", 1090 getcallerpc(&m), m, m->tag, m->ctag, m->ref); 1091 assert(m->ref > 0); 1092 if(--m->ref > 0) 1093 return; 1094 nmsg--; 1095 msgclear(m); 1096 if(m->tpkt){ 1097 free(m->tpkt); 1098 m->tpkt = nil; 1099 } 1100 if(m->rpkt){ 1101 free(m->rpkt); 1102 m->rpkt = nil; 1103 } 1104 m->isopenfd = 0; 1105 m->internal = 0; 1106 m->next = freemsg; 1107 freemsg = m; 1108 } 1109 1110 Msg* 1111 msgget(int n) 1112 { 1113 Msg *m; 1114 1115 if(n < 0 || n >= nmsgtab) 1116 return nil; 1117 m = msgtab[n]; 1118 if(m->ref == 0) 1119 return nil; 1120 if(verbose) fprint(2, "%T msgget %d = %p\n", n, m); 1121 msgincref(m); 1122 return m; 1123 } 1124 1125 1126 void* 1127 emalloc(int n) 1128 { 1129 void *v; 1130 1131 v = mallocz(n, 1); 1132 if(v == nil){ 1133 abort(); 1134 sysfatal("out of memory allocating %d", n); 1135 } 1136 return v; 1137 } 1138 1139 void* 1140 erealloc(void *v, int n) 1141 { 1142 v = realloc(v, n); 1143 if(v == nil){ 1144 abort(); 1145 sysfatal("out of memory reallocating %d", n); 1146 } 1147 return v; 1148 } 1149 1150 typedef struct Qel Qel; 1151 struct Qel 1152 { 1153 Qel *next; 1154 void *p; 1155 }; 1156 1157 struct Queue 1158 { 1159 QLock lk; 1160 Rendez r; 1161 Qel *head; 1162 Qel *tail; 1163 }; 1164 1165 Queue* 1166 qalloc(void) 1167 { 1168 Queue *q; 1169 1170 q = mallocz(sizeof(Queue), 1); 1171 if(q == nil) 1172 return nil; 1173 q->r.l = &q->lk; 1174 return q; 1175 } 1176 1177 int 1178 sendq(Queue *q, void *p) 1179 { 1180 Qel *e; 1181 1182 e = emalloc(sizeof(Qel)); 1183 qlock(&q->lk); 1184 e->p = p; 1185 e->next = nil; 1186 if(q->head == nil) 1187 q->head = e; 1188 else 1189 q->tail->next = e; 1190 q->tail = e; 1191 rwakeup(&q->r); 1192 qunlock(&q->lk); 1193 return 0; 1194 } 1195 1196 void* 1197 recvq(Queue *q) 1198 { 1199 void *p; 1200 Qel *e; 1201 1202 qlock(&q->lk); 1203 while(q->head == nil) 1204 rsleep(&q->r); 1205 e = q->head; 1206 q->head = e->next; 1207 qunlock(&q->lk); 1208 p = e->p; 1209 free(e); 1210 return p; 1211 } 1212 1213 uchar* 1214 read9ppkt(Ioproc *io, int fd) 1215 { 1216 uchar buf[4], *pkt; 1217 int n, nn; 1218 1219 n = ioreadn(io, fd, buf, 4); 1220 if(n != 4) 1221 return nil; 1222 n = GBIT32(buf); 1223 if(n > MAXMSGSIZE) 1224 return nil; 1225 pkt = emalloc(n); 1226 PBIT32(pkt, n); 1227 nn = ioreadn(io, fd, pkt+4, n-4); 1228 if(nn != n-4){ 1229 free(pkt); 1230 return nil; 1231 } 1232 /* would do this if we ever got one of these, but we only generate them 1233 if(pkt[4] == Ropenfd){ 1234 newfd = iorecvfd(io, fd); 1235 PBIT32(pkt+n-4, newfd); 1236 } 1237 */ 1238 return pkt; 1239 } 1240 1241 Msg* 1242 mread9p(Ioproc *io, int fd) 1243 { 1244 int n, nn; 1245 uchar *pkt; 1246 Msg *m; 1247 1248 if((pkt = read9ppkt(io, fd)) == nil) 1249 return nil; 1250 1251 m = msgnew(0); 1252 m->tpkt = pkt; 1253 n = GBIT32(pkt); 1254 nn = convM2S(pkt, n, &m->tx); 1255 if(nn != n){ 1256 fprint(2, "%T read bad packet from %d\n", fd); 1257 free(m->tpkt); 1258 free(m); 1259 return nil; 1260 } 1261 return m; 1262 } 1263 1264 int 1265 mwrite9p(Ioproc *io, int fd, uchar *pkt) 1266 { 1267 int n, nfd; 1268 1269 n = GBIT32(pkt); 1270 if(verbose > 2) fprint(2, "%T write %d %d %.*H\n", fd, n, n, pkt); 1271 if(verbose > 1) fprint(2, "%T before iowrite\n"); 1272 if(iowrite(io, fd, pkt, n) != n){ 1273 fprint(2, "%T write error: %r\n"); 1274 return -1; 1275 } 1276 if(verbose > 1) fprint(2, "%T after iowrite\n"); 1277 if(pkt[4] == Ropenfd){ 1278 nfd = GBIT32(pkt+n-4); 1279 if(iosendfd(io, fd, nfd) < 0){ 1280 fprint(2, "%T send fd error: %r\n"); 1281 return -1; 1282 } 1283 } 1284 return 0; 1285 } 1286 1287 void 1288 restring(uchar *pkt, int pn, char *s) 1289 { 1290 int n; 1291 1292 if(s < (char*)pkt || s >= (char*)pkt+pn) 1293 return; 1294 1295 n = strlen(s); 1296 memmove(s+1, s, n); 1297 PBIT16((uchar*)s-1, n); 1298 } 1299 1300 void 1301 repack(Fcall *f, uchar **ppkt) 1302 { 1303 uint n, nn; 1304 uchar *pkt; 1305 1306 pkt = *ppkt; 1307 n = GBIT32(pkt); 1308 nn = sizeS2M(f); 1309 if(nn > n){ 1310 free(pkt); 1311 pkt = emalloc(nn); 1312 *ppkt = pkt; 1313 } 1314 n = convS2M(f, pkt, nn); 1315 if(n <= BIT16SZ) 1316 sysfatal("convS2M conversion error"); 1317 if(n != nn) 1318 sysfatal("convS2M and sizeS2M disagree"); 1319 } 1320 1321 void 1322 rewritehdr(Fcall *f, uchar *pkt) 1323 { 1324 int i, n; 1325 1326 n = GBIT32(pkt); 1327 PBIT16(pkt+5, f->tag); 1328 switch(f->type){ 1329 case Tversion: 1330 case Rversion: 1331 restring(pkt, n, f->version); 1332 break; 1333 case Tauth: 1334 PBIT32(pkt+7, f->afid); 1335 restring(pkt, n, f->uname); 1336 restring(pkt, n, f->aname); 1337 break; 1338 case Tflush: 1339 PBIT16(pkt+7, f->oldtag); 1340 break; 1341 case Tattach: 1342 restring(pkt, n, f->uname); 1343 restring(pkt, n, f->aname); 1344 PBIT32(pkt+7, f->fid); 1345 PBIT32(pkt+11, f->afid); 1346 break; 1347 case Twalk: 1348 PBIT32(pkt+7, f->fid); 1349 PBIT32(pkt+11, f->newfid); 1350 for(i=0; i<f->nwname; i++) 1351 restring(pkt, n, f->wname[i]); 1352 break; 1353 case Tcreate: 1354 restring(pkt, n, f->name); 1355 /* fall through */ 1356 case Topen: 1357 case Tclunk: 1358 case Tremove: 1359 case Tstat: 1360 case Twstat: 1361 case Twrite: 1362 PBIT32(pkt+7, f->fid); 1363 break; 1364 case Tread: 1365 PBIT32(pkt+7, f->fid); 1366 PBIT64(pkt+11, f->offset); 1367 break; 1368 case Rerror: 1369 restring(pkt, n, f->ename); 1370 break; 1371 } 1372 } 1373 1374 static long 1375 _iolisten(va_list *arg) 1376 { 1377 char *a, *b; 1378 1379 a = va_arg(*arg, char*); 1380 b = va_arg(*arg, char*); 1381 return listen(a, b); 1382 } 1383 1384 int 1385 iolisten(Ioproc *io, char *a, char *b) 1386 { 1387 return iocall(io, _iolisten, a, b); 1388 } 1389 1390 static long 1391 _ioaccept(va_list *arg) 1392 { 1393 int fd; 1394 char *dir; 1395 1396 fd = va_arg(*arg, int); 1397 dir = va_arg(*arg, char*); 1398 return accept(fd, dir); 1399 } 1400 1401 int 1402 ioaccept(Ioproc *io, int fd, char *dir) 1403 { 1404 return iocall(io, _ioaccept, fd, dir); 1405 } 1406 1407 int 1408 timefmt(Fmt *fmt) 1409 { 1410 static char *mon[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", 1411 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; 1412 vlong ns; 1413 Tm tm; 1414 ns = nsec(); 1415 tm = *localtime(time(0)); 1416 return fmtprint(fmt, "%s %2d %02d:%02d:%02d.%03d", 1417 mon[tm.mon], tm.mday, tm.hour, tm.min, tm.sec, 1418 (int)(ns%1000000000)/1000000); 1419 }