plan9port

[fork] Plan 9 from user space
git clone git://src.adamsgaard.dk/plan9port # fast
git clone https://src.adamsgaard.dk/plan9port.git # slow
Log | Files | Refs | README | LICENSE Back to index

9pserve.c (25691B)


      1 #include <u.h>
      2 #include <libc.h>
      3 #include <fcall.h>
      4 #include <thread.h>
      5 #include <errno.h>
      6 
      7 #define err err9pserve /* Darwin x86 */
      8 
      9 enum
     10 {
     11 	STACK = 32768,
     12 	NHASH = 31,
     13 	MAXMSG = 64,	/* per connection */
     14 	MAXMSGSIZE = 4*1024*1024
     15 };
     16 
     17 typedef struct Hash Hash;
     18 typedef struct Fid Fid;
     19 typedef struct Msg Msg;
     20 typedef struct Conn Conn;
     21 typedef struct Queue Queue;
     22 
     23 struct Hash
     24 {
     25 	Hash *next;
     26 	uint n;
     27 	void *v;
     28 };
     29 
     30 struct Fid
     31 {
     32 	int fid;
     33 	int ref;
     34 	int cfid;
     35 	int openfd;
     36 	int offset;
     37 	int coffset;
     38 	int isdir;
     39 	Fid *next;
     40 };
     41 
     42 struct Msg
     43 {
     44 	Conn *c;
     45 	int internal;
     46 	int sync;
     47 	int ref;
     48 	int ctag;
     49 	int tag;
     50 	int isopenfd;
     51 	Fcall tx;
     52 	Fcall rx;
     53 	Fid *fid;
     54 	Fid *newfid;
     55 	Fid *afid;
     56 	Msg *oldm;
     57 	Msg *next;
     58 	uchar *tpkt;
     59 	uchar *rpkt;
     60 };
     61 
     62 struct Conn
     63 {
     64 	int fd;
     65 	int fdmode;
     66 	Fid *fdfid;
     67 	int nmsg;
     68 	int nfid;
     69 	Channel *inc;
     70 	Channel *internal;
     71 	int inputstalled;
     72 	char dir[40];
     73 	Hash *tag[NHASH];
     74 	Hash *fid[NHASH];
     75 	Queue *outq;
     76 	Queue *inq;
     77 	Channel *outqdead;
     78 };
     79 
     80 char *xaname;
     81 char *addr;
     82 int afd;
     83 char adir[40];
     84 int isunix;
     85 Queue *outq;
     86 Queue *inq;
     87 int verbose = 0;
     88 int logging = 0;
     89 int msize = 8192;
     90 u32int xafid = NOFID;
     91 int attached;
     92 int versioned;
     93 int noauth;
     94 
     95 void *gethash(Hash**, uint);
     96 int puthash(Hash**, uint, void*);
     97 int delhash(Hash**, uint, void*);
     98 Msg *mread9p(Ioproc*, int);
     99 int mwrite9p(Ioproc*, int, uchar*);
    100 uchar *read9ppkt(Ioproc*, int);
    101 int write9ppkt(int, uchar*);
    102 Msg *msgnew(int);
    103 void msgput(Msg*);
    104 void msgclear(Msg*);
    105 Msg *msgget(int);
    106 void msgincref(Msg*);
    107 Fid *fidnew(int);
    108 void fidput(Fid*);
    109 void *emalloc(int);
    110 void *erealloc(void*, int);
    111 Queue *qalloc(void);
    112 int sendq(Queue*, void*);
    113 void *recvq(Queue*);
    114 void connthread(void*);
    115 void connoutthread(void*);
    116 void listenthread(void*);
    117 void outputthread(void*);
    118 void inputthread(void*);
    119 void rewritehdr(Fcall*, uchar*);
    120 void repack(Fcall*, uchar**);
    121 int tlisten(char*, char*);
    122 int taccept(int, char*);
    123 int iolisten(Ioproc*, char*, char*);
    124 int ioaccept(Ioproc*, int, char*);
    125 int iorecvfd(Ioproc*, int);
    126 int iosendfd(Ioproc*, int, int);
    127 void mainproc(void*);
    128 int ignorepipe(void*, char*);
    129 int timefmt(Fmt*);
    130 void dorootstat(void);
    131 
    132 void
    133 usage(void)
    134 {
    135 	fprint(2, "usage: 9pserve [-lnv] [-A aname afid] [-c addr] [-M msize] address\n");
    136 	fprint(2, "\treads/writes 9P messages on stdin/stdout\n");
    137 	threadexitsall("usage");
    138 }
    139 
    140 int
    141 threadmaybackground(void)
    142 {
    143 	return 1;
    144 }
    145 
    146 uchar vbuf[128];
    147 extern int _threaddebuglevel;
    148 void
    149 threadmain(int argc, char **argv)
    150 {
    151 	char *file, *x, *addr;
    152 	int fd;
    153 
    154 	rfork(RFNOTEG);
    155 	x = getenv("verbose9pserve");
    156 	if(x){
    157 		verbose = atoi(x);
    158 		fprint(2, "verbose9pserve %s => %d\n", x, verbose);
    159 	}
    160 	ARGBEGIN{
    161 	default:
    162 		usage();
    163 	case 'A':
    164 		attached = 1;
    165 		xaname = EARGF(usage());
    166 		xafid = atoi(EARGF(usage()));
    167 		break;
    168 	case 'M':
    169 		versioned = 1;
    170 		msize = atoi(EARGF(usage()));
    171 		break;
    172 	case 'c':
    173 		addr = netmkaddr(EARGF(usage()), "net", "9fs");
    174 		if((fd = dial(addr, nil, nil, nil)) < 0)
    175 			sysfatal("dial %s: %r", addr);
    176 		dup(fd, 0);
    177 		dup(fd, 1);
    178 		if(fd > 1)
    179 			close(fd);
    180 		break;
    181 	case 'n':
    182 		noauth = 1;
    183 		break;
    184 	case 'v':
    185 		verbose++;
    186 		break;
    187 	case 'u':
    188 		isunix++;
    189 		break;
    190 	case 'l':
    191 		logging++;
    192 		break;
    193 	}ARGEND
    194 
    195 	if(attached && !versioned){
    196 		fprint(2, "-A must be used with -M\n");
    197 		usage();
    198 	}
    199 
    200 	if(argc != 1)
    201 		usage();
    202 	addr = argv[0];
    203 
    204 	fmtinstall('T', timefmt);
    205 
    206 	if((afd = announce(addr, adir)) < 0)
    207 		sysfatal("announce %s: %r", addr);
    208 	if(logging){
    209 		if(strncmp(addr, "unix!", 5) == 0)
    210 			addr += 5;
    211 		file = smprint("%s.log", addr);
    212 		if(file == nil)
    213 			sysfatal("smprint log: %r");
    214 		if((fd = create(file, OWRITE, 0666)) < 0)
    215 			sysfatal("create %s: %r", file);
    216 		dup(fd, 2);
    217 		if(fd > 2)
    218 			close(fd);
    219 	}
    220 	if(verbose) fprint(2, "%T 9pserve running\n");
    221 	proccreate(mainproc, nil, STACK);
    222 }
    223 
    224 void
    225 mainproc(void *v)
    226 {
    227 	int n, nn;
    228 	Fcall f;
    229 	USED(v);
    230 
    231 	atnotify(ignorepipe, 1);
    232 	fmtinstall('D', dirfmt);
    233 	fmtinstall('M', dirmodefmt);
    234 	fmtinstall('F', fcallfmt);
    235 	fmtinstall('H', encodefmt);
    236 
    237 	outq = qalloc();
    238 	inq = qalloc();
    239 
    240 	if(!versioned){
    241 		f.type = Tversion;
    242 		f.version = "9P2000";
    243 		f.msize = msize;
    244 		f.tag = NOTAG;
    245 		n = convS2M(&f, vbuf, sizeof vbuf);
    246 		if(n <= BIT16SZ)
    247 			sysfatal("convS2M conversion error");
    248 		if(verbose > 1) fprint(2, "%T * <- %F\n", &f);
    249 		nn = write(1, vbuf, n);
    250 		if(n != nn)
    251 			sysfatal("error writing Tversion: %r\n");
    252 		n = read9pmsg(0, vbuf, sizeof vbuf);
    253 		if(n < 0)
    254 			sysfatal("read9pmsg failure");
    255 		if(convM2S(vbuf, n, &f) != n)
    256 			sysfatal("convM2S failure");
    257 		if(f.msize < msize)
    258 			msize = f.msize;
    259 		if(verbose > 1) fprint(2, "%T * -> %F\n", &f);
    260 	}
    261 
    262 	threadcreate(inputthread, nil, STACK);
    263 	threadcreate(outputthread, nil, STACK);
    264 
    265 /*	if(rootfid) */
    266 /*		dorootstat(); */
    267 
    268 	threadcreate(listenthread, nil, STACK);
    269 	threadexits(0);
    270 }
    271 
    272 int
    273 ignorepipe(void *v, char *s)
    274 {
    275 	USED(v);
    276 	if(strcmp(s, "sys: write on closed pipe") == 0)
    277 		return 1;
    278 	if(strcmp(s, "sys: tstp") == 0)
    279 		return 1;
    280 	if(strcmp(s, "sys: window size change") == 0)
    281 		return 1;
    282 	fprint(2, "9pserve %s: %T note: %s\n", addr, s);
    283 	return 0;
    284 }
    285 
    286 void
    287 listenthread(void *arg)
    288 {
    289 	Conn *c;
    290 	Ioproc *io;
    291 
    292 	io = ioproc();
    293 	USED(arg);
    294 	threadsetname("listen %s", adir);
    295 	for(;;){
    296 		c = emalloc(sizeof(Conn));
    297 		c->fd = iolisten(io, adir, c->dir);
    298 		if(c->fd < 0){
    299 			if(verbose) fprint(2, "%T listen: %r\n");
    300 			close(afd);
    301 			free(c);
    302 			return;
    303 		}
    304 		c->inc = chancreate(sizeof(void*), 0);
    305 		c->internal = chancreate(sizeof(void*), 0);
    306 		c->inq = qalloc();
    307 		c->outq = qalloc();
    308 		c->outqdead = chancreate(sizeof(void*), 0);
    309 		if(verbose) fprint(2, "%T incoming call on %s\n", c->dir);
    310 		threadcreate(connthread, c, STACK);
    311 	}
    312 }
    313 
    314 void
    315 send9pmsg(Msg *m)
    316 {
    317 	int n, nn;
    318 
    319 	n = sizeS2M(&m->rx);
    320 	m->rpkt = emalloc(n);
    321 	nn = convS2M(&m->rx, m->rpkt, n);
    322 	if(nn <= BIT16SZ)
    323 		sysfatal("convS2M conversion error");
    324 	if(nn != n)
    325 		sysfatal("sizeS2M and convS2M disagree");
    326 	sendq(m->c->outq, m);
    327 }
    328 
    329 void
    330 sendomsg(Msg *m)
    331 {
    332 	int n, nn;
    333 
    334 	n = sizeS2M(&m->tx);
    335 	m->tpkt = emalloc(n);
    336 	nn = convS2M(&m->tx, m->tpkt, n);
    337 	if(nn <= BIT16SZ)
    338 		sysfatal("convS2M conversion error");
    339 	if(nn != n)
    340 		sysfatal("sizeS2M and convS2M disagree");
    341 	sendq(outq, m);
    342 }
    343 
    344 void
    345 err(Msg *m, char *ename)
    346 {
    347 	m->rx.type = Rerror;
    348 	m->rx.ename = ename;
    349 	m->rx.tag = m->tx.tag;
    350 	send9pmsg(m);
    351 }
    352 
    353 char*
    354 estrdup(char *s)
    355 {
    356 	char *t;
    357 
    358 	t = emalloc(strlen(s)+1);
    359 	strcpy(t, s);
    360 	return t;
    361 }
    362 
    363 void
    364 connthread(void *arg)
    365 {
    366 	int i, fd;
    367 	Conn *c;
    368 	Hash *h, *hnext;
    369 	Msg *m, *om, *mm, sync;
    370 	Fid *f;
    371 	Ioproc *io;
    372 
    373 	c = arg;
    374 	threadsetname("conn %s", c->dir);
    375 	io = ioproc();
    376 	fd = ioaccept(io, c->fd, c->dir);
    377 	if(fd < 0){
    378 		if(verbose) fprint(2, "%T accept %s: %r\n", c->dir);
    379 		goto out;
    380 	}
    381 	close(c->fd);
    382 	c->fd = fd;
    383 	threadcreate(connoutthread, c, STACK);
    384 	while((m = mread9p(io, c->fd)) != nil){
    385 		if(verbose > 1) fprint(2, "%T fd#%d -> %F\n", c->fd, &m->tx);
    386 		m->c = c;
    387 		m->ctag = m->tx.tag;
    388 		c->nmsg++;
    389 		if(verbose > 1) fprint(2, "%T fd#%d: new msg %p\n", c->fd, m);
    390 		if(puthash(c->tag, m->tx.tag, m) < 0){
    391 			err(m, "duplicate tag");
    392 			continue;
    393 		}
    394 		msgincref(m);
    395 		switch(m->tx.type){
    396 		case Tversion:
    397 			m->rx.tag = m->tx.tag;
    398 			m->rx.msize = m->tx.msize;
    399 			if(m->rx.msize > msize)
    400 				m->rx.msize = msize;
    401 			m->rx.version = "9P2000";
    402 			m->rx.type = Rversion;
    403 			send9pmsg(m);
    404 			continue;
    405 		case Tflush:
    406 			if((m->oldm = gethash(c->tag, m->tx.oldtag)) == nil){
    407 				m->rx.tag = m->tx.tag;
    408 				m->rx.type = Rflush;
    409 				send9pmsg(m);
    410 				continue;
    411 			}
    412 			msgincref(m->oldm);
    413 			break;
    414 		case Tattach:
    415 			m->afid = nil;
    416 			if(m->tx.afid != NOFID
    417 			&& (m->afid = gethash(c->fid, m->tx.afid)) == nil){
    418 				err(m, "unknown fid");
    419 				continue;
    420 			}
    421 			if(m->afid)
    422 				m->afid->ref++;
    423 			m->fid = fidnew(m->tx.fid);
    424 			if(puthash(c->fid, m->tx.fid, m->fid) < 0){
    425 				err(m, "duplicate fid");
    426 				continue;
    427 			}
    428 			m->fid->ref++;
    429 			if(attached && m->afid==nil){
    430 				if(m->tx.aname[0] && strcmp(xaname, m->tx.aname) != 0){
    431 					err(m, "invalid attach name");
    432 					continue;
    433 				}
    434 				m->tx.afid = xafid;
    435 				m->tx.aname = xaname;
    436 				m->tx.uname = getuser();	/* what srv.c used */
    437 				repack(&m->tx, &m->tpkt);
    438 			}
    439 			break;
    440 		case Twalk:
    441 			if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
    442 				err(m, "unknown fid");
    443 				continue;
    444 			}
    445 			m->fid->ref++;
    446 			if(m->tx.newfid == m->tx.fid){
    447 				m->fid->ref++;
    448 				m->newfid = m->fid;
    449 			}else{
    450 				m->newfid = fidnew(m->tx.newfid);
    451 				if(puthash(c->fid, m->tx.newfid, m->newfid) < 0){
    452 					err(m, "duplicate fid");
    453 					continue;
    454 				}
    455 				m->newfid->ref++;
    456 			}
    457 			break;
    458 		case Tauth:
    459 			if(attached){
    460 				err(m, "authentication not required");
    461 				continue;
    462 			}
    463 			if(noauth){
    464 				err(m, "authentication rejected");
    465 				continue;
    466 			}
    467 			m->afid = fidnew(m->tx.afid);
    468 			if(puthash(c->fid, m->tx.afid, m->afid) < 0){
    469 				err(m, "duplicate fid");
    470 				continue;
    471 			}
    472 			m->afid->ref++;
    473 			break;
    474 		case Tcreate:
    475 			if(m->tx.perm&(DMSYMLINK|DMDEVICE|DMNAMEDPIPE|DMSOCKET)){
    476 				err(m, "unsupported file type");
    477 				continue;
    478 			}
    479 			goto caseTopen;
    480 		case Topenfd:
    481 			if(m->tx.mode&~(OTRUNC|3)){
    482 				err(m, "bad openfd mode");
    483 				continue;
    484 			}
    485 			m->isopenfd = 1;
    486 			m->tx.type = Topen;
    487 			m->tpkt[4] = Topen;
    488 			/* fall through */
    489 		caseTopen:
    490 		case Topen:
    491 		case Tclunk:
    492 		case Tread:
    493 		case Twrite:
    494 		case Tremove:
    495 		case Tstat:
    496 		case Twstat:
    497 			if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
    498 				err(m, "unknown fid");
    499 				continue;
    500 			}
    501 			m->fid->ref++;
    502 			break;
    503 		}
    504 
    505 		/* have everything - translate and send */
    506 		m->c = c;
    507 		m->ctag = m->tx.tag;
    508 		m->tx.tag = m->tag;
    509 		if(m->fid)
    510 			m->tx.fid = m->fid->fid;
    511 		if(m->newfid)
    512 			m->tx.newfid = m->newfid->fid;
    513 		if(m->afid)
    514 			m->tx.afid = m->afid->fid;
    515 		if(m->oldm)
    516 			m->tx.oldtag = m->oldm->tag;
    517 		/* reference passes to outq */
    518 		sendq(outq, m);
    519 		while(c->nmsg >= MAXMSG){
    520 			c->inputstalled = 1;
    521 			recvp(c->inc);
    522 		}
    523 	}
    524 
    525 	if(verbose) fprint(2, "%T fd#%d eof; flushing conn\n", c->fd);
    526 
    527 	/* flush all outstanding messages */
    528 	for(i=0; i<NHASH; i++){
    529 		while((h = c->tag[i]) != nil){
    530 			om = h->v;
    531 			msgincref(om); /* for us */
    532 			m = msgnew(0);
    533 			m->internal = 1;
    534 			m->c = c;
    535 			c->nmsg++;
    536 			m->tx.type = Tflush;
    537 			m->tx.tag = m->tag;
    538 			m->tx.oldtag = om->tag;
    539 			m->oldm = om;
    540 			msgincref(om);
    541 			msgincref(m);	/* for outq */
    542 			sendomsg(m);
    543 			mm = recvp(c->internal);
    544 			assert(mm == m);
    545 			msgput(m);	/* got from recvp */
    546 			msgput(m);	/* got from msgnew */
    547 			if(delhash(c->tag, om->ctag, om) == 0)
    548 				msgput(om);	/* got from hash table */
    549 			msgput(om);	/* got from msgincref */
    550 		}
    551 	}
    552 
    553 	/*
    554 	 * outputthread has written all its messages
    555 	 * to the remote connection (because we've gotten all the replies!),
    556 	 * but it might not have gotten a chance to msgput
    557 	 * the very last one.  sync up to make sure.
    558 	 */
    559 	memset(&sync, 0, sizeof sync);
    560 	sync.sync = 1;
    561 	sync.c = c;
    562 	sendq(outq, &sync);
    563 	recvp(c->outqdead);
    564 
    565 	/* everything is quiet; can close the local output queue. */
    566 	sendq(c->outq, nil);
    567 	recvp(c->outqdead);
    568 
    569 	/* should be no messages left anywhere. */
    570 	assert(c->nmsg == 0);
    571 
    572 	/* clunk all outstanding fids */
    573 	for(i=0; i<NHASH; i++){
    574 		for(h=c->fid[i]; h; h=hnext){
    575 			f = h->v;
    576 			m = msgnew(0);
    577 			m->internal = 1;
    578 			m->c = c;
    579 			c->nmsg++;
    580 			m->tx.type = Tclunk;
    581 			m->tx.tag = m->tag;
    582 			m->tx.fid = f->fid;
    583 			m->fid = f;
    584 			f->ref++;
    585 			msgincref(m);
    586 			sendomsg(m);
    587 			mm = recvp(c->internal);
    588 			assert(mm == m);
    589 			msgclear(m);
    590 			msgput(m);	/* got from recvp */
    591 			msgput(m);	/* got from msgnew */
    592 			fidput(f);	/* got from hash table */
    593 			hnext = h->next;
    594 			free(h);
    595 		}
    596 	}
    597 
    598 out:
    599 	closeioproc(io);
    600 	assert(c->nmsg == 0);
    601 	assert(c->nfid == 0);
    602 	close(c->fd);
    603 	chanfree(c->internal);
    604 	c->internal = 0;
    605 	chanfree(c->inc);
    606 	c->inc = 0;
    607 	free(c->inq);
    608 	c->inq = 0;
    609 	free(c);
    610 }
    611 
    612 static void
    613 openfdthread(void *v)
    614 {
    615 	Conn *c;
    616 	Fid *fid;
    617 	Msg *m;
    618 	int n;
    619 	vlong tot;
    620 	Ioproc *io;
    621 	char buf[1024];
    622 
    623 	c = v;
    624 	fid = c->fdfid;
    625 	io = ioproc();
    626 	threadsetname("openfd %s", c->fdfid);
    627 	tot = 0;
    628 	m = nil;
    629 	if(c->fdmode == OREAD){
    630 		for(;;){
    631 			if(verbose) fprint(2, "%T tread...");
    632 			m = msgnew(0);
    633 			m->internal = 1;
    634 			m->c = c;
    635 			m->tx.type = Tread;
    636 			m->tx.count = msize - IOHDRSZ;
    637 			m->tx.fid = fid->fid;
    638 			m->tx.tag = m->tag;
    639 			m->tx.offset = tot;
    640 			m->fid = fid;
    641 			fid->ref++;
    642 			msgincref(m);
    643 			sendomsg(m);
    644 			recvp(c->internal);
    645 			if(m->rx.type == Rerror){
    646 			/*	fprint(2, "%T read error: %s\n", m->rx.ename); */
    647 				break;
    648 			}
    649 			if(m->rx.count == 0)
    650 				break;
    651 			tot += m->rx.count;
    652 			if(iowrite(io, c->fd, m->rx.data, m->rx.count) != m->rx.count){
    653 				/* fprint(2, "%T pipe write error: %r\n"); */
    654 				break;
    655 			}
    656 			msgput(m);
    657 			msgput(m);
    658 			m = nil;
    659 		}
    660 	}else{
    661 		for(;;){
    662 			if(verbose) fprint(2, "%T twrite...");
    663 			n = sizeof buf;
    664 			if(n > msize)
    665 				n = msize;
    666 			if((n=ioread(io, c->fd, buf, n)) <= 0){
    667 				if(n < 0)
    668 					fprint(2, "%T pipe read error: %r\n");
    669 				break;
    670 			}
    671 			m = msgnew(0);
    672 			m->internal = 1;
    673 			m->c = c;
    674 			m->tx.type = Twrite;
    675 			m->tx.fid = fid->fid;
    676 			m->tx.data = buf;
    677 			m->tx.count = n;
    678 			m->tx.tag = m->tag;
    679 			m->tx.offset = tot;
    680 			m->fid = fid;
    681 			fid->ref++;
    682 			msgincref(m);
    683 			sendomsg(m);
    684 			recvp(c->internal);
    685 			if(m->rx.type == Rerror){
    686 			/*	fprint(2, "%T write error: %s\n", m->rx.ename); */
    687 			}
    688 			tot += n;
    689 			msgput(m);
    690 			msgput(m);
    691 			m = nil;
    692 		}
    693 	}
    694 	if(verbose) fprint(2, "%T eof on %d fid %d\n", c->fd, fid->fid);
    695 	close(c->fd);
    696 	closeioproc(io);
    697 	if(m){
    698 		msgput(m);
    699 		msgput(m);
    700 	}
    701 	if(verbose) fprint(2, "%T eof on %d fid %d ref %d\n", c->fd, fid->fid, fid->ref);
    702 	if(--fid->openfd == 0){
    703 		m = msgnew(0);
    704 		m->internal = 1;
    705 		m->c = c;
    706 		m->tx.type = Tclunk;
    707 		m->tx.tag = m->tag;
    708 		m->tx.fid = fid->fid;
    709 		m->fid = fid;
    710 		fid->ref++;
    711 		msgincref(m);
    712 		sendomsg(m);
    713 		recvp(c->internal);
    714 		msgput(m);
    715 		msgput(m);
    716 	}
    717 	fidput(fid);
    718 	c->fdfid = nil;
    719 	chanfree(c->internal);
    720 	c->internal = 0;
    721 	free(c);
    722 }
    723 
    724 int
    725 xopenfd(Msg *m)
    726 {
    727 	char errs[ERRMAX];
    728 	int n, p[2];
    729 	Conn *nc;
    730 
    731 	if(pipe(p) < 0){
    732 		rerrstr(errs, sizeof errs);
    733 		err(m, errs);
    734 		/* XXX return here? */
    735 	}
    736 	if(verbose) fprint(2, "%T xopen pipe %d %d...", p[0], p[1]);
    737 
    738 	/* now we're committed. */
    739 
    740 	/* a new connection for this fid */
    741 	nc = emalloc(sizeof(Conn));
    742 	nc->internal = chancreate(sizeof(void*), 0);
    743 
    744 	/* a ref for us */
    745 	nc->fdfid = m->fid;
    746 	m->fid->ref++;
    747 	nc->fdfid->openfd++;
    748 	nc->fdmode = m->tx.mode;
    749 	nc->fd = p[0];
    750 
    751 	/* a thread to tend the pipe */
    752 	threadcreate(openfdthread, nc, STACK);
    753 
    754 	/* if mode is ORDWR, that openfdthread will write; start a reader */
    755 	if((m->tx.mode&3) == ORDWR){
    756 		nc = emalloc(sizeof(Conn));
    757 		nc->internal = chancreate(sizeof(void*), 0);
    758 		nc->fdfid = m->fid;
    759 		m->fid->ref++;
    760 		nc->fdfid->openfd++;
    761 		nc->fdmode = OREAD;
    762 		nc->fd = dup(p[0], -1);
    763 		threadcreate(openfdthread, nc, STACK);
    764 	}
    765 
    766 	/* steal fid from other connection */
    767 	if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
    768 		fidput(m->fid);
    769 
    770 	/* rewrite as Ropenfd */
    771 	m->rx.type = Ropenfd;
    772 	n = GBIT32(m->rpkt);
    773 	m->rpkt = erealloc(m->rpkt, n+4);
    774 	PBIT32(m->rpkt+n, p[1]);
    775 	n += 4;
    776 	PBIT32(m->rpkt, n);
    777 	m->rpkt[4] = Ropenfd;
    778 	m->rx.unixfd = p[1];
    779 	return 0;
    780 }
    781 
    782 void
    783 connoutthread(void *arg)
    784 {
    785 	int err;
    786 	Conn *c;
    787 	Msg *m, *om;
    788 	Ioproc *io;
    789 
    790 	c = arg;
    791 	io = ioproc();
    792 	threadsetname("connout %s", c->dir);
    793 	while((m = recvq(c->outq)) != nil){
    794 		err = m->tx.type+1 != m->rx.type;
    795 		if(!err && m->isopenfd)
    796 			if(xopenfd(m) < 0)
    797 				continue;
    798 		switch(m->tx.type){
    799 		case Tflush:
    800 			om = m->oldm;
    801 			if(om)
    802 				if(delhash(om->c->tag, om->ctag, om) == 0)
    803 					msgput(om);
    804 			break;
    805 		case Tclunk:
    806 		case Tremove:
    807 			if(m->fid)
    808 				if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
    809 					fidput(m->fid);
    810 			break;
    811 		case Tauth:
    812 			if(err && m->afid){
    813 				if(verbose) fprint(2, "%T auth error\n");
    814 				if(delhash(m->c->fid, m->afid->cfid, m->afid) == 0)
    815 					fidput(m->afid);
    816 			}
    817 			break;
    818 		case Tattach:
    819 			if(err && m->fid)
    820 				if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
    821 					fidput(m->fid);
    822 			break;
    823 		case Twalk:
    824 			if(err || m->rx.nwqid < m->tx.nwname)
    825 			if(m->tx.fid != m->tx.newfid && m->newfid)
    826 				if(delhash(m->c->fid, m->newfid->cfid, m->newfid) == 0)
    827 					fidput(m->newfid);
    828 			break;
    829 		case Tread:
    830 			break;
    831 		case Tstat:
    832 			break;
    833 		case Topen:
    834 		case Tcreate:
    835 			m->fid->isdir = (m->rx.qid.type & QTDIR);
    836 			break;
    837 		}
    838 		if(delhash(m->c->tag, m->ctag, m) == 0)
    839 			msgput(m);
    840 		if(verbose > 1) fprint(2, "%T fd#%d <- %F\n", c->fd, &m->rx);
    841 		rewritehdr(&m->rx, m->rpkt);
    842 		if(mwrite9p(io, c->fd, m->rpkt) < 0)
    843 			if(verbose) fprint(2, "%T write error: %r\n");
    844 		msgput(m);
    845 		if(c->inputstalled && c->nmsg < MAXMSG)
    846 			nbsendp(c->inc, 0);
    847 	}
    848 	closeioproc(io);
    849 	free(c->outq);
    850 	c->outq = nil;
    851 	sendp(c->outqdead, nil);
    852 }
    853 
    854 void
    855 outputthread(void *arg)
    856 {
    857 	Msg *m;
    858 	Ioproc *io;
    859 
    860 	USED(arg);
    861 	io = ioproc();
    862 	threadsetname("output");
    863 	while((m = recvq(outq)) != nil){
    864 		if(m->sync){
    865 			sendp(m->c->outqdead, nil);
    866 			continue;
    867 		}
    868 		if(verbose > 1) fprint(2, "%T * <- %F\n", &m->tx);
    869 		rewritehdr(&m->tx, m->tpkt);
    870 		if(mwrite9p(io, 1, m->tpkt) < 0)
    871 			sysfatal("output error: %r");
    872 		msgput(m);
    873 	}
    874 	closeioproc(io);
    875 	fprint(2, "%T output eof\n");
    876 	threadexitsall(0);
    877 }
    878 
    879 void
    880 inputthread(void *arg)
    881 {
    882 	uchar *pkt;
    883 	int n, nn, tag;
    884 	Msg *m;
    885 	Ioproc *io;
    886 
    887 	threadsetname("input");
    888 	if(verbose) fprint(2, "%T input thread\n");
    889 	io = ioproc();
    890 	USED(arg);
    891 	while((pkt = read9ppkt(io, 0)) != nil){
    892 		n = GBIT32(pkt);
    893 		if(n < 7){
    894 			fprint(2, "%T short 9P packet from server\n");
    895 			free(pkt);
    896 			continue;
    897 		}
    898 		if(verbose > 2) fprint(2, "%T read %.*H\n", n, pkt);
    899 		tag = GBIT16(pkt+5);
    900 		if((m = msgget(tag)) == nil){
    901 			fprint(2, "%T unexpected 9P response tag %d\n", tag);
    902 			free(pkt);
    903 			continue;
    904 		}
    905 		if((nn = convM2S(pkt, n, &m->rx)) != n){
    906 			fprint(2, "%T bad packet - convM2S %d but %d\n", nn, n);
    907 			free(pkt);
    908 			msgput(m);
    909 			continue;
    910 		}
    911 		if(verbose > 1) fprint(2, "%T * -> %F%s\n", &m->rx,
    912 			m->internal ? " (internal)" : "");
    913 		m->rpkt = pkt;
    914 		m->rx.tag = m->ctag;
    915 		if(m->internal)
    916 			sendp(m->c->internal, m);
    917 		else if(m->c->outq)
    918 			sendq(m->c->outq, m);
    919 		else
    920 			msgput(m);
    921 	}
    922 	closeioproc(io);
    923 	/*fprint(2, "%T input eof\n"); */
    924 	threadexitsall(0);
    925 }
    926 
    927 void*
    928 gethash(Hash **ht, uint n)
    929 {
    930 	Hash *h;
    931 
    932 	for(h=ht[n%NHASH]; h; h=h->next)
    933 		if(h->n == n)
    934 			return h->v;
    935 	return nil;
    936 }
    937 
    938 int
    939 delhash(Hash **ht, uint n, void *v)
    940 {
    941 	Hash *h, **l;
    942 
    943 	for(l=&ht[n%NHASH]; h=*l; l=&h->next)
    944 		if(h->n == n){
    945 			if(h->v != v){
    946 				if(verbose) fprint(2, "%T delhash %d got %p want %p\n", n, h->v, v);
    947 				return -1;
    948 			}
    949 			*l = h->next;
    950 			free(h);
    951 			return 0;
    952 		}
    953 	return -1;
    954 }
    955 
    956 int
    957 puthash(Hash **ht, uint n, void *v)
    958 {
    959 	Hash *h;
    960 
    961 	if(gethash(ht, n))
    962 		return -1;
    963 	h = emalloc(sizeof(Hash));
    964 	h->next = ht[n%NHASH];
    965 	h->n = n;
    966 	h->v = v;
    967 	ht[n%NHASH] = h;
    968 	return 0;
    969 }
    970 
    971 Fid **fidtab;
    972 int nfidtab;
    973 Fid *freefid;
    974 
    975 Fid*
    976 fidnew(int cfid)
    977 {
    978 	Fid *f;
    979 
    980 	if(freefid == nil){
    981 		fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));
    982 		if(nfidtab == xafid){
    983 			fidtab[nfidtab++] = nil;
    984 			fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));
    985 		}
    986 		fidtab[nfidtab] = emalloc(sizeof(Fid));
    987 		freefid = fidtab[nfidtab];
    988 		freefid->fid = nfidtab++;
    989 	}
    990 	f = freefid;
    991 	freefid = f->next;
    992 	f->cfid = cfid;
    993 	f->ref = 1;
    994 	f->offset = 0;
    995 	f->coffset = 0;
    996 	f->isdir = -1;
    997 	return f;
    998 }
    999 
   1000 void
   1001 fidput(Fid *f)
   1002 {
   1003 	if(f == nil)
   1004 		return;
   1005 	assert(f->ref > 0);
   1006 	if(--f->ref > 0)
   1007 		return;
   1008 	f->next = freefid;
   1009 	f->cfid = -1;
   1010 	freefid = f;
   1011 }
   1012 
   1013 Msg **msgtab;
   1014 int nmsgtab;
   1015 int nmsg;
   1016 Msg *freemsg;
   1017 
   1018 void
   1019 msgincref(Msg *m)
   1020 {
   1021 	if(verbose > 1) fprint(2, "%T msgincref @0x%lux %p tag %d/%d ref %d=>%d\n",
   1022 		getcallerpc(&m), m, m->tag, m->ctag, m->ref, m->ref+1);
   1023 	m->ref++;
   1024 }
   1025 
   1026 Msg*
   1027 msgnew(int x)
   1028 {
   1029 	Msg *m;
   1030 
   1031 	if(freemsg == nil){
   1032 		msgtab = erealloc(msgtab, (nmsgtab+1)*sizeof(msgtab[0]));
   1033 		msgtab[nmsgtab] = emalloc(sizeof(Msg));
   1034 		freemsg = msgtab[nmsgtab];
   1035 		freemsg->tag = nmsgtab++;
   1036 	}
   1037 	m = freemsg;
   1038 	freemsg = m->next;
   1039 	m->ref = 1;
   1040 	if(verbose > 1) fprint(2, "%T msgnew @0x%lux %p tag %d ref %d\n",
   1041 		getcallerpc(&x), m, m->tag, m->ref);
   1042 	nmsg++;
   1043 	return m;
   1044 }
   1045 
   1046 /*
   1047  * Clear data associated with connections, so that
   1048  * if all msgs have been msgcleared, the connection
   1049  * can be freed.  Note that this does *not* free the tpkt
   1050  * and rpkt; they are freed in msgput with the msg itself.
   1051  * The io write thread might still be holding a ref to msg
   1052  * even once the connection has finished with it.
   1053  */
   1054 void
   1055 msgclear(Msg *m)
   1056 {
   1057 	if(m->c){
   1058 		m->c->nmsg--;
   1059 		m->c = nil;
   1060 	}
   1061 	if(m->oldm){
   1062 		msgput(m->oldm);
   1063 		m->oldm = nil;
   1064 	}
   1065 	if(m->fid){
   1066 		fidput(m->fid);
   1067 		m->fid = nil;
   1068 	}
   1069 	if(m->afid){
   1070 		fidput(m->afid);
   1071 		m->afid = nil;
   1072 	}
   1073 	if(m->newfid){
   1074 		fidput(m->newfid);
   1075 		m->newfid = nil;
   1076 	}
   1077 	if(m->rx.type == Ropenfd && m->rx.unixfd >= 0){
   1078 		close(m->rx.unixfd);
   1079 		m->rx.unixfd = -1;
   1080 	}
   1081 }
   1082 
   1083 void
   1084 msgput(Msg *m)
   1085 {
   1086 	if(m == nil)
   1087 		return;
   1088 
   1089 	if(verbose > 1) fprint(2, "%T msgput 0x%lux %p tag %d/%d ref %d\n",
   1090 		getcallerpc(&m), m, m->tag, m->ctag, m->ref);
   1091 	assert(m->ref > 0);
   1092 	if(--m->ref > 0)
   1093 		return;
   1094 	nmsg--;
   1095 	msgclear(m);
   1096 	if(m->tpkt){
   1097 		free(m->tpkt);
   1098 		m->tpkt = nil;
   1099 	}
   1100 	if(m->rpkt){
   1101 		free(m->rpkt);
   1102 		m->rpkt = nil;
   1103 	}
   1104 	m->isopenfd = 0;
   1105 	m->internal = 0;
   1106 	m->next = freemsg;
   1107 	freemsg = m;
   1108 }
   1109 
   1110 Msg*
   1111 msgget(int n)
   1112 {
   1113 	Msg *m;
   1114 
   1115 	if(n < 0 || n >= nmsgtab)
   1116 		return nil;
   1117 	m = msgtab[n];
   1118 	if(m->ref == 0)
   1119 		return nil;
   1120 	if(verbose) fprint(2, "%T msgget %d = %p\n", n, m);
   1121 	msgincref(m);
   1122 	return m;
   1123 }
   1124 
   1125 
   1126 void*
   1127 emalloc(int n)
   1128 {
   1129 	void *v;
   1130 
   1131 	v = mallocz(n, 1);
   1132 	if(v == nil){
   1133 		abort();
   1134 		sysfatal("out of memory allocating %d", n);
   1135 	}
   1136 	return v;
   1137 }
   1138 
   1139 void*
   1140 erealloc(void *v, int n)
   1141 {
   1142 	v = realloc(v, n);
   1143 	if(v == nil){
   1144 		abort();
   1145 		sysfatal("out of memory reallocating %d", n);
   1146 	}
   1147 	return v;
   1148 }
   1149 
   1150 typedef struct Qel Qel;
   1151 struct Qel
   1152 {
   1153 	Qel *next;
   1154 	void *p;
   1155 };
   1156 
   1157 struct Queue
   1158 {
   1159 	QLock lk;
   1160 	Rendez r;
   1161 	Qel *head;
   1162 	Qel *tail;
   1163 };
   1164 
   1165 Queue*
   1166 qalloc(void)
   1167 {
   1168 	Queue *q;
   1169 
   1170 	q = mallocz(sizeof(Queue), 1);
   1171 	if(q == nil)
   1172 		return nil;
   1173 	q->r.l = &q->lk;
   1174 	return q;
   1175 }
   1176 
   1177 int
   1178 sendq(Queue *q, void *p)
   1179 {
   1180 	Qel *e;
   1181 
   1182 	e = emalloc(sizeof(Qel));
   1183 	qlock(&q->lk);
   1184 	e->p = p;
   1185 	e->next = nil;
   1186 	if(q->head == nil)
   1187 		q->head = e;
   1188 	else
   1189 		q->tail->next = e;
   1190 	q->tail = e;
   1191 	rwakeup(&q->r);
   1192 	qunlock(&q->lk);
   1193 	return 0;
   1194 }
   1195 
   1196 void*
   1197 recvq(Queue *q)
   1198 {
   1199 	void *p;
   1200 	Qel *e;
   1201 
   1202 	qlock(&q->lk);
   1203 	while(q->head == nil)
   1204 		rsleep(&q->r);
   1205 	e = q->head;
   1206 	q->head = e->next;
   1207 	qunlock(&q->lk);
   1208 	p = e->p;
   1209 	free(e);
   1210 	return p;
   1211 }
   1212 
   1213 uchar*
   1214 read9ppkt(Ioproc *io, int fd)
   1215 {
   1216 	uchar buf[4], *pkt;
   1217 	int n, nn;
   1218 
   1219 	n = ioreadn(io, fd, buf, 4);
   1220 	if(n != 4)
   1221 		return nil;
   1222 	n = GBIT32(buf);
   1223 	if(n > MAXMSGSIZE)
   1224 		return nil;
   1225 	pkt = emalloc(n);
   1226 	PBIT32(pkt, n);
   1227 	nn = ioreadn(io, fd, pkt+4, n-4);
   1228 	if(nn != n-4){
   1229 		free(pkt);
   1230 		return nil;
   1231 	}
   1232 /* would do this if we ever got one of these, but we only generate them
   1233 	if(pkt[4] == Ropenfd){
   1234 		newfd = iorecvfd(io, fd);
   1235 		PBIT32(pkt+n-4, newfd);
   1236 	}
   1237 */
   1238 	return pkt;
   1239 }
   1240 
   1241 Msg*
   1242 mread9p(Ioproc *io, int fd)
   1243 {
   1244 	int n, nn;
   1245 	uchar *pkt;
   1246 	Msg *m;
   1247 
   1248 	if((pkt = read9ppkt(io, fd)) == nil)
   1249 		return nil;
   1250 
   1251 	m = msgnew(0);
   1252 	m->tpkt = pkt;
   1253 	n = GBIT32(pkt);
   1254 	nn = convM2S(pkt, n, &m->tx);
   1255 	if(nn != n){
   1256 		fprint(2, "%T read bad packet from %d\n", fd);
   1257 		free(m->tpkt);
   1258 		free(m);
   1259 		return nil;
   1260 	}
   1261 	return m;
   1262 }
   1263 
   1264 int
   1265 mwrite9p(Ioproc *io, int fd, uchar *pkt)
   1266 {
   1267 	int n, nfd;
   1268 
   1269 	n = GBIT32(pkt);
   1270 	if(verbose > 2) fprint(2, "%T write %d %d %.*H\n", fd, n, n, pkt);
   1271 if(verbose > 1) fprint(2, "%T before iowrite\n");
   1272 	if(iowrite(io, fd, pkt, n) != n){
   1273 		fprint(2, "%T write error: %r\n");
   1274 		return -1;
   1275 	}
   1276 if(verbose > 1) fprint(2, "%T after iowrite\n");
   1277 	if(pkt[4] == Ropenfd){
   1278 		nfd = GBIT32(pkt+n-4);
   1279 		if(iosendfd(io, fd, nfd) < 0){
   1280 			fprint(2, "%T send fd error: %r\n");
   1281 			return -1;
   1282 		}
   1283 	}
   1284 	return 0;
   1285 }
   1286 
   1287 void
   1288 restring(uchar *pkt, int pn, char *s)
   1289 {
   1290 	int n;
   1291 
   1292 	if(s < (char*)pkt || s >= (char*)pkt+pn)
   1293 		return;
   1294 
   1295 	n = strlen(s);
   1296 	memmove(s+1, s, n);
   1297 	PBIT16((uchar*)s-1, n);
   1298 }
   1299 
   1300 void
   1301 repack(Fcall *f, uchar **ppkt)
   1302 {
   1303 	uint n, nn;
   1304 	uchar *pkt;
   1305 
   1306 	pkt = *ppkt;
   1307 	n = GBIT32(pkt);
   1308 	nn = sizeS2M(f);
   1309 	if(nn > n){
   1310 		free(pkt);
   1311 		pkt = emalloc(nn);
   1312 		*ppkt = pkt;
   1313 	}
   1314 	n = convS2M(f, pkt, nn);
   1315 	if(n <= BIT16SZ)
   1316 		sysfatal("convS2M conversion error");
   1317 	if(n != nn)
   1318 		sysfatal("convS2M and sizeS2M disagree");
   1319 }
   1320 
   1321 void
   1322 rewritehdr(Fcall *f, uchar *pkt)
   1323 {
   1324 	int i, n;
   1325 
   1326 	n = GBIT32(pkt);
   1327 	PBIT16(pkt+5, f->tag);
   1328 	switch(f->type){
   1329 	case Tversion:
   1330 	case Rversion:
   1331 		restring(pkt, n, f->version);
   1332 		break;
   1333 	case Tauth:
   1334 		PBIT32(pkt+7, f->afid);
   1335 		restring(pkt, n, f->uname);
   1336 		restring(pkt, n, f->aname);
   1337 		break;
   1338 	case Tflush:
   1339 		PBIT16(pkt+7, f->oldtag);
   1340 		break;
   1341 	case Tattach:
   1342 		restring(pkt, n, f->uname);
   1343 		restring(pkt, n, f->aname);
   1344 		PBIT32(pkt+7, f->fid);
   1345 		PBIT32(pkt+11, f->afid);
   1346 		break;
   1347 	case Twalk:
   1348 		PBIT32(pkt+7, f->fid);
   1349 		PBIT32(pkt+11, f->newfid);
   1350 		for(i=0; i<f->nwname; i++)
   1351 			restring(pkt, n, f->wname[i]);
   1352 		break;
   1353 	case Tcreate:
   1354 		restring(pkt, n, f->name);
   1355 		/* fall through */
   1356 	case Topen:
   1357 	case Tclunk:
   1358 	case Tremove:
   1359 	case Tstat:
   1360 	case Twstat:
   1361 	case Twrite:
   1362 		PBIT32(pkt+7, f->fid);
   1363 		break;
   1364 	case Tread:
   1365 		PBIT32(pkt+7, f->fid);
   1366 		PBIT64(pkt+11, f->offset);
   1367 		break;
   1368 	case Rerror:
   1369 		restring(pkt, n, f->ename);
   1370 		break;
   1371 	}
   1372 }
   1373 
   1374 static long
   1375 _iolisten(va_list *arg)
   1376 {
   1377 	char *a, *b;
   1378 
   1379 	a = va_arg(*arg, char*);
   1380 	b = va_arg(*arg, char*);
   1381 	return listen(a, b);
   1382 }
   1383 
   1384 int
   1385 iolisten(Ioproc *io, char *a, char *b)
   1386 {
   1387 	return iocall(io, _iolisten, a, b);
   1388 }
   1389 
   1390 static long
   1391 _ioaccept(va_list *arg)
   1392 {
   1393 	int fd;
   1394 	char *dir;
   1395 
   1396 	fd = va_arg(*arg, int);
   1397 	dir = va_arg(*arg, char*);
   1398 	return accept(fd, dir);
   1399 }
   1400 
   1401 int
   1402 ioaccept(Ioproc *io, int fd, char *dir)
   1403 {
   1404 	return iocall(io, _ioaccept, fd, dir);
   1405 }
   1406 
   1407 int
   1408 timefmt(Fmt *fmt)
   1409 {
   1410 	static char *mon[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
   1411 		"Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
   1412 	vlong ns;
   1413 	Tm tm;
   1414 	ns = nsec();
   1415 	tm = *localtime(time(0));
   1416 	return fmtprint(fmt, "%s %2d %02d:%02d:%02d.%03d",
   1417 		mon[tm.mon], tm.mday, tm.hour, tm.min, tm.sec,
   1418 		(int)(ns%1000000000)/1000000);
   1419 }