must batch up reads from pipes
[unix-history] / usr / src / sbin / dump / tape.c
CommitLineData
7a65e725 1#ifndef lint
ca485693 2static char *sccsid = "@(#)tape.c 1.10 (Berkeley) %G%";
7a65e725
SL
3#endif
4
ae4b153c 5#include "dump.h"
23b4aba9 6#include <signal.h>
ae4b153c 7
1ddebffe
SL
8char (*tblock)[TP_BSIZE]; /* Pointer to malloc()ed buffer for tape */
9int writesize; /* Size of malloc()ed buffer for tape */
f5bba473 10int trecno = 0;
23b4aba9 11extern int ntrec; /* blocking factor on tape */
1ddebffe
SL
12
13/*
23b4aba9
KM
14 * Streaming dump mods (Caltech) - disk block reading and tape writing
15 * are exported to several slave processes. While one slave writes the
16 * tape, the others read disk blocks; they pass control of the tape in
17 * a ring via pipes. The parent process traverses the filesystem and
18 * sends daddr's, inode records, etc, through pipes to each slave.
19 * Speed from Eagle to TU77 on VAX/780 is about 140 Kbytes/second.
20 * #ifdef RDUMP version is CPU-limited to about 40 Kbytes/second.
21 */
22struct req { /* instruction packets sent to slaves */
23 daddr_t dblk;
24 int count;
25} *req;
26int reqsiz;
27
28#define SLAVES 3 /* 2 slaves read disk while 3rd writes tape */
29#define LAG 2 /* Write behind by LAG tape blocks (rdump) */
30int slavefd[SLAVES]; /* Pipes from master to each slave */
31int rotor; /* Current slave number */
32int master; /* Pid of master, for sending error signals */
33int trace = 0; /* Protocol trace; easily patchable with adb */
34#define tmsg if (trace) msg
35
36#ifdef RDUMP
37extern int rmtape;
38#endif
39
40/*
41 * Allocate tape buffer contiguous with the array of instruction packets,
42 * so they can be written with a single write call in flusht().
1ddebffe
SL
43 */
44alloctape()
45{
46
47 writesize = ntrec * TP_BSIZE;
23b4aba9
KM
48 reqsiz = ntrec * sizeof(struct req);
49 req = (struct req *)malloc(reqsiz+writesize); /* array of packets */
50 tblock = (char (*)[TP_BSIZE]) &req[ntrec]; /* Tape buffer */
51 return (req != NULL);
1ddebffe
SL
52}
53
23b4aba9
KM
54/*
55 * Send special record to be put on tape
56 */
ae4b153c 57taprec(dp)
b6407c9d 58 char *dp;
ae4b153c 59{
ae4b153c 60
23b4aba9
KM
61 tmsg("taprec %d\n", trecno);
62 req[trecno].dblk = (daddr_t)0;
63 req[trecno].count = 1;
64 *(union u_spcl *)(*tblock++) = *(union u_spcl *)dp;
ae4b153c 65 spcl.c_tapea++;
23b4aba9 66 if (++trecno >= ntrec)
ae4b153c
BJ
67 flusht();
68}
69
f5bba473
KM
70dmpblk(blkno, size)
71 daddr_t blkno;
72 int size;
ae4b153c 73{
23b4aba9
KM
74 int tpblks, dblkno;
75 register int avail;
f5bba473 76
b6407c9d 77 if (size % TP_BSIZE != 0)
f5bba473 78 msg("bad size to dmpblk: %d\n", size);
b6407c9d 79 dblkno = fsbtodb(sblock, blkno);
23b4aba9
KM
80 tpblks = size / TP_BSIZE;
81 while ((avail = MIN(tpblks, ntrec - trecno)) > 0) {
82 tmsg("dmpblk %d\n", avail);
83 req[trecno].dblk = dblkno;
84 req[trecno].count = avail;
f5bba473
KM
85 trecno += avail;
86 spcl.c_tapea += avail;
23b4aba9
KM
87 if (trecno >= ntrec)
88 flusht();
b6407c9d
KM
89 dblkno += avail * (TP_BSIZE / DEV_BSIZE);
90 tpblks -= avail;
f5bba473 91 }
ae4b153c
BJ
92}
93
94int nogripe = 0;
95
23b4aba9
KM
96tperror() {
97 if (pipeout) {
98 msg("Tape write error on %s\n", tape);
99 msg("Cannot recover\n");
100 dumpabort();
101 /* NOTREACHED */
102 }
103 msg("Tape write error on tape %d\n", tapeno);
104 broadcast("TAPE ERROR!\n");
105 if (!query("Do you want to restart?"))
106 dumpabort();
107 msg("This tape will rewind. After it is rewound,\n");
108 msg("replace the faulty tape with a new one;\n");
109 msg("this dump volume will be rewritten.\n");
110 nogripe = 1;
111 close_rewind();
112 Exit(X_REWRITE);
113}
114
115senderr()
116{
117
118 perror("dump: pipe error in command to slave");
119 dumpabort();
120}
121
122#ifdef RDUMP
123tflush(cnt)
124 int cnt;
125{
126 int i;
127
128 for (i = 0; i < ntrec; i++)
129 spclrec();
130}
131#endif RDUMP
132
ae4b153c
BJ
133flusht()
134{
23b4aba9 135 int sig, siz = (char *)tblock - (char *)req;
ae4b153c 136
23b4aba9
KM
137 tmsg("flusht %d\n", siz);
138 sig = sigblock(1<<SIGINT-1 | 1<<SIGIOT-1); /* Don't interrupt write */
139 if (write(slavefd[rotor], req, siz) != siz)
140 senderr();
141 sigsetmask(sig);
142 if (++rotor >= SLAVES) rotor = 0;
143 tblock = (char (*)[TP_BSIZE]) &req[ntrec];
ae4b153c 144 trecno = 0;
1ddebffe 145 asize += writesize/density;
23b4aba9 146 asize += 7; /* inter-record gap (why fixed?) */
1ddebffe 147 blockswritten += ntrec;
a47b7e40 148 if (!pipeout && asize > tsize) {
ae4b153c
BJ
149 close_rewind();
150 otape();
151 }
152 timeest();
153}
154
155rewind()
156{
23b4aba9 157 register int f;
a47b7e40
KM
158
159 if (pipeout)
160 return;
23b4aba9
KM
161 for (f = 0; f < SLAVES; f++)
162 close(slavefd[f]);
163 while (wait(NULL) >= 0) ; /* wait for any signals from slaves */
164 msg("Tape rewinding\n");
165#ifdef RDUMP
166 rmtclose();
167 while (rmtopen(tape, 0) < 0)
168 sleep(10);
169 rmtclose();
ae4b153c 170#else
be3f486f
BJ
171 close(to);
172 while ((f = open(tape, 0)) < 0)
173 sleep (10);
174 close(f);
ae4b153c
BJ
175#endif
176}
177
178close_rewind()
179{
23b4aba9
KM
180 rewind();
181 if (!nogripe) {
ae4b153c
BJ
182 msg("Change Tapes: Mount tape #%d\n", tapeno+1);
183 broadcast("CHANGE TAPES!\7\7\n");
184 }
23b4aba9
KM
185 while (!query("Is the new tape mounted and ready to go?"))
186 if (query("Do you want to abort?"))
ae4b153c 187 dumpabort();
ae4b153c
BJ
188}
189
190/*
23b4aba9 191 * We implement taking and restoring checkpoints on the tape level.
ae4b153c
BJ
192 * When each tape is opened, a new process is created by forking; this
193 * saves all of the necessary context in the parent. The child
194 * continues the dump; the parent waits around, saving the context.
195 * If the child returns X_REWRITE, then it had problems writing that tape;
196 * this causes the parent to fork again, duplicating the context, and
197 * everything continues as if nothing had happened.
198 */
199
200otape()
201{
202 int parentpid;
203 int childpid;
204 int status;
205 int waitpid;
ae4b153c
BJ
206 int interrupt();
207
ae4b153c
BJ
208 parentpid = getpid();
209
210 restore_check_point:
211 signal(SIGINT, interrupt);
212 /*
213 * All signals are inherited...
214 */
215 childpid = fork();
23b4aba9 216 if (childpid < 0) {
ae4b153c
BJ
217 msg("Context save fork fails in parent %d\n", parentpid);
218 Exit(X_ABORT);
219 }
23b4aba9 220 if (childpid != 0) {
ae4b153c
BJ
221 /*
222 * PARENT:
223 * save the context by waiting
224 * until the child doing all of the work returns.
23b4aba9 225 * don't catch the interrupt
ae4b153c
BJ
226 */
227 signal(SIGINT, SIG_IGN);
228#ifdef TDEBUG
229 msg("Tape: %d; parent process: %d child process %d\n",
230 tapeno+1, parentpid, childpid);
231#endif TDEBUG
23b4aba9
KM
232 while ((waitpid = wait(&status)) != childpid)
233 msg("Parent %d waiting for child %d has another child %d return\n",
234 parentpid, childpid, waitpid);
235 if (status & 0xFF) {
ae4b153c
BJ
236 msg("Child %d returns LOB status %o\n",
237 childpid, status&0xFF);
238 }
239 status = (status >> 8) & 0xFF;
240#ifdef TDEBUG
23b4aba9 241 switch(status) {
ae4b153c
BJ
242 case X_FINOK:
243 msg("Child %d finishes X_FINOK\n", childpid);
244 break;
245 case X_ABORT:
246 msg("Child %d finishes X_ABORT\n", childpid);
247 break;
248 case X_REWRITE:
249 msg("Child %d finishes X_REWRITE\n", childpid);
250 break;
251 default:
23b4aba9
KM
252 msg("Child %d finishes unknown %d\n",
253 childpid, status);
ae4b153c
BJ
254 break;
255 }
256#endif TDEBUG
23b4aba9 257 switch(status) {
ae4b153c
BJ
258 case X_FINOK:
259 Exit(X_FINOK);
260 case X_ABORT:
261 Exit(X_ABORT);
262 case X_REWRITE:
263 goto restore_check_point;
264 default:
265 msg("Bad return code from dump: %d\n", status);
266 Exit(X_ABORT);
267 }
268 /*NOTREACHED*/
269 } else { /* we are the child; just continue */
270#ifdef TDEBUG
271 sleep(4); /* allow time for parent's message to get out */
272 msg("Child on Tape %d has parent %d, my pid = %d\n",
273 tapeno+1, parentpid, getpid());
274#endif
23b4aba9
KM
275#ifdef RDUMP
276 while ((to = rmtopen(tape, 2)) < 0)
277#else
278 while ((to = pipeout ? 1 : creat(tape, 0666)) < 0)
279#endif
280 if (!query("Cannot open tape. Do you want to retry the open?"))
281 dumpabort();
282
283 enslave(); /* Share open tape file descriptor with slaves */
ae4b153c
BJ
284
285 asize = 0;
286 tapeno++; /* current tape sequence */
287 newtape++; /* new tape signal */
288 spcl.c_volume++;
289 spcl.c_type = TS_TAPE;
290 spclrec();
291 if (tapeno > 1)
292 msg("Tape %d begins with blocks from ino %d\n",
293 tapeno, ino);
294 }
295}
296
ae4b153c
BJ
297dumpabort()
298{
23b4aba9
KM
299 if (master != 0 && master != getpid())
300 kill(master, SIGIOT);
ed7c701e 301 msg("The ENTIRE dump is aborted.\n");
ae4b153c
BJ
302 Exit(X_ABORT);
303}
304
305Exit(status)
306{
307#ifdef TDEBUG
308 msg("pid = %d exits with status %d\n", getpid(), status);
309#endif TDEBUG
ed7c701e 310 exit(status);
ae4b153c 311}
23b4aba9
KM
312
313#define OK 020
314char tok = OK;
315
316enslave()
317{
318 int prev[2], next[2], cmd[2]; /* file descriptors for pipes */
319 int i, j, slavepid;
320
321 master = getpid();
322 signal(SIGPIPE, dumpabort);
323 signal(SIGIOT, tperror); /* SIGIOT asks for restart from checkpoint */
324 pipe(prev);
325 for (i = rotor = 0; i < SLAVES; ++i) {
326 if ((i < SLAVES - 1 && pipe(next) < 0) || pipe(cmd) < 0
327 || (slavepid = fork()) < 0) {
328 perror(" DUMP: too many slaves");
329 dumpabort();
330 }
331 if (i >= SLAVES - 1)
332 next[1] = prev[1]; /* Last slave loops back */
333 slavefd[i] = cmd[1];
334 if (slavepid == 0) { /* Slave starts up here */
335 for (j = 0; j <= i; j++)
336 close(slavefd[j]);
337 if (i < SLAVES - 1) {
338 close(prev[1]);
339 close(next[0]);
340 } else { /* Insert initial token */
341 if (write(next[1], &tok, 1) != 1)
342 ringerr();
343 }
344 doslave(i, cmd[0], prev[0], next[1]);
345 close(next[1]);
346 j = read(prev[0], &tok, 1); /* Eat the final token */
347#ifdef RDUMP /* Read remaining acknowledges */
348 for (; j > 0 && (tok &~ OK) > 0; tok--) {
349 if (rmtwrite2() != writesize && (tok & OK)) {
350 kill(master, SIGIOT);
351 tok &= ~OK;
352 }
353 }
354#endif
355 Exit(X_FINOK);
356 }
357 close(cmd[0]);
358 close(next[1]);
359 close(prev[0]);
360 prev[0] = next[0];
361 }
362 master = 0;
363}
364
365/*
366 * Somebody must have died, should never happen
367 */
368ringerr()
369{
370 perror(" DUMP: token passing error");
371 kill(master, SIGPIPE);
372 Exit(X_ABORT);
373}
374
375doslave(num, cmd, prev, next)
376 int num, cmd, prev, next;
377{
378 tmsg("slave %d\n", num);
379 signal(SIGINT, SIG_IGN); /* Master handles it */
380 signal(SIGTERM, SIG_IGN);
381 signal(SIGPIPE, ringerr);
382 close(fi);
383 if ((fi = open(disk, 0)) < 0) { /* Need our own seek pointer */
384 perror(" DUMP: can't reopen disk");
385 kill(master, SIGPIPE);
386 Exit(X_ABORT);
387 }
ca485693 388 while (readpipe(cmd, req, reqsiz) == reqsiz) {
23b4aba9
KM
389 register struct req *p = req;
390 for (trecno = 0; trecno < ntrec; trecno += p->count, p += p->count) {
391 if (p->dblk) {
392 tmsg("%d READS %d\n", num, p->count);
393 bread(p->dblk, tblock[trecno],
394 p->count * TP_BSIZE);
395 } else {
396 tmsg("%d PIPEIN %d\n", num, p->count);
397 if (p->count != 1)
398 ringerr();
ca485693 399 if (readpipe(cmd, tblock[trecno], TP_BSIZE) != TP_BSIZE)
23b4aba9
KM
400 senderr();
401 }
402 }
403 if (read(prev, &tok, 1) != 1)
404 ringerr(); /* Wait your turn */
405 tmsg("%d WRITE\n", num);
406#ifdef RDUMP
407 if (tok & OK) {
408 rmtwrite0(writesize);
409 rmtwrite1(tblock[0], writesize);
410 tok++; /* Number of writes in progress */
411 }
412 if (tok > (LAG|OK) && (--tok, rmtwrite2() != writesize)) {
413#else
414 if ((tok & OK) &&
415 write(to, tblock[0], writesize) != writesize) {
416 perror(tape);
417#endif
418 kill(master, SIGIOT); /* restart from checkpoint */
419 tok &= ~OK;
420 }
421 if (write(next, &tok, 1) != 1)
422 ringerr(); /* Next slave's turn */
423 }
424 tmsg("%d CLOSE\n", num);
425}
ca485693
KM
426
427/*
428 * Since a read from a pipe may not return all we asked for
429 * we must loop until we get all we need
430 */
431readpipe(fd, buf, cnt)
432 int fd;
433 char *buf;
434 int cnt;
435{
436 int rd, got;
437
438 for (rd = cnt; rd > 0; rd -= got) {
439 got = read(fd, buf, rd);
440 if (got < 0)
441 return (got);
442 if (got == 0)
443 return (cnt - rd);
444 buf += got;
445 }
446 return (cnt);
447}