plan9port

[fork] Plan 9 from user space
git clone git://src.adamsgaard.dk/plan9port # fast
git clone https://src.adamsgaard.dk/plan9port.git # slow
Log | Files | Refs | README | LICENSE Back to index

mirrorarenas.c (12243B)


      1 /*
      2  * Mirror one arena partition onto another.
      3  * Be careful to copy only new data.
      4  */
      5 
      6 #include "stdinc.h"
      7 #include "dat.h"
      8 #include "fns.h"
      9 
     10 Channel *writechan;
     11 
     12 typedef struct Write Write;
     13 struct Write
     14 {
     15 	uchar *p;
     16 	int n;
     17 	uvlong o;
     18 	int error;
     19 };
     20 
     21 Part *src;
     22 Part *dst;
     23 int force;
     24 int verbose;
     25 int dosha1 = 1;
     26 char *status;
     27 uvlong astart, aend;
     28 
     29 void
     30 usage(void)
     31 {
     32 	fprint(2, "usage: mirrorarenas [-sv] src dst [ranges]\n");
     33 	threadexitsall("usage");
     34 }
     35 
     36 char *tagged;
     37 char *tagname;
     38 int tagindx;
     39 
     40 void
     41 tag(int indx, char *name, char *fmt, ...)
     42 {
     43 	va_list arg;
     44 
     45 	if(tagged){
     46 		free(tagged);
     47 		tagged = nil;
     48 	}
     49 	tagindx = indx;
     50 	tagname = name;
     51 	va_start(arg, fmt);
     52 	tagged = vsmprint(fmt, arg);
     53 	va_end(arg);
     54 }
     55 
     56 enum
     57 {
     58 	Sealed = 1,
     59 	Mirrored = 2,
     60 	Empty = 4,
     61 };
     62 
     63 void
     64 setstatus(int bits)
     65 {
     66 	static int startindx = -1;
     67 	static char *startname, *endname;
     68 	static int lastbits;
     69 	char buf[100];
     70 
     71 	if(bits != lastbits) {
     72 		if(startindx >= 0) {
     73 			switch(lastbits) {
     74 			case Sealed:
     75 				snprint(buf, sizeof buf, "sealed");
     76 				break;
     77 			case Mirrored:
     78 				snprint(buf, sizeof buf, "mirrored");
     79 				break;
     80 			case Sealed+Mirrored:
     81 				snprint(buf, sizeof buf, "mirrored sealed");
     82 				break;
     83 			case Empty:
     84 				snprint(buf, sizeof buf, "empty");
     85 				break;
     86 			default:
     87 				snprint(buf, sizeof buf, "%d", bits);
     88 				break;
     89 			}
     90 			print("%T %s-%s %s\n", startname, endname, buf);
     91 		}
     92 		lastbits = bits;
     93 		startindx = tagindx;
     94 		startname = tagname;
     95 		endname = tagname;
     96 	} else {
     97 		endname = tagname;
     98 	}
     99 	if(bits < 0) {
    100 		startindx = -1;
    101 		return;
    102 	}
    103 }
    104 
    105 void
    106 chat(char *fmt, ...)
    107 {
    108 	va_list arg;
    109 
    110 	setstatus(-1);
    111 
    112 	if(tagged){
    113 		write(1, tagged, strlen(tagged));
    114 		free(tagged);
    115 		tagged = nil;
    116 	}
    117 	va_start(arg, fmt);
    118 	vfprint(1, fmt, arg);
    119 	va_end(arg);
    120 }
    121 
    122 #pragma varargck argpos tag 3
    123 #pragma varargck argpos chat 1
    124 
    125 
    126 int
    127 ereadpart(Part *p, u64int offset, u8int *buf, u32int count)
    128 {
    129 	if(readpart(p, offset, buf, count) != count){
    130 		chat("%T readpart %s at %#llux+%ud: %r\n", p->name, offset, count);
    131 		return -1;
    132 	}
    133 	return 0;
    134 }
    135 
    136 int
    137 ewritepart(Part *p, u64int offset, u8int *buf, u32int count)
    138 {
    139 	if(writepart(p, offset, buf, count) != count || flushpart(p) < 0){
    140 		chat("%T writepart %s at %#llux+%ud: %r\n", p->name, offset, count);
    141 		return -1;
    142 	}
    143 	return 0;
    144 }
    145 
    146 /*
    147  * Extra proc to do writes to dst, so that we can overlap reading
    148  * src with writing dst during copy.  This is an easy factor of two
    149  * (almost) in performance.
    150  */
    151 static Write wsync;
    152 static void
    153 writeproc(void *v)
    154 {
    155 	Write *w;
    156 
    157 	USED(v);
    158 	while((w = recvp(writechan)) != nil){
    159 		if(w == &wsync)
    160 			continue;
    161 		if(ewritepart(dst, w->o, w->p, w->n) < 0)
    162 			w->error = 1;
    163 	}
    164 }
    165 
    166 int
    167 copy(uvlong start, uvlong end, char *what, DigestState *ds)
    168 {
    169 	int i, n;
    170 	uvlong o;
    171 	enum {
    172 		Chunk = 1024*1024
    173 	};
    174 	static uchar tmpbuf[2*Chunk+MaxIo];
    175 	static uchar *tmp[2];
    176 	uchar *p;
    177 	Write w[2];
    178 
    179 	assert(start <= end);
    180 	assert(astart <= start && start < aend);
    181 	assert(astart <= end && end <= aend);
    182 
    183 	// align the buffers so readpart/writepart can do big transfers
    184 	p = tmpbuf;
    185 	if((uintptr)p%MaxIo)
    186 		p += MaxIo - (uintptr)p%MaxIo;
    187 	tmp[0] = p;
    188 	tmp[1] = p + Chunk;
    189 
    190 	if(verbose && start != end)
    191 		chat("%T   copy %,llud-%,llud %s\n", start, end, what);
    192 
    193 	i = 0;
    194 	memset(w, 0, sizeof w);
    195 	for(o=start; o<end; o+=n){
    196 		if(w[i].error)
    197 			goto error;
    198 		n = Chunk;
    199 		if(o+n > end)
    200 			n = end - o;
    201 		if(ereadpart(src, o, tmp[i], n) < 0)
    202 			goto error;
    203 		w[i].p = tmp[i];
    204 		w[i].o = o;
    205 		w[i].n = n;
    206 		w[i].error = 0;
    207 		sendp(writechan, &w[i]);
    208 		if(ds)
    209 			sha1(tmp[i], n, nil, ds);
    210 		i = 1-i;
    211 	}
    212 	if(w[i].error)
    213 		goto error;
    214 
    215 	/*
    216 	 * wait for queued write to finish
    217 	 */
    218 	sendp(writechan, &wsync);
    219 	i = 1-i;
    220 	if(w[i].error)
    221 		return -1;
    222 	return 0;
    223 
    224 error:
    225 	/*
    226 	 * sync with write proc
    227 	 */
    228 	w[i].p = nil;
    229 	w[i].o = 0;
    230 	w[i].n = 0;
    231 	w[i].error = 0;
    232 	sendp(writechan, &w[i]);
    233 	return -1;
    234 }
    235 
    236 /* single-threaded, for reference */
    237 int
    238 copy1(uvlong start, uvlong end, char *what, DigestState *ds)
    239 {
    240 	int n;
    241 	uvlong o;
    242 	static uchar tmp[1024*1024];
    243 
    244 	assert(start <= end);
    245 	assert(astart <= start && start < aend);
    246 	assert(astart <= end && end <= aend);
    247 
    248 	if(verbose && start != end)
    249 		chat("%T   copy %,llud-%,llud %s\n", start, end, what);
    250 
    251 	for(o=start; o<end; o+=n){
    252 		n = sizeof tmp;
    253 		if(o+n > end)
    254 			n = end - o;
    255 		if(ereadpart(src, o, tmp, n) < 0)
    256 			return -1;
    257 		if(ds)
    258 			sha1(tmp, n, nil, ds);
    259 		if(ewritepart(dst, o, tmp, n) < 0)
    260 			return -1;
    261 	}
    262 	return 0;
    263 }
    264 
    265 int
    266 asha1(Part *p, uvlong start, uvlong end, DigestState *ds)
    267 {
    268 	int n;
    269 	uvlong o;
    270 	static uchar tmp[1024*1024];
    271 
    272 	if(start == end)
    273 		return 0;
    274 	assert(start < end);
    275 
    276 	if(verbose)
    277 		chat("%T   sha1 %,llud-%,llud\n", start, end);
    278 
    279 	for(o=start; o<end; o+=n){
    280 		n = sizeof tmp;
    281 		if(o+n > end)
    282 			n = end - o;
    283 		if(ereadpart(p, o, tmp, n) < 0)
    284 			return -1;
    285 		sha1(tmp, n, nil, ds);
    286 	}
    287 	return 0;
    288 }
    289 
    290 uvlong
    291 rdown(uvlong a, int b)
    292 {
    293 	return a-a%b;
    294 }
    295 
    296 uvlong
    297 rup(uvlong a, int b)
    298 {
    299 	if(a%b == 0)
    300 		return a;
    301 	return a+b-a%b;
    302 }
    303 
    304 void
    305 mirror(int indx, Arena *sa, Arena *da)
    306 {
    307 	vlong v, si, di, end;
    308 	int clumpmax, blocksize, sealed;
    309 	static uchar buf[MaxIoSize];
    310 	ArenaHead h;
    311 	DigestState xds, *ds;
    312 	vlong shaoff, base;
    313 
    314 	base = sa->base;
    315 	blocksize = sa->blocksize;
    316 	end = sa->base + sa->size;
    317 
    318 	astart = base - blocksize;
    319 	aend = end + blocksize;
    320 
    321 	tag(indx, sa->name, "%T %s (%,llud-%,llud)\n", sa->name, astart, aend);
    322 
    323 	if(force){
    324 		copy(astart, aend, "all", nil);
    325 		return;
    326 	}
    327 
    328 	if(sa->diskstats.sealed && da->diskstats.sealed && scorecmp(da->score, zeroscore) != 0){
    329 		if(scorecmp(sa->score, da->score) == 0){
    330 			setstatus(Sealed+Mirrored);
    331 			if(verbose > 1)
    332 				chat("%T %s: %V sealed mirrored\n", sa->name, sa->score);
    333 			return;
    334 		}
    335 		chat("%T %s: warning: sealed score mismatch %V vs %V\n", sa->name, sa->score, da->score);
    336 		/* Keep executing; will correct seal if possible. */
    337 	}
    338 	if(!sa->diskstats.sealed && da->diskstats.sealed && scorecmp(da->score, zeroscore) != 0){
    339 		chat("%T %s: dst is sealed, src is not\n", sa->name);
    340 		status = "errors";
    341 		return;
    342 	}
    343 	if(sa->diskstats.used < da->diskstats.used){
    344 		chat("%T %s: src used %,lld < dst used %,lld\n", sa->name, sa->diskstats.used, da->diskstats.used);
    345 		status = "errors";
    346 		return;
    347 	}
    348 
    349 	if(da->clumpmagic != sa->clumpmagic){
    350 		/*
    351 		 * Write this now to reduce the window in which
    352 		 * the head and tail disagree about clumpmagic.
    353 		 */
    354 		da->clumpmagic = sa->clumpmagic;
    355 		memset(buf, 0, sizeof buf);
    356 		packarena(da, buf);
    357 		if(ewritepart(dst, end, buf, blocksize) < 0)
    358 			return;
    359 	}
    360 
    361 	memset(&h, 0, sizeof h);
    362 	h.version = da->version;
    363 	strcpy(h.name, da->name);
    364 	h.blocksize = da->blocksize;
    365 	h.size = da->size + 2*da->blocksize;
    366 	h.clumpmagic = da->clumpmagic;
    367 	memset(buf, 0, sizeof buf);
    368 	packarenahead(&h, buf);
    369 	if(ewritepart(dst, base - blocksize, buf, blocksize) < 0)
    370 		return;
    371 
    372 	shaoff = 0;
    373 	ds = nil;
    374 	sealed = sa->diskstats.sealed && scorecmp(sa->score, zeroscore) != 0;
    375 	if(sealed && dosha1){
    376 		/* start sha1 state with header */
    377 		memset(&xds, 0, sizeof xds);
    378 		ds = &xds;
    379 		sha1(buf, blocksize, nil, ds);
    380 		shaoff = base;
    381 	}
    382 
    383 	if(sa->diskstats.used != da->diskstats.used){
    384 		di = base+rdown(da->diskstats.used, blocksize);
    385 		si = base+rup(sa->diskstats.used, blocksize);
    386 		if(ds && asha1(dst, shaoff, di, ds) < 0)
    387 			return;
    388 		if(copy(di, si, "data", ds) < 0)
    389 			return;
    390 		shaoff = si;
    391 	}
    392 
    393 	clumpmax = sa->clumpmax;
    394 	di = end - da->diskstats.clumps/clumpmax * blocksize;
    395 	si = end - (sa->diskstats.clumps+clumpmax-1)/clumpmax * blocksize;
    396 
    397 	if(sa->diskstats.sealed){
    398 		/*
    399 		 * might be a small hole between the end of the
    400 		 * data and the beginning of the directory.
    401 		 */
    402 		v = base+rup(sa->diskstats.used, blocksize);
    403 		if(ds && asha1(dst, shaoff, v, ds) < 0)
    404 			return;
    405 		if(copy(v, si, "hole", ds) < 0)
    406 			return;
    407 		shaoff = si;
    408 	}
    409 
    410 	if(da->diskstats.clumps != sa->diskstats.clumps){
    411 		if(ds && asha1(dst, shaoff, si, ds) < 0)
    412 			return;
    413 		if(copy(si, di, "directory", ds) < 0)	/* si < di  because clumpinfo blocks grow down */
    414 			return;
    415 		shaoff = di;
    416 	}
    417 
    418 	da->ctime = sa->ctime;
    419 	da->wtime = sa->wtime;
    420 	da->diskstats = sa->diskstats;
    421 	da->diskstats.sealed = 0;
    422 
    423 	/*
    424 	 * Repack the arena tail information
    425 	 * and save it for next time...
    426 	 */
    427 	memset(buf, 0, sizeof buf);
    428 	packarena(da, buf);
    429 	if(ewritepart(dst, end, buf, blocksize) < 0)
    430 		return;
    431 
    432 	if(sealed){
    433 		/*
    434 		 * ... but on the final pass, copy the encoding
    435 		 * of the tail information from the source
    436 		 * arena itself.  There are multiple possible
    437 		 * ways to write the tail info out (the exact
    438 		 * details have changed as venti went through
    439 		 * revisions), and to keep the SHA1 hash the
    440 		 * same, we have to use what the disk uses.
    441 		 */
    442 		if(asha1(dst, shaoff, end, ds) < 0
    443 		|| copy(end, end+blocksize-VtScoreSize, "tail", ds) < 0)
    444 			return;
    445 		if(dosha1){
    446 			memset(buf, 0, VtScoreSize);
    447 			sha1(buf, VtScoreSize, da->score, ds);
    448 			if(scorecmp(sa->score, da->score) == 0){
    449 				setstatus(Sealed+Mirrored);
    450 				if(verbose > 1)
    451 					chat("%T %s: %V sealed mirrored\n", sa->name, sa->score);
    452 				if(ewritepart(dst, end+blocksize-VtScoreSize, da->score, VtScoreSize) < 0)
    453 					return;
    454 			}else{
    455 				chat("%T %s: sealing dst: score mismatch: %V vs %V\n", sa->name, sa->score, da->score);
    456 				memset(&xds, 0, sizeof xds);
    457 				asha1(dst, base-blocksize, end+blocksize-VtScoreSize, &xds);
    458 				sha1(buf, VtScoreSize, 0, &xds);
    459 				chat("%T   reseal: %V\n", da->score);
    460 				status = "errors";
    461 			}
    462 		}else{
    463 			setstatus(Mirrored);
    464 			if(verbose > 1)
    465 				chat("%T %s: %V mirrored\n", sa->name, sa->score);
    466 			if(ewritepart(dst, end+blocksize-VtScoreSize, sa->score, VtScoreSize) < 0)
    467 				return;
    468 		}
    469 	}else{
    470 		if(sa->diskstats.used > 0 || verbose > 1) {
    471 			chat("%T %s: %,lld used mirrored\n",
    472 				sa->name, sa->diskstats.used);
    473 		}
    474 		if(sa->diskstats.used > 0)
    475 			setstatus(Mirrored);
    476 		else
    477 			setstatus(Empty);
    478 	}
    479 }
    480 
    481 void
    482 mirrormany(ArenaPart *sp, ArenaPart *dp, char *range)
    483 {
    484 	int i, lo, hi;
    485 	char *s, *t;
    486 	Arena *sa, *da;
    487 
    488 	if(range == nil){
    489 		for(i=0; i<sp->narenas; i++){
    490 			sa = sp->arenas[i];
    491 			da = dp->arenas[i];
    492 			mirror(i, sa, da);
    493 		}
    494 		setstatus(-1);
    495 		return;
    496 	}
    497 	if(strcmp(range, "none") == 0)
    498 		return;
    499 
    500 	for(s=range; *s; s=t){
    501 		t = strchr(s, ',');
    502 		if(t)
    503 			*t++ = 0;
    504 		else
    505 			t = s+strlen(s);
    506 		if(*s == '-')
    507 			lo = 0;
    508 		else
    509 			lo = strtol(s, &s, 0);
    510 		hi = lo;
    511 		if(*s == '-'){
    512 			s++;
    513 			if(*s == 0)
    514 				hi = sp->narenas-1;
    515 			else
    516 				hi = strtol(s, &s, 0);
    517 		}
    518 		if(*s != 0){
    519 			chat("%T bad arena range: %s\n", s);
    520 			continue;
    521 		}
    522 		for(i=lo; i<=hi; i++){
    523 			sa = sp->arenas[i];
    524 			da = dp->arenas[i];
    525 			mirror(i, sa, da);
    526 		}
    527 		setstatus(-1);
    528 	}
    529 }
    530 
    531 
    532 void
    533 threadmain(int argc, char **argv)
    534 {
    535 	int i;
    536 	Arena *sa, *da;
    537 	ArenaPart *s, *d;
    538 	char *ranges;
    539 
    540 	ventifmtinstall();
    541 
    542 	ARGBEGIN{
    543 	case 'F':
    544 		force = 1;
    545 		break;
    546 	case 'v':
    547 		verbose++;
    548 		break;
    549 	case 's':
    550 		dosha1 = 0;
    551 		break;
    552 	default:
    553 		usage();
    554 	}ARGEND
    555 
    556 	if(argc != 2 && argc != 3)
    557 		usage();
    558 	ranges = nil;
    559 	if(argc == 3)
    560 		ranges = argv[2];
    561 
    562 	if((src = initpart(argv[0], OREAD)) == nil)
    563 		sysfatal("initpart %s: %r", argv[0]);
    564 	if((dst = initpart(argv[1], ORDWR)) == nil)
    565 		sysfatal("initpart %s: %r", argv[1]);
    566 	if((s = initarenapart(src)) == nil)
    567 		sysfatal("initarenapart %s: %r", argv[0]);
    568 	for(i=0; i<s->narenas; i++)
    569 		delarena(s->arenas[i]);
    570 	if((d = initarenapart(dst)) == nil)
    571 		sysfatal("loadarenapart %s: %r", argv[1]);
    572 	for(i=0; i<d->narenas; i++)
    573 		delarena(d->arenas[i]);
    574 
    575 	/*
    576 	 * The arena geometries must match or all bets are off.
    577 	 */
    578 	if(s->narenas != d->narenas)
    579 		sysfatal("arena count mismatch: %d vs %d", s->narenas, d->narenas);
    580 	for(i=0; i<s->narenas; i++){
    581 		sa = s->arenas[i];
    582 		da = d->arenas[i];
    583 		if(sa->version != da->version)
    584 			sysfatal("arena %d: version mismatch: %d vs %d", i, sa->version, da->version);
    585 		if(sa->blocksize != da->blocksize)
    586 			sysfatal("arena %d: blocksize mismatch: %d vs %d", i, sa->blocksize, da->blocksize);
    587 		if(sa->size != da->size)
    588 			sysfatal("arena %d: size mismatch: %,lld vs %,lld", i, sa->size, da->size);
    589 		if(strcmp(sa->name, da->name) != 0)
    590 			sysfatal("arena %d: name mismatch: %s vs %s", i, sa->name, da->name);
    591 	}
    592 
    593 	/*
    594 	 * Mirror one arena at a time.
    595 	 */
    596 	writechan = chancreate(sizeof(void*), 0);
    597 	vtproc(writeproc, nil);
    598 	mirrormany(s, d, ranges);
    599 	sendp(writechan, nil);
    600 	threadexitsall(status);
    601 }