mirrorarenas.c (12243B)
1 /* 2 * Mirror one arena partition onto another. 3 * Be careful to copy only new data. 4 */ 5 6 #include "stdinc.h" 7 #include "dat.h" 8 #include "fns.h" 9 10 Channel *writechan; 11 12 typedef struct Write Write; 13 struct Write 14 { 15 uchar *p; 16 int n; 17 uvlong o; 18 int error; 19 }; 20 21 Part *src; 22 Part *dst; 23 int force; 24 int verbose; 25 int dosha1 = 1; 26 char *status; 27 uvlong astart, aend; 28 29 void 30 usage(void) 31 { 32 fprint(2, "usage: mirrorarenas [-sv] src dst [ranges]\n"); 33 threadexitsall("usage"); 34 } 35 36 char *tagged; 37 char *tagname; 38 int tagindx; 39 40 void 41 tag(int indx, char *name, char *fmt, ...) 42 { 43 va_list arg; 44 45 if(tagged){ 46 free(tagged); 47 tagged = nil; 48 } 49 tagindx = indx; 50 tagname = name; 51 va_start(arg, fmt); 52 tagged = vsmprint(fmt, arg); 53 va_end(arg); 54 } 55 56 enum 57 { 58 Sealed = 1, 59 Mirrored = 2, 60 Empty = 4, 61 }; 62 63 void 64 setstatus(int bits) 65 { 66 static int startindx = -1; 67 static char *startname, *endname; 68 static int lastbits; 69 char buf[100]; 70 71 if(bits != lastbits) { 72 if(startindx >= 0) { 73 switch(lastbits) { 74 case Sealed: 75 snprint(buf, sizeof buf, "sealed"); 76 break; 77 case Mirrored: 78 snprint(buf, sizeof buf, "mirrored"); 79 break; 80 case Sealed+Mirrored: 81 snprint(buf, sizeof buf, "mirrored sealed"); 82 break; 83 case Empty: 84 snprint(buf, sizeof buf, "empty"); 85 break; 86 default: 87 snprint(buf, sizeof buf, "%d", bits); 88 break; 89 } 90 print("%T %s-%s %s\n", startname, endname, buf); 91 } 92 lastbits = bits; 93 startindx = tagindx; 94 startname = tagname; 95 endname = tagname; 96 } else { 97 endname = tagname; 98 } 99 if(bits < 0) { 100 startindx = -1; 101 return; 102 } 103 } 104 105 void 106 chat(char *fmt, ...) 107 { 108 va_list arg; 109 110 setstatus(-1); 111 112 if(tagged){ 113 write(1, tagged, strlen(tagged)); 114 free(tagged); 115 tagged = nil; 116 } 117 va_start(arg, fmt); 118 vfprint(1, fmt, arg); 119 va_end(arg); 120 } 121 122 #pragma varargck argpos tag 3 123 #pragma varargck argpos chat 1 124 125 126 int 127 ereadpart(Part *p, u64int offset, u8int *buf, u32int count) 128 { 129 if(readpart(p, offset, buf, count) != count){ 130 chat("%T readpart %s at %#llux+%ud: %r\n", p->name, offset, count); 131 return -1; 132 } 133 return 0; 134 } 135 136 int 137 ewritepart(Part *p, u64int offset, u8int *buf, u32int count) 138 { 139 if(writepart(p, offset, buf, count) != count || flushpart(p) < 0){ 140 chat("%T writepart %s at %#llux+%ud: %r\n", p->name, offset, count); 141 return -1; 142 } 143 return 0; 144 } 145 146 /* 147 * Extra proc to do writes to dst, so that we can overlap reading 148 * src with writing dst during copy. This is an easy factor of two 149 * (almost) in performance. 150 */ 151 static Write wsync; 152 static void 153 writeproc(void *v) 154 { 155 Write *w; 156 157 USED(v); 158 while((w = recvp(writechan)) != nil){ 159 if(w == &wsync) 160 continue; 161 if(ewritepart(dst, w->o, w->p, w->n) < 0) 162 w->error = 1; 163 } 164 } 165 166 int 167 copy(uvlong start, uvlong end, char *what, DigestState *ds) 168 { 169 int i, n; 170 uvlong o; 171 enum { 172 Chunk = 1024*1024 173 }; 174 static uchar tmpbuf[2*Chunk+MaxIo]; 175 static uchar *tmp[2]; 176 uchar *p; 177 Write w[2]; 178 179 assert(start <= end); 180 assert(astart <= start && start < aend); 181 assert(astart <= end && end <= aend); 182 183 // align the buffers so readpart/writepart can do big transfers 184 p = tmpbuf; 185 if((uintptr)p%MaxIo) 186 p += MaxIo - (uintptr)p%MaxIo; 187 tmp[0] = p; 188 tmp[1] = p + Chunk; 189 190 if(verbose && start != end) 191 chat("%T copy %,llud-%,llud %s\n", start, end, what); 192 193 i = 0; 194 memset(w, 0, sizeof w); 195 for(o=start; o<end; o+=n){ 196 if(w[i].error) 197 goto error; 198 n = Chunk; 199 if(o+n > end) 200 n = end - o; 201 if(ereadpart(src, o, tmp[i], n) < 0) 202 goto error; 203 w[i].p = tmp[i]; 204 w[i].o = o; 205 w[i].n = n; 206 w[i].error = 0; 207 sendp(writechan, &w[i]); 208 if(ds) 209 sha1(tmp[i], n, nil, ds); 210 i = 1-i; 211 } 212 if(w[i].error) 213 goto error; 214 215 /* 216 * wait for queued write to finish 217 */ 218 sendp(writechan, &wsync); 219 i = 1-i; 220 if(w[i].error) 221 return -1; 222 return 0; 223 224 error: 225 /* 226 * sync with write proc 227 */ 228 w[i].p = nil; 229 w[i].o = 0; 230 w[i].n = 0; 231 w[i].error = 0; 232 sendp(writechan, &w[i]); 233 return -1; 234 } 235 236 /* single-threaded, for reference */ 237 int 238 copy1(uvlong start, uvlong end, char *what, DigestState *ds) 239 { 240 int n; 241 uvlong o; 242 static uchar tmp[1024*1024]; 243 244 assert(start <= end); 245 assert(astart <= start && start < aend); 246 assert(astart <= end && end <= aend); 247 248 if(verbose && start != end) 249 chat("%T copy %,llud-%,llud %s\n", start, end, what); 250 251 for(o=start; o<end; o+=n){ 252 n = sizeof tmp; 253 if(o+n > end) 254 n = end - o; 255 if(ereadpart(src, o, tmp, n) < 0) 256 return -1; 257 if(ds) 258 sha1(tmp, n, nil, ds); 259 if(ewritepart(dst, o, tmp, n) < 0) 260 return -1; 261 } 262 return 0; 263 } 264 265 int 266 asha1(Part *p, uvlong start, uvlong end, DigestState *ds) 267 { 268 int n; 269 uvlong o; 270 static uchar tmp[1024*1024]; 271 272 if(start == end) 273 return 0; 274 assert(start < end); 275 276 if(verbose) 277 chat("%T sha1 %,llud-%,llud\n", start, end); 278 279 for(o=start; o<end; o+=n){ 280 n = sizeof tmp; 281 if(o+n > end) 282 n = end - o; 283 if(ereadpart(p, o, tmp, n) < 0) 284 return -1; 285 sha1(tmp, n, nil, ds); 286 } 287 return 0; 288 } 289 290 uvlong 291 rdown(uvlong a, int b) 292 { 293 return a-a%b; 294 } 295 296 uvlong 297 rup(uvlong a, int b) 298 { 299 if(a%b == 0) 300 return a; 301 return a+b-a%b; 302 } 303 304 void 305 mirror(int indx, Arena *sa, Arena *da) 306 { 307 vlong v, si, di, end; 308 int clumpmax, blocksize, sealed; 309 static uchar buf[MaxIoSize]; 310 ArenaHead h; 311 DigestState xds, *ds; 312 vlong shaoff, base; 313 314 base = sa->base; 315 blocksize = sa->blocksize; 316 end = sa->base + sa->size; 317 318 astart = base - blocksize; 319 aend = end + blocksize; 320 321 tag(indx, sa->name, "%T %s (%,llud-%,llud)\n", sa->name, astart, aend); 322 323 if(force){ 324 copy(astart, aend, "all", nil); 325 return; 326 } 327 328 if(sa->diskstats.sealed && da->diskstats.sealed && scorecmp(da->score, zeroscore) != 0){ 329 if(scorecmp(sa->score, da->score) == 0){ 330 setstatus(Sealed+Mirrored); 331 if(verbose > 1) 332 chat("%T %s: %V sealed mirrored\n", sa->name, sa->score); 333 return; 334 } 335 chat("%T %s: warning: sealed score mismatch %V vs %V\n", sa->name, sa->score, da->score); 336 /* Keep executing; will correct seal if possible. */ 337 } 338 if(!sa->diskstats.sealed && da->diskstats.sealed && scorecmp(da->score, zeroscore) != 0){ 339 chat("%T %s: dst is sealed, src is not\n", sa->name); 340 status = "errors"; 341 return; 342 } 343 if(sa->diskstats.used < da->diskstats.used){ 344 chat("%T %s: src used %,lld < dst used %,lld\n", sa->name, sa->diskstats.used, da->diskstats.used); 345 status = "errors"; 346 return; 347 } 348 349 if(da->clumpmagic != sa->clumpmagic){ 350 /* 351 * Write this now to reduce the window in which 352 * the head and tail disagree about clumpmagic. 353 */ 354 da->clumpmagic = sa->clumpmagic; 355 memset(buf, 0, sizeof buf); 356 packarena(da, buf); 357 if(ewritepart(dst, end, buf, blocksize) < 0) 358 return; 359 } 360 361 memset(&h, 0, sizeof h); 362 h.version = da->version; 363 strcpy(h.name, da->name); 364 h.blocksize = da->blocksize; 365 h.size = da->size + 2*da->blocksize; 366 h.clumpmagic = da->clumpmagic; 367 memset(buf, 0, sizeof buf); 368 packarenahead(&h, buf); 369 if(ewritepart(dst, base - blocksize, buf, blocksize) < 0) 370 return; 371 372 shaoff = 0; 373 ds = nil; 374 sealed = sa->diskstats.sealed && scorecmp(sa->score, zeroscore) != 0; 375 if(sealed && dosha1){ 376 /* start sha1 state with header */ 377 memset(&xds, 0, sizeof xds); 378 ds = &xds; 379 sha1(buf, blocksize, nil, ds); 380 shaoff = base; 381 } 382 383 if(sa->diskstats.used != da->diskstats.used){ 384 di = base+rdown(da->diskstats.used, blocksize); 385 si = base+rup(sa->diskstats.used, blocksize); 386 if(ds && asha1(dst, shaoff, di, ds) < 0) 387 return; 388 if(copy(di, si, "data", ds) < 0) 389 return; 390 shaoff = si; 391 } 392 393 clumpmax = sa->clumpmax; 394 di = end - da->diskstats.clumps/clumpmax * blocksize; 395 si = end - (sa->diskstats.clumps+clumpmax-1)/clumpmax * blocksize; 396 397 if(sa->diskstats.sealed){ 398 /* 399 * might be a small hole between the end of the 400 * data and the beginning of the directory. 401 */ 402 v = base+rup(sa->diskstats.used, blocksize); 403 if(ds && asha1(dst, shaoff, v, ds) < 0) 404 return; 405 if(copy(v, si, "hole", ds) < 0) 406 return; 407 shaoff = si; 408 } 409 410 if(da->diskstats.clumps != sa->diskstats.clumps){ 411 if(ds && asha1(dst, shaoff, si, ds) < 0) 412 return; 413 if(copy(si, di, "directory", ds) < 0) /* si < di because clumpinfo blocks grow down */ 414 return; 415 shaoff = di; 416 } 417 418 da->ctime = sa->ctime; 419 da->wtime = sa->wtime; 420 da->diskstats = sa->diskstats; 421 da->diskstats.sealed = 0; 422 423 /* 424 * Repack the arena tail information 425 * and save it for next time... 426 */ 427 memset(buf, 0, sizeof buf); 428 packarena(da, buf); 429 if(ewritepart(dst, end, buf, blocksize) < 0) 430 return; 431 432 if(sealed){ 433 /* 434 * ... but on the final pass, copy the encoding 435 * of the tail information from the source 436 * arena itself. There are multiple possible 437 * ways to write the tail info out (the exact 438 * details have changed as venti went through 439 * revisions), and to keep the SHA1 hash the 440 * same, we have to use what the disk uses. 441 */ 442 if(asha1(dst, shaoff, end, ds) < 0 443 || copy(end, end+blocksize-VtScoreSize, "tail", ds) < 0) 444 return; 445 if(dosha1){ 446 memset(buf, 0, VtScoreSize); 447 sha1(buf, VtScoreSize, da->score, ds); 448 if(scorecmp(sa->score, da->score) == 0){ 449 setstatus(Sealed+Mirrored); 450 if(verbose > 1) 451 chat("%T %s: %V sealed mirrored\n", sa->name, sa->score); 452 if(ewritepart(dst, end+blocksize-VtScoreSize, da->score, VtScoreSize) < 0) 453 return; 454 }else{ 455 chat("%T %s: sealing dst: score mismatch: %V vs %V\n", sa->name, sa->score, da->score); 456 memset(&xds, 0, sizeof xds); 457 asha1(dst, base-blocksize, end+blocksize-VtScoreSize, &xds); 458 sha1(buf, VtScoreSize, 0, &xds); 459 chat("%T reseal: %V\n", da->score); 460 status = "errors"; 461 } 462 }else{ 463 setstatus(Mirrored); 464 if(verbose > 1) 465 chat("%T %s: %V mirrored\n", sa->name, sa->score); 466 if(ewritepart(dst, end+blocksize-VtScoreSize, sa->score, VtScoreSize) < 0) 467 return; 468 } 469 }else{ 470 if(sa->diskstats.used > 0 || verbose > 1) { 471 chat("%T %s: %,lld used mirrored\n", 472 sa->name, sa->diskstats.used); 473 } 474 if(sa->diskstats.used > 0) 475 setstatus(Mirrored); 476 else 477 setstatus(Empty); 478 } 479 } 480 481 void 482 mirrormany(ArenaPart *sp, ArenaPart *dp, char *range) 483 { 484 int i, lo, hi; 485 char *s, *t; 486 Arena *sa, *da; 487 488 if(range == nil){ 489 for(i=0; i<sp->narenas; i++){ 490 sa = sp->arenas[i]; 491 da = dp->arenas[i]; 492 mirror(i, sa, da); 493 } 494 setstatus(-1); 495 return; 496 } 497 if(strcmp(range, "none") == 0) 498 return; 499 500 for(s=range; *s; s=t){ 501 t = strchr(s, ','); 502 if(t) 503 *t++ = 0; 504 else 505 t = s+strlen(s); 506 if(*s == '-') 507 lo = 0; 508 else 509 lo = strtol(s, &s, 0); 510 hi = lo; 511 if(*s == '-'){ 512 s++; 513 if(*s == 0) 514 hi = sp->narenas-1; 515 else 516 hi = strtol(s, &s, 0); 517 } 518 if(*s != 0){ 519 chat("%T bad arena range: %s\n", s); 520 continue; 521 } 522 for(i=lo; i<=hi; i++){ 523 sa = sp->arenas[i]; 524 da = dp->arenas[i]; 525 mirror(i, sa, da); 526 } 527 setstatus(-1); 528 } 529 } 530 531 532 void 533 threadmain(int argc, char **argv) 534 { 535 int i; 536 Arena *sa, *da; 537 ArenaPart *s, *d; 538 char *ranges; 539 540 ventifmtinstall(); 541 542 ARGBEGIN{ 543 case 'F': 544 force = 1; 545 break; 546 case 'v': 547 verbose++; 548 break; 549 case 's': 550 dosha1 = 0; 551 break; 552 default: 553 usage(); 554 }ARGEND 555 556 if(argc != 2 && argc != 3) 557 usage(); 558 ranges = nil; 559 if(argc == 3) 560 ranges = argv[2]; 561 562 if((src = initpart(argv[0], OREAD)) == nil) 563 sysfatal("initpart %s: %r", argv[0]); 564 if((dst = initpart(argv[1], ORDWR)) == nil) 565 sysfatal("initpart %s: %r", argv[1]); 566 if((s = initarenapart(src)) == nil) 567 sysfatal("initarenapart %s: %r", argv[0]); 568 for(i=0; i<s->narenas; i++) 569 delarena(s->arenas[i]); 570 if((d = initarenapart(dst)) == nil) 571 sysfatal("loadarenapart %s: %r", argv[1]); 572 for(i=0; i<d->narenas; i++) 573 delarena(d->arenas[i]); 574 575 /* 576 * The arena geometries must match or all bets are off. 577 */ 578 if(s->narenas != d->narenas) 579 sysfatal("arena count mismatch: %d vs %d", s->narenas, d->narenas); 580 for(i=0; i<s->narenas; i++){ 581 sa = s->arenas[i]; 582 da = d->arenas[i]; 583 if(sa->version != da->version) 584 sysfatal("arena %d: version mismatch: %d vs %d", i, sa->version, da->version); 585 if(sa->blocksize != da->blocksize) 586 sysfatal("arena %d: blocksize mismatch: %d vs %d", i, sa->blocksize, da->blocksize); 587 if(sa->size != da->size) 588 sysfatal("arena %d: size mismatch: %,lld vs %,lld", i, sa->size, da->size); 589 if(strcmp(sa->name, da->name) != 0) 590 sysfatal("arena %d: name mismatch: %s vs %s", i, sa->name, da->name); 591 } 592 593 /* 594 * Mirror one arena at a time. 595 */ 596 writechan = chancreate(sizeof(void*), 0); 597 vtproc(writeproc, nil); 598 mirrormany(s, d, ranges); 599 sendp(writechan, nil); 600 threadexitsall(status); 601 }